In [1]:
#sql imports
from pyspark.sql import *
import pyspark.sql.types as typ
from pyspark.sql.session import SparkSession
import pyspark.sql.functions as fn

#spark context
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)


In [2]:
#creating training dataframe
labels_train= [('Store',typ.IntegerType()),
              ('Dept',typ.IntegerType()),
              ('Date',typ.DateType()),
              ('Weekly_sales',typ.DoubleType()),
              ('IsHoliday',typ.BooleanType())
         ]


schema_train = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels_train
])

In [3]:
train_csv_values = spark.read.csv('C:/Users/mouni/OneDrive/Documents/walmart-recruiting-store-sales-forecasting/train.csv', header=True,schema=schema_train)

In [4]:
train_csv_values.printSchema()
#train_csv_values.show(5)

root
 |-- Store: integer (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Weekly_sales: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [5]:
#creating feature dataframe
labels_features= [('Store',typ.IntegerType()),
              ('Date',typ.DateType()),
              ('Temperature',typ.DoubleType()),
            ('Fuel_Price',typ.DoubleType()),
                  ('MarkDown1',typ.StringType()),
                  ('MarkDown2',typ.StringType()),
                  ('MarkDown3',typ.StringType()),
                  ('MarkDown4',typ.StringType()),
                  ('MarkDown5',typ.StringType()),
                  ('CPI',typ.DoubleType()),
                  ('Unemployment',typ.DoubleType()),
              ('IsHoliday',typ.BooleanType())
         ]
schema_features= typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels_features
])
featuers_csv_values = spark.read.csv('C:/Users/mouni/OneDrive/Documents/walmart-recruiting-store-sales-forecasting/features.csv', header=True,schema=schema_features)
featuers_csv_values.printSchema()
#featuers_csv_values.show(5)

root
 |-- Store: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- MarkDown1: string (nullable = true)
 |-- MarkDown2: string (nullable = true)
 |-- MarkDown3: string (nullable = true)
 |-- MarkDown4: string (nullable = true)
 |-- MarkDown5: string (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)



In [6]:
#creating stores dataframe
labels_stores= [('Store',typ.IntegerType()),
              ('Type',typ.StringType()),
              ('Size',typ.IntegerType())
         ]
schema_stores= typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels_stores
])
stores_csv_values = spark.read.csv('C:/Users/mouni/OneDrive/Documents/walmart-recruiting-store-sales-forecasting/stores.csv', header=True,schema=schema_stores)
stores_csv_values.printSchema()
stores_csv_values.show(5)

root
 |-- Store: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
+-----+----+------+
only showing top 5 rows



In [7]:
train_csv_values.createOrReplaceTempView("train_csv_values")
featuers_csv_values.createOrReplaceTempView("featuers_csv_values")
stores_csv_values.createOrReplaceTempView("stores_csv_values")

In [8]:
data=spark.sql("""select train_csv_values.Store,train_csv_values.Dept,stores_csv_values.Type,stores_csv_values.Size,
train_csv_values.Date,train_csv_values.Weekly_Sales,
featuers_csv_values.Temperature,featuers_csv_values.Fuel_Price,featuers_csv_values.CPI,featuers_csv_values.Unemployment,
train_csv_values.IsHoliday from train_csv_values
join featuers_csv_values on train_csv_values.Store=featuers_csv_values.Store and train_csv_values.Date=featuers_csv_values.Date
Join stores_csv_values on train_csv_values.Store=stores_csv_values.Store
where train_csv_values.Weekly_Sales>0
""")
#where train_csv_values.Store<=10 and train_csv_values.Dept<=15
data.count()

420212

In [9]:
data.toPandas()\
.to_excel(r'C:/Users/mouni/OneDrive/Documents/walmart-recruiting-store-sales-forecasting/joined_data_7.xlsx', index = False)

PermissionError: [Errno 13] Permission denied: 'C:/Users/mouni/OneDrive/Documents/walmart-recruiting-store-sales-forecasting/joined_data_7.xlsx'

In [10]:
#Data Preprocessing 
print('Count of rows: {0}'.format(data.count()))

Count of rows: 420212


In [11]:
df = data.dropDuplicates()

In [12]:
print('Count of rows: {0}'.format(df.count()))

Count of rows: 420212


In [13]:
print('Count of distinct records: {0}'\
      .format(df.select([c for c in df.columns]).distinct().count()))

Count of distinct records: 420212


In [14]:
df.agg(*[
(1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') \
for c in df.columns
]).show()

+-------------+------------+------------+------------+------------+--------------------+-------------------+------------------+-----------+--------------------+-----------------+
|Store_missing|Dept_missing|Type_missing|Size_missing|Date_missing|Weekly_Sales_missing|Temperature_missing|Fuel_Price_missing|CPI_missing|Unemployment_missing|IsHoliday_missing|
+-------------+------------+------------+------------+------------+--------------------+-------------------+------------------+-----------+--------------------+-----------------+
|          0.0|         0.0|         0.0|         0.0|         0.0|                 0.0|                0.0|               0.0|        0.0|                 0.0|              0.0|
+-------------+------------+------------+------------+------------+--------------------+-------------------+------------------+-----------+--------------------+-----------------+



In [15]:
df.printSchema()

root
 |-- Store: integer (nullable = true)
 |-- Dept: integer (nullable = true)
 |-- Type: string (nullable = true)
 |-- Size: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Weekly_Sales: double (nullable = true)
 |-- Temperature: double (nullable = true)
 |-- Fuel_Price: double (nullable = true)
 |-- CPI: double (nullable = true)
 |-- Unemployment: double (nullable = true)
 |-- IsHoliday: boolean (nullable = true)

