# Data Cleaning, Merging and Feature Engineering

In [None]:
%load_ext lab_black
%load_ext autoreload
%autoreload 2

In [None]:
import os
import re
import time

import numpy as np
import pandas as pd
from isoweek import Week
from sklearn.pipeline import Pipeline

In [None]:
%aimport src.data_custom_transformers
import src.data_custom_transformers as ct

%aimport src.data_helpers
from src.data_helpers import left_merge_dfs

%aimport src.features_helpers
from src.features_helpers import add_datepart, get_elapsed

%aimport src.test_helpers
from src.test_helpers import test_train_test_target_var

<a id="toc"></a>

## [Table of Contents](#table-of-contents)
0.  [About](#about)
1.  [User Inputs](#user-inputs)
2.  [Load Data](#load-data)
3.  [Data Cleaning](#data-cleaning)
4.  [Feature Extraction](#feature-extraction)
5.  [Merging Data Sources](#merging-data-sources)
6.  [Handle Missing values](#handle-missing-values)
7.  [Feature Engineering](#feature-engineering)
    -   7.1. [Elapsed Time - Competitors](#elapsed-time---competitors)
    -   7.2. [Elapsed Time - Promo2](#elapsed-time---promo2)
    -   7.3. [Add time before and after special events](#add-time-before-and-after-special-events)
    -   7.4. [Weekly Rolling Average, of number of special-event days, by store](#weekly-rolling-average,-of-number-of-special-event-days,-by-store)
8.  [Dropping columns from `LEFT_JOIN`](#dropping-columns-from-`left-join`)
9.  [Export merged data](#export-merged-data)
10.  [Ideas for Exploratory Data Analysis](#ideas-for-exploratory-data-analysis)

<a id="about"></a>

## 0. [About](#about)

In this notebook, the raw store-wise sales data (the primary data source) will be processed, and merged wtih the secondary datasets (weather, state names, etc.). Features will be engineered from the merged data. This will be done separately for the training and testing sales datasets. Each of the merged datasets will then be exported to a [parquet](https://databricks.com/glossary/what-is-parquet) file.

<a id="user-inputs"></a>

## 1. [User Inputs](#user-inputs)

In [None]:
PROJ_ROOT_DIR = os.getcwd()

In [None]:
dataset_names = [
    "store",
    "state_names",
    "store_states",
    "test",
    "train",
    "weather",
]
train_date_max = "2015-07-25"

In [None]:
raw_data_path = os.path.join(PROJ_ROOT_DIR, "data", "raw")
processed_data_path = os.path.join(PROJ_ROOT_DIR, "data", "processed")
d = {tn: os.path.join(raw_data_path, "rossmann", f"{tn}.csv") for tn in dataset_names}

<a id="load-data"></a>

## 2. [Load Data](#load-data)

Each dataset will be loaded into a separate `DataFrame`

In [None]:
%%time
(
    store,
    state_names,
    store_states,
    test,
    train,
    weather,
) = [
    pd.read_csv(d[dataset_name], low_memory=False)
    for dataset_name in dataset_names
]
print(len(train),len(test))

In [None]:
display(store.head())
display(store.dtypes.to_frame().T)
display(state_names.head())
display(state_names.dtypes.to_frame().T)
display(store_states.head())
display(store_states.dtypes.to_frame().T)
display(train.head())
display(train.dtypes.to_frame().T)
display(weather.head())
display(weather.dtypes.to_frame().T)

**Observations**
1. The primary datasets available are `train` and `test`, and secondary datasets provided are `store`, `store_states`, `state_names` and `weather`.
2. `sotre` provides store details
3. `state_names` provides the full state name and the state name acronym
4. `store_states` provides the state name acronym for each store (store number)
5. `train` and `test` provide store-wise data for a single date, including `Sales` (only in the `train`ing data), store number, state name (full name of the state), number of customers, promotion, school holiday, state holiday, date and day of week (Mon-Fri), and whether the store was open on that date
6. `weather` provides the weather conditions by date for each state (full state name)

<a id="data-cleaning"></a>

## 3. [Data Cleaning](#data-cleaning)

The `StateHoliday` column in the (primary) `train` and `test` datasets is a string with a value not equal to `1` if the date on a single row corresponds to a state holiday. This string will be converted to a boolean value (`True`, indicating the date is a state holiday, if the value is not equal to `0`).

In [None]:
%%time
str2bool_pipe = Pipeline(
    [
        ("str2bool", ct.DFStr2Bool("StateHoliday", "0", "ne")),
    ]
)
train = str2bool_pipe.fit_transform(train)
test = str2bool_pipe.fit_transform(test)

<a id="feature-extraction"></a>

## 4. [Feature Extraction](#feature-extraction)

`datetime` attributes will be extracted from the `Date` column of the `weather`, `train` and `test` datasets

In [None]:
%%time
add_datepart_pipe = Pipeline(
    [
        ("adddatepart", ct.DFAddDatePart("Date", False, False, False)),
    ]
)
weather = add_datepart_pipe.fit_transform(weather)
train = add_datepart_pipe.fit_transform(train)
test = add_datepart_pipe.fit_transform(test)

**Note**
1. In order to assist in merging these datasets, the `Date` column will be retained after extracting the `datetime` attributes.

<a id="merging-data-sources"></a>

## 5. [Merging Data Sources](#merging-data-sources)

All the secondary datasets (`store`, `store_states`, `state_names`, `weather`) will now be separately merged with the (primary) `train`ing and `test`ing data

First, merge <font color='green'>weather</font> data with the <font color='orange'>state</font> dataset

In [None]:
%%time
weather = weather.merge(state_names, left_on="file", right_on="StateName", how="left", suffixes=("", '_y'))

Next, merge <font color='red'>store</font> data with the name of the <font color='blue'>state</font> in which the store is located

In [None]:
%%time
store = left_merge_dfs(store, store_states, "Store")
len(store[store.State.isnull()])

Next, separately merge the raw training and testing <font color='purple'>sales</font> data with the merged <font color='red'>store</font>-<font color='orange'>state</font> data

In [None]:
%%time
joined = left_merge_dfs(train, store, "Store")
joined_test = left_merge_dfs(test, store, "Store")
len(joined[joined.StoreType.isnull()]), len(joined_test[joined_test.StoreType.isnull()])

Next, merge the merged <font color='purple'>sales</font>-<font color='red'>store</font>-<font color='orange'>state</font> data with the <font color='green'>weather</font> data
- the `Date` column will be part of this merge, since observations in each dataset involved in this merge are listed by date

In [None]:
%%time
joined = left_merge_dfs(joined, weather, ["State","Date"])
joined_test = left_merge_dfs(joined_test, weather, ["State","Date"])
len(joined[joined.Mean_TemperatureC.isnull()]),len(joined_test[joined_test.Mean_TemperatureC.isnull()])

We'll do a quick sanity check to verify that the target variable (`Sales`) is not in the merged testing data and that the merged testing data contains one less column than the merged training data

In [None]:
test_train_test_target_var(joined, joined_test, "Sales")

Since all shared columns between dataset-pairs in each merge were not used to perform the merge, this will result in suffixes (`_y`) appended to these column names. This results in duplicated columns where one column name in each pair of such shared columns ends in `_y`. So, next, one of the columns that were duplicated after the merges (column name ending in `_y`) will be dropped

In [None]:
%%time
drop_cols_by_suffix_pipe = Pipeline(
    [
        ("dropsuffix", ct.DFDropColsBySuffix("_y")),
    ]
)
joined = drop_cols_by_suffix_pipe.fit_transform(joined)
joined_test = drop_cols_by_suffix_pipe.fit_transform(joined_test)

<a id="handle-missing-values"></a>

## 6. [Handle Missing values](#handle-missing-values)

Next, missing data in special events columns will be handled. A placeholder value, *that does not appear elsewhere in the corresponding column in the* **training** <font color='purple'>sales</font> data, will be used to fill in missing values in each column in both the merged training and testing datasets.

Also, the `datatype` for each of these columns will be set to an integer (since the filled in placeholder value is of integer `dtype`)

Unique values occurring in each of these columns with missing values are shown below

In [None]:
na_cols = [
    "CompetitionOpenSinceYear",
    "CompetitionOpenSinceMonth",
    "Promo2SinceYear",
    "Promo2SinceWeek",
]

In [None]:
for c in na_cols:
    display(joined[c].dropna().astype(int).value_counts().to_frame())

Missing values are now filled in and the `datatype` is changed to `int`

In [None]:
%%time
fillna_placeholder_pipe = Pipeline(
    [
        ("nancompetitionopensinceyear", ct.DFFillNaPlaceHolder("CompetitionOpenSinceYear", 1900)),
        ("competitionopensinceyearint", ct.DFDtypeChanger("CompetitionOpenSinceYear", np.int32)),        
        ("nancompetitionopensincemonth", ct.DFFillNaPlaceHolder("CompetitionOpenSinceMonth", 1)),
        ("competitionopensincemonthint", ct.DFDtypeChanger("CompetitionOpenSinceMonth", np.int32)),        
        ("nanpromo2sinceyear", ct.DFFillNaPlaceHolder("Promo2SinceYear", 1900)),
        ("promo2sinceyearint", ct.DFDtypeChanger("Promo2SinceYear", np.int32)),        
        ("nanpromo2sinceweek", ct.DFFillNaPlaceHolder("Promo2SinceWeek", 1)),
        ("promo2sinceweekint", ct.DFDtypeChanger("Promo2SinceWeek", np.int32)),        
    ]
)
joined = fillna_placeholder_pipe.fit_transform(joined)
joined_test = fillna_placeholder_pipe.fit_transform(joined_test)

<a id="feature-engineering"></a>

## 7. [Feature Engineering](#feature-engineering)

<a id="elapsed-time---competitors"></a>

### 7.1. [Elapsed Time - Competitors](#elapsed-time---competitors)

We'll extract features "CompetitionOpenSince" and "CompetitionDaysOpen"

In [None]:
%time
for df in (joined, joined_test):
    df["CompetitionOpenSince"] = pd.to_datetime(
        dict(
            year=df["CompetitionOpenSinceYear"],
            month=df["CompetitionOpenSinceMonth"],
            day=15,
        )
    )
    df["CompetitionDaysOpen"] = df["Date"].subtract(df["CompetitionOpenSince"]).dt.days

We'll replace some erroneous / outlying data
-   replace the number of days for which competitors have been open by zero, if they have been open for less than 0 days
-   replace the year since which competitors have been open by zero, if they have been open since prior to the year 1990

In [None]:
%%time
for df in (joined,joined_test):
    df.loc[df["CompetitionDaysOpen"] < 0, "CompetitionDaysOpen"] = 0
    df.loc[df["CompetitionOpenSinceYear"] < 1990, "CompetitionDaysOpen"] = 0

We add the "CompetitionMonthsOpen" field, limiting the maximum to 2 years to limit the number of unique categories

In [None]:
%%time
for df in (joined,joined_test):
    # Convert days to months
    df["CompetitionMonthsOpen"] = df["CompetitionDaysOpen"] // 30
    # limit the max to 2 years (24 months) to limit number of unique categories
    df.loc[df.CompetitionMonthsOpen > 24, "CompetitionMonthsOpen"] = 24
print(joined.CompetitionMonthsOpen.unique())

<a id="elapsed-time---promo2"></a>

### 7.2. [Elapsed Time - Promo2](#elapsed-time---promo2)

We'll similarly extract features "Promo2Since" and "Promo2Days"

In [None]:
%%time
for df in (joined, joined_test):
    df["Promo2Since"] = pd.to_datetime(
        df.apply(lambda x: Week(x.Promo2SinceYear, x.Promo2SinceWeek).monday(), axis=1)
    )
    df["Promo2Days"] = df.Date.subtract(df["Promo2Since"]).dt.days

We'll replace some erroneous / outlying data

In [None]:
%%time
for df in (joined,joined_test):
    df.loc[df["Promo2Days"] < 0, "Promo2Days"] = 0
    df.loc[df["Promo2SinceYear"] < 1990, "Promo2Days"] = 0

We add the "Promo2Weeks" field, limiting the maximum to 6 months to (as before) limit the number of unique categories

In [None]:
%%time
for df in (joined, joined_test):
    # Convert days to weeks
    df["Promo2Weeks"] = df["Promo2Days"] // 7
    # limit ourselves to 6 months to limit number of unique categories
    df.loc[df["Promo2Weeks"] < 0, "Promo2Weeks"] = 0
    df.loc[df["Promo2Weeks"] > 25, "Promo2Weeks"] = 25
    print(df["Promo2Weeks"].unique())

<a id="add-time-before-and-after-special-events"></a>

### 7.3. [Add time before and after special events](#add-time-before-and-after-special-events)

**Notes**
1.  This will be done across a subset of features
2.  This requires `DataFrame`s to be sorted by the relevant field and then by `Date`

In [None]:
columns = ["Date", "Store", "Promo", "StateHoliday", "SchoolHoliday"]

In [None]:
df = train[columns].append(test[columns])

In [None]:
%%time
fld = 'SchoolHoliday'
df = df.sort_values(['Store', 'Date'])
get_elapsed(df, fld, 'After')
df = df.sort_values(['Store', 'Date'], ascending=[True, False])
get_elapsed(df, fld, 'Before')

In [None]:
%%time
fld = 'StateHoliday'
df = df.sort_values(['Store', 'Date'])
get_elapsed(df, fld, 'After')
df = df.sort_values(['Store', 'Date'], ascending=[True, False])
get_elapsed(df, fld, 'Before')

In [None]:
%%time
fld = 'Promo'
df = df.sort_values(['Store', 'Date'])
get_elapsed(df, fld, 'After')
df = df.sort_values(['Store', 'Date'], ascending=[True, False])
get_elapsed(df, fld, 'Before')

In [None]:
df = df.set_index("Date")

Then set null values from elapsed field calculations to 0

In [None]:
%%time
columns = ['SchoolHoliday', 'StateHoliday', 'Promo']
for o in ['Before', 'After']:
    for p in columns:
        a = o+p
        df[a] = df[a].fillna(0).astype(int)

<a id="weekly-rolling-average,-of-number-of-special-event-days,-by-store"></a>

### 7.4. [Weekly Rolling Average, of number of special-event days, by store](#weekly-rolling-average,-of-number-of-special-event-days,-by-store)

Next, we'll calculate the number of special events by store, on a rolling weekly basis. This will give the rolling number of weekly
- school holidays
- state holidays
- promotions

per store.

This means for tomorrow's observation in the sales data, the rolling total (number) of special-event days over the past week (7 days) will be computed and inserted as a value in a new column. This will be done separately for each store in the sales dataset, and also separately for each of the special-event columns (`SchoolHoliday`, `StateHoliday`, `Promo`).

This first requires a sort by date, to ensure the observations are in chronological order, and then count the number of weekly occurrences of special events grouped by store. This gives a backward looking rolling total.i.e. looking into the past (7 days). If the sorting by date is done in descending order (to get later dates to appear first, and have observations appear in reverse chronological order) then this gives a forward-looking rolling total. Both will be used here to engineer rolling statistic features for each of the special-event columns.

The steps followed are
- calculate backward looking rolling statistic (number of special events)
- calculate forward looking rolling statistic
- remove `Store` level from the index of the forward- and backward-looking store-wise aggregations
- merge rolling statistic (total) with source data

The relevant columns (special events) as well as the `Store` column are shown below

In [None]:
few_store_numbers = [1]
df_few_stores = df[df["Store"].isin(few_store_numbers)][
    ["Store"] + columns
].sort_index()
display(df_few_stores.head(2 * 7))
display(df_few_stores.tail(2 * 7))

A preview (showing the first and last 14 days) of the rolling total is shown below for store number 1 in the backward looking direction

In [None]:
%%time
df_few_stores_roll_stats = df_few_stores.sort_index().groupby("Store")[columns].rolling(7, min_periods=1).sum().astype(int)
df_few_stores_roll_stats["weekday"] = df_few_stores_roll_stats.index.get_level_values(1).day_name()
display(df_few_stores_roll_stats.head(2*7))
display(df_few_stores_roll_stats.tail(2*7))

and similarly, for the same store, in the forward looking direction

In [None]:
%%time
df_few_stores_roll_stats_reverse_chron = df_few_stores.sort_index(ascending=False).groupby("Store")[columns].rolling(7, min_periods=1).sum().astype(int)
df_few_stores_roll_stats_reverse_chron["weekday"] = df_few_stores_roll_stats_reverse_chron.index.get_level_values(1).day_name()
display(df_few_stores_roll_stats_reverse_chron.head(2*7))
display(df_few_stores_roll_stats_reverse_chron.tail(2*7))

In [None]:
%%time
roll_stats_pipe = Pipeline(
    [
        ("rollstats", ct.DFMultiDirMultiColRollingStat("Store", columns)),
    ]
)
df = roll_stats_pipe.fit_transform(df)

Convert the `Date` column to `datetime`

In [None]:
to_datetime_pipe = Pipeline(
    [
        ("todatetime", ct.DFToDatetime("Date")),
    ]
)
df = to_datetime_pipe.fit_transform(df)

Merge the fully merged dataset from [above](#merging-data-sources) with this merged rolling statistic-source data

In [None]:
%%time
joined = left_merge_dfs(joined, df, ['Store', 'Date'])
joined = joined[joined.Sales!=0].reset_index()

In [None]:
%%time
joined_test = left_merge_dfs(joined_test, df, ['Store', 'Date']).reset_index()

<a id="dropping-columns-from-`left-join`"></a>

## 8. [Dropping columns from `LEFT_JOIN`](#dropping-columns-from-`left-join`)

In [None]:
list(
    joined.columns[
        joined.columns.str.contains(
            "|".join(["Promo", "StateHoliday", "SchoolHoliday"])
        )
    ]
)

In [None]:
%%time
joined = joined.drop(joined.columns[joined.columns.str.endswith("_y")].tolist(), axis=1)
joined_test = joined_test.drop(joined_test.columns[joined_test.columns.str.endswith("_y")].tolist(), axis=1)

In [None]:
print(joined.shape)
print(joined_test.shape)

In [None]:
joined.head()

In [None]:
display(joined.head().T)

<a id="export-merged-data"></a>

## 9. [Export merged data](#export-merged-data)

We'll define a variable with the path to the parquet file (to be saved) in the raw data folder created above

In [None]:
timestr = time.strftime("%Y%m%d_%H%M%S")
train_parquet_filepath = os.path.join(
    processed_data_path, "cleaned_train" + "_" + timestr + ".parquet"
)
test_parquet_filepath = os.path.join(
    processed_data_path, "cleaned_test" + "_" + timestr + ".parquet"
)
print(train_parquet_filepath)
print(test_parquet_filepath)

We'll now save the merged datasets to a separate `parquet` file

In [None]:
%%time
for df, split_type, fpath in zip(
    (joined, joined_test),
    ("train", "test"),
    (train_parquet_filepath, test_parquet_filepath),
):
    try:
        print(f"Saving {split_type}ing data to {fpath + '.gzip'}", end="...")
        df.to_parquet(
            fpath + ".gzip",
            engine="auto",
            index=False,
            compression="gzip",
        )
        print("done.")
    except Exception as e:
        print(str(e))

<a id="ideas-for-exploratory-data-analysis"></a>

## 10. [Ideas for Exploratory Data Analysis](#ideas-for-exploratory-data-analysis)

Based on data cleaning and feature engineering, the following is a preliminary list of relationships in the data to be examined via exploratory data analysis
-   on a per store basis, explore rolling total of weekly special event days versus weekly sales
-   sales per state
-   sales on special event days regular regular business days
-   sales on school or state holidays versus regular business days
-   sales by various datetime attributes
-   on a per store basis, sales as a function of time
-   sales per store
-   sales versus weather events [`Mean_Wind_SpeedKm_h`, `Precipitationmm`,`CloudCover` (maybe categorical), `Events` (categorical)]

---

<span style="float:left;">
    &#169; 2021 | <a href="https://github.com/edesz/streetcar-delays">@edesz</a> (MIT)
</span>
    
<span style="float:right;">
    <a href="./2_xgboost_trials.ipynb">2 - Regression Trials with XGBoost >></a>
</span>