In [45]:
#sql imports
from pyspark.sql import *
import pyspark.sql.types as typ
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as fn

#spark context
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)


In [46]:
#creating training dataframe
labels_train= [('Store',typ.IntegerType()),
              ('Dept',typ.IntegerType()),
              ('Date',typ.DateType()),
              ('Weekly_sales',typ.DoubleType()),
              ('IsHoliday',typ.BooleanType())
         ]


schema_train = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels_train
])

In [47]:
train_csv_values = spark.read.csv('./train/train.csv', header=True,schema=schema_train)

In [48]:
train_csv_values.printSchema()
#train_csv_values.show(5)

root
 |-- Store: integer (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Weekly_sales: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [49]:
#creating feature dataframe
labels_features= [('Store',typ.IntegerType()),
              ('Date',typ.DateType()),
              ('Temperature',typ.DoubleType()),
            ('Fuel_Price',typ.DoubleType()),
                  ('MarkDown1',typ.StringType()),
                  ('MarkDown2',typ.StringType()),
                  ('MarkDown3',typ.StringType()),
                  ('MarkDown4',typ.StringType()),
                  ('MarkDown5',typ.StringType()),
                  ('CPI',typ.DoubleType()),
                  ('Unemployment',typ.DoubleType()),
              ('IsHoliday',typ.BooleanType())
         ]
schema_features= typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels_features
])
featuers_csv_values = spark.read.csv('./features/features.csv', header=True,schema=schema_features)
featuers_csv_values.printSchema()
#featuers_csv_values.show(5)

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [50]:
#creating stores dataframe
labels_stores= [('Store',typ.IntegerType()),
              ('Type',typ.StringType()),
              ('Size',typ.IntegerType())
         ]
schema_stores= typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels_stores
])
stores_csv_values = spark.read.csv('stores.csv', header=True,schema=schema_stores)
stores_csv_values.printSchema()
stores_csv_values.show(5)

root
 |-- Store: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
+-----+----+------+
only showing top 5 rows



In [51]:
train_csv_values.createOrReplaceTempView("train_csv_values")
featuers_csv_values.createOrReplaceTempView("featuers_csv_values")
stores_csv_values.createOrReplaceTempView("stores_csv_values")
featuers_csv_values.columns

['Store',
 'Date',
 'Temperature',
 'Fuel_Price',
 'MarkDown1',
 'MarkDown2',
 'MarkDown3',
 'MarkDown4',
 'MarkDown5',
 'CPI',
 'Unemployment',
 'IsHoliday']

In [52]:
data=spark.sql("""select train_csv_values.Store,train_csv_values.Dept,stores_csv_values.Type,stores_csv_values.Size,
train_csv_values.Date,train_csv_values.Weekly_Sales,
featuers_csv_values.Temperature,featuers_csv_values.Fuel_Price,featuers_csv_values.CPI, featuers_csv_values.Unemployment, featuers_csv_values.MarkDown1, featuers_csv_values.MarkDown2, featuers_csv_values.MarkDown3, featuers_csv_values.MarkDown4, featuers_csv_values.MarkDown5,
train_csv_values.IsHoliday from train_csv_values
join featuers_csv_values on train_csv_values.Store=featuers_csv_values.Store and train_csv_values.Date=featuers_csv_values.Date
Join stores_csv_values on train_csv_values.Store=stores_csv_values.Store
where train_csv_values.Weekly_Sales>0
""")
#where train_csv_values.Store<=10 and train_csv_values.Dept<=15
data.count()

420212

In [54]:
data.toPandas()\
.to_csv('joined_data.csv', index = False)

In [55]:
#Data Preprocessing 
print('Count of rows: {0}'.format(data.count()))

Count of rows: 420212


In [56]:
df = data.dropDuplicates()

In [57]:
print('Count of rows: {0}'.format(df.count()))

Count of rows: 420212


In [58]:
print('Count of distinct records: {0}'\
      .format(df.select([c for c in df.columns]).distinct().count()))

Count of distinct records: 420212


In [59]:
df.agg(*[
(1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') \
for c in df.columns
]).show()

+-------------+------------+------------+------------+------------+--------------------+-------------------+------------------+-----------+--------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|Store_missing|Dept_missing|Type_missing|Size_missing|Date_missing|Weekly_Sales_missing|Temperature_missing|Fuel_Price_missing|CPI_missing|Unemployment_missing|MarkDown1_missing|MarkDown2_missing|MarkDown3_missing|MarkDown4_missing|MarkDown5_missing|IsHoliday_missing|
+-------------+------------+------------+------------+------------+--------------------+-------------------+------------------+-----------+--------------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|          0.0|         0.0|         0.0|         0.0|         0.0|                 0.0|                0.0|               0.0|        0.0|                 0.0|              0.0|              0

In [60]:
df.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Weekly_Sales: double (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [61]:
import pandas as pd
import numpy as np

In [62]:
df = pd.read_csv('joined_data.csv')

In [63]:
df['MarkDown1'] = df['MarkDown1'].replace(np.nan, 0)

In [64]:
print(df[['MarkDown1']])

        MarkDown1
0            0.00
1            0.00
2            0.00
3            0.00
4            0.00
...           ...
420207    4556.61
420208    5046.74
420209    1956.28
420210    2004.02
420211    4018.91

[420212 rows x 1 columns]


In [65]:
df['MarkDown2'] = df['MarkDown2'].replace(np.nan, 0)

In [66]:
df['MarkDown3'] = df['MarkDown3'].replace(np.nan, 0)

In [67]:
df['MarkDown4'] = df['MarkDown4'].replace(np.nan, 0)

In [68]:
df['MarkDown5'] = df['MarkDown5'].replace(np.nan, 0)

In [69]:
print(df[['MarkDown1','MarkDown2','MarkDown3','MarkDown4','MarkDown5']])

        MarkDown1  MarkDown2  MarkDown3  MarkDown4  MarkDown5
0            0.00       0.00       0.00       0.00       0.00
1            0.00       0.00       0.00       0.00       0.00
2            0.00       0.00       0.00       0.00       0.00
3            0.00       0.00       0.00       0.00       0.00
4            0.00       0.00       0.00       0.00       0.00
...           ...        ...        ...        ...        ...
420207    4556.61      20.64       1.50    1601.01    3288.25
420208    5046.74       0.00      18.82    2253.43    2340.01
420209    1956.28       0.00       7.89     599.32    3990.54
420210    2004.02       0.00       3.18     437.73    1537.49
420211    4018.91      58.08     100.00     211.94     858.33

[420212 rows x 5 columns]


In [70]:
# df=pd.get_dummies(df,drop_first=True)

In [71]:
print(df.head())

   Store  Dept Type    Size        Date  Weekly_Sales  Temperature  \
0      1     1    A  151315  2010-02-05      24924.50        42.31   
1      1     1    A  151315  2010-02-12      46039.49        38.51   
2      1     1    A  151315  2010-02-19      41595.55        39.93   
3      1     1    A  151315  2010-02-26      19403.54        46.63   
4      1     1    A  151315  2010-03-05      21827.90        46.50   

   Fuel_Price         CPI  Unemployment  MarkDown1  MarkDown2  MarkDown3  \
0       2.572  211.096358         8.106        0.0        0.0        0.0   
1       2.548  211.242170         8.106        0.0        0.0        0.0   
2       2.514  211.289143         8.106        0.0        0.0        0.0   
3       2.561  211.319643         8.106        0.0        0.0        0.0   
4       2.625  211.350143         8.106        0.0        0.0        0.0   

   MarkDown4  MarkDown5  IsHoliday  
0        0.0        0.0      False  
1        0.0        0.0       True  
2        0.

In [72]:
print(df[['Type']])

       Type
0         A
1         A
2         A
3         A
4         A
...     ...
420207    B
420208    B
420209    B
420210    B
420211    B

[420212 rows x 1 columns]


In [73]:
del df['Date']

In [74]:
df=pd.get_dummies(df,drop_first=True)

In [75]:
print(df.head())

   Store  Dept    Size  Weekly_Sales  Temperature  Fuel_Price         CPI  \
0      1     1  151315      24924.50        42.31       2.572  211.096358   
1      1     1  151315      46039.49        38.51       2.548  211.242170   
2      1     1  151315      41595.55        39.93       2.514  211.289143   
3      1     1  151315      19403.54        46.63       2.561  211.319643   
4      1     1  151315      21827.90        46.50       2.625  211.350143   

   Unemployment  MarkDown1  MarkDown2  MarkDown3  MarkDown4  MarkDown5  \
0         8.106        0.0        0.0        0.0        0.0        0.0   
1         8.106        0.0        0.0        0.0        0.0        0.0   
2         8.106        0.0        0.0        0.0        0.0        0.0   
3         8.106        0.0        0.0        0.0        0.0        0.0   
4         8.106        0.0        0.0        0.0        0.0        0.0   

   IsHoliday  Type_B  Type_C  
0      False       0       0  
1       True       0       0  