# Forecasting the Sales of a Supermarket During a Festive Season

## Problem

* TODO: Write down the problem explicitly!

## Solution Overview

* We use the dataset provided by [Walmart Recruiting - Store Sales Forecasting](https://www.kaggle.com/c/walmart-recruiting-store-sales-forecasting/data) challenge. It is the historical sales data for 45 Walmart stores located in different regions. Each store contains a number of departments. We will use this data to predict the department-wise sales for each store in the Christmas.

* The dataset consists of 4 files (TODO: Add file description)
    * `stores.csv`
    * `train.csv`
    * `test.csv`
    * `features.csv`

## EDA and Data Cleaning

In [73]:
from pyspark.sql import SparkSession

from pyspark.sql import types

from pyspark.sql.functions import col

from pyspark.sql.functions import date_format

In [87]:
from datetime import datetime
from datetime import date

In [4]:
import warnings
# ignore warnings
warnings.filterwarnings("ignore")

In [5]:
spark = (SparkSession.builder
         .appName("Business Intelligence Capstone")
         .master("local[*]")
         .getOrCreate())

* Load the Stores

In [13]:
store_schema = types.StructType([
  types.StructField("Store", types.StringType(), nullable=False),
  types.StructField("Type", types.StringType(), nullable=False),
  types.StructField("Size", types.IntegerType(), nullable=False)
])


store_df = (spark.read
            .option("header", "true")
            .option("schema", store_schema)
            .csv("../data/stores.csv")
            .withColumnRenamed("Store", "store")
            .withColumnRenamed("Type", "type")
            .withColumnRenamed("Size", "size"))

In [14]:
store_df.show(5)

+-----+----+------+
|store|type|  size|
+-----+----+------+
|    1|   A|151315|
|    2|   A|202307|
|    3|   B| 37392|
|    4|   A|205863|
|    5|   B| 34875|
+-----+----+------+
only showing top 5 rows



In [15]:
store_df.createOrReplaceTempView("stores")

* Load the Train/Test and Features Dataset

In [49]:
%%time

train_df = (spark.read
            .option("header", "true")
            .option("inferSchema", "true")
            .csv("../data/train.csv")
            .select(
              col("Store").cast(types.StringType()).alias("store"),
              col("Dept").cast(types.StringType()).alias("dept"),
              col("Date").cast(types.DateType()).alias("date"),
              col("Weekly_Sales").alias("weekly_sales"),
              col("IsHoliday").alias("is_holiday")
            ))



CPU times: user 9.36 ms, sys: 4.53 ms, total: 13.9 ms
Wall time: 1.1 s


                                                                                

In [50]:
train_df.printSchema()

root
 |-- store: string (nullable = true)
 |-- dept: string (nullable = true)
 |-- date: date (nullable = true)
 |-- weekly_sales: double (nullable = true)
 |-- is_holiday: boolean (nullable = true)



In [51]:
train_df.createOrReplaceTempView("train")

In [52]:
features_df = (spark.read
               .option("header", "true")
               .option("inferSchema", "true")
               .csv("../data/features.csv")
               .select(
                 col("Store").cast(types.StringType()).alias("store"),
                 col("Date").cast(types.DateType()).alias("date"),
                 col("Temperature").alias("temperature"),
                 col("Fuel_Price").alias("fuel_price"),
                 col("MarkDown1").cast(types.DoubleType()).alias("markdown_1"),
                 col("MarkDown2").cast(types.DoubleType()).alias("markdown_2"),
                 col("MarkDown3").cast(types.DoubleType()).alias("markdown_3"),
                 col("MarkDown4").cast(types.DoubleType()).alias("markdown_4"),
                 col("MarkDown5").cast(types.DoubleType()).alias("markdown_5"),
                 col("CPI").alias("cpi"),
                 col("Unemployment").cast(types.DoubleType()).alias("unemployment"),
                 col("IsHoliday").alias("is_holiday")))

In [53]:
features_df.createOrReplaceTempView("features")

In [54]:
features_df.printSchema()

root
 |-- store: string (nullable = true)
 |-- date: date (nullable = true)
 |-- temperature: double (nullable = true)
 |-- fuel_price: double (nullable = true)
 |-- markdown_1: double (nullable = true)
 |-- markdown_2: double (nullable = true)
 |-- markdown_3: double (nullable = true)
 |-- markdown_4: double (nullable = true)
 |-- markdown_5: double (nullable = true)
 |-- cpi: string (nullable = true)
 |-- unemployment: double (nullable = true)
 |-- is_holiday: boolean (nullable = true)



* Merge `train` with `features`

In [57]:
train_features = spark.sql(
  """
  select t.store, t.date,
         t.dept, t.weekly_sales,
         f.temperature, f.fuel_price, 
         f.markdown_1, f.markdown_2, f.markdown_3, f.markdown_4, f.markdown_5,
         f.cpi, f.unemployment, f.is_holiday
  from train t inner join features f
  on (t.store = f.store and t.date = f.date)
  """)

In [59]:
train_features.createOrReplaceTempView("train_features")

In [78]:
spark.sql(
  """
  select distinct date_format(date, "EEEE") as day_of_week
  from train_features
  """
).show()

                                                                                

+-----------+
|day_of_week|
+-----------+
|     Friday|
+-----------+



* The `date` field does not represent the day, but the friday of every week in a year. We will create two field `year` and `week` to make the problem more general.

In [79]:
train_features = (train_features
                  .withColumn("year", date_format(col("date"), "y"))
                  .withColumn("week", date_format(col("date"), "w")))

In [81]:
train_features.createOrReplaceTempView("train_features")

In [82]:
spark.sql(
  """
  select distinct year
  from train_features
  """).show()

                                                                                

+----+
|year|
+----+
|2012|
|2011|
|2010|
+----+



* Next, we will have to mark Christmas weeks (a Christmas week is a week containing the Christmas day). Christmas is always on December 25th so we will mark the week containing `2010-12-25`, `2011-12-25`, `2012-12-25` days as Christmas.

In [90]:
print("year=2010, week={}".format(date(2010, 12, 25).isocalendar()[1]))
print("year=2011, week={}".format(date(2011, 12, 25).isocalendar()[1]))
print("year=2012, week={}".format(date(2012, 12, 25).isocalendar()[1]))

year=2010, week=51
year=2011, week=51
year=2012, week=52


In [101]:
train_features = spark.sql(
  """
  select *,
         case when year = 2010 and week = 51 then true
              when year = 2011 and week = 51 then true
              when year = 2012 and week = 52 then true
              else false
         end as is_christmas
  from train_features
  """)

In [103]:
train_features.where(col("is_christmas")).count()

                                                                                

5972

## Modeling