# Data Wrangling Walmart Data Set in Pyspark


In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import col, lit
from pyspark.sql.window import Window
import sys

In [2]:
features = spark.read.csv('features.csv', header = True)

In [3]:
stores = spark.read.csv('stores.csv', header = True)

In [4]:
train = spark.read.csv('train.csv', header = True)

In [5]:
stores.show(5)

+-----+----+------+
|Store|Type|  Size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
+-----+----+------+
only showing top 5 rows



In [6]:
features.show(5)

+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|Store|      Date|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|IsHoliday|
+-----+----------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+---------+
|    1|2010-02-05|      42.31|     2.572|       NA|       NA|       NA|       NA|       NA|211.0963582|       8.106|    FALSE|
|    1|2010-02-12|      38.51|     2.548|       NA|       NA|       NA|       NA|       NA|211.2421698|       8.106|     TRUE|
|    1|2010-02-19|      39.93|     2.514|       NA|       NA|       NA|       NA|       NA|211.2891429|       8.106|    FALSE|
|    1|2010-02-26|      46.63|     2.561|       NA|       NA|       NA|       NA|       NA|211.3196429|       8.106|    FALSE|
|    1|2010-03-05|       46.5|     2.625|       NA|       NA|       NA|       NA|       NA|211.3501429|       8

In [7]:
df = train.join(features, on=['Store', 'Date', 'IsHoliday'])

In [8]:
df1 = df.join(stores, on=['Store'])
df1.show(5)

+-----+----------+---------+----+------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+----+------+
|Store|      Date|IsHoliday|Dept|Weekly_Sales|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|Type|  Size|
+-----+----------+---------+----+------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+----+------+
|    1|2010-02-05|    FALSE|   1|     24924.5|      42.31|     2.572|       NA|       NA|       NA|       NA|       NA|211.0963582|       8.106|   A|151315|
|    1|2010-02-12|     TRUE|   1|    46039.49|      38.51|     2.548|       NA|       NA|       NA|       NA|       NA|211.2421698|       8.106|   A|151315|
|    1|2010-02-19|    FALSE|   1|    41595.55|      39.93|     2.514|       NA|       NA|       NA|       NA|       NA|211.2891429|       8.106|   A|151315|
|    1|2010-02-26|    FALSE|   1|    19403.54|      46.63|

In [9]:
df1.printSchema

<bound method DataFrame.printSchema of DataFrame[Store: string, Date: string, IsHoliday: string, Dept: string, Weekly_Sales: string, Temperature: string, Fuel_Price: string, MarkDown1: string, MarkDown2: string, MarkDown3: string, MarkDown4: string, MarkDown5: string, CPI: string, Unemployment: string, Type: string, Size: string]>

In [10]:
## Cast columns as proper data types
df2 = df1.withColumn('Store', F.col('Store').cast(T.IntegerType()))\
    .withColumn('Date', F.col('Date').cast(T.DateType()))\
    .withColumn('Dept', F.col('Dept').cast(T.IntegerType()))\
    .withColumn('Weekly_Sales', F.col('Weekly_Sales').cast(T.DoubleType()))\
    .withColumn('Temperature', F.col('Temperature').cast(T.DoubleType()))\
    .withColumn('Fuel_Price', F.col('Fuel_Price').cast(T.DoubleType()))\
    .withColumn('MarkDown1', F.col('MarkDown1').cast(T.DoubleType()))\
    .withColumn('MarkDown2', F.col('MarkDown2').cast(T.DoubleType()))\
    .withColumn('MarkDown3', F.col('MarkDown3').cast(T.DoubleType()))\
    .withColumn('MarkDown4', F.col('MarkDown4').cast(T.DoubleType()))\
    .withColumn('MarkDown5', F.col('MarkDown5').cast(T.DoubleType()))\
    .withColumn('CPI', F.col('CPI').cast(T.DoubleType()))\
    .withColumn('Unemployment', F.col('Unemployment').cast(T.DoubleType()))

In [11]:
df2.printSchema

<bound method DataFrame.printSchema of DataFrame[Store: int, Date: date, IsHoliday: string, Dept: int, Weekly_Sales: double, Temperature: double, Fuel_Price: double, MarkDown1: double, MarkDown2: double, MarkDown3: double, MarkDown4: double, MarkDown5: double, CPI: double, Unemployment: double, Type: string, Size: string]>

In [12]:
df2.show(5)

+-----+----------+---------+----+------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+----+------+
|Store|      Date|IsHoliday|Dept|Weekly_Sales|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|Type|  Size|
+-----+----------+---------+----+------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+----+------+
|    1|2010-02-05|    FALSE|   1|     24924.5|      42.31|     2.572|     null|     null|     null|     null|     null|211.0963582|       8.106|   A|151315|
|    1|2010-02-12|     TRUE|   1|    46039.49|      38.51|     2.548|     null|     null|     null|     null|     null|211.2421698|       8.106|   A|151315|
|    1|2010-02-19|    FALSE|   1|    41595.55|      39.93|     2.514|     null|     null|     null|     null|     null|211.2891429|       8.106|   A|151315|
|    1|2010-02-26|    FALSE|   1|    19403.54|      46.63|

In [13]:
### replace null values with 0
df5 = df2.fillna(0, subset= ['MarkDown1','MarkDown2', 'MarkDown3', 'MarkDown4', 'MarkDown5'])
   
df5.show(5)

+-----+----------+---------+----+------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+----+------+
|Store|      Date|IsHoliday|Dept|Weekly_Sales|Temperature|Fuel_Price|MarkDown1|MarkDown2|MarkDown3|MarkDown4|MarkDown5|        CPI|Unemployment|Type|  Size|
+-----+----------+---------+----+------------+-----------+----------+---------+---------+---------+---------+---------+-----------+------------+----+------+
|    1|2010-02-05|    FALSE|   1|     24924.5|      42.31|     2.572|      0.0|      0.0|      0.0|      0.0|      0.0|211.0963582|       8.106|   A|151315|
|    1|2010-02-12|     TRUE|   1|    46039.49|      38.51|     2.548|      0.0|      0.0|      0.0|      0.0|      0.0|211.2421698|       8.106|   A|151315|
|    1|2010-02-19|    FALSE|   1|    41595.55|      39.93|     2.514|      0.0|      0.0|      0.0|      0.0|      0.0|211.2891429|       8.106|   A|151315|
|    1|2010-02-26|    FALSE|   1|    19403.54|      46.63|

In [14]:
df5.describe('MarkDown4').show()

+-------+------------------+
|summary|         MarkDown4|
+-------+------------------+
|  count|            421570|
|   mean|1083.1322675238291|
| stddev| 3894.529945443478|
|    min|               0.0|
|    max|          67474.85|
+-------+------------------+



In [15]:
df5.coalesce(1).write.format('com.databricks.spark.csv').save('Joined_data.csv',header = 'true')