In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer
import Transformers
import Estimators
import Evaluators
import json
import utils
import sys

In [2]:
# Create a SparkSession
spark = SparkSession.builder.master("local[*]")\
                            .appName("M5-forecasting")\
                            .getOrCreate()


In [3]:
# Reading Files

sales = spark.read.option("header", "true").option("inferSchema", "true")\
    .csv("./M5-forecasting/sales_train_evaluation.csv")

# sales.printSchema()

calendar = spark.read.option("header", "true").option("inferSchema", "true")\
    .csv("./M5-forecasting/calendar.csv")

# calendar.printSchema()

config = json.load(open("config.json"))
# config


In [4]:
fltr = Transformers.FilterDF(filterCond="dept_id == '{}'".format(config["train_on_dept_id"]))
explode_days = Transformers.ExplodingDays(inputCols=['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'state_id'])

pipeline = Pipeline(stages=[fltr, explode_days])
model = pipeline.fit(sales)
filtered_df = model.transform(sales)

filtered_df = filtered_df.join(calendar, filtered_df.day == calendar.d)

# filtered_df.show()


In [5]:
# Feature Engineering
groupBy = Transformers.GroupByTransformer(groupByCols=["store_id", "year", "month"], aggExprs={"sales": "sum"})
log_sales = Transformers.LogTransformer(inputCols=["sales"])
lag_sales = Transformers.LagFeatures(inputCol="sales", lagVals=config["lag_values_to_create"], partCols=["store_id"], orderCols=["year", "month"])
fltr_null_lags = Transformers.FilterDF(filterCond="lag_sales_{} is not null".format(config["lag_values_to_create"][-1]))
store_indxr = StringIndexer(inputCol="store_id", outputCol="store_id_index")
vectorize = VectorAssembler(inputCols=["store_id_index", "month", "year"] + ["lag_sales_" + str(i) for i in range(1, 13)], outputCol="features")


pipeline = Pipeline(stages=[groupBy, log_sales, lag_sales, fltr_null_lags, store_indxr, vectorize])
model = pipeline.fit(filtered_df)
transformed_df = model.transform(filtered_df)

#transformed_df.show(truncate=7)

# Test Train Split
trainDF, testDF = utils.df_split(transformed_df, **config["train_test_split"])

In [6]:
space_pyspark = utils.json_to_space(config["hp_space_pyspark_RF"])

bestRFModel = Estimators.RandomForestEstimator(featuresCol="features", labelCol="sales", 
                                               hyperParamsSpace=space_pyspark, maxIter=40, 
                                               train_validation_split=config["train_validation_split"])\
                        .fit(trainDF)


Tunning hyper-parameters
100%|██████████| 40/40 [1:02:47<00:00, 94.19s/trial, best loss: 0.8990709922423087] 
bestParams:  {'maxBins': 64.0, 'maxDepth': 21.0, 'minInfoGain': 0.0, 'numTrees': 30.0}


In [7]:
space_xgboost = utils.json_to_space(config["hp_space_xgboost_RF"])

bestXGBModel = Estimators.XGBoostEstimator(featuresCol="features", labelCol="sales", 
                                           hyperParamsSpace=space_xgboost, maxIter=10,
                                           train_validation_split=config["train_validation_split"])\
                         .fit(trainDF)


Tunning hyper-parameters
100%|██████████| 10/10 [1:14:38<00:00, 447.83s/trial, best loss: 0.9583901758494806]
bestParams:  {'colsample_bytree': 0.7000000000000001, 'max_depth': 74.0, 'n_estimators': 1105.0, 'subsample': 0.9}


In [8]:
# Evaluating the best models

bestModels = [{"model_name": "PySpark_Random_Forest", "model": bestRFModel, "loss": 0}, 
              {"model_name": "XGBoost_Random_Forest", "model": bestXGBModel, "loss": 0}]

evaluator = Evaluators.MAPE(labelCol="sales", predictionCol="prediction")

bestModel = {"model_name": "", "model": None, "loss": sys.float_info.max, "predictions": None}

for model_info in bestModels:
    print("Evaluating: {}".format(model_info["model_name"]), end='\t')
    preds = model_info["model"].transform(testDF)
    model_info["loss"] = evaluator.evaluate(preds)
    print("Loss: {}".format(model_info["loss"]))

    #Saving the Best Model
    if(bestModel["loss"] > model_info["loss"]):
        bestModel = model_info.copy()
        bestModel["predictions"] = preds


Evaluating: PySpark_Random_Forest	Loss: 1.0905980554387458
Evaluating: XGBoost_Random_Forest	Loss: 1.0785129402881208


In [9]:
predictions = Transformers.AntiLogTransformer(inputCols=["sales", "prediction"]).transform(bestModel["predictions"])
predictions.select(["store_id", "year", "month", "sales", "prediction"])\
                 .toPandas()\
                 .to_csv("{}_{}_forecasts.csv".format(bestModel["model_name"], config["train_on_dept_id"]), index=False, header=True)