In [None]:
"""
WALMART SALES PREDICTION USING PYSPARK 

Before we make predictions of sales, we will broadly decide the steps to make the prediction.

AIM: TO PREDICT THE WEEKLY SALES OF WALMART RETAIL STORES FOR EACH PRODUCT/SKU

STEPS:

1. MERGING DIFFERENT DATA TO MAKE DATA MODEL READY.
WE HAVE DATA AVAILABLE IN DIFFERENT CSV FILES as 
A] TRAINING DATA
B] TEST DATA
C] FEATURES
D] STORE

We will merge the data available in a single data frame so that we can perform pre-processing on the data

2. DATA PRE-PROCESSING
This is a very important step as we do multiple processes so that the data becomes easier to interpret and predict for the model that will be used.

A] Handling the categorical columns
Time Series and machine learning models perform better when the features are numerical.
We use different methodes to encode the categorical variable into numerical ones.

B] Handling the null values
This is a very important step as values of all feature variables for the future are not available. Hence we have to separately make a time-series problem 
which will predict the future values of these feature variables. We have made use of fbProphet Library to make these predictions

3. Feature Selection
Again this becomes very important so as to avoid the dimensionality curse. But since all the models will be created using python udf's function in spark 
and for each store-department group a separate model is trained, it becomes difficult to do feature selection with limited computational power

4. Model Training
In this case we have trained on two different models - fbProphet and XGBoost
As mentioned earlier spark is used to enable parallel processing which allows us to train hundreds of models parallelly which would otherwise be a tedious 
rather impossible task. 
Pandas Udf function allows us to pass grouped data and apply a function on the grouped data.
Using pandas udf in our case is beneficial as we can group our sales data by stores and the respective departments and make indivual models for 
individual units in each store
"""

In [None]:
import pandas as pd
import numpy as np
import matplotlib as mp
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [None]:
spark = SparkSession.builder.getOrCreate()

PREPARING THE DATA

Reading the Data 

In [None]:
train = spark.read.options(infer_schema = True, header = True).csv("/FileStore/tables/train.csv")
test = spark.read.options(infer_schema = True, header = True).csv("/FileStore/tables/test.csv")
features = spark.read.options(infer_schema = True, header = True).csv("/FileStore/tables/features.csv")
stores = spark.read.options(infer_schema = True, header = True).csv("/FileStore/tables/stores.csv")

In [None]:
train.tail(5)

Out[6]: [Row(Store='45', Dept='98', Date='2012-09-28', Weekly_Sales='508.37', IsHoliday='FALSE'),
 Row(Store='45', Dept='98', Date='2012-10-05', Weekly_Sales='628.1', IsHoliday='FALSE'),
 Row(Store='45', Dept='98', Date='2012-10-12', Weekly_Sales='1061.02', IsHoliday='FALSE'),
 Row(Store='45', Dept='98', Date='2012-10-19', Weekly_Sales='760.01', IsHoliday='FALSE'),
 Row(Store='45', Dept='98', Date='2012-10-26', Weekly_Sales='1076.8', IsHoliday='FALSE')]

In [None]:
test.show(5)

+-----+----+----------+---------+
|Store|Dept|      Date|IsHoliday|
+-----+----+----------+---------+
|    1|   1|2012-11-02|    FALSE|
|    1|   1|2012-11-09|    FALSE|
|    1|   1|2012-11-16|    FALSE|
|    1|   1|2012-11-23|     TRUE|
|    1|   1|2012-11-30|    FALSE|
+-----+----+----------+---------+
only showing top 5 rows



Train data is from 05-02-2010 to 25-10-2012                 
Test data is from 02-11-2012 to 26-07-2013

In [None]:
train.count()

Out[8]: 421570

In [None]:
test.count()

Out[9]: 115064

In [None]:
print(f" Total number of stores: {train.select('Store').distinct().count()} \n Total number of departments: {train.select('Dept').distinct().count()}")

 Total number of stores: 45 
 Total number of departments: 81


In [None]:
features.show(5)

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|Store|      Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|IsHoliday|
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|    1|2010-02-05|      42.31|     2.572|       NA|       NA|       NA|       NA|       NA|211.0963582|       8.106|    FALSE|
|    1|2010-02-12|      38.51|     2.548|       NA|       NA|       NA|       NA|       NA|211.2421698|       8.106|     TRUE|
|    1|2010-02-19|      39.93|     2.514|       NA|       NA|       NA|       NA|       NA|211.2891429|       8.106|    FALSE|
|    1|2010-02-26|      46.63|     2.561|       NA|       NA|       NA|       NA|       NA|211.3196429|       8.106|    FALSE|
|    1|2010-03-05|       46.5|     2.625|       NA|       NA|       NA|       NA|       NA|211.3501429|       8

In [None]:
stores.show(5)

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
+-----+----+------+
only showing top 5 rows



Combining training and testing data so that any all transformations are done at once. Later we separate it for model feature selection and model training

In [None]:
train = train.withColumn("is_test", F.lit(0))
test = test.withColumn("Weekly_Sales", F.lit(None)).withColumn("is_test", F.lit(1))
weekly_sales = train.union(test)

In [None]:
print(f"No of rows in training data {train.count()}\nNo of rows in testing data {test.count()}\nNo of rows after merging {weekly_sales.count()}")

No of rows in training data 421570
No of rows in testing data 115064
No of rows after merging 536634


In [None]:
weekly_sales.printSchema()

root
 |-- Store: string (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Weekly_Sales: string (nullable = true)
 |-- IsHoliday: string (nullable = true)
 |-- is_test: integer (nullable = false)



In [None]:
from pyspark.sql.types import IntegerType, DateType, FloatType, StringType

Joining Weekly_Sales dataframe with the features dataframe              
Note: We won't use store data as anyways each store and dept will be trained separately and thus this data won't have any effect as a feature while training the time series model

In [None]:
df_store = weekly_sales.join(features, ["Date", "Store", "IsHoliday"], how= "leftOuter")
df_store.show(5)

+----------+-----+---------+----+------------+-------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|      Date|Store|IsHoliday|Dept|Weekly_Sales|is_test|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|
+----------+-----+---------+----+------------+-------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+
|2010-02-05|    1|    FALSE|   1|     24924.5|      0|      42.31|     2.572|       NA|       NA|       NA|       NA|       NA|211.0963582|       8.106|
|2010-02-12|    1|     TRUE|   1|    46039.49|      0|      38.51|     2.548|       NA|       NA|       NA|       NA|       NA|211.2421698|       8.106|
|2010-02-19|    1|    FALSE|   1|    41595.55|      0|      39.93|     2.514|       NA|       NA|       NA|       NA|       NA|211.2891429|       8.106|
|2010-02-26|    1|    FALSE|   1|    19403.54|      0|      46.63|     2.561|     

Transforming the data into appropriate datatypes

In [None]:
df_store.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Store: string (nullable = true)
 |-- IsHoliday: string (nullable = true)
 |-- Dept: string (nullable = true)
 |-- Weekly_Sales: string (nullable = true)
 |-- is_test: integer (nullable = false)
 |-- Temperature: string (nullable = true)
 |-- Fuel_Price: string (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- CPI: string (nullable = true)
 |-- Unemployment: string (nullable = true)



In [None]:
df_store = df_store.withColumn("Temperature", F.col("Temperature").cast(FloatType())).withColumn("Fuel_Price", F.col("Fuel_Price").cast(FloatType())).withColumn("MarkDown1", F.col("MarkDown1").cast(FloatType())).withColumn("MarkDown2", F.col("MarkDown2").cast(FloatType())).withColumn("MarkDown3", F.col("MarkDown3").cast(FloatType())).withColumn("MarkDown4", F.col("MarkDown4").cast(FloatType())).withColumn("MarkDown5", F.col("MarkDown5").cast(FloatType())).withColumn("CPI", F.col("CPI").cast(FloatType())).withColumn("Unemployment", F.col("Unemployment").cast(FloatType())).withColumn("Date", F.col("Date").cast(DateType())).withColumn("Weekly_Sales", F.col("Weekly_Sales").cast(FloatType())).withColumn("Store", F.col("Store").cast(IntegerType()))

DATA DICTIONARY

Store - the store number                 
Dept - the department number                 
Date - the week                           
Weekly_Sales -  sales for the given department in the given store                       
IsHoliday - whether the week is a special holiday week                            
Temperature - average temperature in the region                      
Fuel_Price - cost of fuel in the region                          
MarkDown1-5 - anonymized data related to promotional markdowns that Walmart is running. MarkDown data is only available after Nov 2011, and is not available for all stores all the time. Any missing value is marked with an NA.                 
CPI - the consumer price index                                  
Unemployment - the unemployment rate

FEATURE ENGINEERING AND FEATURE SELECTION

Ordinal Label Encoding FOR IsHoliday column

In [None]:
df_store = df_store.withColumn("IsHoliday",F.when(F.col("IsHoliday") == "True", 1).otherwise(0))

In [None]:
print("The total number of weeks without Walmart markdown data: {}".format(df_store.filter("Date <= date'2011-11-04'").select("Date").distinct().count()))

The total number of weeks without Walmart markdown data: 92


In [None]:
print("The total number of weeks in training data: {}".format(df_store.filter(F.col("is_test") == 0).select("Date").distinct().count()))

The total number of weeks in training data: 143


INSIGHT: Of 143 weeks of training data, promotional markdown data is known only for 51 weeks of data. We will exclude these markdowns

In [None]:
df = df_store.drop(*("MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4", "MarkDown5"))

In [None]:
df.show()

+----------+-----+---------+----+------------+-------+-----------+----------+---------+------------+
|      Date|Store|IsHoliday|Dept|Weekly_Sales|is_test|Temperature|Fuel_Price|      CPI|Unemployment|
+----------+-----+---------+----+------------+-------+-----------+----------+---------+------------+
|2010-02-05|    1|        0|   1|     24924.5|      0|      42.31|     2.572|211.09636|       8.106|
|2010-02-12|    1|        0|   1|    46039.49|      0|      38.51|     2.548|211.24217|       8.106|
|2010-02-19|    1|        0|   1|    41595.55|      0|      39.93|     2.514|211.28914|       8.106|
|2010-02-26|    1|        0|   1|    19403.54|      0|      46.63|     2.561|211.31964|       8.106|
|2010-03-05|    1|        0|   1|     21827.9|      0|       46.5|     2.625|211.35014|       8.106|
|2010-03-12|    1|        0|   1|    21043.39|      0|      57.79|     2.667|211.38065|       8.106|
|2010-03-19|    1|        0|   1|    22136.64|      0|      54.58|      2.72|211.21564|    

Handling Null Values if any

In [None]:
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+----+-----+---------+----+------------+-------+-----------+----------+------+------------+
|Date|Store|IsHoliday|Dept|Weekly_Sales|is_test|Temperature|Fuel_Price|   CPI|Unemployment|
+----+-----+---------+----+------------+-------+-----------+----------+------+------------+
|   0|    0|        0|   0|      115064|      0|     115064|    115064|115064|      115064|
+----+-----+---------+----+------------+-------+-----------+----------+------+------------+



Note: We see that features such as Temperature, Fuel_Price, CPI, Unemployment are not given in the test data. We'll use time series to predict these values in the future

In [None]:
df_feature = df.select(["Date", "Store", "Temperature", "Fuel_Price", "CPI", "Unemployment",  "is_test"]).distinct().sort(F.col("Date"), F.col("Store"))

In [None]:
df_feature.count()

Out[27]: 8190

In [None]:
df_feature.filter(F.col("is_test") == 1).count()

Out[28]: 1755

In [None]:
from pyspark.sql.types import StructType, StructField
schema = StructType(
    [
        StructField("Store", IntegerType()),
        StructField("Date", DateType()),
        StructField("Temperature", FloatType()),
        StructField("Fuel_Price", FloatType()),
        StructField("CPI", FloatType()),
        StructField("Unemployment", FloatType())
        
    ]
    )

In [None]:
df.select(F.dayofweek("Date")).collect()[0]

Out[30]: Row(dayofweek(Date)=6)

In [None]:
import fbprophet
from fbprophet import Prophet

In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def forecast_feature(data):

    test = data[data["is_test"] == 1][["Date"]]
    test.columns = ["ds"]
    test["ds"] = pd.to_datetime(test["ds"])
    results_combined = pd.DataFrame(test)
    
    features = ["Temperature", "Fuel_Price", "CPI", "Unemployment"]
    
    
    
    for feature in features:
        train = data[data["is_test"] == 0][["Date",feature]]
        train.columns = ["ds", "y"]
        train["ds"] = pd.to_datetime(train["ds"])
        
        # instantiate the model
        model = Prophet()
        
        # fit the model
        model.fit(train)
        
        future_pd = model.make_future_dataframe(periods=test.shape[0],freq='W-FRI',include_history=False)
        
        # make predictions
        
        results_pd = model.predict(future_pd)
        results_pd.rename(columns = {"yhat": feature}, inplace = True)
        results_combined = pd.merge(results_combined, results_pd[["ds", feature]], on = "ds", how = "inner")
    
    # return predictions
    return pd.DataFrame({'Store': data._get_value(0, "Store"),'Date': results_combined["ds"], 'Temperature': results_combined["Temperature"], 'Fuel_Price': results_combined["Fuel_Price"],'CPI': results_combined["CPI"], 'Unemployment': results_combined["Unemployment"]})

In [None]:
results = (
    df_feature
    .groupBy('Store')
    .apply(forecast_feature)
    )

In [None]:
results = results.cache()

In [None]:
results.show()

+-----+----------+-----------+----------+---------+------------+
|Store|      Date|Temperature|Fuel_Price|      CPI|Unemployment|
+-----+----------+-----------+----------+---------+------------+
|   31|2012-11-02|  62.463303| 3.4760756|223.08025|   6.2060027|
|   31|2012-11-09|  60.323032| 3.4589472|223.04842|    6.201559|
|   31|2012-11-16|   57.43928| 3.4435263|223.01009|   6.1673317|
|   31|2012-11-23|  53.883717| 3.4265876|222.98979|   6.1267176|
|   31|2012-11-30|   50.46428| 3.4092705|223.01224|    6.109187|
|   31|2012-12-07|  48.049553|  3.401732|223.07225|    6.118647|
|   31|2012-12-14|   46.74868| 3.4127457|223.13518|    6.125961|
|   31|2012-12-21|   45.92714| 3.4378996|223.17876|   6.0930696|
|   31|2012-12-28|   45.03206|  3.463723|223.22963|    6.006984|
|   31|2013-01-04|  44.174603|  3.484071|223.34447|      5.8931|
|   31|2013-01-11|  43.746548| 3.5075586|223.54837|    5.796365|
|   31|2013-01-18|  43.662586| 3.5453598|223.79878|   5.7474074|
|   31|2013-01-25|    43.

JOB HALF DONE! We have successfully found the missing values of the features for the test data, which will now help us make the final model more comprehensive and complex as we can count on these features

Dividing the actual dataframe into test and train data, as we want to join the forecasted features into the actual data

In [None]:
test = df.filter(F.col("is_test") == 1)
train = df.filter(F.col("is_test") == 0)

In [None]:
test = test.alias("t").join(results.alias("r"), on = ["Store", "Date"], how = "leftOuter").select(F.col("t.Date"), F.col("t.Store"), F.col("t.IsHoliday"),  F.col("t.Dept"), F.col("t.Weekly_Sales"), F.col("t.is_test"), F.col("r.Temperature"), F.col("r.Fuel_Price"), F.col("r.CPI"), F.col("r.Unemployment"))

In [None]:
test.count()

Out[38]: 115064

In [None]:
df_final = train.union(test)

In [None]:
df_final = df_final.cache()

In [None]:
df_final.show(10)

+----------+-----+---------+----+------------+-------+-----------+----------+---------+------------+
|      Date|Store|IsHoliday|Dept|Weekly_Sales|is_test|Temperature|Fuel_Price|      CPI|Unemployment|
+----------+-----+---------+----+------------+-------+-----------+----------+---------+------------+
|2010-02-05|    1|        0|   1|     24924.5|      0|      42.31|     2.572|211.09636|       8.106|
|2010-02-12|    1|        0|   1|    46039.49|      0|      38.51|     2.548|211.24217|       8.106|
|2010-02-19|    1|        0|   1|    41595.55|      0|      39.93|     2.514|211.28914|       8.106|
|2010-02-26|    1|        0|   1|    19403.54|      0|      46.63|     2.561|211.31964|       8.106|
|2010-03-05|    1|        0|   1|     21827.9|      0|       46.5|     2.625|211.35014|       8.106|
|2010-03-12|    1|        0|   1|    21043.39|      0|      57.79|     2.667|211.38065|       8.106|
|2010-03-19|    1|        0|   1|    22136.64|      0|      54.58|      2.72|211.21564|    

In [None]:
df_final.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df_final.columns]).show()

+----+-----+---------+----+------------+-------+-----------+----------+---+------------+
|Date|Store|IsHoliday|Dept|Weekly_Sales|is_test|Temperature|Fuel_Price|CPI|Unemployment|
+----+-----+---------+----+------------+-------+-----------+----------+---+------------+
|   0|    0|        0|   0|      115064|      0|          0|         0|  0|           0|
+----+-----+---------+----+------------+-------+-----------+----------+---+------------+



Our dataframe is ready for model training! We will again use pandas udf to do the time series prediction of the features of test data.

MODEL TRAINING.             
We will leverage the power of pandas udf to train multiple models for each store and dept parallelly

1. MODEL TRAINING USING FB PROPHET

In [None]:
result_schema = StructType(
    [
        StructField("Store", IntegerType()),
        StructField("Date", DateType()),
        StructField("Dept", IntegerType()),
        StructField("Weekly_Sales", FloatType())
    ]
    )

In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [None]:
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def sales_forecast_fbProphet(df1):
    import datetime
    df1.rename(columns = {"Date": "ds", "Weekly_Sales": "y"}, inplace = True)
    df1["ds"] = pd.to_datetime(df1["ds"])
    df_test = df1[df1["is_test"] == 1]
    df_train = df1[df1["is_test"] == 0]
    results_combined = pd.DataFrame(df_test)
    results_combined[["Store", "Dept"]] = results_combined[["Store", "Dept"]].apply(pd.to_numeric)
    
    if df_test.shape[0] > 0:
        if df_train.shape[0] >= 2:

            # Multi-variate forecasting using fbProphet

            # instantiate the model
            model = Prophet()

            #Adding regressors
            model.add_regressor("IsHoliday")
            model.add_regressor("Temperature")
            model.add_regressor("Fuel_Price")
            model.add_regressor("CPI")
            model.add_regressor("Unemployment")

            # fit the model
            model.fit(df_train[["ds", "y", "IsHoliday", "Temperature", "Fuel_Price" , "CPI", "Unemployment"]])

            future_pd = model.make_future_dataframe(periods=df_test.shape[0],freq='W-FRI',include_history=False)
            future_pd = df_test[["ds", "IsHoliday", "Temperature", "Fuel_Price" , "CPI", "Unemployment"]]

            # make predictions
            results_pd = model.predict(future_pd)
            results_combined = pd.merge(results_combined, results_pd[["ds", "yhat"]], on = "ds", how = "left")
            
            return pd.DataFrame({'Store': results_combined._get_value(0, "Store"),'Date': results_combined["ds"], 'Dept': results_combined["Dept"],'Weekly_Sales': results_combined["yhat"]})
        else:
            return pd.DataFrame({'Store': results_combined["Store"],'Date': results_combined["ds"], 'Dept': results_combined["Dept"],'Weekly_Sales': results_combined["y"]})
    else:
        date = pd.Series([])
        date = pd.to_datetime(date)
        return pd.DataFrame({'Store': pd.Series([], dtype = "int64"),'Date': date,'Dept': pd.Series([], dtype = "int64"), 'Weekly_Sales': pd.Series([], dtype = "float")})

In [None]:
final_result = (
    df_final
    .groupBy("Store", "Dept")
    .apply(sales_forecast)
)

In [None]:
final_result = final_result.select(F.concat_ws('_',final_result.Store.astype(StringType()) , final_result.Dept.astype(StringType()) , final_result.Date.astype(StringType())).alias("Id"), "Weekly_Sales").sort("Store", "Dept", "Date")

Replacing null values and negative values with zero

In [None]:
final_result = final_result.withColumn("Weekly_Sales", F.when(((F.col("Weekly_Sales") < 0) | (F.col("Weekly_Sales").isNull())), 0).otherwise(F.col("Weekly_Sales")))

In [None]:
final_result.cache()

Out[61]: DataFrame[Id: string, Weekly_Sales: float]

In [None]:
final_result.show()

+--------------+------------+
|            Id|Weekly_Sales|
+--------------+------------+
|1_1_2012-11-02|   33310.984|
|1_1_2012-11-09|   27762.264|
|1_1_2012-11-16|   19833.027|
|1_1_2012-11-23|   16897.316|
|1_1_2012-11-30|   22920.828|
|1_1_2012-12-07|    34474.39|
|1_1_2012-12-14|    43389.71|
|1_1_2012-12-21|   43426.688|
|1_1_2012-12-28|    34762.98|
|1_1_2013-01-04|   22948.361|
|1_1_2013-01-11|   14426.078|
|1_1_2013-01-18|   12888.094|
|1_1_2013-01-25|   18347.828|
|1_1_2013-02-01|    27828.89|
|1_1_2013-02-08|   36459.863|
|1_1_2013-02-15|    39359.96|
|1_1_2013-02-22|   34593.305|
|1_1_2013-03-01|   25284.668|
|1_1_2013-03-08|    18142.46|
|1_1_2013-03-15|   18486.209|
+--------------+------------+
only showing top 20 rows



2. MODEL TRAINING USING XGBOOST

In [None]:
@pandas_udf(result_schema, PandasUDFType.GROUPED_MAP)
def sales_forecast_xgboost(df2):
    
    import pandas as pd
    import xgboost
    from xgboost import XGBRegressor
    from sklearn.preprocessing import StandardScaler
    from sklearn.model_selection import GridSearchCV
    import math
    import warnings
    warnings.filterwarnings("ignore")
    
    df2["Date"] = pd.to_datetime(df2["Date"])
    df2["Weekly_Sales"].fillna(df2["Weekly_Sales"].mean(), inplace = True)

    #Adding lag as a feature
    df2["lag_1"] = df2["Weekly_Sales"].shift(1)
    df2["lag_1"].fillna("bfill", inplace = True)

    #Adding week of month as a feature
    df2['week_of_month'] = pd.to_numeric(df2["Date"].dt.day/7)
    df2['week_of_month'] = df2['week_of_month'].apply(lambda x: math.ceil(x))
    
    df2.set_index("Date", inplace = True)

    #Preparing the training data
    df_train = df2[df2["is_test"] == 0]    
    X_train = df_train[["IsHoliday", "Temperature", "Fuel_Price" , "CPI", "Unemployment"]]
    df_train.reset_index(inplace = True)
    y_train = df_train["Weekly_Sales"]

    #Preparing the test data
    df_test = df2[df2["is_test"] == 1]
    X_test = df_test[["IsHoliday", "Temperature", "Fuel_Price" , "CPI", "Unemployment"]]
    

    results_combined = pd.DataFrame(df_test)
    results_combined[["Store", "Dept"]] = results_combined[["Store", "Dept"]].apply(pd.to_numeric)

    if df_test.shape[0] > 0:
        xgb = XGBRegressor()
        xgb.fit(X_train, y_train)
        prediction = xgb.predict(X_test)
        df_test.reset_index(inplace = True)

        return pd.DataFrame({'Store': df_test["Store"],'Date': df_test["Date"], 'Dept': df_test["Dept"],'Weekly_Sales': pd.Series(prediction)})
    else:
        date = pd.Series([])
        date = pd.to_datetime(date)
        return pd.DataFrame({'Store': pd.Series([], dtype = "int64"),'Date': date,'Dept': pd.Series([], dtype = "int64"), 'Weekly_Sales': pd.Series([], dtype = "float")})

In [None]:
final_result_xgboost = (
    df_final
    .groupBy("Store", "Dept")
    .apply(sales_forecast_xgboost)
)

In [None]:
final_result_xgboost = final_result_xgboost.select(F.concat_ws('_',final_result_xgboost.Store.astype(StringType()) , final_result_xgboost.Dept.astype(StringType()) , final_result_xgboost.Date.astype(StringType())).alias("Id"), "Weekly_Sales").sort("Store", "Dept", "Date")

Replacing null values and negative values with zero

In [None]:
final_result_xgboost = final_result_xgboost.withColumn("Weekly_Sales", F.when(((F.col("Weekly_Sales") < 0) | (F.col("Weekly_Sales").isNull())), 0).otherwise(F.col("Weekly_Sales")))

In [None]:
final_result_xgboost.cache()

Out[51]: DataFrame[Id: string, Weekly_Sales: float]

In [None]:
final_result_xgboost.show()

+--------------+------------+
|            Id|Weekly_Sales|
+--------------+------------+
|1_1_2012-11-02|    24892.76|
|1_1_2012-11-09|   25464.375|
|1_1_2012-11-16|   26853.299|
|1_1_2012-11-23|   26475.893|
|1_1_2012-11-30|   25543.693|
|1_1_2012-12-07|   36648.652|
|1_1_2012-12-14|    38755.16|
|1_1_2012-12-21|    42290.37|
|1_1_2012-12-28|   30497.705|
|1_1_2013-01-04|   32512.557|
|1_1_2013-01-11|   33047.047|
|1_1_2013-01-18|   27126.342|
|1_1_2013-01-25|   30656.611|
|1_1_2013-02-01|   29818.873|
|1_1_2013-02-08|   29998.264|
|1_1_2013-02-15|   26544.326|
|1_1_2013-02-22|   20816.533|
|1_1_2013-03-01|   25759.291|
|1_1_2013-03-08|   15893.155|
|1_1_2013-03-15|   16583.146|
+--------------+------------+
only showing top 20 rows



MODEL SELECTION. - OF THE 2 Methods we used to predict the future sales data, we find that the fbProphet performs well while prediction. Hence fbProphet is selected