# Data Preperation Pipeline

### Hamilton Framework
https://hamilton.dagworks.io/en/latest/how-tos/use-in-jupyter-notebook/ 

### 0. Import Packages

In [1]:
import pandas as pd
import numpy as np

import sys
import altair as alt

import vegafusion as vf

import sklearn

from datetime import datetime, timedelta
from sklearn.pipeline import Pipeline, make_pipeline

### 1.1 Downcast and transform data
Update formatting of features to optimize memory and standardize column names.

In [2]:
def standardize_column_names(s):
    """Removes spaces from the column names."""
    return s.replace(" ", "")


def optimize_memory(df):
    """Optimize memory usage of a DataFrame by converting object columns to categorical
    and downcasting numeric columns to smaller types."""

    # Change: Objects to Categorical.
    object_cols = df.select_dtypes(include="object").columns
    if not object_cols.empty:
        print("Change: Objects to Categorical")
        df[object_cols] = df[object_cols].astype("category")

    # Change: Convert integers to smallest signed or unsigned integer and floats to smallest.
    for col in df.select_dtypes(include=["int"]).columns:
        if (df[col] >= 0).all():  # Check if all values are non-negative
            df[col] = pd.to_numeric(
                df[col], downcast="unsigned"
            )  # Downcast to unsigned
        else:
            df[col] = pd.to_numeric(df[col], downcast="integer")  # Downcast to signed

    # Downcast float columns
    for col in df.select_dtypes(include=["float"]).columns:
        df[col] = pd.to_numeric(df[col], downcast="float")

    return df


def month_year_to_int(df, i):

    # Change: Month and Year to integer.

    if i == 0:

        print("Change: Month and Year to integer")

        df = df.astype({"month": int, "year": int})

    return df


# Transform date-related columns to datetime format.


def transform_date_to_datetime(df, i):

    if i == 0:

        # print("Change: Transformed 'year', 'month', 'day' columns to Datetime feature")

        df["date"] = pd.to_datetime(df[["year", "month", "day"]], unit="us")

        print(
            "Change: Dropped 'year', 'month', 'day' columns and transformed to Datetime64[us]"
        )

        df.drop(columns=["day", "month", "year"], inplace=True)

    else:
        if "date" in df.columns:

            print("Change: Transformed 'date' column to Datetime Dtype")

            df["date"] = pd.to_datetime(df["date"]).dt.tz_localize(None)

    return df

In [3]:
def df_basic_info_before(df):
    print(
        f"-> Contains:                {df.shape[0]} observations and {df.shape[1]} features."
    )
    print(
        f"-> Has original size of    {round(sys.getsizeof(df)/1024/1024/1024, 2)} GB."
    )


def df_basic_info_after(df):
    print(
        f"-> Contains:                {df.shape[0]} observations and {df.shape[1]} features."
    )
    print(
        f"-> Has optimized size of    {round(sys.getsizeof(df)/1024/1024/1024, 2)} GB."
    )

### 1.2 Import data from local PATH
Import data trough pipeline to downcast the data and transformations

In [4]:
def f_get_data(i=0):

    # Define path.
    c_path = "C:/Users/alexander/Documents/0. Data Science and AI for Experts/EAISI_4B_Supermarket/data/raw/"

    # Identify file.
    v_file = (
        "history-per-year",  # 0
        "holidays_events",  # 1
        "items",  # 2
        "stores",  # 3
    )

    print(f"\nReading file {i}\n")

    # Load data.
    df = (
        pd.read_parquet(c_path + v_file[i] + ".parquet")
        .rename(columns=standardize_column_names)
        .pipe(optimize_memory)
        .pipe(month_year_to_int, i)
        .pipe(transform_date_to_datetime, i)
    )

    # Return data.
    return df

### 1.3 Importing data

In [5]:
# To-do: write this in function. But where executed?

# Sales History per year

df_sales = f_get_data(0)

df_basic_info_after(df_sales)


# Holidays

df_holidays = f_get_data(1)

df_basic_info_after(df_holidays)


# Items

df_items = f_get_data(2)

df_basic_info_after(df_items)


# Stores

df_stores = f_get_data(3)

df_basic_info_after(df_stores)


Reading file 0

Change: Month and Year to integer
Change: Dropped 'year', 'month', 'day' columns and transformed to Datetime64[us]
-> Contains:                125497040 observations and 6 features.
-> Has optimized size of    2.69 GB.

Reading file 1

Change: Objects to Categorical
Change: Transformed 'date' column to Datetime Dtype
-> Contains:                350 observations and 6 features.
-> Has optimized size of    0.0 GB.

Reading file 2

Change: Objects to Categorical
-> Contains:                4100 observations and 4 features.
-> Has optimized size of    0.0 GB.

Reading file 3

Change: Objects to Categorical
-> Contains:                54 observations and 5 features.
-> Has optimized size of    0.0 GB.


## 2.0 Exclude Stores + Vulcano Eruption holiday + Items

#### 2.1 Return list containing stores with less then 1670 operational days with sales

parameter: store_exclusion_cutoff_number = 1670 days

In [6]:
def stores_exclude_sales_days(df_sales, df_stores, store_exclusion_cutoff_number=1670):

    # Group the sales date by store and item
    df_sales_grouped = (
        df_sales.groupby(["store_nbr", "date"]).agg({"unit_sales": "sum"}).reset_index()
    )

    # Merge the grouped sales data with the store data
    df_sales_stores_merged = df_sales_grouped.merge(
        df_stores, left_on="store_nbr", right_on="store_nbr", how="inner"
    )

    # Count the number of daily sale records per store
    store_count = df_sales_stores_merged["store_nbr"].value_counts()

    # Get stores with counts less than the exclusion cutoff
    store_count_exclusion = store_count[store_count < store_exclusion_cutoff_number]

    # Get the list of store numbers to be excluded
    list_excluded_stores_sales_days = store_count_exclusion.index.tolist()

    return list_excluded_stores_sales_days

In [7]:
stores_exclude_sales_days(
    df_sales, df_stores, store_exclusion_cutoff_number=1670
)  # --> [30, 14, 12, 25, 24, 18, 36, 53, 20, 29, 21, 42, 22, 52]

[30, 14, 12, 25, 24, 18, 36, 53, 20, 29, 21, 42, 22, 52]

#### 2.2 Return list containing stores with cluster=10 in stores df

In [8]:
def stores_exclude_cluster(df_stores, cluster_number=10):

    # Get the list of store numbers that belong to cluster 10

    list_stores_cluster_10 = df_stores[df_stores["cluster"] == cluster_number][
        "store_nbr"
    ].tolist()

    return list_stores_cluster_10

In [9]:
stores_exclude_cluster(df_stores, cluster_number=10)  # --> [26, 28, 29, 31, 36, 43]

[26, 28, 29, 31, 36, 43]

##### 2.3 Function to exclude stores with less then 1670 sales days and related to cluster 10 

In [10]:
def df_sales_cleaned_stores(df_sales, store_exclusion_cutoff_number=1670):

    # Excluded less then 1670 salesdays
    list_excluded_stores_sales_days = stores_exclude_sales_days(
        df_sales, df_stores, store_exclusion_cutoff_number
    )

    df_sales = df_sales.drop(
        df_sales[df_sales["store_nbr"].isin(list_excluded_stores_sales_days)].index
    )

    # Cluster 10
    list_stores_cluster_10 = stores_exclude_cluster(df_stores, cluster_number=10)

    df_sales = df_sales.drop(
        df_sales[df_sales["store_nbr"].isin(list_stores_cluster_10)].index
    )

    return df_sales

In [11]:
# Execution of final function --> In pipeline
df_sales = df_sales_cleaned_stores(df_sales)

##### 2.4 Filter Vulcano Eruption from holiday df

In [12]:
def holiday_filter_vulcano_event(df_holidays, event_substring="Terremoto Manabi"):

    # Filter the DataFrame where 'description' contains the event_substring
    df_vulcano_event_filtered = df_holidays[
        df_holidays["description"].str.contains(event_substring)
    ]

    return df_vulcano_event_filtered

In [13]:
def df_holidays_cleaned(df_holidays):

    # Exclude holiday_filter_vulcano_event function to return filtered df
    df_vulcano_event_filtered = holiday_filter_vulcano_event(df_holidays)

    # Filter the specific holiday events from the holiday DataFrame
    df_holidays = df_holidays.loc[
        ~df_holidays.index.isin(df_vulcano_event_filtered.index)
    ]

    return df_holidays

In [14]:
# Execution of final function --> In pipeline?
df_holidays = df_holidays_cleaned(df_holidays)

2.5 Filter and exclude of Items

In [16]:
##Orginal, first try and check on item level


def item_check(start_date, x_days):

    # start_date = "2013-02-01"
    # x_days = 31

    # Convert start_date to datetime
    start_date = datetime.strptime(start_date, "%Y-%m-%d")
    print(start_date)

    # Calculate end_date
    end_date = start_date + timedelta(days=x_days)
    print(x_days)
    print(end_date)

    # Filter the DataFrame based on the date range
    df_sales_filtered = df_sales[
        (df_sales["date"] >= start_date) & (df_sales["date"] <= end_date)
    ]

    # Group by item_nbr and sum unit_sales, so this will be the same criteria for all stores.
    df_sales_item = (
        df_sales_filtered.groupby("item_nbr").agg({"unit_sales": "sum"}).reset_index()
    )

    # Get the list of store numbers to stay included, as they have sales within the first 28 days
    list_sales_items = df_sales_item["item_nbr"].tolist()
    unique_values_count = df_sales_item["item_nbr"].nunique()
    print(unique_values_count)

    # If sum_sales = 0 --> drop item

    # first 4 weeks
    # last 4 weeks

    # sum_total per item for these two months

In [17]:
# item_check("2016-01-01", 31)

In [18]:
# start_date = "2013-01-01"
# x_days = 28

# # Convert start_date to datetime
# start_date = datetime.strptime(start_date, "%Y-%m-%d")
# print(start_date)

# # Calculate end_date
# end_date = start_date + timedelta(days=x_days)
# print(x_days)
# print(end_date)

# # Filter the DataFrame based on the date range
# df_sales_filtered = df_sales[
#     (df_sales["date"] >= start_date) & (df_sales["date"] <= end_date)
# ]

# # Change the dtype for item_nbr from uint32 to int32
# df_sales["item_nbr"] = df_sales["item_nbr"].astype(int)
# df_items["item_nbr"] = df_items["item_nbr"].astype(int)

# # Merge the filtered sales data with the items data
# df_sales_items_merged = df_sales.merge(df_items, on="item_nbr", how="left")

# # print(df_sales_items_merged.info())
# # print(df_sales_items_merged.sample(5))

# df_sales_items_merged["class"] = df_sales_items_merged["class"].astype(str)

# # Group by item_nbr and sum unit_sales, so this will be the same criteria for all stores.
# df_sales_item = (
#     df_sales_filtered.groupby("class").agg({"unit_sales": "sum"}).reset_index()
# )

# # Get the list of store numbers to stay included, as they have sales within the first 28 days
# list_sales_items = df_sales_item["class"].tolist()
# unique_values_count = df_sales_item["class"].nunique()
# print(unique_values_count)

In [19]:
def get_unique(df, column_name):
    """Get the all values and the count for specific column"""
    unique_values_count = df[column_name].nunique()
    unique_values = df[column_name].unique()

    # Convert unique values to a single string to print
    unique_values_str = ", ".join(map(str, unique_values))

    print(f"Number of unique values in {column_name}: {unique_values_count}")
    print("Unique values:")
    print(unique_values_str)

    return

### 3.0 Prepare and Merge df_sales + df_items + df_stores + df_holidays

3.1 Prepare df_holidays

In [20]:
# Prepare df_holiday and df_stores by cleaning up df for merging with holidays by dropping unneeded columns
def clean_holidays_stores_prep(df_holidays, df_stores):

    df_holidays_cleaned = df_holidays.drop(
        columns=[
            "description",
            "transferred",
        ]
    )

    df_stores_cleaned = df_stores.drop(columns=["cluster", "type"])

    return df_holidays_cleaned, df_stores_cleaned

In [21]:
def holidays_prep_local(df_holidays, df_stores):

    df_holidays_cleaned, df_stores_cleaned = clean_holidays_stores_prep(
        df_holidays, df_stores
    )

    # select locale 'Local' from holiday df and merge with city stores df
    df_holidays_local = df_holidays_cleaned[df_holidays_cleaned["locale"] == "Local"]

    df_holidays_prep_local = df_holidays_local.merge(
        df_stores_cleaned, left_on="locale_name", right_on="city", how="left"
    )

    return df_holidays_prep_local

In [22]:
def holidays_prep_regional(df_holidays, df_stores):

    df_holidays_cleaned, df_stores_cleaned = clean_holidays_stores_prep(
        df_holidays, df_stores
    )

    # select locale 'Regional' from holiday df and merge with state stores df
    df_holidays_regional = df_holidays_cleaned[
        df_holidays_cleaned["locale"] == "Regional"
    ]

    df_holidays_prep_regional = df_holidays_regional.merge(
        df_stores_cleaned, left_on="locale_name", right_on="state", how="left"
    )

    return df_holidays_prep_regional

In [23]:
def holidays_prep_national(df_holidays, df_stores):

    df_holidays_cleaned, df_stores_cleaned = clean_holidays_stores_prep(
        df_holidays, df_stores
    )

    # Select locale 'Regional' from holiday df and merge with national stores df
    df_holidays_national = df_holidays_cleaned[
        df_holidays_cleaned["locale"] == "National"
    ]

    # Create extra column for merge on "Ecuador"
    df_stores_cleaned["national_merge"] = "Ecuador"

    df_holidays_prep_national = df_holidays_national.merge(
        df_stores_cleaned, left_on="locale_name", right_on="national_merge", how="left"
    )

    # Drop newly created column national_merge, not needed further
    df_holidays_prep_national = df_holidays_prep_national.drop(
        columns=["national_merge"]
    )

    return df_holidays_prep_national

In [24]:
def holidays_prep_merged(df_holidays, df_stores):

    # Load prep functions from local, Regional and National df's
    df_holidays_prep_local = holidays_prep_local(df_holidays, df_stores)

    df_holidays_prep_regional = holidays_prep_regional(df_holidays, df_stores)

    df_holidays_prep_national = holidays_prep_national(df_holidays, df_stores)

    # Combine local, regional and national dataframes into 1 merged dataframe
    df_holidays_merged = pd.concat(
        [df_holidays_prep_local, df_holidays_prep_regional, df_holidays_prep_national]
    )

    # Clean df_holidays_merged by dropping locale_name", "city", "state"
    df_holidays_merged = df_holidays_merged.drop(
        columns=["locale_name", "city", "state"]
    )

    # Rename 'type' of holiday to 'holiday_type'
    df_holidays_merged = df_holidays_merged.rename(columns={"type": "holiday_type"})

    return df_holidays_merged

In [25]:
# Fill newly created NaN columns, due to holiday join, with 'no' on thates where there are now holidays
def holidays_fill_no_normal(df):

    cat_col = df.select_dtypes(include=["category"]).columns

    for col in cat_col:

        if "no" not in df[col].cat.categories:

            df[col] = df[col].cat.add_categories("no")

    df[cat_col] = df[cat_col].fillna("no")

    return df

3.X Merge datasets

In [31]:
# Merge datasets
def merge_datasets(df_sales, df_items, df_stores, df_holidays):

    # Holidays prep
    df_holidays_merged = holidays_prep_merged(df_holidays, df_stores)

    # Holidays merge on sales
    df_merged = df_sales.merge(df_holidays_merged, on=["date", "store_nbr"], how="left")
    df_merged = holidays_fill_no_normal(df_merged)

    # Stores merged with sales+holidays
    df_merged = df_merged.merge(df_stores, on="store_nbr", how="left")

    # -------------------------------------------------------------------
    # To-do: Check if problem is in dtype of item_nbr of df_merged or df_items

    print(df_merged["item_nbr"].dtype)
    print(df_items["item_nbr"].dtype)

    # # Change the dtype for item_nbr from uint32 to int32
    df_merged["item_nbr"] = df_merged["item_nbr"].astype(int)
    df_items["item_nbr"] = df_items["item_nbr"].astype(int)
    print("-" * 30)
    print(df_merged["item_nbr"].dtype)
    print(df_items["item_nbr"].dtype)
    print("-" * 30)

    # -------------------------------------------------------------------

    # Items merged with sales+holidays+stores
    df_merged = df_merged.merge(df_items, on="item_nbr", how="left")

    return df_merged

In [None]:
df_merged = merge_datasets(df_sales, df_items, df_stores, df_holidays)  # --> 2.9 GB
df_merged.info()

In [28]:
df_merged.head(10)
# to-do: rename columns to holiday_locale, store_city, store_type, store_cluster, item_family, item_class

# to-do: re-order column order: date, store_nbr, item_nbr, unit_sales ,....

Unnamed: 0,id,store_nbr,item_nbr,unit_sales,onpromotion,date,holiday_type,locale,city,state,type,cluster,family,class,perishable
0,578,1,103665,2.0,,2013-01-02,no,no,Quito,Pichincha,D,13,BREAD/BAKERY,2712,1
1,579,1,105574,8.0,,2013-01-02,no,no,Quito,Pichincha,D,13,GROCERY I,1045,0
2,580,1,105575,15.0,,2013-01-02,no,no,Quito,Pichincha,D,13,GROCERY I,1045,0
3,581,1,105577,2.0,,2013-01-02,no,no,Quito,Pichincha,D,13,GROCERY I,1045,0
4,582,1,105737,2.0,,2013-01-02,no,no,Quito,Pichincha,D,13,GROCERY I,1044,0
5,583,1,105857,12.0,,2013-01-02,no,no,Quito,Pichincha,D,13,GROCERY I,1092,0
6,584,1,106716,2.0,,2013-01-02,no,no,Quito,Pichincha,D,13,GROCERY I,1032,0
7,585,1,108696,3.0,,2013-01-02,no,no,Quito,Pichincha,D,13,DELI,2636,1
8,586,1,108698,6.0,,2013-01-02,no,no,Quito,Pichincha,D,13,DELI,2644,1
9,587,1,108701,3.0,,2013-01-02,no,no,Quito,Pichincha,D,13,DELI,2644,1


In [29]:
df_merged.sample(10)

Unnamed: 0,id,store_nbr,item_nbr,unit_sales,onpromotion,date,holiday_type,locale,city,state,type,cluster,family,class,perishable
52470497,67847544,39,1113872,6.0,False,2016-01-16,no,no,Cuenca,Azuay,B,6,GROCERY I,1045,0
43870883,47651989,40,129296,6.0,False,2015-05-28,no,no,Machala,El Oro,C,3,GROCERY I,1032,0
54923315,94462931,47,258396,11.0,False,2016-10-20,no,no,Quito,Pichincha,A,14,GROCERY I,1010,0
8454429,6050884,23,268443,11.0,,2013-05-25,no,no,Ambato,Tungurahua,D,9,CLEANING,3026,0
49222232,54835059,27,559044,3.0,False,2015-08-25,no,no,Daule,Guayas,D,1,BREAD/BAKERY,2716,1
36030559,63271437,38,1173210,2.0,False,2015-11-27,Event,National,Loja,Loja,D,4,CLEANING,3040,0
5238794,2079036,49,752987,1.0,,2013-02-21,no,no,Quito,Pichincha,A,11,DAIRY,2116,1
90775044,121659336,15,1321497,3.0,False,2017-07-10,no,no,Ibarra,Imbabura,C,15,BREAD/BAKERY,2714,1
89161168,119319219,16,2081095,1.0,False,2017-06-18,no,no,Santo Domingo,Santo Domingo de los Tsachilas,C,3,PREPARED FOODS,2962,1
47662202,52700401,49,1113847,2.0,False,2015-07-30,no,no,Quito,Pichincha,A,11,DELI,2654,1


# 4 Data Manipulation

4.X Brainstorm ideas for imputing missing values

Creating NaN sales for missing days

In [None]:
# Forward and Backward fill --> items, stores, holidays
df["sales"].fillna(method="ffill", inplace=True)

df["sales"].fillna(method="bfill", inplace=True)

# Interpolate between missing datapoints --> sales

df["column_name"].interpolate(method="linear", inplace=True)

df["column_name"].interpolate(method="time", inplace=True)

df["column_name"].interpolate(method="polynomial", order=2, inplace=True)

In [None]:
#  include all daily dates in the range, filling missing dates with NaNs
df = df.reindex(pd.date_range(start=df.index.min(), end=df.index.max(), freq="D"))

all_dates = pd.date_range(start=df["date"].min(), end=df["date"].max(), freq="D")
all_combinations = pd.MultiIndex.from_product(
    [df["store_nbr"].unique(), df["item_nbr"].unique(), all_dates],
    names=["store_nbr", "item_nbr", "date"],
)

4.1 Missing sales data: Closed Stores

- Sales for all items a given store and date are NA
- Action: Impute with 0


4.2 Missing sales data: New product

•	Before the very first sale of an item, all observations are kept as NA

•	After the very first sale of an item, we go to step 3:  


4.3  Missing sales data: Stockout on store level

•      Perishable good: when there are missing values for two consecutive days for a given item and store 

•      Nonperishable goods: when there are missing values for 7 consecutive days for a given item and store

•      Action: Impute with algorithm 


4.4 Missing sales data: Zero sales

•	All other cases

•	Action: Impute with 0

4.X Negative values imputing to 0

In [None]:
df["unit_sales"] = df["unit_sales"].clip(lower=0)  # --> negatives inputed to zero

4.5 Promotional Data 

•   All missing values are interpreted a day with no promotion

•   Action: Inpute onpromotion N/A with False

In [None]:
# Fill missing N/A values in boolean columns with False
def sales_fill_onpromotion(df):

    df["onpromotion"] = df["onpromotion"].fillna(False)

    # df["onpromotion"] = df["onpromotion"].fillna(MISSING) #MISSING results in error, due to boolean dtype

    return df


# To-do: Discuss Does the N/A of onpromotion need to be filled with False or with MISSING?
MISSING = "missing"

In [None]:
# df_merged_test = sales_fill_onpromotion(df_merged)

# df_merged_test.head(10)

# 5 Feature construction

5.X Extracting datetime features

In [None]:
def extract_datetime_features(df):
    """
    Extracting datetime features
    year, month, day of month, weekday (1-7), week number-year, week_year_date
    """
    df = df.copy()

    # Ensure the date column is sorted
    df = df.sort_values("date")

    # df["year"] = df["date"].dt.year
    # df["month"] = df["date"].dt.month
    # df["day"] = df["date"].dt.day

    # Adjusting weekday to start from 1 (Monday) to 7 (Sunday)
    df["weekday"] = df["date"].dt.dayofweek + 1

    # Adding week number-year feature
    df["week_number"] = df["date"].dt.isocalendar().week
    df["week_year"] = df["week_number"].astype(str).str.zfill(2) + df["year"].astype(
        str
    )

    # Convert week_year to datetime with monday as startdate of week
    df["week_year_date"] = pd.to_datetime(
        df["year"].astype(str) + df["week_number"].astype(str).str.zfill(2) + "1",
        format="%Y%W%w",
    )

    # Adding trend feature: number of weeks since the start of the dataset
    start_date = df["date"].min()
    df["weeks_since_start"] = ((df["date"] - start_date).dt.days / 7).astype(int)

    return df

In [None]:
# df = extract_datetime_features(df)

5.X Promotion

The number of days a item was on promotion 

In [None]:
# COPY FROM OLD NOTEBOOK
# TO-DO 1: transform with new df names
# TO-DO 2: total promotion days month --> week


def onpromotion_month_count(df):

    if "onpromotion" in df.columns:

        df["onpromotion_month_count"] = df.groupby(
            ["item_nbr", "store_nbr", "day", "month", "year"]
        )["onpromotion"].transform("sum")

        print(
            "Change: 'onpromotion' column transformed to 'onpromotion_month_count' feature."
        )
    else:

        print("The DataFrame does not contain an 'onpromotion' column.")

    return df

In [None]:
df_0_agg = (
    onpromotion_month_count(df_0)  # Transformation to 'onpromotion_month_count' feature
    .drop(
        columns=["id", "date", "onpromotion"]
    )  # Drop unnecessary columns "id", "date", "onpromotion"
    .groupby(["month", "year", "store_nbr", "item_nbr"])
    .agg({"unit_sales": "sum", "onpromotion_month_count": "sum"})
    .reset_index()
)

5.X Store closed on 25-12 and 01-01 

--> can we also use this feature to include the excluded stores with >9 days data, due to closing or later openings?