# Imports

In [85]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import OneHotEncoder
import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.model_selection import GridSearchCV
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import seaborn as sb

import tensorflow as tf
from tensorflow import keras

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.compose import make_column_selector
from sklearn.model_selection import cross_val_score

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as F
from pyspark.sql.functions import sum, when, avg, max, col, dayofweek, count, month, lit, mean
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Imputer
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.regression import RandomForestRegressor
from pyspark.sql.functions import expr
from pyspark.sql import Row
from pyspark.sql.types import DateType

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=abd671e141ce7b4f6a92c089028abdf73cee7bd902a2e2da1f4e870ce198835c
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
import warnings

warnings.filterwarnings("ignore", category=FutureWarning)

In [None]:
spark = SparkSession.builder.appName('RHODS').getOrCreate()

In [None]:
spark.catalog.clearCache()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1024 * 1024 * 1024)

# Data extraction

In [None]:
main_df = spark.read.csv('train.csv', header = True, inferSchema = True)
oil_df = spark.read.csv('oil.csv', header = True, inferSchema = True)
holidays_df = spark.read.csv('holidays_events.csv', header = True, inferSchema = True)
transactions_df = spark.read.csv('transactions.csv', header = True, inferSchema = True)

# Feature Engineering

In [None]:
main_df.show()

+---+----------+---------+-------------------+-----+-----------+
| id|      date|store_nbr|             family|sales|onpromotion|
+---+----------+---------+-------------------+-----+-----------+
|  0|2013-01-01|        1|         AUTOMOTIVE|  0.0|          0|
|  1|2013-01-01|        1|          BABY CARE|  0.0|          0|
|  2|2013-01-01|        1|             BEAUTY|  0.0|          0|
|  3|2013-01-01|        1|          BEVERAGES|  0.0|          0|
|  4|2013-01-01|        1|              BOOKS|  0.0|          0|
|  5|2013-01-01|        1|       BREAD/BAKERY|  0.0|          0|
|  6|2013-01-01|        1|        CELEBRATION|  0.0|          0|
|  7|2013-01-01|        1|           CLEANING|  0.0|          0|
|  8|2013-01-01|        1|              DAIRY|  0.0|          0|
|  9|2013-01-01|        1|               DELI|  0.0|          0|
| 10|2013-01-01|        1|               EGGS|  0.0|          0|
| 11|2013-01-01|        1|       FROZEN FOODS|  0.0|          0|
| 12|2013-01-01|        1

## Train_df Aggregation, Date transformation

In [None]:
# Aggregation of the main_df to get the sales volumes aggregatet over all stores for each product family and date
agg_df = main_df.groupBy('date', 'family').agg(sum('sales').alias("sales"), sum('onpromotion').alias("onpromotion"))

# Adding of features related to the day of the week and the month of each record based on the date
agg_df = agg_df.withColumn('day_of_week', dayofweek(agg_df.date)).withColumn('month', month(agg_df.date))
agg_df.sort(col("date"), col("family")).show(truncate = False)

+----------+-------------------+---------+-----------+-----------+-----+
|date      |family             |sales    |onpromotion|day_of_week|month|
+----------+-------------------+---------+-----------+-----------+-----+
|2013-01-01|AUTOMOTIVE         |0.0      |0          |3          |1    |
|2013-01-01|BABY CARE          |0.0      |0          |3          |1    |
|2013-01-01|BEAUTY             |2.0      |0          |3          |1    |
|2013-01-01|BEVERAGES          |810.0    |0          |3          |1    |
|2013-01-01|BOOKS              |0.0      |0          |3          |1    |
|2013-01-01|BREAD/BAKERY       |180.589  |0          |3          |1    |
|2013-01-01|CELEBRATION        |0.0      |0          |3          |1    |
|2013-01-01|CLEANING           |186.0    |0          |3          |1    |
|2013-01-01|DAIRY              |143.0    |0          |3          |1    |
|2013-01-01|DELI               |71.09    |0          |3          |1    |
|2013-01-01|EGGS               |46.0     |0        

## Oil_df Merge
In this section the agg_df and the oil_df are getting merged to use the oil price in the ML-Part of the project

### Preprocessing

In [86]:
# Aggregating the oil_df and the agg_df
agg_oil_merge_df = agg_df.join(oil_df, on='date', how='left') # left join agg_df and oil_df
agg_oil_merge_df = agg_oil_merge_df.dropDuplicates(['date']) # reduce df to one row per date

# Calculate the percentage of NaN values in the 'dcoilwtico' column
percentage_nan = (agg_oil_merge_df.filter(col("dcoilwtico").isNull()).count() / agg_oil_merge_df.count()) * 100
print(f'Procentual number of NaN-Values: {percentage_nan}%')

Procentual number of NaN-Values: 30.93824228028503%


Because of the high number of NaN-Values regarding the oilprice-value we decided to do a oilprice forecast

### Oilprice forecast

In [87]:
# Create a subset of agg_oil_merge_df to forecast the oil prices
oil_forecast_df = agg_oil_merge_df.select('date', 'dcoilwtico', 'day_of_week', 'month')

In [89]:
n_lags = 5
window_spec = Window.orderBy('date')
for i in range(1, n_lags+1):
  oil_forecast_df = oil_forecast_df.withColumn("lag_{}".format(i), F.lag("dcoilwtico", offset = i).over(window_spec).cast('float'))

oil_forecast_df = oil_forecast_df.withColumn("dcoilwtico", oil_forecast_df["dcoilwtico"].cast('float'))
oil_forecast_df.show()

+----------+----------+-----------+-----+-----+-----+-----+-----+-----+
|      date|dcoilwtico|day_of_week|month|lag_1|lag_2|lag_3|lag_4|lag_5|
+----------+----------+-----------+-----+-----+-----+-----+-----+-----+
|2013-01-01|      NULL|          3|    1| NULL| NULL| NULL| NULL| NULL|
|2013-01-02|     93.14|          4|    1| NULL| NULL| NULL| NULL| NULL|
|2013-01-03|     92.97|          5|    1|93.14| NULL| NULL| NULL| NULL|
|2013-01-04|     93.12|          6|    1|92.97|93.14| NULL| NULL| NULL|
|2013-01-05|      NULL|          7|    1|93.12|92.97|93.14| NULL| NULL|
|2013-01-06|      NULL|          1|    1| NULL|93.12|92.97|93.14| NULL|
|2013-01-07|      93.2|          2|    1| NULL| NULL|93.12|92.97|93.14|
|2013-01-08|     93.21|          3|    1| 93.2| NULL| NULL|93.12|92.97|
|2013-01-09|     93.08|          4|    1|93.21| 93.2| NULL| NULL|93.12|
|2013-01-10|     93.81|          5|    1|93.08|93.21| 93.2| NULL| NULL|
|2013-01-11|      93.6|          6|    1|93.81|93.08|93.21| 93.2

#### Data transformation/encoding/split, Pipeline creation

In [101]:
numerical_cols = ['lag_1', 'lag_2', 'lag_3', 'lag_4', 'lag_5']
categorical_cols = ['day_of_week', 'month']

# Defining the encoder for encoding the categorical values
encoder = OneHotEncoder(inputCols=categorical_cols,
                        outputCols=[f"{col}_encoded" for col in categorical_cols])

oil_forecast_encoded_df = encoder.fit(oil_forecast_df).transform(oil_forecast_df)

In [102]:
# Create the train set of data with oil prices
oil_fc_train = oil_forecast_encoded_df.filter(oil_forecast_encoded_df['dcoilwtico'].isNotNull())
# Create the prediction set of data without oil prices
oil_fc_pred = oil_forecast_encoded_df.filter(oil_forecast_encoded_df['dcoilwtico'].isNull())

In [103]:
# Set the imputer
numerical_imputer = Imputer(strategy="median", inputCols=numerical_cols, outputCols=[f"{col}_imputed" for col in numerical_cols])

# Assemble features into a single vector
feature_cols = [f"{col}_imputed" for col in numerical_cols] + [f"{col}_encoded" for col in categorical_cols]
assembler = VectorAssembler(inputCols=feature_cols,outputCol="features")

# Initialise the oilprice prediction model
rfr_oil = RandomForestRegressor(featuresCol='features', labelCol='dcoilwtico')

# Create the data preprocessing pipeline
oil_pipeline = Pipeline(stages=[numerical_imputer, assembler, rfr_oil]) # pipeline for cross validation

#### Model training

In [104]:
param_grid = ParamGridBuilder() \
    .addGrid(rfr_oil.maxDepth, [5,7]) \
    .build()

evaluator = RegressionEvaluator(labelCol="dcoilwtico", predictionCol="prediction", metricName="mae")

crossval = CrossValidator(estimator=oil_pipeline,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=5)

cv_model_oil = crossval.fit(oil_fc_train)

best_model_oil = cv_model_oil.bestModel # Model with the best parameters

oil_predictions = best_model_oil.transform(oil_fc_pred)

#### Oil price dataframe creation

In [107]:
oil_new_df = oil_fc_train.select('date', 'dcoilwtico').union(oil_predictions.select('date', 'prediction').withColumnRenamed("prediction",'dcoilwtico'))
oil_new_df.sort(col("date")).show(truncate = False)

+----------+-----------------+
|date      |dcoilwtico       |
+----------+-----------------+
|2013-01-01|51.27001419525241|
|2013-01-02|93.13999938964844|
|2013-01-03|92.97000122070312|
|2013-01-04|93.12000274658203|
|2013-01-05|93.24322576849536|
|2013-01-06|94.23897578407957|
|2013-01-07|93.19999694824219|
|2013-01-08|93.20999908447266|
|2013-01-09|93.08000183105469|
|2013-01-10|93.80999755859375|
|2013-01-11|93.5999984741211 |
|2013-01-12|93.8679537450954 |
|2013-01-13|93.56011075269237|
|2013-01-14|94.2699966430664 |
|2013-01-15|93.26000213623047|
|2013-01-16|94.27999877929688|
|2013-01-17|95.48999786376953|
|2013-01-18|95.61000061035156|
|2013-01-19|95.84087724370666|
|2013-01-20|94.70283648174117|
+----------+-----------------+
only showing top 20 rows



In [109]:
# Now the new_oil_df gets merged with the agg_df to do further preprocessing
agg_oilprice_merged_df = agg_df.join(oil_new_df, on='date', how='left')

# There are no NaN-Values left in the Column "dcoilwtico"
percentage_nan = (agg_oilprice_merged_df.filter(col("dcoilwtico").isNull()).count() / agg_oilprice_merged_df.count()) * 100
print(f'Procentual number of NaN-Values: {percentage_nan}%')

Procentual number of NaN-Values: 0.0%


## Transactions_df Merge
In this section we merge the aggregations_df with the agg_df to use the number of transaction per day for furter predictions

In [110]:
agg_transactions_df = transactions_df.groupBy("date").agg(sum("transactions").alias("transactions"))
agg_transactions_df.sort(col("date")).show(truncate = False)

+----------+------------+
|date      |transactions|
+----------+------------+
|2013-01-01|770         |
|2013-01-02|93215       |
|2013-01-03|78504       |
|2013-01-04|78494       |
|2013-01-05|93573       |
|2013-01-06|90464       |
|2013-01-07|75597       |
|2013-01-08|72325       |
|2013-01-09|71971       |
|2013-01-10|66383       |
|2013-01-11|70338       |
|2013-01-12|85511       |
|2013-01-13|86306       |
|2013-01-14|70078       |
|2013-01-15|71134       |
|2013-01-16|72471       |
|2013-01-17|68873       |
|2013-01-18|71371       |
|2013-01-19|88558       |
|2013-01-20|85281       |
+----------+------------+
only showing top 20 rows



In [119]:
merged_df = agg_oilprice_merged_df.join(agg_transactions_df, on='date', how='left')
merged_df = merged_df.na.drop()
merged_df.sort(col("date"), col("family")).show(truncate = False)

+----------+-------------------+---------+-----------+-----------+-----+-----------------+------------+
|date      |family             |sales    |onpromotion|day_of_week|month|dcoilwtico       |transactions|
+----------+-------------------+---------+-----------+-----------+-----+-----------------+------------+
|2013-01-01|AUTOMOTIVE         |0.0      |0          |3          |1    |51.27001419525241|770         |
|2013-01-01|BABY CARE          |0.0      |0          |3          |1    |51.27001419525241|770         |
|2013-01-01|BEAUTY             |2.0      |0          |3          |1    |51.27001419525241|770         |
|2013-01-01|BEVERAGES          |810.0    |0          |3          |1    |51.27001419525241|770         |
|2013-01-01|BOOKS              |0.0      |0          |3          |1    |51.27001419525241|770         |
|2013-01-01|BREAD/BAKERY       |180.589  |0          |3          |1    |51.27001419525241|770         |
|2013-01-01|CELEBRATION        |0.0      |0          |3         

### Holidays_df Merge
In this section the holidays_df gets merged to use the transaction data for the upcoming model prediction

In [120]:
holidays_df.show()

+----------+-------+--------+-------------+--------------------+-----------+
|      date|   type|  locale|  locale_name|         description|transferred|
+----------+-------+--------+-------------+--------------------+-----------+
|2012-03-02|Holiday|   Local|        Manta|  Fundacion de Manta|      false|
|2012-04-01|Holiday|Regional|     Cotopaxi|Provincializacion...|      false|
|2012-04-12|Holiday|   Local|       Cuenca| Fundacion de Cuenca|      false|
|2012-04-14|Holiday|   Local|     Libertad|Cantonizacion de ...|      false|
|2012-04-21|Holiday|   Local|     Riobamba|Cantonizacion de ...|      false|
|2012-05-12|Holiday|   Local|         Puyo|Cantonizacion del...|      false|
|2012-06-23|Holiday|   Local|     Guaranda|Cantonizacion de ...|      false|
|2012-06-25|Holiday|Regional|     Imbabura|Provincializacion...|      false|
|2012-06-25|Holiday|   Local|    Latacunga|Cantonizacion de ...|      false|
|2012-06-25|Holiday|   Local|      Machala|Fundacion de Machala|      false|

In [121]:
# We only want to use the Holiday/Additional,etc. data which counts for the whole country Ecuador
modified_holidays_df = holidays_df.filter((col("transferred") == False) & (col("locale") == "National"))
modified_holidays_df = modified_holidays_df.withColumn("type", when(col("type") == "Transfer", "Holiday").otherwise(col("type")))

modified_holidays_df = modified_holidays_df.select("date", "type")
modified_holidays_df.sort(col("date")).show(truncate = False)

+----------+----------+
|date      |type      |
+----------+----------+
|2012-08-10|Holiday   |
|2012-10-12|Holiday   |
|2012-11-02|Holiday   |
|2012-11-03|Holiday   |
|2012-12-21|Additional|
|2012-12-22|Additional|
|2012-12-23|Additional|
|2012-12-24|Bridge    |
|2012-12-24|Additional|
|2012-12-25|Holiday   |
|2012-12-26|Additional|
|2012-12-31|Bridge    |
|2012-12-31|Additional|
|2013-01-01|Holiday   |
|2013-01-05|Work Day  |
|2013-01-12|Work Day  |
|2013-02-11|Holiday   |
|2013-02-12|Holiday   |
|2013-04-29|Holiday   |
|2013-05-01|Holiday   |
+----------+----------+
only showing top 20 rows



In [123]:
# Doing the merge of modified_holidays_df and merged_df
merged_df = merged_df.join(modified_holidays_df, on="date", how="left")
# The dates on which no holiday or something else encounted in the holiday_df takes place we insert "Normal" as a value
merged_df = merged_df.withColumn("type", when(merged_df["type"].isNull(), "Normal").otherwise(merged_df["type"]))
merged_df.sort(col("date"), col('family')).show(truncate = False)

+----------+-------------------+---------+-----------+-----------+-----+-----------------+------------+-------+
|date      |family             |sales    |onpromotion|day_of_week|month|dcoilwtico       |transactions|type   |
+----------+-------------------+---------+-----------+-----------+-----+-----------------+------------+-------+
|2013-01-01|AUTOMOTIVE         |0.0      |0          |3          |1    |51.27001419525241|770         |Holiday|
|2013-01-01|BABY CARE          |0.0      |0          |3          |1    |51.27001419525241|770         |Holiday|
|2013-01-01|BEAUTY             |2.0      |0          |3          |1    |51.27001419525241|770         |Holiday|
|2013-01-01|BEVERAGES          |810.0    |0          |3          |1    |51.27001419525241|770         |Holiday|
|2013-01-01|BOOKS              |0.0      |0          |3          |1    |51.27001419525241|770         |Holiday|
|2013-01-01|BREAD/BAKERY       |180.589  |0          |3          |1    |51.27001419525241|770         |H

# Feature based prediction (fb)
In the first trained model for predicting the sales volumes we use a normal feature based prediction without taking into account previous values.

## Data transformation/encoding

In [124]:
transformed_df_fb = merged_df

transformed_df_fb = transformed_df_fb.withColumnRenamed("day_of_week", "day_of_week_index")
transformed_df_fb = transformed_df_fb.withColumnRenamed("month", "month_index")

str_cat_cols_fb = ["type", "family"]
cat_cols_fb = ["day_of_week", "month", "type", "family"]

indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in str_cat_cols_fb]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded") for col in cat_cols_fb]

pipeline = Pipeline(stages=indexers + encoders)
model = pipeline.fit(transformed_df_fb)
data = model.transform(transformed_df_fb)
data.sort(col("date"), col('family')).show(truncate = False)

+----------+-------------------+---------+-----------+-----------------+-----------+-----------------+------------+-------+----------+------------+-------------------+--------------+-------------+---------------+
|date      |family             |sales    |onpromotion|day_of_week_index|month_index|dcoilwtico       |transactions|type   |type_index|family_index|day_of_week_encoded|month_encoded |type_encoded |family_encoded |
+----------+-------------------+---------+-----------+-----------------+-----------+-----------------+------------+-------+----------+------------+-------------------+--------------+-------------+---------------+
|2013-01-01|AUTOMOTIVE         |0.0      |0          |3                |1          |51.27001419525241|770         |Holiday|2.0       |0.0         |(7,[3],[1.0])      |(12,[1],[1.0])|(5,[2],[1.0])|(32,[0],[1.0]) |
|2013-01-01|BABY CARE          |0.0      |0          |3                |1          |51.27001419525241|770         |Holiday|2.0       |1.0         |(

## Model training

In [125]:
feature_cols = ["day_of_week_encoded", "month_encoded", "type_encoded", "family_encoded", "transactions", "dcoilwtico", "onpromotion"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

gbt = GBTRegressor(featuresCol="features", labelCol="sales", maxBins=33)

pipeline = Pipeline(stages=[assembler, gbt])

paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [4, 6])
             .addGrid(gbt.maxIter, [50, 100])
             .addGrid(gbt.stepSize, [0.1, 0.01])
             .build())

evaluator = RegressionEvaluator(labelCol="sales", predictionCol="prediction", metricName="mae")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2)

train_data, test_data = data.randomSplit([0.8, 0.2], seed=12)
cvModel = crossval.fit(train_data)

## Evaluation

### Overall evaluation

In [130]:
evaluation_fb_df = cvModel.transform(test_data)
clipped_evaluation_fb_df = evaluation_fb_df.withColumn("clipped_predictions", when(col("prediction") < 0, 0).otherwise(col("prediction")))

mae = evaluator.evaluate(clipped_evaluation_fb_df)

best_model = cvModel.bestModel

print("Mean Absolute Error (MAE):", mae)

Mean Absolute Error (MAE): 2019.3064947554606


In [133]:
# Show the true sales values compared to the predicted ones
clipped_evaluation_fb_df.select("sales", "clipped_predictions").show()

+------------------+-------------------+
|             sales|clipped_predictions|
+------------------+-------------------+
|               0.0|                0.0|
|             186.0|  16437.16335618371|
|               0.0|                0.0|
|           110.801| 3629.6335336039856|
|               0.0|                0.0|
|               0.0|                0.0|
|             255.0|  697.5648840967916|
|               0.0|  722.4296956057548|
|15754.500000000002| 12823.271693997933|
|            1476.0| 1332.8697452310762|
|               0.0| 11493.949004926975|
|           17204.0| 15528.730866883498|
| 5338.111975999999|  6397.462639051975|
|       1526.750002|  722.4296956057548|
|       18456.48002| 17919.562476878822|
|       4760.805009|  5197.111639483184|
|              37.0|  454.4587384580996|
|               0.0|  350.3133655566963|
|              15.0|  427.4880864793748|
|               0.0|  9017.018767647945|
+------------------+-------------------+
only showing top

### Product family wise evaluation

In [135]:
filtered_evaluation_fb_df = clipped_evaluation_fb_df.select("family", "sales", "clipped_predictions")
evaluator = RegressionEvaluator(labelCol="sales", predictionCol="clipped_predictions", metricName="mae")
unique_family_values = filtered_evaluation_fb_df.select("family").distinct().rdd.flatMap(lambda x: x).collect()

data = []
for value in unique_family_values:
  df_fb = filtered_evaluation_fb_df.filter(filtered_evaluation_fb_df['family'] == value)
  mae_fb = evaluator.evaluate(df_fb)
  mean_fb = df_fb.select(mean("sales")).collect()[0][0]
  row = Row(family=value, mean=mean_fb, mean_absolute_error=mae_fb)
  data.append(row)

family_evaluation_fb_df = spark.createDataFrame(data)

In [136]:
family_evaluation_fb_df.show()

+-------------------+------------------+-------------------+
|             family|              mean|mean_absolute_error|
+-------------------+------------------+-------------------+
|     PREPARED FOODS|5230.4515081210375|  420.6393269845602|
|         LADIESWEAR|382.79639175257734|  348.5981787217733|
|HOME AND KITCHEN II|  911.861581920904|  513.6927463111062|
|    LAWN AND GARDEN|330.26358695652175| 302.89053819749904|
|          GROCERY I| 202750.6754290032| 15814.814652801457|
|          BABY CARE| 6.491891891891892| 189.18802818432312|
|            PRODUCE| 70645.91190945453|  9371.646271386293|
|         AUTOMOTIVE|331.54597701149424|  274.3895864846023|
|          HOME CARE|10088.822784810127| 1276.5148066686402|
|          BEVERAGES|128460.99442896935| 13031.354819849461|
|       BREAD/BAKERY|25303.821485757315|  1453.215131417575|
|              BOOKS|3.5608974358974357| 199.91030165804156|
|           LINGERIE| 387.0639534883721| 274.58533873111946|
|        CELEBRATION| 45

# Feature based prediction with Time lags (tl)

## Preprocessing

In [137]:
tl_df = merged_df

In [138]:
window_spec = Window.partitionBy("family").orderBy("date")

tl_df = tl_df.withColumn("lag_1", F.lag("sales", 1).over(window_spec))
tl_df = tl_df.withColumn("lag_2", F.lag("sales", 2).over(window_spec))
tl_df = tl_df.withColumn("lag_3", F.lag("sales", 3).over(window_spec))

In [139]:
specific_date = "2013-01-04" # The Time lag dataframe should start from the "2013-01-04" because else there is no lag data for the first row
specific_date = spark.createDataFrame([(specific_date,)], ["specific_date"]).withColumn("specific_date", col("specific_date").cast(DateType()))
tl_df_filtered = tl_df.filter(col("date") >= specific_date.select("specific_date").collect()[0][0])
tl_df_filtered.show()

+----------+----------+-----+-----------+-----------+-----+-----------------+------------+--------+-----+-----+-----+
|      date|    family|sales|onpromotion|day_of_week|month|       dcoilwtico|transactions|    type|lag_1|lag_2|lag_3|
+----------+----------+-----+-----------+-----------+-----+-----------------+------------+--------+-----+-----+-----+
|2013-01-04|AUTOMOTIVE|169.0|          0|          6|    1|93.12000274658203|       78494|  Normal|161.0|255.0|  0.0|
|2013-01-05|AUTOMOTIVE|342.0|          0|          7|    1|93.24322576849536|       93573|Work Day|169.0|161.0|255.0|
|2013-01-06|AUTOMOTIVE|360.0|          0|          1|    1|94.23897578407957|       90464|  Normal|342.0|169.0|161.0|
|2013-01-07|AUTOMOTIVE|189.0|          0|          2|    1|93.19999694824219|       75597|  Normal|360.0|342.0|169.0|
|2013-01-08|AUTOMOTIVE|229.0|          0|          3|    1|93.20999908447266|       72325|  Normal|189.0|360.0|342.0|
|2013-01-09|AUTOMOTIVE|164.0|          0|          4|   

## Data transformation/encoding

In [142]:
transformed_df_tl = tl_df_filtered

transformed_df_tl = transformed_df_tl.withColumnRenamed("day_of_week", "day_of_week_index")
transformed_df_tl = transformed_df_tl.withColumnRenamed("month", "month_index")

str_cat_cols_tl = ["type", "family"]
cat_cols_tl = ["day_of_week", "month", "type", "family"]

indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in str_cat_cols_tl]
encoders = [OneHotEncoder(inputCol=col + "_index", outputCol=col + "_encoded") for col in cat_cols_tl]

pipeline = Pipeline(stages=indexers + encoders)
model = pipeline.fit(transformed_df_tl)
data = model.transform(transformed_df_tl)
data.sort(col("date"), col('family')).show(truncate = False)

+----------+-------------------+------------------+-----------+-----------------+-----------+-----------------+------------+------+------------------+------------------+---------+----------+------------+-------------------+--------------+-------------+---------------+
|date      |family             |sales             |onpromotion|day_of_week_index|month_index|dcoilwtico       |transactions|type  |lag_1             |lag_2             |lag_3    |type_index|family_index|day_of_week_encoded|month_encoded |type_encoded |family_encoded |
+----------+-------------------+------------------+-----------+-----------------+-----------+-----------------+------------+------+------------------+------------------+---------+----------+------------+-------------------+--------------+-------------+---------------+
|2013-01-04|AUTOMOTIVE         |169.0             |0          |6                |1          |93.12000274658203|78494       |Normal|161.0             |255.0             |0.0      |0.0       |0.0

## Model training

In [None]:
feature_cols = ["day_of_week_encoded", "month_encoded", "type_encoded", "family_encoded", "transactions", "dcoilwtico", "onpromotion", "lag_1", "lag_2", "lag_3"]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

gbt = GBTRegressor(featuresCol="features", labelCol="sales", maxBins=33)

pipeline = Pipeline(stages=[assembler, gbt])

paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [4, 6])
             .addGrid(gbt.maxIter, [50, 100])
             .addGrid(gbt.stepSize, [0.1, 0.01])
             .build())

evaluator = RegressionEvaluator(labelCol="sales", predictionCol="prediction", metricName="mae")

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=2)

train_data, test_data = data.randomSplit([0.8, 0.2], seed=12)
cvModel = crossval.fit(train_data)

## Evaluation

### Overall evaluation

In [80]:
evaluation_tl_df = cvModel.transform(test_data)
clipped_evaluation_tl_df = evaluation_tl_df.withColumn("clipped_predictions", when(col("prediction") < 0, 0).otherwise(col("prediction")))

mae_tl = evaluator.evaluate(evaluation_tl_df)

best_model_tl = cvModel.bestModel

print("Mean Absolute Error (MAE):", mae_tl)

Mean Absolute Error (MAE): 1464.5554578706108


In [81]:
# Show the true sales values compared to the predicted ones
clipped_evaluation_tl_df.select("sales", "clipped_predictions").show()

+------------------+------------------+
|             sales|        prediction|
+------------------+------------------+
|               0.0| 50.63921736772478|
|           52064.0| 58658.64860187022|
|               0.0| 50.63921736772478|
|21625.963055000004| 18911.26506552606|
|               0.0| 50.63921736772478|
|               0.0| 50.63921736772478|
|             342.0|374.68346571214556|
|               0.0|56.400603564901104|
|       13734.94501|13706.028798341518|
|            1273.0|1291.9570861916554|
|               0.0|467.51022328055865|
|           16819.0| 16979.40859782955|
| 5830.073020000001|  5345.12210725686|
|1245.6370040000002|1433.4825076041284|
|         26861.665| 27567.48539930741|
|        6114.88201| 6471.431879522169|
|              87.0| 63.22694394547287|
|               0.0| 63.22694394547287|
|              17.0| 63.22694394547287|
|               0.0| 366.4738080168359|
+------------------+------------------+
only showing top 20 rows



### Product family wise evaluation

In [82]:
filtered_evaluation_fb_df = clipped_evaluation_tl_df.select("family", "sales", "clipped_predictions")
evaluator = RegressionEvaluator(labelCol="sales", predictionCol="clipped_predictions", metricName="mae")
unique_family_values = filtered_evaluation_fb_df.select("family").distinct().rdd.flatMap(lambda x: x).collect()

data = []
for value in unique_family_values:
  df_fb = filtered_evaluation_fb_df.filter(filtered_evaluation_fb_df['family'] == value)
  mae_fb = evaluator.evaluate(df_fb)
  mean_fb = df_fb.select(mean("sales")).collect()[0][0]
  row = Row(family=value, mean=mean_fb, mean_absolute_error=mae_fb)
  data.append(row)

family_evaluation_fb_df = spark.createDataFrame(data)

In [83]:
family_evaluation_fb_df.show()

+--------------------+------------------+-------------------+
|              family|              mean|mean_absolute_error|
+--------------------+------------------+-------------------+
|           BABY CARE| 6.612637362637362|  69.60637448054534|
|            CLEANING| 57567.14488636364|   3908.71758256646|
|          LADIESWEAR|405.37948717948717| 158.23994314484813|
|               MEATS| 18642.70083979641|  1435.506810630091|
|        PET SUPPLIES|223.17563739376772|  91.08303611610825|
|PLAYERS AND ELECT...| 343.0202312138728| 127.31106256037218|
|          AUTOMOTIVE| 330.0606936416185|   94.5886351657935|
|         CELEBRATION| 476.5057803468208|  214.8923749664194|
|                DELI|14437.514166687324| 1013.2221640842084|
|          GROCERY II|1167.7283582089551| 199.42468756904222|
|           HOME CARE| 9838.485714285714|  1056.115926951535|
|       PERSONAL CARE|14366.356741573034| 1302.7989269761647|
|      PREPARED FOODS|  5227.00813007826|  461.8630003295187|
|       