# Step 1: Make the base dataset

In this notebook we will create the base dataset that we will work with for feature engineering and modelling. The base dataset is the data after joining all the relevant information we need for feature engineering and modelling (e.g., units sold, price, special events, etc.). The raw data from Kaggle comes as a set of csv files.  We will load each csv file and join them to create our base dataset. We will then compute features from this base dataset in later notebooks. 

In a business setting the data would likely be stored in a database in separate tables. So one common pattern would be to use SQL or Spark to create the base dataset. If we were producing a regular forecast as a batch process (e.g., a weekly forecast), we would run this step periodically as the source data gets updated.

In [1]:
from pathlib import Path

import numpy as np
import pandas as pd

# Config paths

Specify the directories that we will read data from and write data to. In practice we would store this information in a separate configuration file rather than duplicate it across all notebooks. For simplicity, we specify the paths in the notebook itself.

In [2]:
# Directory containing the raw data
data_sources = Path("../data_sources")

# Output processed data (i.e., the base dataset)
processed_data_dir = Path("../processed_data")
processed_data_dir.mkdir(exist_ok=True)

Specify path to the datasets we will use.

In [3]:
sales_data = data_sources / "sales_train_evaluation.csv"
calendar_and_promos_data = data_sources / "calendar.csv"
price_data = data_sources / "sell_prices.csv"

# Load datasets

We will specify `dtypes` of each column and pass this to `pd.read_csv` when loading data. This will allow us to be more memory efficient (e.g., using `category` rather than `object` to denote categorical variables in our pandas dataframes).

**Load sales data**

In [4]:
sales_dtypes = {
    "id": "category",
    "item_id": "category",
    "dept_id": "category",
    "cat_id": "category",
    "store_id": "category",
    "state_id": "category",
    **{f"d_{i}": np.uint64 for i in range(1942)},
}

df = pd.read_csv(sales_data, dtype=sales_dtypes)
df.head()

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,d_1,d_2,d_3,d_4,...,d_1932,d_1933,d_1934,d_1935,d_1936,d_1937,d_1938,d_1939,d_1940,d_1941
0,HOBBIES_1_001_CA_1_evaluation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,2,4,0,0,0,0,3,3,0,1
1,HOBBIES_1_002_CA_1_evaluation,HOBBIES_1_002,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,0,1,2,1,1,0,0,0,0,0
2,HOBBIES_1_003_CA_1_evaluation,HOBBIES_1_003,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,1,0,2,0,0,0,2,3,0,1
3,HOBBIES_1_004_CA_1_evaluation,HOBBIES_1_004,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,1,1,0,4,0,1,3,0,2,6
4,HOBBIES_1_005_CA_1_evaluation,HOBBIES_1_005,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,0,0,0,2,1,0,0,2,1,0


**Load calendar & promos data**

In [5]:
cal_dtypes = {
    "d": "category",
    "wm_yr_wk": np.uint16,
    "event_name_1": "category",
    "event_type_1": "category",
    "event_name_2": "category",
    "event_type_2": "category",
    "snap_CA": np.uint8,
    "snap_TX": np.uint8,
    "snap_WI": np.uint8,
}
df_cal = pd.read_csv(calendar_and_promos_data, dtype=cal_dtypes, parse_dates=["date"])

df_cal.head()

Unnamed: 0,date,wm_yr_wk,weekday,wday,month,year,d,event_name_1,event_type_1,event_name_2,event_type_2,snap_CA,snap_TX,snap_WI
0,2011-01-29,11101,Saturday,1,1,2011,d_1,,,,,0,0,0
1,2011-01-30,11101,Sunday,2,1,2011,d_2,,,,,0,0,0
2,2011-01-31,11101,Monday,3,1,2011,d_3,,,,,0,0,0
3,2011-02-01,11101,Tuesday,4,2,2011,d_4,,,,,1,1,0
4,2011-02-02,11101,Wednesday,5,2,2011,d_5,,,,,1,0,1


The `event` columns represent the absence of an event with a `NaN` value. Missing data will cause issues with feature engineering steps and some ML models. So we will impute the missing values with an additional category called `"no_event"`.

In [6]:
for event_col in df_cal.filter(like="event").columns:
    df_cal[event_col] = df_cal[event_col].cat.add_categories("no_event").fillna("no_event")

**Load price data**

In [7]:
price_dtypes = {
    "store_id": "category",
    "item_id": "category",
    "wm_yr_wk": np.uint16,
    "sell_price": np.float64,
}
df_price = pd.read_csv(price_data, dtype=price_dtypes)

df_price.head()

Unnamed: 0,store_id,item_id,wm_yr_wk,sell_price
0,CA_1,HOBBIES_1_001,11325,9.58
1,CA_1,HOBBIES_1_001,11326,9.58
2,CA_1,HOBBIES_1_001,11327,8.26
3,CA_1,HOBBIES_1_001,11328,8.26
4,CA_1,HOBBIES_1_001,11329,8.26


# Create a panel view of the time series

Create a panel view of the time series using `melt`.

In [8]:
df_sales = df.melt(
    id_vars=["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"],
    var_name="d",
    value_name="y", # `y` is the number of units sold on a given day
)

df_sales.head()

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,d,y
0,HOBBIES_1_001_CA_1_evaluation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,d_1,0
1,HOBBIES_1_002_CA_1_evaluation,HOBBIES_1_002,HOBBIES_1,HOBBIES,CA_1,CA,d_1,0
2,HOBBIES_1_003_CA_1_evaluation,HOBBIES_1_003,HOBBIES_1,HOBBIES,CA_1,CA,d_1,0
3,HOBBIES_1_004_CA_1_evaluation,HOBBIES_1_004,HOBBIES_1,HOBBIES,CA_1,CA,d_1,0
4,HOBBIES_1_005_CA_1_evaluation,HOBBIES_1_005,HOBBIES_1,HOBBIES,CA_1,CA,d_1,0


# Create base data set by joining date, special events, and promos columns to sales dataframe.

Join calendar data to sales dataframe.

In [9]:
df_sales = df_sales.merge(
    right=df_cal[
        [
            "d",
            "date",
            "wm_yr_wk",
            "event_name_1",
            "event_type_1",
            "event_name_2",
            "event_type_2",
            "snap_CA",
            "snap_TX",
            "snap_WI",
        ]
    ],
    on="d",
)

Join price column to sales dataframe.

In [None]:
df_sales = df_sales.merge(right=df_price, on=["store_id", "item_id", "wm_yr_wk"])

Sort by time series id and date, and drop columns that are not needed. 

In [None]:
df_sales =(
          df_sales.sort_values(by=["id", "date"])  # Sort by time series id and date.
                   .drop(columns=["wm_yr_wk", "d"])  # Drop columns that are no longer needed.
          )

df_sales.head()
Let's look at the final dataset.

In [None]:
df_sales.head()


In [None]:
display(df_sales.shape)

Let's save the full dataset as parquet (far more memory efficient than csv). In this example, we will partition the data by `store_id` so we can make it easier to read the data of just one store. 

See these links to learn more about working with the parquet file format:  
* https://parquet.apache.org/docs/
* https://towardsdatascience.com/easy-parquet-tutorial-best-practices-237955e46cb7
* https://blog.datasyndrome.com/python-and-parquet-performance-e71da65269ce

In [None]:
f_out = processed_data_dir / "data" # We specify a directory rather than a file when
                                    # saving partitioned data.
df_sales.to_parquet(f_out, 
                    index=False, 
                    engine="pyarrow", 
                    partition_cols=["store_id"]
                   )

# An aside on row groups

See the below article on predicate pushdown filtering and row groups. This allows us to apply filters before the data is read into memory, which is pretty neat!

http://peter-hoffmann.com/2020/understand-predicate-pushdown-on-rowgroup-level-in-parquet-with-pyarrow-and-python.html

In [None]:
f_in = processed_data_dir / "data"
df = pd.read_parquet(
    path=f_in, 
    engine="pyarrow",
    filters=[
             ("store_id", "=", "CA_1"), # Only load this partition.
             ("date", ">=", pd.to_datetime("2012-01-01")) # Filter on date if
                                                          # desired.
            ]                           
)

In [None]:
import pyarrow.parquet as pq

Let's look at a specific parquet file in our partition.

In [None]:
pq_file = pq.ParquetFile(f_out / "store_id=CA_1" / "e77d9d89c6c5408896fa2d767a1e9a6d-0.parquet")

In [None]:
display(["columns:", pq_file.metadata.num_columns],
        ["rows:", pq_file.metadata.num_rows],
        ["row_roups:", pq_file.metadata.num_row_groups])

In [None]:
pq_file.metadata.row_group(0)

In [None]:
pq_file.read_row_group(0).to_pandas()

In [None]:
pq_file.metadata.row_group(0).column(6)