In [0]:
print('Welcome to Big Data Analytics - Technical Project - Walmart Sales Analytics Prediction')

############################################# Kowsik Bhattacharjee ##################################
############################################# Big Data Analytics Technical Project ##################
################# Walmart Sales Data Analysis and Model Preparation using Big Data Spark ############

#Big Data Analytics - Technical Report
#Author - Kowsik Bhattacharjee : MSc Big Data analytics
#Title - Walmart Store Sales Forecasting
#Lecturer - Shagufta Henna 

#This lab aims to utilize spark for Big Data Analytics Technical Project.

#The lab primarily focuses to i) data analysis ii) preparing datasets to analyze, and feed to machine learning #models.

#This lab requires to analyze three datasets under the given sections.

#Used DataSets are:

#1) features.csv
#2) stores.csv
#3) train.csv
#####################################################################################################
#####################################################################################################

In [0]:
# Import standard Spark Libraries
import pyspark
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark import SparkFiles
from pyspark.sql.types import StringType, IntegerType, DoubleType, StructField, StructType, ArrayType, MapType

In [0]:
# Schema define in spark for Stores dataset
# Store,Type,Size

schema = StructType([
        StructField("Store", IntegerType(), True),
        StructField("Type", StringType(), True),
        StructField("Size", IntegerType(), True),
         ])

In [0]:
############# Reading the file stores.csv with spark.read function ##############

stores_sdf = spark.read.format("csv").option("header", "true").schema(schema).load("/FileStore/shared_uploads/reach2kowsik@gmail.com/stores.csv")
  
# Print Scehamof Stores data set
stores_sdf.printSchema()

# Showing top 5 records
stores_sdf.show(5)

In [0]:
stores_sdf.columns

In [0]:
# Schema define in spark for features dataset
# Store,	Date,	Temperature,	Fuel_Price,	MarkDown1,	MarkDown2,	
# MarkDown3,	MarkDown4,	MarkDown5,	CPI,	Unemployment,	IsHoliday

schema = StructType([
        StructField("Store", IntegerType(), True),
        StructField("Date", StringType(), True),
        StructField("Temperature", DoubleType(), True),
        StructField("Fuel_Price", FloatType(), True),
        StructField("MarkDown1", StringType(), True),
        StructField("MarkDown2", StringType(), True),
        StructField("MarkDown3", StringType(), True),
        StructField("MarkDown4", StringType(), True),
        StructField("MarkDown5", StringType(), True),
        StructField("CPI", FloatType(), True),
        StructField("Unemployment", FloatType(), True),
        StructField("IsHoliday", StringType(), True),
         ]) 

In [0]:
############# Reading the file features.csv with spark.read function ##############
#features_sdf = spark.read.csv('/FileStore/shared_uploads/reach2kowsik@gmail.com/features.csv', inferSchema=True, header=True)

features_sdf = spark.read.format("csv").option("header", "true").schema(schema).load("/FileStore/shared_uploads/reach2kowsik@gmail.com/features.csv")

# Print Sceham of Features data set
features_sdf.printSchema()

# Showing top 5 records
features_sdf.show(5)

#df1 = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/reach2kowsik@gmail.com/stores.csv")
#df2 = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/reach2kowsik@gmail.com/features.csv")
#df3 = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/reach2kowsik@gmail.com/train.csv")

In [0]:
features_sdf.columns

In [0]:
# Schema define in spark for train dataset
# Store,Dept,Date,Weekly_Sales,IsHoliday

schema = StructType([
        StructField("Store", IntegerType(), True),
        StructField("Dept", StringType(), True),
        StructField("Date", StringType(), True),
        StructField("Weekly_Sales", FloatType(), True),
        StructField("IsHoliday", StringType(), True),
         ])

In [0]:
############# Reading the file train.csv with spark.read function ##############
#train_sdf = spark.read.csv('/FileStore/shared_uploads/reach2kowsik@gmail.com/train.csv', inferSchema=True, header=True)

train_sdf = spark.read.format("csv").option("header", "true").schema(schema).load("/FileStore/shared_uploads/reach2kowsik@gmail.com/train.csv")

# Print Sceham of Train data set
train_sdf.printSchema()

# Showing top 5 records
train_sdf.show(5)

#df1 = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/reach2kowsik@gmail.com/stores.csv")
#df2 = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/reach2kowsik@gmail.com/features.csv")
#df3 = spark.read.format("csv").load("dbfs:/FileStore/shared_uploads/reach2kowsik@gmail.com/train.csv")

In [0]:
train_sdf.columns

In [0]:
##### Data Cleaning Activity on Spark Data Frame #####

#1. Drop duplicate rows
#2. Drop Null rows
#3. Instead Dropping rows that have values as 'NaN' or 'NA' will replace any empty cells with 0 instead.

#4. Drop columns those are not neccessary for the rest of the section.

#5. Rename Columns as necessary

#Save the result to final_walmart_df data frame

#########################

print('Data Cleaning Activity')

In [0]:
# Drop duplicates from all 3 data frames
train_sdf = train_sdf.dropDuplicates()
features_sdf = features_sdf.dropDuplicates()
stores_sdf = stores_sdf.dropDuplicates()


In [0]:
# Rename Column in train_sdf
train_sdf_r = train_sdf.withColumnRenamed('Date','Tr_Date').withColumnRenamed('IsHoliday','Tr_IsHoliday').withColumnRenamed('Store', 'Tr_Store')
print(train_sdf_r.columns)
train_sdf_r.show(5, truncate = False)


In [0]:
# Rename Column in features_sdf
features_sdf_r = features_sdf.withColumnRenamed('Date', 'Fr_Date').withColumnRenamed('IsHoliday', 'Fr_IsHoliday').withColumnRenamed('Store', 'Fr_Store')
print(features_sdf_r.columns)
features_sdf_r.show(5, truncate = False)


In [0]:
# Performing Merge operation between 3 data frames using SQL

features_sdf.createOrReplaceTempView("Feature")
stores_sdf.createOrReplaceTempView("Store")
train_sdf.createOrReplaceTempView("Train")

# Right outer Join between Feature and Train data frame
#f_t_sdf = spark.sql("""SELECT Tr_Store, Dept, Tr_Date, Weekly_Sales, Tr_IsHoliday, Fr_Store, Fr_Date, Temperature, Fuel_Price, MarkDown1, MarkDown2, MarkDown3, MarkDown4, MarkDown5, CPI, Unemployment, Fr_IsHoliday
#FROM Feature f LEFT JOIN Train t ON t.Tr_Store = f.Fr_Store""")

f_t_sdf = spark.sql("""SELECT f.Store, t.Dept, f.Date, Weekly_Sales, Temperature, Fuel_Price, MarkDown1, MarkDown2, MarkDown3, MarkDown4, MarkDown5, CPI, Unemployment, f.IsHoliday
FROM Train t RIGHT JOIN Feature f ON t.Store = f.Store and t.IsHoliday = f.IsHoliday""")

f_t_sdf.show(10)

In [0]:
display(f_t_sdf)

Store,Dept,Date,Weekly_Sales,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,IsHoliday
1,98,2012-09-21,10139.42,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,98,2012-09-21,11254.16,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,97,2012-09-21,30023.13,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,97,2012-09-21,37394.9,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,96,2012-09-21,24137.46,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,94,2012-09-21,69906.91,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,93,2012-09-21,67370.04,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,92,2012-09-21,147179.81,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,82,2012-09-21,17824.37,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,74,2012-09-21,12090.3,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False


In [0]:
# making previously merged data frame as temp view for final merge with Store data frame

f_t_sdf.createOrReplaceTempView("mer_fet_tra")

In [0]:
# Join between Merged data frame and Store data frame

f_t_s_sdf = spark.sql("""SELECT s.Store, Type, Size, Dept, Date, Weekly_Sales, Temperature, Fuel_Price, MarkDown1, MarkDown2, MarkDown3, MarkDown4, MarkDown5, CPI, Unemployment, IsHoliday
FROM mer_fet_tra m INNER JOIN Store s ON s.Store = m.Store""")

f_t_s_sdf.show(10)

In [0]:
display(f_t_s_sdf)

Store,Type,Size,Dept,Date,Weekly_Sales,Temperature,Fuel_Price,MarkDown1,MarkDown2,MarkDown3,MarkDown4,MarkDown5,CPI,Unemployment,IsHoliday
1,A,151315,98,2012-09-21,10139.42,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,A,151315,98,2012-09-21,11254.16,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,A,151315,97,2012-09-21,30023.13,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,A,151315,97,2012-09-21,37394.9,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,A,151315,96,2012-09-21,24137.46,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,A,151315,94,2012-09-21,69906.91,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,A,151315,93,2012-09-21,67370.04,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,A,151315,92,2012-09-21,147179.81,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,A,151315,82,2012-09-21,17824.37,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False
1,A,151315,74,2012-09-21,12090.3,69.87,3.721,6352.3,7.64,4.69,1010.06,6456.71,222.78185,6.908,False


In [0]:
drop_list_sdf = ['Date', 'MarkDown1', 'MarkDown2', 'MarkDown3', 'MarkDown4', 'MarkDown5', 'Temperature', 'Fuel_Price', 'CPI', 'Unemployment', 'Type', 'Size']

final_walmart_sdf = f_t_s_sdf.select([column for column in f_t_s_sdf.columns if column not in drop_list_sdf])

display(final_walmart_sdf)

Store,Dept,Weekly_Sales,IsHoliday
1,98,10139.42,False
1,98,11254.16,False
1,97,30023.13,False
1,97,37394.9,False
1,96,24137.46,False
1,94,69906.91,False
1,93,67370.04,False
1,92,147179.81,False
1,82,17824.37,False
1,74,12090.3,False


In [0]:
from pyspark.sql.functions import *

# replacing string to boolean for further Model processing 
# Converting Categorical Variable 'IsHoliday' into Numerical Variables.

final_walmart_sdf = final_walmart_sdf.withColumn('IsHoliday', translate('IsHoliday', 'TRUE', '1'))
final_walmart_sdf = final_walmart_sdf.withColumn('IsHoliday', translate('IsHoliday', 'FALSE', '0'))
print(final_walmart_sdf.columns)

In [0]:
final_walmart_sdf.dropna()
final_walmart_sdf.fillna(0)
display(final_walmart_sdf)

Store,Dept,Weekly_Sales,IsHoliday
1,98,10139.42,0
1,98,11254.16,0
1,97,30023.13,0
1,97,37394.9,0
1,96,24137.46,0
1,94,69906.91,0
1,93,67370.04,0
1,92,147179.81,0
1,82,17824.37,0
1,74,12090.3,0


In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

In [0]:
# Now, for the Spark ML, we need to create a feature column that has all features concatenated and a single column for labels.

#You can use VectorAssembler() to create a feature vector from all categorical and numerical features. Let us call #the call the final vector as “features”.

#Now, list all columns in the data and store it in a list named 'all_columns'
print('create a feature column and a single column for labels')

In [0]:
all_columns = final_walmart_sdf.columns # Task
all_columns

In [0]:
# Now create a list of columns which you don't wan't to include in your features, i.e., the labels 
drop_columns = ['Store','label'] 

drop_columns

In [0]:

columns_to_use = [i for i in all_columns if i not in drop_columns]
columns_to_use

In [0]:
# create a VectorAssembler object with columns you want to use for the ML models. Let us Name the output column as 'features'. These are the features that you will use later. Let us name the vector assembler object 'assembler'

print('create a VectorAssembler object')

In [0]:
assembler = VectorAssembler(inputCols=columns_to_use,outputCol='features')
print('stat_assembler', (str(assembler.params), columns_to_use))

In [0]:
# create a pipeline with a single stage - the assembler. Fit the pipeline to your data and create the transformed dataframe and name it 'modified_data_sdf'.

In [0]:
# converting string datatype to Integer for pipeline

final_walmart_sdf = final_walmart_sdf.withColumn("Dept", final_walmart_sdf['Dept'].cast('int'))
final_walmart_sdf = final_walmart_sdf.withColumn("IsHoliday", final_walmart_sdf['IsHoliday'].cast('int'))

In [0]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler])
model = pipeline.fit(final_walmart_sdf)
modified_data_sdf = model.transform(final_walmart_sdf)
modified_data_sdf

In [0]:
import numpy as np 
import pandas as pd
import json
import matplotlib
import matplotlib.pyplot as plt
from matplotlib import cm
from datetime import datetime
import glob
import seaborn as sns
import re
import os
import datetime as dt

In [0]:
#Print results

pipeline_stat = pd.DataFrame(modified_data_sdf.take(5), columns=modified_data_sdf.columns)
print('check_pipeline', (pipeline_stat.columns.values, pipeline_stat['features'][0].size))

In [0]:
#create our train and test sets. Let us, split into an 80-20 ratio between the train and test sets. Name these 'train_sdf' and 'test_sdf'

In [0]:
train_sdf,test_sdf = modified_data_sdf.randomSplit([0.8, 0.2])

In [0]:
# Print results 
print('check_split', (train_sdf.count(), test_sdf.count()))

In [0]:
# Renamed Column Store as Label

train_sdf = train_sdf.withColumnRenamed("Store", "label") 
train_sdf
test_sdf = train_sdf.withColumnRenamed("Store", "label") 
test_sdf

In [0]:
# Import the Library
from pyspark.ml.classification import LogisticRegression

# Logistic Regression Features Training the model #

lr = LogisticRegression(maxIter=3, regParam=0.2, elasticNetParam=0)

# Train model with Training Data
lrModel = lr.fit(train_sdf)

In [0]:
# Model Prediction 
predictions = lrModel.transform(test_sdf)
predictions


In [0]:
### Accuracy of Logistic Regression ###

# Import Libraries
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label",predictionCol="prediction")
evaluator.evaluate(predictions)

In [0]:
# train Linear Regression model to our data and predict. This prediction should be based on Spark ML's linear regression. 
#Create a model using this library, fit the training data. Afterwards, print the summary stats of the model, i.e, the RMSE error, R2 score 
#In this section, we will train the model without any regularization!
from pyspark.ml.regression import LinearRegression

# Add your code here

lr = LinearRegression()
lr_model = lr.fit(train_sdf)
lr_model.transform(test_sdf)

In [0]:
trainingSum = lr_model.summary

print("RMSE: %f" % trainingSum.rootMeanSquaredError)
print("r2: %f" % trainingSum.r2)

In [0]:
#Let us investigate that if the model actually overfits the training data.

#Predict the views for your test data (Note: it is called 'transform' in spark ml). Evaluate the performance using 'RegressionEvaluator' in the Spark ML Regression library. Name prediction column as 'prediction'.

predictions = lr_model.transform(test_sdf);
predictions

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

# Task: Compute RMSr on the test set

evaluator = RegressionEvaluator(
labelCol = "label", predictionCol="prediction", metricName="rmse")
test_rmse_orig = evaluator.evaluate(predictions)

In [0]:
#Print results here

predictions_to_print = predictions.toPandas()
lranswer = [test_rmse_orig, predictions_to_print['prediction'][0:50], predictions_to_print['label'][0:50]]
print('result_lr_test', lranswer)

In [0]:
#implement regularization to avoid overfitting. you can try different regularization parameters, e.g., try LASSO (L1), Ridge (L2) and elastic net (combination of L1 and L2).

#Try different regularization hyperparameters to initialize three different regularized linear regression models. Compare these regularization methods with each other and the non-regularized method above.

# Compute predictions using each of the models

l1_predictions = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=1)
l2_predictions = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0)
elastic_net_predictions = LinearRegression(maxIter=10, regParam=0.3,elasticNetParam=0.8)

l1_predictionsf = l1_predictions.fit(train_sdf)
l2_predictionsf = l2_predictions.fit(train_sdf)
elastic_net_predictionsf = elastic_net_predictions.fit(train_sdf)

l1_predictionst = l1_predictionsf.evaluate(test_sdf)
l1_predictionst = l2_predictionsf.evaluate(test_sdf)
elastic_net_predictionst = elastic_net_predictionsf.evaluate(test_sdf)

# Calculate the root mean squared error (RMSE) on test set for each of your models

test_rmse_l1 = l1_predictionst.rootMeanSquaredError
test_rmse_l2 = l1_predictionst.rootMeanSquaredError
test_rmse_elastic = elastic_net_predictionst.rootMeanSquaredError

In [0]:
# Print your results here

result = [test_rmse_l1, test_rmse_l2, test_rmse_elastic]
print('result_lr_all', result)

In [0]:
#### Accuracy calculation with Random Forest Model ####
from pyspark.ml.classification import RandomForestClassifier

## Create an initial RandomForest model.

rf = RandomForestClassifier(labelCol = "label", \
                            featuresCol = "features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)

# Train model with Training Data

rfModel = rf.fit(train_sdf)


In [0]:
### Model Prediction ###

predictions = rfModel.transform(test_sdf)

predictions.filter(predictions['prediction'] == 0) \
    .select("Dept","Weekly_Sales","IsHoliday","label","prediction","probability") \
    .orderBy("probability", ascending = False) \
    .show(n = 10, truncate = 30)

In [0]:
##### Accuracy Claculation with Random Forest #####

evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction")
evaluator.evaluate(predictions)