# Predicting Rossmann store sales using Spark framework

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark import SparkContext
import pandas as pd

sc=SparkContext('local','ex1')  # if using locally
sql_sc=SQLContext(sc)

In [2]:
spark=SparkSession.builder.appName('data_processing').getOrCreate()

df1=spark.read.csv('G:/Big_data_homework/store_sales/store.csv', inferSchema=True,header=True)
df_train=spark.read.csv('G:/Big_data_homework/store_sales/train.csv',inferSchema=True,header=True)
df_test=spark.read.csv('G:/Big_data_homework/store_sales/test.csv',inferSchema=True,header=True)

In [4]:
tr1=df_train.join(df1,df_train.Store==df1.Store,'left_outer')
ts1=df_test.join(df1,df_test.Store==df1.Store,'left_outer')

In [5]:
tr1.head(2)

[Row(Store=1, DayOfWeek=5, Date='2015-07-31', Sales=5263, Customers=555, Open=1, Promo=1, StateHoliday='0', SchoolHoliday=1, Store=1, StoreType='c', Assortment='a', CompetitionDistance=1270, CompetitionOpenSinceMonth=9, CompetitionOpenSinceYear=2008, Promo2=0, Promo2SinceWeek=None, Promo2SinceYear=None, PromoInterval=None),
 Row(Store=2, DayOfWeek=5, Date='2015-07-31', Sales=6064, Customers=625, Open=1, Promo=1, StateHoliday='0', SchoolHoliday=1, Store=2, StoreType='a', Assortment='a', CompetitionDistance=570, CompetitionOpenSinceMonth=11, CompetitionOpenSinceYear=2007, Promo2=1, Promo2SinceWeek=13, Promo2SinceYear=2010, PromoInterval='Jan,Apr,Jul,Oct')]

In [6]:
ts1.head(2)

[Row(Id=1, Store=1, DayOfWeek=4, Date='2015-09-17', Open=1, Promo=1, StateHoliday='0', SchoolHoliday=0, Store=1, StoreType='c', Assortment='a', CompetitionDistance=1270, CompetitionOpenSinceMonth=9, CompetitionOpenSinceYear=2008, Promo2=0, Promo2SinceWeek=None, Promo2SinceYear=None, PromoInterval=None),
 Row(Id=2, Store=3, DayOfWeek=4, Date='2015-09-17', Open=1, Promo=1, StateHoliday='0', SchoolHoliday=0, Store=3, StoreType='a', Assortment='a', CompetitionDistance=14130, CompetitionOpenSinceMonth=12, CompetitionOpenSinceYear=2006, Promo2=1, Promo2SinceWeek=14, Promo2SinceYear=2011, PromoInterval='Jan,Apr,Jul,Oct')]

In [7]:
tr1.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- Date: string (nullable = true)
 |-- Sales: integer (nullable = true)
 |-- Customers: integer (nullable = true)
 |-- Open: integer (nullable = true)
 |-- Promo: integer (nullable = true)
 |-- StateHoliday: string (nullable = true)
 |-- SchoolHoliday: integer (nullable = true)
 |-- Store: integer (nullable = true)
 |-- StoreType: string (nullable = true)
 |-- Assortment: string (nullable = true)
 |-- CompetitionDistance: integer (nullable = true)
 |-- CompetitionOpenSinceMonth: integer (nullable = true)
 |-- CompetitionOpenSinceYear: integer (nullable = true)
 |-- Promo2: integer (nullable = true)
 |-- Promo2SinceWeek: integer (nullable = true)
 |-- Promo2SinceYear: integer (nullable = true)
 |-- PromoInterval: string (nullable = true)



In [8]:
trn1=tr1.select(['DayOfWeek','Customers','Open','Promo','StateHoliday','SchoolHoliday','StoreType','Assortment',
                  'CompetitionDistance','CompetitionOpenSinceYear','Promo2','Promo2SinceYear','PromoInterval','Sales'])

In [9]:
trn1.printSchema()

root
 |-- DayOfWeek: integer (nullable = true)
 |-- Customers: integer (nullable = true)
 |-- Open: integer (nullable = true)
 |-- Promo: integer (nullable = true)
 |-- StateHoliday: string (nullable = true)
 |-- SchoolHoliday: integer (nullable = true)
 |-- StoreType: string (nullable = true)
 |-- Assortment: string (nullable = true)
 |-- CompetitionDistance: integer (nullable = true)
 |-- CompetitionOpenSinceYear: integer (nullable = true)
 |-- Promo2: integer (nullable = true)
 |-- Promo2SinceYear: integer (nullable = true)
 |-- PromoInterval: string (nullable = true)
 |-- Sales: integer (nullable = true)



In [10]:
train_data_final = trn1.na.drop()

In [12]:
from pyspark.ml.feature import VectorAssembler,OneHotEncoder,StringIndexer

idx1=StringIndexer(inputCol='StateHoliday',outputCol='i1').fit(train_data_final).transform(train_data_final)
idx2=StringIndexer(inputCol='StoreType',outputCol='i2').fit(idx1).transform(idx1)
idx3=StringIndexer(inputCol='Assortment',outputCol='i3').fit(idx2).transform(idx2)
idx4=StringIndexer(inputCol='PromoInterval',outputCol='i4').fit(idx3).transform(idx3)

In [23]:
assembler=VectorAssembler(inputCols=['DayOfWeek','Customers','Open','Promo','SchoolHoliday','CompetitionDistance','Promo2',
                                     'CompetitionOpenSinceYear','Promo2SinceYear','i1','i2','i3','i4'],outputCol='features')
trn_vect=assembler.transform(idx4)

In [24]:
model_df=trn_vect.select('features','Sales')

In [25]:
model_df.head(3)

[Row(features=DenseVector([5.0, 625.0, 1.0, 1.0, 1.0, 570.0, 1.0, 2007.0, 2010.0, 0.0, 0.0, 0.0, 0.0]), Sales=6064),
 Row(features=DenseVector([5.0, 821.0, 1.0, 1.0, 1.0, 14130.0, 1.0, 2006.0, 2011.0, 0.0, 0.0, 0.0, 0.0]), Sales=8314),
 Row(features=DenseVector([5.0, 1236.0, 1.0, 1.0, 1.0, 960.0, 1.0, 2011.0, 2012.0, 0.0, 0.0, 1.0, 0.0]), Sales=10457)]

In [42]:
# Test file supplied with this cse study doen't contain "customer" column so its nat same as train data; 
                                                                                        #    that's why not used for testing.

train_data,test_data = model_df.randomSplit([0.7,0.3])

In [43]:
from pyspark.ml.regression import LinearRegression

lin_Reg=LinearRegression(labelCol='Sales')
lr_model=lin_Reg.fit(train_data)

In [44]:
training_predictions=lr_model.evaluate(train_data)
print(training_predictions.r2)

0.8836492062131855


In [46]:
test_predictions=lr_model.evaluate(test_data)
print(test_predictions.r2)
print(test_predictions.meanSquaredError)

0.8830567597503829
1436316.6789211857
