# Store Sales - Time Series Forecasting

Use machine learning to predict grocery sales. [source](https://www.kaggle.com/competitions/store-sales-time-series-forecasting/overview/description)

## Objective

In this Kaggle competition, the goal is to 

> build a model that more accurately predicts the unit sales for thousands of items sold at different Favorita stores.

The evaluation metric for this competition is ***Root Mean Squared Logarithmic Error***.

The `RMSLE` is calculated as:

$$\sqrt{ \frac{1}{n} \sum_{i=1}^n \left(\log (1 + \hat{y}_i) - \log (1 + y_i)\right)^2}$$

where:

- $ n $ is the total number of instances,
     
- $\hat{y}$ is the predicted value of the target for instance (i),
   
- $y_i$ is the actual value of the target for instance (i), and,
 
- $log$ is the natural logarithm.

For each id in the test set, you must predict a value for the sales variable. The file should contain a header and have the following format:

    ```
    id,sales
    3000888,0.0
    3000889,0.0
    3000890,0.0
    3000891,0.0
    3000892,0.0
    etc.
    ```


## Libraries for this research notebook

Extra libraries for this version of notebook:

via below conda or pip install (or copy+paste into Terminal):
- ydata-profiling
- tqdm
- connectorx
- pycaret

needs to be installed separately:
- [pscale cli](https://github.com/planetscale/cli#installation) 



In [None]:
# !conda install ydata-profiling tqdm connectorx pycaret

In [None]:
# !pip install ydata-profiling tqdm connectorx pycaret

In [1]:
import pandas as pd
from tqdm.auto import tqdm

# to overcome path issue for src
%reload_ext autoreload
%autoreload 2

from pathlib import Path
import sys

# set the path to the current file
current_file_path = Path().resolve()
print(f"current_file_path is {current_file_path}")

# set the path to the src folder
src_folder_path = current_file_path.parent / 'src'
print(f"src_folder_path is {src_folder_path}")

# add the src folder to the system path
sys.path.append(str(src_folder_path))

from data_loader import DBDataLoader
from logger import logging

current_file_path is /Users/galvangoh/Desktop/Tech/ai_practitioners/time-series-forecasting/time-series-forecasting/notebooks
src_folder_path is /Users/galvangoh/Desktop/Tech/ai_practitioners/time-series-forecasting/time-series-forecasting/src


In [2]:
import sqlalchemy
import pandas
import polars
print(f'{sqlalchemy.__version__  = }')
print(f'{pandas.__version__  = }')
print(f'{polars.__version__  = }')

sqlalchemy.__version__  = '1.4.40'
pandas.__version__  = '1.5.3'
polars.__version__  = '0.18.9'


## Initial Data Analysis

Making sure that the csv files imported to SQL server is identical to localhost and planetscale `time_series` database.

### Dates in train.csv

```sql 
select count(*) from train;
``` 

|count(*)|
|--------|
| 3000888|

```sql
select
    min(tr.`date`) as oldest_date_train, 
    max(tr.`date`) as newest_date_train
from
    train as tr;
```

|oldest_date_train|newest_date_train|
|-----------------|-----------------|
|       2013-01-01|       2017-08-15|


### Dates in test.csv

```sql 
select count(*) from test;
``` 

|count(*)|
|--------|
|   28512|


```sql
select
    min(ts.`date`) as oldest_date_test, 
    max(ts.`date`) as newest_date_test
from
    test as ts;
```

|oldest_date_test|newest_date_test|
|----------------|----------------|
|      2017-08-16|      2017-08-31|


### attempt at overcoming pscale limitationns

Due to the *free tier* on [remote database on planetscale](https://app.planetscale.com/ai-practitioners) imposing limitations of query returns of 100,000 records and execution time of 20s, we have further reduced the returned results of the query and then saved as SQL Views for easier and faster retrieval.

row counts of each table or view:

|table / view | records count| description
|-----------------|-----------------|-----------------|
| train.csv | 3,000,888 | full count of raw train.csv |
| full_df | 3,054,348 | full df after join of 5 tables | 
| quito | 1,000,296 | join of train, store & transaction and pull data for `quito` city for all years and stores |
| sales_year_state | 818,021 | sales statistics from `full_df` groupedby year, state, family, sales | 
| year_2013 | 648,648 | join of train, store & transaction and pull data for `2013` year for all cities and stores | 
| quito_2013 | 216,216 | join of train, store & transaction and pull data for `2013` year and `quito`  city for all stores | 
| quito_44 | 36,036 | join of train, store & transaction and pull data for `quito` city and store_nbr 44 for all the years | 


## Data Ingestion

Query data from MySQL

In [None]:
query='''SELECT * FROM train'''
CHUNKSIZE=100000

### (1.1) on localhost using connextorx

In [4]:
conn=DBDataLoader().get_connection_string("local", "connectorx")  # no need to swap to "remote" as ConnectorX does not support remote connections
print("[+] localhost Connection Successful using connectorx")

[+] localhost Connection Successful using connectorx


In [None]:
%%timeit

df=cx.read_sql(conn, query) #, partition_on="id", partition_num=10)
print(f'{df.shape = }')

### (1.2) on localhost using sqlalchemy

In [None]:
from sqlalchemy import create_engine

engine = create_engine(DBDataLoader().get_connection_string("local", "sqlalchemy")) # swap to "remote" for accessing planetscale
print("[+] localhost Connection Successful using sqlalchemy")

In [None]:
%%timeit

df = pd.read_sql(sql=query, con=engine)
print(f'{df.shape = }')

In [None]:
%%timeit

chunks = pd.read_sql(sql=query, con=engine, chunksize=CHUNKSIZE)
# print(f'{df.shape = }')
# print(f'chunks size: {sys.getsizeof(chunks)}')
# logging.info(f"chunks loaded {sys.getsizeof(chunks)}")

df_pandas = pd.DataFrame()
# for i in tqdm(range(sys.getsizeof(chunks)), desc='Reading from View'):
for chunk in chunks:
    df_pandas = pd.concat([df_pandas, chunk])

print(f'{df_pandas.shape = }')

### (1.3) on localhost using sqlalchemy + polars

In [None]:
%%timeit
import polars as pl
query = "SELECT * FROM train"
df = pl.read_database(query=query, connection=conn, engine="connectorx")
print(f'{df.shape = }')

Notes on chunking / partitioning / yield_per

https://planetscale.com/blog/using-mysql-with-sql-alchemy-hands-on-examples

```python
from sqlalchemy import create_engine
connection_string = "mysql+mysqlconnector://user1:pscale_pw_abc123@us-east.connect.psdb.cloud:3306/sqlalchemy"
engine = create_engine(connection_string, echo=True)
```

> By default, SSL/TLS usage in mysql-connector-python is enabled, which is required to connect to PlanetScale. This means you do not need to pass it into create_engine() as a connection arguement.

https://docs.sqlalchemy.org/en/20/orm/queryguide/api.html#orm-queryguide-yield-per
https://docs.sqlalchemy.org/en/20/core/connections.html#engine-stream-results

```python
with engine.connect() as conn:
    with conn.execution_options(yield_per=100).execute(
        text("select * from table")
    ) as result:
        for partition in result.partitions():
            # partition is an iterable that will be at most 100 items
            for row in partition:
                print(f"{row}")
```



### (2.1) on remote using sqlalchemy

In [None]:
# remote access to planetscale
engine = create_engine(DBDataLoader().get_connection_string("remote", "sqlalchemy")) # swap to "remote" for accessing planetscale
print("[+] planetscale Connection Successful")

In [None]:
%%timeit
with engine.connect() as conn:
    conn.execute("SET WORKLOAD = 'olap'")
    result = conn.execute(query)
    rows = result.fetchall()
    df = pd.DataFrame(rows, columns=result.keys())

print(f'{df.shape = }')

In [None]:
%%timeit

# initialize empty list to store dataframes
dfs = []
# loop through chunks of data and append to list
with engine.connect() as conn:
    conn.execute("SET WORKLOAD = 'olap'")
    result = conn.execute(query).yield_per(CHUNKSIZE)
    for chunk in iter(lambda: result.fetchmany(CHUNKSIZE), []):
        dfs.append(pd.DataFrame(chunk))
        # print(f'chunk#{len(dfs)}')

# concatenate dataframes into one dataframe
big_df = pd.concat(dfs)
print(f'{big_df.shape = }')

### (2.2) on remote using sqlalchemy + polars

In [None]:
# %%timeit
# df_polars = pl.read_database(query=query, connection_uri=get_connection_string("remote", "connectorx"), engine="connectorx")

(2.2) on remote using sqlalchemy + polars is a NO GO

MySqlError { ERROR 1105 (HY000): unknown error: Code: UNAVAILABLE
    server does not allow insecure connections, client must use SSL/TLS

## Summary 

Notes: for comparisons  planetscale limitations of 20sec and 100000 rows. Times are not exact, of course, each run would have +-. 

### `time_series` on localhost
- MySql Workbench fetch `query` in less than **5.969 s / 0.203 s (duration/fetch)** for 216,216 rows × 10 columns
- connectorx+pandas fetch `query` in **13.1 s ± 769 ms per loop** for 216,216 rows, with no partitioning
- sqlalchemy+pandas fetch `query` in **7.18 s ± 79.7 ms per loop** for 216,216 rows, without using chunks
- sqlalchemy+pandas fetch `query` in **7.32 s ± 43.5 ms per loop** for 216,216 rows, with CHUNKSIZE=100000
- connectorx+polars fetch `query` in **6.44 s ± 82.1 ms per loop** for 216,216 rows, without using chunks

### `time_series` on planetscale
- connectorx does not work on remote
- MySql Workbench fetch `query` : hit the 100,000 rows limit just before 20sec mark
- sqlalchemy(mysqldb)+pandas : got the "OperationalError: (MySQLdb.OperationalError) (1105, 'charset/name utf8mb3 is not supported')"
- sqlalchemy(mysqlconnector)+pandas fetch `query` in **11.1 s ± 529 ms per loop** for 216,216 rows, without using chunks
- sqlalchemy(mysqlconnector)+pandas fetch `query` in **11.5 s ± 624 ms per loop** for 216,216 rows, with CHUNKSIZE=100000
- sqlalchemy(connectorx)+polars fetch `query` ERROR *client must use SSL/TLS*

### "split df by city code" to CSV files

In [None]:
query='''
select 
    state, city, 
    `year`, month, day_of_month,
    count(id),
    coalesce(sum(sales), 0) as sales_total
from full_df
group by state, city, `year`, month, day_of_month
order by `year` asc, `month` asc, day_of_month asc
'''

# set the path to the data folder
data_folder_path = current_file_path.parent / 'data'
print(f"data_folder_path is {data_folder_path}")

df = pd.read_sql(sql=query, con=conn)

for city, group in df.groupby('city'):
    filename = data_folder_path / f'{city}.csv'
    group.to_csv(filename, index=False)

## Still to review below onwards

In [None]:
df

In [None]:
df.shape

In [None]:
df.info()

DF loaded confirm: 1972674 rows × 14 columns

In [None]:
stmt='select * from VwDump1'

chunks = db.load(query=stmt)

print(f'VwDump1 chunks size: {sys.getsizeof(chunks)}')
logging.info(f"VwDump1 chunks loaded {sys.getsizeof(chunks)}")

view_df = pd.DataFrame()
for i in tqdm(range(sys.getsizeof(chunks)), desc='Reading from View'):
    for chunk in chunks:
        view_df = pd.concat([view_df, chunk])

In [None]:
view_df.head()

DF loaded confirm: 3000888 rows × 14 columns

In [None]:
view_df.info()

In [None]:
# trying out data ingestion with connectorx library
from sqlalchemy import create_engine, text
from dotenv import dotenv_values
config = dotenv_values()

In [None]:
# create engine to talk to database
engine = create_engine(
    f'mysql+pymysql://'             # dialect + driver
    f'{config.get("USERNAME")}'     # username
    f':{config.get("PASSWORD")}'    # password
    f'@{config.get("ENDPOINT")}'    # host
    f':{config.get("PORT")}'        # port
    f'/{config.get("DBNAME")}'      # database
)

In [None]:
# establish connection and make the query
with engine.connect() as cnxn:
    with open('../src/scripts/query_data.sql') as f:
        query = text(f.read())
        results = pd.read_sql(query, cnxn)

# runtime 1 min 1.4 secs

In [None]:
results.shape

In [None]:
from dotenv import load_dotenv
import os
import mysql.connector

load_dotenv()

In [None]:
DB_HOST = os.getenv("PS_HOST")
DB_USERNAME = os.getenv("PS_USERNAME")
DB_PASSWORD = os.getenv("PS_PASSWORD")
DB_DATABASE = os.getenv("PS_DATABASE")

In [None]:
connection = mysql.connector.connect(
    host=DB_HOST,
    user=DB_USERNAME,
    password=DB_PASSWORD,
    database=DB_DATABASE,
    # ssl_verify_identity=True,
    # ssl_ca="/etc/ssl/certs/ca-certificates.crt"
)

In [None]:
cursor = connection.cursor()

batch_size = 100000
start_id = 0
rows = []

while True:
    
    cursor.execute(
        f"""
        SELECT *
        FROM train
        WHERE id >= {start_id}
        ORDER BY id
        LIMIT {batch_size}
        """
    )
    
    batch = cursor.fetchall()
    if not batch:
        break
    
    rows.extend(batch)
    start_id = batch[-1][0] + 1
    
    if len(rows) >= 3000000:
        break

df = pd.DataFrame.from_records(rows, columns=[desc[0] for desc in cursor.description])

In [None]:
connection = mysql.connector.connect(
    host=DB_HOST,
    user=DB_USERNAME,
    password=DB_PASSWORD,
    database=DB_DATABASE,
    # ssl_verify_identity=True,
    # ssl_ca="/etc/ssl/certs/ca-certificates.crt"
    connect_timeout=1000
)

cursor = connection.cursor()

cursor.execute(
    f"""
    SET GLOBAL connect_timeout=60;
    """
)

cursor.execute(
    f"""
    SET WORKLOAD = 'olap';
    """
)

cursor.execute(
    f"""
    SELECT * FROM full_df
    """
)

rows = cursor.fetchall()

print(len(rows))

df = pd.DataFrame.from_records(rows, columns=[desc[0] for desc in cursor.description])

cursor.close()
connection.close()

In [None]:
df.shape

## Data cleaning

In [None]:
df.dtypes

In [None]:
# df['date'] = pd.to_datetime(df['date'])

In [None]:
df.set_index('date', drop=True, inplace=True)

In [None]:
df['weekday'] = df.index.day_name()

In [None]:
df.isnull().sum()

In [None]:
groupby_store = df.groupby(by=['store_nbr', 'family'], group_keys=True).agg('sum', 'mean')

In [None]:
groupby_store.info()

## Data profile

In [None]:
# from ydata_profiling import ProfileReport

In [None]:
# profile = ProfileReport(df, tsmode=True, title="Time-Series EDA Quito City")
# profile.to_notebook_iframe()
# profile.to_file("../artifacts/reports/quito_ProfileReport.html") # AttributeError: 'float' object has no attribute 'shape'

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
# Use seaborn style defaults and set the default figure size
sns.set(rc={'figure.figsize':(11, 4)})

In [None]:
df['sales_sum'].plot();

In [None]:
df.columns

In [None]:
cols_plot = ['onpromotion_sum', 'transactions_sum', 'sales_sum']
axes = df[cols_plot].plot(marker='.', alpha=0.5, linestyle='None', figsize=(11, 9), subplots=True)
for ax in axes:
    ax.set_ylabel('Daily Totals (GWh)')

In [None]:
df_2013 = df.loc['2013']

In [None]:
df_2013

In [None]:
ax = df_2013.loc['2013', 'sales_sum'].plot()
ax.set_ylabel('Daily Sales for 2013');

### Seasonality

In [None]:
fig, axes = plt.subplots(3, 1, figsize=(11, 10), sharex=True)
for name, ax in zip(cols_plot, axes):
    sns.boxplot(data=df, x='weekday', y=name, ax=ax)
    ax.set_ylabel('Sum')
    ax.set_title(name)
    # Remove the automatic x-axis label from all but the bottom subplot
    if ax != axes[-1]:
        ax.set_xlabel('')

In [None]:
fig, axes = plt.subplots(3, 1, figsize=(11, 10), sharex=True)
for name, ax in zip(cols_plot, axes):
    sns.boxplot(data=df, x='month', y=name, ax=ax)
    ax.set_ylabel('Sum')
    ax.set_title(name)
    # Remove the automatic x-axis label from all but the bottom subplot
    if ax != axes[-1]:
        ax.set_xlabel('')

In [None]:
fig, axes = plt.subplots(3, 1, figsize=(11, 10), sharex=True)
for name, ax in zip(cols_plot, axes):
    sns.boxplot(data=df, x='store_nbr', y=name, ax=ax)
    ax.set_ylabel('Sum')
    ax.set_title(name)
    # Remove the automatic x-axis label from all but the bottom subplot
    if ax != axes[-1]:
        ax.set_xlabel('')

## Data preprocessing

In [None]:
df.store_nbr.value_counts()

Notes:

- have outliers needing treatment; need to treat outliers first before we can analyse seasonality for `weekday` and `month`
- selected Store 44 as it has most spread for `transaction_sum` meaning more activity; note assumption that 0 transaction means store is closed as there's no sale on that day
- which tracks with 0 transactions occuring on days with holidays (not included to keep dataset small)

In [None]:
df_str44 = df[(df.store_nbr == 44)]

In [None]:
grp_df_str44 = df_str44.groupby(by=['date'], group_keys=True).agg('sum')

In [None]:
grp_df_str44.drop(columns=['id','store_nbr','year','month','day_of_month'], inplace=True)

In [None]:
grp_df_str44 = grp_df_str44.asfreq('D')

## autoML with pycaret

EDA and ML


In [None]:
grp_df_str44.info()

In [None]:
# check installed version
import pycaret
pycaret.__version__

In [None]:
# import pycaret time series and init setup
from pycaret.time_series import *
s = setup(grp_df_str44,  
            target='sales_sum', 
            fh = 28, 
            session_id = 123, 
            profile=True,
            numeric_imputation_exogenous='mean',
            numeric_imputation_target="median",
            # ignore_features = ['id', 'family', 'store_nbr']
          )  


In [None]:
# check statistical tests on original data
check_stats()

sources for add_metric()

- https://towardsdatascience.com/predict-customer-churn-the-right-way-using-pycaret-8ba6541608ac
- https://github.com/pycaret/pycaret/issues/3491
- https://github.com/pycaret/pycaret/issues/1063

In [None]:
from sklearn.metrics import mean_squared_log_error

# create a custom function
def rmsle(y_true, y_pred):
    return mean_squared_log_error(y_true
                        , y_pred
                        , squared=False)

add_metric('msle', 'MSLE', mean_squared_log_error, greater_is_better=False) # default squared=True
add_metric('rmsle', 'RMSLE', rmsle, greater_is_better=False) # for problem statement squared=False

In [None]:
# compare baseline models
best = compare_models()