# M5 Forecasting – Accuracy

In this notebook, I am going to predict sales for 28 days using different machine learning models on pyspark. <br>
The dataset used in this notebook is available at:<br>
https://www.kaggle.com/competitions/m5-forecasting-accuracy/data <br><br>
The goal of this notebook is not to create the best model but to understand, learn and apply distributed computing using apache spark wrapped in python library pyspark.<br>

Check the spark version and other details.

In [1]:
!pip show pyspark

Name: pyspark
Version: 3.4.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/spark/python
Requires: py4j
Required-by: forecastflowml


Since I used docker for this notebook. I pulled pyspark-notebook image from jupyter, I do not need to use pip install pyspark. If you are runnig this code on any other platform or notebook, uncomment and run the following command.

In [2]:
# !pip install pyspark

In [None]:
# install PrettyTable to visualise the results in table
# !pip install PrettyTable

### Import the required Libraries

In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import LinearRegression, GBTRegressor, RandomForestRegressor, DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import Pipeline
from pyspark.sql.window import Window
from prettytable import PrettyTable


Initialize the Spark Session

In [4]:
spark = SparkSession.builder.appName("M5 Forecasting").getOrCreate()

In [5]:
spark

I'm using spark version 3.4.1, which is the latest available for docker from jupyter website.<br>
This runs on local machine, i.e. standalone mode.

### Import dataset

When you download the dataset, there are five files. However, we are only using only three files. The files used in this notebook are:
1. calendar.csv
2. sell_prices.csv
3. sales_train_evaluation.csv

Import <i>sales_train_evaluation.csv.</i>

In [6]:
data = spark.read.csv("m5-forecasting-accuracy/sales_train_evaluation.csv", header = True, inferSchema = True)

In [7]:
data.printSchema()

root
 |-- id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- dept_id: string (nullable = true)
 |-- cat_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- d_1: integer (nullable = true)
 |-- d_2: integer (nullable = true)
 |-- d_3: integer (nullable = true)
 |-- d_4: integer (nullable = true)
 |-- d_5: integer (nullable = true)
 |-- d_6: integer (nullable = true)
 |-- d_7: integer (nullable = true)
 |-- d_8: integer (nullable = true)
 |-- d_9: integer (nullable = true)
 |-- d_10: integer (nullable = true)
 |-- d_11: integer (nullable = true)
 |-- d_12: integer (nullable = true)
 |-- d_13: integer (nullable = true)
 |-- d_14: integer (nullable = true)
 |-- d_15: integer (nullable = true)
 |-- d_16: integer (nullable = true)
 |-- d_17: integer (nullable = true)
 |-- d_18: integer (nullable = true)
 |-- d_19: integer (nullable = true)
 |-- d_20: integer (nullable = true)
 |-- d_21: integer (nullable = tru

In [8]:
# data.show()
# too many to show in proper format

In [7]:
data.count()

30490

In [9]:
data.limit(10).toPandas()

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,d_1,d_2,d_3,d_4,...,d_1932,d_1933,d_1934,d_1935,d_1936,d_1937,d_1938,d_1939,d_1940,d_1941
0,HOBBIES_1_001_CA_1_evaluation,HOBBIES_1_001,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,2,4,0,0,0,0,3,3,0,1
1,HOBBIES_1_002_CA_1_evaluation,HOBBIES_1_002,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,0,1,2,1,1,0,0,0,0,0
2,HOBBIES_1_003_CA_1_evaluation,HOBBIES_1_003,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,1,0,2,0,0,0,2,3,0,1
3,HOBBIES_1_004_CA_1_evaluation,HOBBIES_1_004,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,1,1,0,4,0,1,3,0,2,6
4,HOBBIES_1_005_CA_1_evaluation,HOBBIES_1_005,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,0,0,0,2,1,0,0,2,1,0
5,HOBBIES_1_006_CA_1_evaluation,HOBBIES_1_006,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,2,1,0,0,1,0,0,5,2,0
6,HOBBIES_1_007_CA_1_evaluation,HOBBIES_1_007,HOBBIES_1,HOBBIES,CA_1,CA,0,0,0,0,...,0,1,0,0,0,1,0,1,1,0
7,HOBBIES_1_008_CA_1_evaluation,HOBBIES_1_008,HOBBIES_1,HOBBIES,CA_1,CA,12,15,0,0,...,7,0,6,0,15,5,4,1,40,32
8,HOBBIES_1_009_CA_1_evaluation,HOBBIES_1_009,HOBBIES_1,HOBBIES,CA_1,CA,2,0,7,3,...,1,0,0,0,0,0,0,0,1,0
9,HOBBIES_1_010_CA_1_evaluation,HOBBIES_1_010,HOBBIES_1,HOBBIES,CA_1,CA,0,0,1,0,...,0,0,1,0,2,1,1,0,0,1


In [10]:
# last data is of day 1941 in evaluation data
# when d='d_1941', date=datetime.date(2016, 5, 22)
# you can see this information below

Import <i>calendar.csv.</i>

In [11]:
calendar = spark.read.csv("m5-forecasting-accuracy/calendar.csv", header = True, inferSchema=True)

In [12]:
# insert other calendar cmds to see the dataset

In [13]:
calendar.limit(10).toPandas()

Unnamed: 0,date,wm_yr_wk,weekday,wday,month,year,d,event_name_1,event_type_1,event_name_2,event_type_2,snap_CA,snap_TX,snap_WI
0,2011-01-29,11101,Saturday,1,1,2011,d_1,,,,,0,0,0
1,2011-01-30,11101,Sunday,2,1,2011,d_2,,,,,0,0,0
2,2011-01-31,11101,Monday,3,1,2011,d_3,,,,,0,0,0
3,2011-02-01,11101,Tuesday,4,2,2011,d_4,,,,,1,1,0
4,2011-02-02,11101,Wednesday,5,2,2011,d_5,,,,,1,0,1
5,2011-02-03,11101,Thursday,6,2,2011,d_6,,,,,1,1,1
6,2011-02-04,11101,Friday,7,2,2011,d_7,,,,,1,0,0
7,2011-02-05,11102,Saturday,1,2,2011,d_8,,,,,1,1,1
8,2011-02-06,11102,Sunday,2,2,2011,d_9,SuperBowl,Sporting,,,1,1,1
9,2011-02-07,11102,Monday,3,2,2011,d_10,,,,,1,1,0


In [14]:
# total number of days
calendar.count()

1969

In [15]:
# calendar-evaluation date
1969-1941

28

Import <i>sell_prices.csv.</i>

In [16]:
price = spark.read.csv("m5-forecasting-accuracy/sell_prices.csv", header = True, inferSchema=True)

In [17]:
price.printSchema()

root
 |-- store_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- wm_yr_wk: integer (nullable = true)
 |-- sell_price: double (nullable = true)



In [18]:
price.limit(10).toPandas()

Unnamed: 0,store_id,item_id,wm_yr_wk,sell_price
0,CA_1,HOBBIES_1_001,11325,9.58
1,CA_1,HOBBIES_1_001,11326,9.58
2,CA_1,HOBBIES_1_001,11327,8.26
3,CA_1,HOBBIES_1_001,11328,8.26
4,CA_1,HOBBIES_1_001,11329,8.26
5,CA_1,HOBBIES_1_001,11330,8.26
6,CA_1,HOBBIES_1_001,11331,8.26
7,CA_1,HOBBIES_1_001,11332,8.26
8,CA_1,HOBBIES_1_001,11333,8.26
9,CA_1,HOBBIES_1_001,11334,8.26


In [19]:
price.count()
# total number of rows

6841121

### Data preprocessing

Steps
1. To create "price" df:<br>
"calendar" and "price" df have "wm_yr_wk" as common feature. <br>
Combine, and also create new feature "id".<br>
Then, select "id", "d", "sell_price" as new features of "price" df.

2. To create "calendar" df:<br>
Select "d", "date" and other important features from "calendar" df.
   
3. To combine all the df:<br>
Join "data" df and "calendar" df using "d" as the common feature.<br>
Then, combine with "price" df using "id" and "d" as the common features.

In [20]:
# price initial schema
price.printSchema()

root
 |-- store_id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- wm_yr_wk: integer (nullable = true)
 |-- sell_price: double (nullable = true)



In [21]:
def preprocess_price(price, calendar):
    """
    Only get necessary columns from price dataset.
    """
    price = (
        price.join(calendar, on="wm_yr_wk", how="left")
        .withColumn(
            "id",
            F.concat_ws("_", F.col("item_id"), F.col("store_id"), F.lit("evaluation")),
        )
        .select("id", "d", "sell_price")
    )
    return price

In [22]:
price = preprocess_price(price, calendar)

In [23]:
# schema after pre-processing
price.printSchema()

root
 |-- id: string (nullable = false)
 |-- d: string (nullable = true)
 |-- sell_price: double (nullable = true)



In [24]:
price.limit(10).toPandas()

Unnamed: 0,id,d,sell_price
0,HOBBIES_1_001_CA_1_evaluation,d_903,9.58
1,HOBBIES_1_001_CA_1_evaluation,d_902,9.58
2,HOBBIES_1_001_CA_1_evaluation,d_901,9.58
3,HOBBIES_1_001_CA_1_evaluation,d_900,9.58
4,HOBBIES_1_001_CA_1_evaluation,d_899,9.58
5,HOBBIES_1_001_CA_1_evaluation,d_898,9.58
6,HOBBIES_1_001_CA_1_evaluation,d_897,9.58
7,HOBBIES_1_001_CA_1_evaluation,d_910,9.58
8,HOBBIES_1_001_CA_1_evaluation,d_909,9.58
9,HOBBIES_1_001_CA_1_evaluation,d_908,9.58


In [25]:
# select only required columns
calendar = calendar.select(
        "d",
        "date",
        "event_name_1",
        "event_type_1",
        "SNAP_TX",
        "SNAP_WI",
        "SNAP_CA",
    )

In [26]:
calendar.limit(10).toPandas()

Unnamed: 0,d,date,event_name_1,event_type_1,SNAP_TX,SNAP_WI,SNAP_CA
0,d_1,2011-01-29,,,0,0,0
1,d_2,2011-01-30,,,0,0,0
2,d_3,2011-01-31,,,0,0,0
3,d_4,2011-02-01,,,1,0,1
4,d_5,2011-02-02,,,0,1,1
5,d_6,2011-02-03,,,1,1,1
6,d_7,2011-02-04,,,0,0,1
7,d_8,2011-02-05,,,1,1,1
8,d_9,2011-02-06,SuperBowl,Sporting,1,1,1
9,d_10,2011-02-07,,,1,0,1


In [27]:
# last 30 rows of calendar
calendar.toPandas().tail(30)

Unnamed: 0,d,date,event_name_1,event_type_1,SNAP_TX,SNAP_WI,SNAP_CA
1939,d_1940,2016-05-21,,,0,0,0
1940,d_1941,2016-05-22,,,0,0,0
1941,d_1942,2016-05-23,,,0,0,0
1942,d_1943,2016-05-24,,,0,0,0
1943,d_1944,2016-05-25,,,0,0,0
1944,d_1945,2016-05-26,,,0,0,0
1945,d_1946,2016-05-27,,,0,0,0
1946,d_1947,2016-05-28,,,0,0,0
1947,d_1948,2016-05-29,,,0,0,0
1948,d_1949,2016-05-30,MemorialDay,National,0,0,0


In [28]:
# when d='d_1913', date=datetime.date(2016, 4, 24)

In [29]:
# check starting and ending dates
calendar.createOrReplaceTempView("temptable")
spark.sql("SELECT MIN(date) FROM temptable").collect()[0][0], \
spark.sql("SELECT MAX(date) FROM temptable").collect()[0][0]

(datetime.date(2011, 1, 29), datetime.date(2016, 6, 19))

### Some helper functions:

In [30]:
# the code below is adapted from
# https://www.kaggle.com/code/canerturkseven/forecastflowml-m5-forecasting-accuracy

# def reduce_training_dates(df):
#     """
#     Drop data before 2015 to reduce memory requirements.
#     """
#     # return df.drop(*[f"d_{i}" for i in range(1, 1434)]) # Jan 1, 2015 is d_1434
#     return df.drop(*[f"d_{i}" for i in range(1, 1434)]) # Jan 1, 2015 is d_1434
    
def melt_dataframe(df):
    """
    Convert dataframe from wide to long format.
    """
    id_vars = ["id", "item_id", "dept_id", "cat_id", "store_id", "state_id"]
    # value_vars = [f"d_{i}" for i in range(1434, 1942)] # last day is 1941 # for Jan 1, 2015
    value_vars = [f"d_{i}" for i in range(1, 1942)] # last day is 1941
    var_name = "d"
    value_name = "sales"
    expr = ", ".join([f"'{col}', cast({col} as int)" for col in value_vars])
    df = df.selectExpr(
        *id_vars,
        f"stack({len(value_vars)}, {expr}) as ({var_name}, {value_name})",
    )
    return df

def combine_datasets(df, price, calendar):
    """
    Combine actuals with price and calendar.
    """
    df = (
        df.join(calendar, on="d", how="left")
        .join(price, on=["id", "d"], how="left")
        .drop("d")
        .withColumn("date", F.to_date(F.col("date")))
        .withColumn("sales", F.col("sales").cast("smallint"))
    )
    return df

def remove_leading_zeros(df):
    """
    Remove data until the first occurance of sales per time series.
    """
    from pyspark.sql.window import Window

    w = (
        Window.partitionBy("id")
        .orderBy("date")
        .rowsBetween(Window.unboundedPreceding, 0)
    )
    df = (
        df.withColumn("no_sales", F.when(F.col("sales") == 0, 1).otherwise(0))
        .withColumn(
            "first_sales_date",
            F.when(
                F.count("no_sales").over(w) == F.sum("no_sales").over(w), 1
            ).otherwise(0),
        )
        .filter(F.col("first_sales_date") == 0)
        .drop("no_sales", "first_sales_date")
    )
    return df

def combine_snap_events(df):
    """
    Reduce three snap event columns to a single one.
    """
    df = df.withColumn(
        "snap",
        F.when(F.col("state_id") == "TX", F.col("SNAP_TX"))
        .when(F.col("state_id") == "WI", F.col("SNAP_WI"))
        .when(F.col("state_id") == "CA", F.col("SNAP_CA")),
    ).drop("SNAP_TX", "SNAP_WI", "SNAP_CA")
    return df

In [31]:
display(calendar)
# not working on jupyter lab.
# shows perfect table with scroll on databricks
# found 2015-01-01 is d_1434
# i want to reduce the dataset since my machine cant process such huge number of rows
# if i dont use this i am getting 45+ million rows to train

DataFrame[d: string, date: date, event_name_1: string, event_type_1: string, SNAP_TX: int, SNAP_WI: int, SNAP_CA: int]

In [32]:
# pre-process data
# df = reduce_training_dates(data)
# df = melt_dataframe(df)

df = melt_dataframe(data)
df = combine_datasets(df, price, calendar)
df = remove_leading_zeros(df)
df = combine_snap_events(df)

In [33]:
%%time
# number of rows after combining the datasets.
df.count()

# with all dates 46,796,220
# from 2015 14,919,266

CPU times: user 158 ms, sys: 47.8 ms, total: 206 ms
Wall time: 4min 47s


46796220

There are 46+ million rows, which is a big data.

Now I save the df as parquet file to create a checkpoint. When I stop the docker and rerun, I do not need to run all the preprocessing. This can also be useful when the pyspark crashes.<br>
Using parquet because these are smaller file size than CSV files, and they can be read and written much faster.

In [34]:
%%time
# write formatted dataset
df.write.parquet("df.parquet", mode="overwrite")

CPU times: user 198 ms, sys: 64.6 ms, total: 263 ms
Wall time: 6min 38s


In [4]:
# read it back
df = spark.read.parquet("df.parquet")

In [5]:
df.limit(10).toPandas()

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,sales,date,event_name_1,event_type_1,sell_price,snap
0,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-01-30,,,2.0,0
1,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-01-31,,,2.0,0
2,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-01,,,2.0,1
3,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-02,,,2.0,0
4,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-03,,,2.0,1
5,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-04,,,2.0,0
6,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-05,,,2.0,1
7,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,2,2011-02-06,SuperBowl,Sporting,2.0,1
8,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-02-07,,,2.0,1
9,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-08,,,2.0,0


In [37]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- dept_id: string (nullable = true)
 |-- cat_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- sales: short (nullable = true)
 |-- date: date (nullable = true)
 |-- event_name_1: string (nullable = true)
 |-- event_type_1: string (nullable = true)
 |-- sell_price: double (nullable = true)
 |-- snap: integer (nullable = true)



In [None]:
# df.columns

Save the preprocessed dataframe to csv for data visualization on Tableau.

In [6]:
# overwrite if it already exists
%time df.write.csv("data.csv", header=True, mode="overwrite")

CPU times: user 162 ms, sys: 67.1 ms, total: 229 ms
Wall time: 3min 3s


### Feature Engineering

Prepare features and target. <br>
Convert string values to numerical indices using String Indexer. <br>
Then convert it into machine readable format to feed into ML algorithms using One Hot Encoder

In [38]:
%%time
# categorical_cols = ["store_id", "item_id", "dept_id", "cat_id", "state_id", "event_name_1", "event_type_1", "snap"]
indexer1 = StringIndexer(inputCol="store_id", outputCol="store_id_index", handleInvalid = "keep")
indexer2 = StringIndexer(inputCol="item_id", outputCol="item_id_index", handleInvalid = "keep")
indexer3 = StringIndexer(inputCol="dept_id", outputCol="dept_id_index", handleInvalid = "keep")
indexer4 = StringIndexer(inputCol="cat_id", outputCol="cat_id_index", handleInvalid = "keep")
indexer5 = StringIndexer(inputCol="state_id", outputCol="state_id_index", handleInvalid = "keep")
indexer6 = StringIndexer(inputCol="event_name_1", outputCol="event_name_1_index", handleInvalid = "keep")
indexer7 = StringIndexer(inputCol="event_type_1", outputCol="event_type_1_index", handleInvalid = "keep")
indexer8 = StringIndexer(inputCol="snap", outputCol="snap_index", handleInvalid = "keep")

encoder1 = OneHotEncoder(inputCol="store_id_index", outputCol="store_id_encoded", handleInvalid = "keep")
encoder2 = OneHotEncoder(inputCol="item_id_index", outputCol="item_id_encoded", handleInvalid = "keep")
encoder3 = OneHotEncoder(inputCol="dept_id_index", outputCol="dept_id_encoded", handleInvalid = "keep")
encoder4 = OneHotEncoder(inputCol="cat_id_index", outputCol="cat_id_encoded", handleInvalid = "keep")
encoder5 = OneHotEncoder(inputCol="state_id_index", outputCol="state_id_encoded", handleInvalid = "keep")
encoder6 = OneHotEncoder(inputCol="event_name_1_index", outputCol="event_name_1_encoded", handleInvalid = "keep")
encoder7 = OneHotEncoder(inputCol="event_type_1_index", outputCol="event_type_1_encoded", handleInvalid = "keep")
encoder8 = OneHotEncoder(inputCol="snap_index", outputCol="snap_encoded", handleInvalid = "keep")

# create a pipeline
pipeline = Pipeline(stages=[indexer1, indexer2, indexer3, indexer4, indexer5, indexer6, indexer7, indexer8, encoder1, encoder2, encoder3, encoder4, encoder5, encoder6, encoder7, encoder8])

# fit and transform data using the pipeline
transformed_df = pipeline.fit(df).transform(df)

CPU times: user 1.87 s, sys: 792 ms, total: 2.66 s
Wall time: 1min 21s


In [39]:
# Show the transformed DataFrame
# transformed_df.show()
transformed_df.limit(10).toPandas()

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,sales,date,event_name_1,event_type_1,...,event_type_1_index,snap_index,store_id_encoded,item_id_encoded,dept_id_encoded,cat_id_encoded,state_id_encoded,event_name_1_encoded,event_type_1_encoded,snap_encoded
0,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-01-30,,,...,4.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0)"
1,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-01-31,,,...,4.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0)"
2,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-01,,,...,4.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(0.0, 1.0, 0.0)"
3,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-02,,,...,4.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0)"
4,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-03,,,...,4.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(0.0, 1.0, 0.0)"
5,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-04,,,...,4.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0)"
6,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-05,,,...,4.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(0.0, 1.0, 0.0)"
7,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,2,2011-02-06,SuperBowl,Sporting,...,3.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0)","(0.0, 1.0, 0.0)"
8,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-02-07,,,...,4.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(0.0, 1.0, 0.0)"
9,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-08,,,...,4.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0)"


In [40]:
transformed_df.columns

['id',
 'item_id',
 'dept_id',
 'cat_id',
 'store_id',
 'state_id',
 'sales',
 'date',
 'event_name_1',
 'event_type_1',
 'sell_price',
 'snap',
 'store_id_index',
 'item_id_index',
 'dept_id_index',
 'cat_id_index',
 'state_id_index',
 'event_name_1_index',
 'event_type_1_index',
 'snap_index',
 'store_id_encoded',
 'item_id_encoded',
 'dept_id_encoded',
 'cat_id_encoded',
 'state_id_encoded',
 'event_name_1_encoded',
 'event_type_1_encoded',
 'snap_encoded']

In [41]:
transformed_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- item_id: string (nullable = true)
 |-- dept_id: string (nullable = true)
 |-- cat_id: string (nullable = true)
 |-- store_id: string (nullable = true)
 |-- state_id: string (nullable = true)
 |-- sales: short (nullable = true)
 |-- date: date (nullable = true)
 |-- event_name_1: string (nullable = true)
 |-- event_type_1: string (nullable = true)
 |-- sell_price: double (nullable = true)
 |-- snap: integer (nullable = true)
 |-- store_id_index: double (nullable = false)
 |-- item_id_index: double (nullable = false)
 |-- dept_id_index: double (nullable = false)
 |-- cat_id_index: double (nullable = false)
 |-- state_id_index: double (nullable = false)
 |-- event_name_1_index: double (nullable = false)
 |-- event_type_1_index: double (nullable = false)
 |-- snap_index: double (nullable = false)
 |-- store_id_encoded: vector (nullable = true)
 |-- item_id_encoded: vector (nullable = true)
 |-- dept_id_encoded: vector (nullable = true)
 |-- cat_i

In [42]:
transformed_df.limit(10).toPandas()

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,sales,date,event_name_1,event_type_1,...,event_type_1_index,snap_index,store_id_encoded,item_id_encoded,dept_id_encoded,cat_id_encoded,state_id_encoded,event_name_1_encoded,event_type_1_encoded,snap_encoded
0,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-01-30,,,...,4.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0)"
1,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-01-31,,,...,4.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0)"
2,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-01,,,...,4.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(0.0, 1.0, 0.0)"
3,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-02,,,...,4.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0)"
4,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-03,,,...,4.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(0.0, 1.0, 0.0)"
5,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-04,,,...,4.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0)"
6,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-05,,,...,4.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(0.0, 1.0, 0.0)"
7,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,2,2011-02-06,SuperBowl,Sporting,...,3.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, ...","(0.0, 0.0, 0.0, 1.0, 0.0)","(0.0, 1.0, 0.0)"
8,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-02-07,,,...,4.0,1.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(0.0, 1.0, 0.0)"
9,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-08,,,...,4.0,0.0,"(0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0)","(1.0, 0.0, 0.0, 0.0)","(0.0, 1.0, 0.0, 0.0)","(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...","(0.0, 0.0, 0.0, 0.0, 1.0)","(1.0, 0.0, 0.0)"


Using the date feature, we can create some additional time features. These can be used to learn trend and seasonality. <br>
I have used the source code of ForecastFlowML library.<br>
Not using the library itself because this notebook focuses on using PySpark exclusively.

In [43]:
# https://github.com/canerturkseven/ForecastFlowML/blob/master/src/forecastflowml/preprocessing.py

def add_date_features(df, date_col, features):
    supported_features = [
        "day_of_week",
        "day_of_year",
        "day_of_month",
        "week_of_year",
        "week_of_month",
        "weekend",
        "month",
        "quarter",
        "year",
    ]

    not_supported = set(features) - set(supported_features)
    if len(not_supported) > 0:
        raise ValueError(f"{', '.join(not_supported)} feature(s) not supported.")

    for feature in features:
        if feature == "day_of_week":
            df = df.withColumn(
                feature, F.dayofweek(F.col(date_col)).cast("tinyint") - 1
            )
            df = df.withColumn(
                feature, F.when(F.col(feature) == 0, 7).otherwise(F.col(feature))
            )
        if feature == "day_of_year":
            df = df.withColumn(feature, F.dayofyear(F.col(date_col)).cast("smallint"))
        if feature == "day_of_month":
            df = df.withColumn(feature, F.dayofmonth(F.col(date_col)).cast("tinyint"))
        if feature == "week_of_year":
            df = df.withColumn(feature, F.weekofyear(F.col(date_col)).cast("tinyint"))
        if feature == "week_of_month":
            df = df.withColumn(
                feature, F.ceil(F.dayofmonth(F.col(date_col)) / 7).cast("tinyint")
            )
        if feature == "weekend":
            df = df.withColumn(
                feature,
                F.when(F.dayofweek(F.col(date_col)).isin([1, 7]), 1)
                .otherwise(0)
                .cast("tinyint"),
            )
        if feature == "month":
            df = df.withColumn("month", F.month(F.col(date_col)).cast("tinyint"))
        if feature == "quarter":
            df = df.withColumn("quarter", F.quarter(F.col(date_col)).cast("tinyint"))
        if feature == "year":
            df = df.withColumn("year", F.year(F.col(date_col)).cast("smallint"))
    return df

In [44]:
transformed_df = add_date_features(transformed_df, "date", [
        "day_of_week",
        "day_of_year",
        "day_of_month",
        "week_of_year",
        "week_of_month",
        "weekend",
        "month",
        "quarter",
        "year",
    ])

In [45]:
# with time features
transformed_df.limit(10).toPandas()

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,sales,date,event_name_1,event_type_1,...,snap_encoded,day_of_week,day_of_year,day_of_month,week_of_year,week_of_month,weekend,month,quarter,year
0,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-01-30,,,...,"(1.0, 0.0, 0.0)",7,30,30,4,5,1,1,1,2011
1,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-01-31,,,...,"(1.0, 0.0, 0.0)",1,31,31,5,5,0,1,1,2011
2,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-01,,,...,"(0.0, 1.0, 0.0)",2,32,1,5,1,0,2,1,2011
3,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-02,,,...,"(1.0, 0.0, 0.0)",3,33,2,5,1,0,2,1,2011
4,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-03,,,...,"(0.0, 1.0, 0.0)",4,34,3,5,1,0,2,1,2011
5,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-04,,,...,"(1.0, 0.0, 0.0)",5,35,4,5,1,0,2,1,2011
6,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-05,,,...,"(0.0, 1.0, 0.0)",6,36,5,5,1,1,2,1,2011
7,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,2,2011-02-06,SuperBowl,Sporting,...,"(0.0, 1.0, 0.0)",7,37,6,5,1,1,2,1,2011
8,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,1,2011-02-07,,,...,"(0.0, 1.0, 0.0)",1,38,7,6,1,0,2,1,2011
9,FOODS_1_001_TX_1_evaluation,FOODS_1_001,FOODS_1,FOODS,TX_1,TX,0,2011-02-08,,,...,"(1.0, 0.0, 0.0)",2,39,8,6,2,0,2,1,2011


In [46]:
transformed_df.columns

['id',
 'item_id',
 'dept_id',
 'cat_id',
 'store_id',
 'state_id',
 'sales',
 'date',
 'event_name_1',
 'event_type_1',
 'sell_price',
 'snap',
 'store_id_index',
 'item_id_index',
 'dept_id_index',
 'cat_id_index',
 'state_id_index',
 'event_name_1_index',
 'event_type_1_index',
 'snap_index',
 'store_id_encoded',
 'item_id_encoded',
 'dept_id_encoded',
 'cat_id_encoded',
 'state_id_encoded',
 'event_name_1_encoded',
 'event_type_1_encoded',
 'snap_encoded',
 'day_of_week',
 'day_of_year',
 'day_of_month',
 'week_of_year',
 'week_of_month',
 'weekend',
 'month',
 'quarter',
 'year']

In [None]:
# if you want to add lag features with rolling mean, use the below function
# https://github.com/canerturkseven/ForecastFlowML/blob/master/src/forecastflowml/preprocessing.py

# import pyspark.sql.functions as F
# from pyspark.sql.window import Window

# def add_lag_window_summarizer(df, id_col, target_col, date_col, features):
#     w1 = Window.partitionBy(id_col).orderBy(date_col)
#     for key, values in features.items():
#         if key == "lag":
#             for lag in values:
#                 df = df.withColumn(f"lag_{lag}", F.lag(target_col, lag).over(w1))
#         else:
#             for window, lag in values:
#                 w2 = w1.rowsBetween(-(lag + window - 1), -lag)
#                 df = df.withColumn(
#                     f"window_{window}_lag_{lag}_{key}",
#                     F.expr(f"{key}({target_col})").over(w2),
#                 )
#     return df

# transformed_df = add_lag_window_summarizer(transformed_df, "id", "sales", "date", {
#                     "lag": [7 * (i + 1) for i in range(4)], 
#                     "mean": [
#                         [window, lag] for lag in [7, 14] for window in [7]
#                     ],
#                 })

Lag features are also created to capture the temporal dependencies and patterns in time series data. Lag period of 7, 14, 21 and 28 days are created in the process.

In [47]:
def add_lag_features(input_df, lag_periods):
    """
    Create lag features grouped by ids.
    """
    window_spec = Window.partitionBy("id").orderBy("date")
    
    lag_columns = []
    
    for lag_period in lag_periods:
        lag_col_name = f"lag_{lag_period}"
        lag_columns.append(lag_col_name)
        
        lag_expr = F.lag("sales", lag_period).over(window_spec)
        input_df = input_df.withColumn(lag_col_name, lag_expr)
    
    return input_df, lag_columns

# add lag periods
lag_periods = [7, 14, 21, 28]
transformed_df, lag_column_names = add_lag_features(transformed_df, lag_periods)

In [48]:
lag_column_names

['lag_7', 'lag_14', 'lag_21', 'lag_28']

In [49]:
# with lag features
%time transformed_df.limit(10).toPandas()

CPU times: user 336 ms, sys: 53.8 ms, total: 390 ms
Wall time: 3min 37s


Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,sales,date,event_name_1,event_type_1,...,week_of_year,week_of_month,weekend,month,quarter,year,lag_7,lag_14,lag_21,lag_28
0,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,2,2012-03-23,,,...,12,4,0,3,1,2012,,,,
1,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,1,2012-03-24,,,...,12,4,1,3,1,2012,,,,
2,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,1,2012-03-25,,,...,12,4,1,3,1,2012,,,,
3,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,5,2012-03-26,,,...,13,4,0,3,1,2012,,,,
4,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,1,2012-03-27,,,...,13,4,0,3,1,2012,,,,
5,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,3,2012-03-28,,,...,13,4,0,3,1,2012,,,,
6,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,1,2012-03-29,,,...,13,5,0,3,1,2012,,,,
7,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,0,2012-03-30,,,...,13,5,0,3,1,2012,2.0,,,
8,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,1,2012-03-31,,,...,13,5,1,3,1,2012,1.0,,,
9,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,2,2012-04-01,,,...,13,1,1,4,2,2012,1.0,,,


In [50]:
transformed_df.columns

['id',
 'item_id',
 'dept_id',
 'cat_id',
 'store_id',
 'state_id',
 'sales',
 'date',
 'event_name_1',
 'event_type_1',
 'sell_price',
 'snap',
 'store_id_index',
 'item_id_index',
 'dept_id_index',
 'cat_id_index',
 'state_id_index',
 'event_name_1_index',
 'event_type_1_index',
 'snap_index',
 'store_id_encoded',
 'item_id_encoded',
 'dept_id_encoded',
 'cat_id_encoded',
 'state_id_encoded',
 'event_name_1_encoded',
 'event_type_1_encoded',
 'snap_encoded',
 'day_of_week',
 'day_of_year',
 'day_of_month',
 'week_of_year',
 'week_of_month',
 'weekend',
 'month',
 'quarter',
 'year',
 'lag_7',
 'lag_14',
 'lag_21',
 'lag_28']

Lag features are added.

In [51]:
%time transformed_df.limit(10).toPandas()

CPU times: user 154 ms, sys: 18.7 ms, total: 173 ms
Wall time: 2min 47s


Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,sales,date,event_name_1,event_type_1,...,week_of_year,week_of_month,weekend,month,quarter,year,lag_7,lag_14,lag_21,lag_28
0,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,2,2012-03-23,,,...,12,4,0,3,1,2012,,,,
1,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,1,2012-03-24,,,...,12,4,1,3,1,2012,,,,
2,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,1,2012-03-25,,,...,12,4,1,3,1,2012,,,,
3,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,5,2012-03-26,,,...,13,4,0,3,1,2012,,,,
4,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,1,2012-03-27,,,...,13,4,0,3,1,2012,,,,
5,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,3,2012-03-28,,,...,13,4,0,3,1,2012,,,,
6,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,1,2012-03-29,,,...,13,5,0,3,1,2012,,,,
7,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,0,2012-03-30,,,...,13,5,0,3,1,2012,2.0,,,
8,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,1,2012-03-31,,,...,13,5,1,3,1,2012,1.0,,,
9,FOODS_1_017_WI_3_evaluation,FOODS_1_017,FOODS_1,FOODS,WI_3,WI,2,2012-04-01,,,...,13,1,1,4,2,2012,1.0,,,


Again, I save the df as parquet file because when the spark crashes or I stop the docker and rerun, I do not need to run all the preprocessing. 

In [53]:
# write into file
transformed_df.write.parquet("transformed_df.parquet", mode="overwrite")

In [5]:
# read it back
transformed_df = spark.read.parquet("transformed_df.parquet")

In [5]:
transformed_df.limit(10).toPandas()

Unnamed: 0,id,item_id,dept_id,cat_id,store_id,state_id,sales,date,event_name_1,event_type_1,...,week_of_year,week_of_month,weekend,month,quarter,year,lag_7,lag_14,lag_21,lag_28
0,FOODS_1_009_CA_2_evaluation,FOODS_1_009,FOODS_1,FOODS,CA_2,CA,1,2012-03-08,Purim End,Religious,...,10,2,0,3,1,2012,,,,
1,FOODS_1_009_CA_2_evaluation,FOODS_1_009,FOODS_1,FOODS,CA_2,CA,0,2012-03-09,,,...,10,2,0,3,1,2012,,,,
2,FOODS_1_009_CA_2_evaluation,FOODS_1_009,FOODS_1,FOODS,CA_2,CA,0,2012-03-10,,,...,10,2,1,3,1,2012,,,,
3,FOODS_1_009_CA_2_evaluation,FOODS_1_009,FOODS_1,FOODS,CA_2,CA,0,2012-03-11,,,...,10,2,1,3,1,2012,,,,
4,FOODS_1_009_CA_2_evaluation,FOODS_1_009,FOODS_1,FOODS,CA_2,CA,0,2012-03-12,,,...,11,2,0,3,1,2012,,,,
5,FOODS_1_009_CA_2_evaluation,FOODS_1_009,FOODS_1,FOODS,CA_2,CA,1,2012-03-13,,,...,11,2,0,3,1,2012,,,,
6,FOODS_1_009_CA_2_evaluation,FOODS_1_009,FOODS_1,FOODS,CA_2,CA,0,2012-03-14,,,...,11,2,0,3,1,2012,,,,
7,FOODS_1_009_CA_2_evaluation,FOODS_1_009,FOODS_1,FOODS,CA_2,CA,7,2012-03-15,,,...,11,3,0,3,1,2012,1.0,,,
8,FOODS_1_009_CA_2_evaluation,FOODS_1_009,FOODS_1,FOODS,CA_2,CA,0,2012-03-16,,,...,11,3,0,3,1,2012,0.0,,,
9,FOODS_1_009_CA_2_evaluation,FOODS_1_009,FOODS_1,FOODS,CA_2,CA,0,2012-03-17,StPatricksDay,Cultural,...,11,3,1,3,1,2012,0.0,,,


In [6]:
# check starting and ending dates from the combined data
transformed_df.createOrReplaceTempView("temptable")
spark.sql("SELECT MIN(date) FROM temptable").collect()[0][0], \
spark.sql("SELECT MAX(date) FROM temptable").collect()[0][0]

# (datetime.date(2011, 1, 29), datetime.date(2016, 5, 22))

(datetime.date(2011, 1, 29), datetime.date(2016, 5, 22))

In [6]:
# remove nan values from the lag features
%time transformed_df = transformed_df.dropna(subset="lag_28")

CPU times: user 2.19 ms, sys: 3.05 ms, total: 5.24 ms
Wall time: 154 ms


When we create the lag features, some rows get null values as there are no existing data to create the lag values. <br>
For example when we create lag features of 7 days window, the first 7 days will have null values as there are no existing values to create the lag values for that period.<br>
We need to drop such rows having null values because our machine learning models can't take null values.

Create input feature for the regressors using VectorAssembler.

In [7]:
# assemble features using VectorAssembler
feature_cols = ["store_id_encoded", "item_id_encoded", "dept_id_encoded", 
                "cat_id_encoded", "state_id_encoded", "event_name_1_encoded", 
                "event_type_1_encoded", "snap_encoded", "sell_price", 
                "day_of_week", "day_of_year", "day_of_month", "week_of_year",
                "week_of_month", "weekend", "month", "quarter", "year",
                "lag_7", "lag_14", "lag_21", "lag_28"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid = "keep")

In [8]:
%time transformed_df = assembler.transform(transformed_df)

CPU times: user 14.3 ms, sys: 8.82 ms, total: 23.1 ms
Wall time: 619 ms


In [11]:
type(transformed_df)

pyspark.sql.dataframe.DataFrame

In [12]:
transformed_df.columns

['id',
 'item_id',
 'dept_id',
 'cat_id',
 'store_id',
 'state_id',
 'sales',
 'date',
 'event_name_1',
 'event_type_1',
 'sell_price',
 'snap',
 'store_id_index',
 'item_id_index',
 'dept_id_index',
 'cat_id_index',
 'state_id_index',
 'event_name_1_index',
 'event_type_1_index',
 'snap_index',
 'store_id_encoded',
 'item_id_encoded',
 'dept_id_encoded',
 'cat_id_encoded',
 'state_id_encoded',
 'event_name_1_encoded',
 'event_type_1_encoded',
 'snap_encoded',
 'day_of_week',
 'day_of_year',
 'day_of_month',
 'week_of_year',
 'week_of_month',
 'weekend',
 'month',
 'quarter',
 'year',
 'lag_7',
 'lag_14',
 'lag_21',
 'lag_28',
 'features']

An additional feature called "features" is created using VectorAssembler which will be input feature column for the different regressors.

### Divide the Data into Train and Test

Before we divide the data into train and test sets, let us select only the data for Texas store 1.<br>
I am doing this because it takes a lot of time to train the model with all the data (46+ million rows). <br>
If you want to use all the data you can skip the following cell and jump to its next cell.

In [9]:
# skip this cell if you want to use all the data
transformed_df = transformed_df.filter(F.col("store_id") == "TX_1")

In [None]:
# check starting and ending dates on calendar
calendar.createOrReplaceTempView("temptable")
spark.sql("SELECT MIN(date) FROM temptable").collect()[0][0], \
spark.sql("SELECT MAX(date) FROM temptable").collect()[0][0]

# (datetime.date(2011, 1, 29), datetime.date(2016, 6, 19))

In [16]:
# check starting and ending dates from the combined data
transformed_df.createOrReplaceTempView("temptable")
spark.sql("SELECT MIN(date) FROM temptable").collect()[0][0], \
spark.sql("SELECT MAX(date) FROM temptable").collect()[0][0]

# (datetime.date(2011, 1, 29), datetime.date(2016, 5, 22))

(datetime.date(2011, 2, 26), datetime.date(2016, 5, 22))

The last data available is of date 2016, 5, 22

Now, divide the data into train and test. <br>
I have used the last 28 days as the test data.

In [10]:
train_data = transformed_df.filter(F.col("date") <= "2016-04-24")
test_data = transformed_df.filter(F.col("date") > "2016-04-24")

In [18]:
train_data.count(), test_data.count()

(4619453, 85372)

# Create Models

Since this is a time series problem, it can be taken as a supervised machine learning problem. It can be considered as a regression problem. 

In [11]:
# Before we create model, let's create an evaluator for LR, GBT and RF
evaluator = RegressionEvaluator(labelCol="sales", predictionCol="prediction", metricName="rmse")

### 1. Liner Regression

In [20]:
%%time
lr = LinearRegression(featuresCol="features", labelCol="sales")

# fit the model on the training data
lr_model = lr.fit(train_data)

CPU times: user 80 ms, sys: 34.9 ms, total: 115 ms
Wall time: 3min 3s


In [21]:
%%time
# make predictions on the test data
lr_predictions = lr_model.transform(test_data)

CPU times: user 31.9 ms, sys: 14.5 ms, total: 46.4 ms
Wall time: 278 ms


In [22]:
%%time
# evaluate the model
lr_rmse = evaluator.evaluate(lr_predictions)
print(f"Root Mean Squared Error (RMSE): {lr_rmse}")

Root Mean Squared Error (RMSE): 1.8214197716503682
CPU times: user 25.3 ms, sys: 21.5 ms, total: 46.9 ms
Wall time: 1min 16s


In [23]:
%%time
# Show the predictions
lr_predictions.limit(25).select("id", "sales", "prediction").toPandas()

CPU times: user 48.9 ms, sys: 2.51 ms, total: 51.5 ms
Wall time: 356 ms


Unnamed: 0,id,sales,prediction
0,FOODS_1_012_TX_1_evaluation,10,4.181226
1,FOODS_1_012_TX_1_evaluation,2,7.70116
2,FOODS_1_012_TX_1_evaluation,11,9.798426
3,FOODS_1_012_TX_1_evaluation,9,6.322064
4,FOODS_1_012_TX_1_evaluation,6,6.829528
5,FOODS_1_012_TX_1_evaluation,8,4.091129
6,FOODS_1_012_TX_1_evaluation,12,6.898606
7,FOODS_1_012_TX_1_evaluation,4,6.028119
8,FOODS_1_012_TX_1_evaluation,27,6.268582
9,FOODS_1_012_TX_1_evaluation,23,10.49926


### 2. Decision Tree Regressor

In [23]:
%%time
dt = DecisionTreeRegressor(featuresCol="features", labelCol="sales", maxDepth=2, seed=42)

# fit the model on the training data
dt_model = dt.fit(train_data)

CPU times: user 512 ms, sys: 146 ms, total: 659 ms
Wall time: 16min 57s


In [24]:
%%time
# make predictions on the test data
dt_predictions = dt_model.transform(test_data)

CPU times: user 21.9 ms, sys: 686 µs, total: 22.6 ms
Wall time: 436 ms


In [25]:
%%time
# evaluate the model
dt_rmse = evaluator.evaluate(dt_predictions)
print(f"Root Mean Squared Error (RMSE): {dt_rmse}")

Root Mean Squared Error (RMSE): 2.163706003513224
CPU times: user 421 ms, sys: 188 ms, total: 609 ms
Wall time: 3min 27s


In [26]:
%%time
# show actual sales and predictions
dt_predictions.limit(25).select("id", "sales", "prediction").toPandas()

CPU times: user 526 ms, sys: 61.7 ms, total: 588 ms
Wall time: 1min 49s


Unnamed: 0,id,sales,prediction
0,FOODS_1_012_TX_1_evaluation,10,4.188794
1,FOODS_1_012_TX_1_evaluation,2,4.188794
2,FOODS_1_012_TX_1_evaluation,11,11.823307
3,FOODS_1_012_TX_1_evaluation,9,4.188794
4,FOODS_1_012_TX_1_evaluation,6,4.188794
5,FOODS_1_012_TX_1_evaluation,8,4.188794
6,FOODS_1_012_TX_1_evaluation,12,4.188794
7,FOODS_1_012_TX_1_evaluation,4,0.675329
8,FOODS_1_012_TX_1_evaluation,27,4.188794
9,FOODS_1_012_TX_1_evaluation,23,11.823307


### 3. Random Forest Regressor

In [12]:
%%time
rf = RandomForestRegressor(featuresCol="features", labelCol="sales", numTrees=10, seed=42)
# numTrees=10 for short training time

# fit the model on the training data
rf_model = rf.fit(train_data)

CPU times: user 874 ms, sys: 382 ms, total: 1.26 s
Wall time: 20min 38s


In [13]:
%%time
# make predictions on the test data
rf_predictions = rf_model.transform(test_data)

CPU times: user 76.6 ms, sys: 20.1 ms, total: 96.7 ms
Wall time: 682 ms


In [14]:
%%time
# evaluate the model
rf_rmse = evaluator.evaluate(rf_predictions)
print(f"Root Mean Squared Error (RMSE): {rf_rmse}")

Root Mean Squared Error (RMSE): 1.865908138713067
CPU times: user 148 ms, sys: 107 ms, total: 255 ms
Wall time: 1min 48s


In [15]:
%%time
# show actual sales and predictions
rf_predictions.limit(25).select("id", "sales", "prediction").toPandas()

CPU times: user 3.61 s, sys: 1.45 s, total: 5.06 s
Wall time: 7.43 s


Unnamed: 0,id,sales,prediction
0,FOODS_1_012_TX_1_evaluation,10,3.540976
1,FOODS_1_012_TX_1_evaluation,2,7.634429
2,FOODS_1_012_TX_1_evaluation,11,10.845778
3,FOODS_1_012_TX_1_evaluation,9,5.11423
4,FOODS_1_012_TX_1_evaluation,6,6.530422
5,FOODS_1_012_TX_1_evaluation,8,4.045484
6,FOODS_1_012_TX_1_evaluation,12,6.555109
7,FOODS_1_012_TX_1_evaluation,4,5.617624
8,FOODS_1_012_TX_1_evaluation,27,4.965584
9,FOODS_1_012_TX_1_evaluation,23,10.906768


### 4. Light GBM

In [None]:
# Light gbm installation
# Goto compute. Open the cluster you are using. Goto Libraries tab. Click on “Install new”.
# Select Maven as Library Source. 
# In Coordinates: com.microsoft.azure:synapseml_2.12:0.11.2-spark3.3 
# In repository: https://mmlspark.azureedge.net/maven 

# Ensure this library is attached to your target cluster(s).
# Finally, ensure that your Spark cluster has at least Spark 3.2 and Scala 2.12.

In [None]:
from synapse.ml.lightgbm import LightGBMRegressor
from synapse.ml.train import ComputeModelStatistics

In [None]:
%%time
lgbm = LightGBMRegressor(featuresCol="features", labelCol="sales", objective="tweedie", learningRate=0.3, numLeaves=31)

# fit the model on the training data
lgbm_model = lgbm.fit(train_data)

In [None]:
# print(lgbm_model.getFeatureImportances()) 

In [None]:
%%time
# make predictions on the test data
lgbm_predictions = lgbm_model.transform(test_data)

In [None]:
%%time
# evaluate the model
lgbm_rmse = evaluator.evaluate(lgbm_predictions)
print(f"Root Mean Squared Error (RMSE): {lgbm_rmse}")

In [None]:
metrics = ComputeModelStatistics(
    evaluationMetric="regression", labelCol="sales", scoresCol="prediction"
).transform(lgbm_predictions)

display(metrics)

In [None]:
%%time
# show actual sales and predictions
lgbm_predictions.limit(25).select("id", "sales", "prediction").toPandas()

In [None]:
%%time
# show actual sales and predictions
xgb_predictions.limit(25).select("id", "sales", "prediction").toPandas()

### 5. Gradient Boosted Trees Regressor

Run the GBTRegressor only if you have a powerful machine!

In [None]:
%%time
gbt = GBTRegressor(featuresCol="features", labelCol="sales", maxDepth=2, maxIter=5, cacheNodeIds= True, seed=42)

# fit the model on the training data
gbt_model = gbt.fit(train_data)

In [None]:
%%time
# make predictions on the test data
gbt_predictions = gbt_model.transform(test_data)

In [None]:
%%time
# evaluate the model
gbt_rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {gbt_rmse}")

In [None]:
%%time
# Show the predictions
gbt_predictions.limit(25).select("id", "sales", "prediction").toPandas()

# Comparing Models

In [9]:
# I have copied and pasted the values because I trained and calculated the models at different times.
# The values of the variables are lost when you restart the spark.
lr_rmse = 1.8214197716503682
dt_rmse = 2.163706003513224
rf_rmse = 1.865908138713067
lgbm_rmse = 1.7913037054709238

In [10]:
# compare the models using RMSE score
model_names = ["LinearRegressor", "DecisionTreeRegressor", "RandomForestRegressor", "LightGBMRegressor"]
rmse_scores = [lr_rmse, dt_rmse, rf_rmse, lgbm_rmse]

# create a PrettyTable
pt = PrettyTable()
pt.field_names = ["Model", "RMSE"]

for model, rmse in zip(model_names, rmse_scores):
    pt.add_row([model, rmse])

print(pt)

+-----------------------+--------------------+
|         Model         |        RMSE        |
+-----------------------+--------------------+
|    LinearRegressor    | 1.8214197716503682 |
| DecisionTreeRegressor | 2.163706003513224  |
| RandomForestRegressor | 1.865908138713067  |
|   LightGBMRegressor   | 1.7913037054709238 |
+-----------------------+--------------------+


By comparing the model, we can select the LightGBMRegressor as the best model. It has the lowest RMSE value and runs in short time.

<i>Thank you for reading my notebook.</i>


<i>Avishek K C </i>

<i>2023 </i>