In [217]:
#sql imports
from pyspark.sql import *
import pyspark.sql.types as typ
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as fn

#spark context
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)


In [218]:
#creating training dataframe
labels_train= [('Store',typ.IntegerType()),
              ('Dept',typ.IntegerType()),
              ('Date',typ.DateType()),
              ('Weekly_sales',typ.DoubleType()),
              ('IsHoliday',typ.BooleanType())
         ]


schema_train = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels_train
])

In [219]:
train_csv_values = spark.read.csv('./train/train.csv', header=True,schema=schema_train)

In [220]:
train_csv_values.printSchema()
#train_csv_values.show(5)

root
 |-- Store: integer (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Weekly_sales: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [221]:
#creating feature dataframe
labels_features= [('Store',typ.IntegerType()),
              ('Date',typ.DateType()),
              ('Temperature',typ.DoubleType()),
            ('Fuel_Price',typ.DoubleType()),
                  ('MarkDown1',typ.StringType()),
                  ('MarkDown2',typ.StringType()),
                  ('MarkDown3',typ.StringType()),
                  ('MarkDown4',typ.StringType()),
                  ('MarkDown5',typ.StringType()),
                  ('CPI',typ.DoubleType()),
                  ('Unemployment',typ.DoubleType()),
              ('IsHoliday',typ.BooleanType())
         ]
schema_features= typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels_features
])
featuers_csv_values = spark.read.csv('./features/features.csv', header=True,schema=schema_features)
featuers_csv_values.printSchema()
#featuers_csv_values.show(5)

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [222]:
#creating stores dataframe
labels_stores= [('Store',typ.IntegerType()),
              ('Type',typ.StringType()),
              ('Size',typ.IntegerType())
         ]
schema_stores= typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels_stores
])
stores_csv_values = spark.read.csv('stores.csv', header=True,schema=schema_stores)
stores_csv_values.printSchema()
stores_csv_values.show(5)

root
 |-- Store: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
+-----+----+------+
only showing top 5 rows



In [223]:
train_csv_values.createOrReplaceTempView("train_csv_values")
featuers_csv_values.createOrReplaceTempView("featuers_csv_values")
stores_csv_values.createOrReplaceTempView("stores_csv_values")
featuers_csv_values.columns

['Store',
 'Date',
 'Temperature',
 'Fuel_Price',
 'MarkDown1',
 'MarkDown2',
 'MarkDown3',
 'MarkDown4',
 'MarkDown5',
 'CPI',
 'Unemployment',
 'IsHoliday']

In [224]:
data=spark.sql("""select train_csv_values.Store,train_csv_values.Dept,stores_csv_values.Type,stores_csv_values.Size,
train_csv_values.Date,train_csv_values.Weekly_Sales,
featuers_csv_values.Temperature,featuers_csv_values.Fuel_Price,featuers_csv_values.CPI, featuers_csv_values.Unemployment, featuers_csv_values.MarkDown1, featuers_csv_values.MarkDown2, featuers_csv_values.MarkDown3, featuers_csv_values.MarkDown4, featuers_csv_values.MarkDown5,
train_csv_values.IsHoliday from train_csv_values
join featuers_csv_values on train_csv_values.Store=featuers_csv_values.Store and train_csv_values.Date=featuers_csv_values.Date
Join stores_csv_values on train_csv_values.Store=stores_csv_values.Store
where train_csv_values.Weekly_Sales>0
""")
#where train_csv_values.Store<=10 and train_csv_values.Dept<=15
data.count()

420212

In [225]:
data.toPandas()\
.to_csv('joined_data.csv', index = False)

In [226]:
#Data Preprocessing 
print('Count of rows: {0}'.format(data.count()))

Count of rows: 420212


In [227]:
df = data.dropDuplicates()

In [228]:
print('Count of rows: {0}'.format(df.count()))

Count of rows: 420212


In [229]:
print('Count of distinct records: {0}'\
      .format(df.select([c for c in df.columns]).distinct().count()))

Count of distinct records: 420212


In [230]:
df.agg(*[
(1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') \
for c in df.columns
]).show()

+-------------+------------+------------+------------+------------+--------------------+-------------------+------------------+-----------+--------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|Store_missing|Dept_missing|Type_missing|Size_missing|Date_missing|Weekly_Sales_missing|Temperature_missing|Fuel_Price_missing|CPI_missing|Unemployment_missing|MarkDown1_missing|MarkDown2_missing|MarkDown3_missing|MarkDown4_missing|MarkDown5_missing|IsHoliday_missing|
+-------------+------------+------------+------------+------------+--------------------+-------------------+------------------+-----------+--------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|          0.0|         0.0|         0.0|         0.0|         0.0|                 0.0|                0.0|               0.0|        0.0|                 0.0|              0.0|              0

In [231]:
df.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Weekly_Sales: double (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [232]:
import pandas as pd
import numpy as np

In [233]:
df = pd.read_csv('joined_data.csv')

In [234]:
df['MarkDown1'] = df['MarkDown1'].replace(np.nan, 0)

In [235]:
print(df[['MarkDown1']])

        MarkDown1
0            0.00
1            0.00
2            0.00
3            0.00
4            0.00
...           ...
420207    4556.61
420208    5046.74
420209    1956.28
420210    2004.02
420211    4018.91

[420212 rows x 1 columns]


In [236]:
df['MarkDown2'] = df['MarkDown2'].replace(np.nan, 0)

In [237]:
df['MarkDown3'] = df['MarkDown3'].replace(np.nan, 0)

In [238]:
df['MarkDown4'] = df['MarkDown4'].replace(np.nan, 0)

In [239]:
df['MarkDown5'] = df['MarkDown5'].replace(np.nan, 0)

In [240]:
print(df[['MarkDown1','MarkDown2','MarkDown3','MarkDown4','MarkDown5']])

        MarkDown1  MarkDown2  MarkDown3  MarkDown4  MarkDown5
0            0.00       0.00       0.00       0.00       0.00
1            0.00       0.00       0.00       0.00       0.00
2            0.00       0.00       0.00       0.00       0.00
3            0.00       0.00       0.00       0.00       0.00
4            0.00       0.00       0.00       0.00       0.00
...           ...        ...        ...        ...        ...
420207    4556.61      20.64       1.50    1601.01    3288.25
420208    5046.74       0.00      18.82    2253.43    2340.01
420209    1956.28       0.00       7.89     599.32    3990.54
420210    2004.02       0.00       3.18     437.73    1537.49
420211    4018.91      58.08     100.00     211.94     858.33

[420212 rows x 5 columns]


In [241]:
df=pd.get_dummies(df,columns=['Type'])

In [242]:
import datetime

In [243]:
df['Date']= pd.to_datetime(df['Date']) 

In [244]:
df['month'] = df['Date'].dt.month
df['Year'] = df['Date'].dt.year

In [245]:
df[['Date','month', 'Year']].head()

Unnamed: 0,Date,month,Year
0,2010-02-05,2,2010
1,2010-02-12,2,2010
2,2010-02-19,2,2010
3,2010-02-26,2,2010
4,2010-03-05,3,2010


In [246]:
df['quarter'] = df['Date'].dt.quarter
df[['Date','quarter']].head()

Unnamed: 0,Date,quarter
0,2010-02-05,1
1,2010-02-12,1
2,2010-02-19,1
3,2010-02-26,1
4,2010-03-05,1


In [247]:
df['dayofweek_name'] = df['Date'].dt.day_name()
df[['Date','dayofweek_name']].head()

Unnamed: 0,Date,dayofweek_name
0,2010-02-05,Friday
1,2010-02-12,Friday
2,2010-02-19,Friday
3,2010-02-26,Friday
4,2010-03-05,Friday


In [248]:
df['is_weekend'] = np.where(df['dayofweek_name'].isin(['Sunday','Saturday']),1,0)
df[['Date','is_weekend']].head()

Unnamed: 0,Date,is_weekend
0,2010-02-05,0
1,2010-02-12,0
2,2010-02-19,0
3,2010-02-26,0
4,2010-03-05,0


In [249]:

df["IsHoliday"] = df["IsHoliday"].astype(int)
del df['dayofweek_name']
# del df['Date']
print(df.head())

   Store  Dept    Size       Date  Weekly_Sales  Temperature  Fuel_Price  \
0      1     1  151315 2010-02-05      24924.50        42.31       2.572   
1      1     1  151315 2010-02-12      46039.49        38.51       2.548   
2      1     1  151315 2010-02-19      41595.55        39.93       2.514   
3      1     1  151315 2010-02-26      19403.54        46.63       2.561   
4      1     1  151315 2010-03-05      21827.90        46.50       2.625   

          CPI  Unemployment  MarkDown1  ...  MarkDown4  MarkDown5  IsHoliday  \
0  211.096358         8.106        0.0  ...        0.0        0.0          0   
1  211.242170         8.106        0.0  ...        0.0        0.0          1   
2  211.289143         8.106        0.0  ...        0.0        0.0          0   
3  211.319643         8.106        0.0  ...        0.0        0.0          0   
4  211.350143         8.106        0.0  ...        0.0        0.0          0   

   Type_A  Type_B  Type_C  month  Year  quarter  is_weekend  


In [250]:
df.to_csv('joined_data_refined.csv', index=False)