# Abstract

> The main objective of this project is to predict the sales of the 111 potentially weather sensitive products like (umbrellas, bread, milk) around the time of major waeather events at 45 of the Walmart's retail locations using statistical and machine learning techiniques. 
This is an important problem for any big retailer as it will help the replenishment manager to correctly plan and manage the inventory to avoid being out-of-stock or overstock during and after a storm. This will also help in timely intervention to manage the risks arising from the other unforseen circumstances.

# Introduction

[Walmart](https://en.wikipedia.org/wiki/Walmart) is an American multinational retail corporation that operates a chain of hypermarkets, discount department stores, and grocery stores, headquartered in Bentonville, Arkansas. The company was founded by Sam Walton in 1962 and incorporated on October 31, 1969. It also owns and operates Sam's Club retail warehouses. As of October 31, 2019, Walmart has 11,438 stores and clubs in 27 countries, operating under 55 different names.The company operates under the name Walmart in the United States and Canada, as Walmart de México y Centroamérica in Mexico and Central America, as Asda in the United Kingdom, as the Seiyu Group in Japan, and as Best Price in India. It has wholly owned operations in Argentina, Chile, Canada, and South Africa. Since August 2018, Walmart only holds a minority stake in Walmart Brasil, which was renamed Grupo Big in August 2019, with 20 percent of the company's shares, and private equity firm Advent International holding 80 percent ownership of the company.

**Inventory Management:** Inventory management is a tricky function of any business and especially with business that are consumer facing, the management of the inventory for different stock keeping units at an optimal quantity becomes one of the critical parts of the business. Anything more than the optimal value might create overhead costs which might not be helpful as it might bring down the profitability and anything below the optimal number can cause consumers to leave the store and buy somewhere else. Hence, it become a key function for any consumer facing business to maintain an optimal invertory for each sku. 
The problem of inventory managment boils down to one important metric and that is nothing but the demand of a paricular item - which is difficult to predit without any scientific method and also time consuming. Therefore, the goal for a firm is to predict the demand for sku's and matain those. 

**Problem faced by Walmart:** With more than 11,000 stores across different geographies, inventory management becomes a necessay task for Walmart to focus on their custormers needs and to make sure the right products are available at the right place in the right quantity. The prediction problem at hand is for a subset of sku's - weather senstitive items like umbrellas, bread etc. around the time of the major weather events that makes it even more variable to predict the demand. Therefore, the task is to use analytical techniques to predict the demand for those essential sku's during the major weather events.

# Methods

## Data

The data has been download from the Kaggle website. This dataset contains sales data for 111 products whose sales may be affected by weather (such as milk, bread , umbrellasetc.) These 111 products are sold in stores at 45 different Walmart locations. Some of the products may be a similar item (such as milk) but have a different id in different stores/regions/suppliers. The 45 locations are covered by 20 weather stations (i.e. some of the stores are nearby and share a weather station).

For the purposes of this project, a weather event is defined as any day in which more than an inch of rain or two inches of snow was observed. The prediction is the units sold for a window of ±3 days surrounding each storm.

The following graphic shows the layout of the test windows. The green dots are the training set days, the red dots are the test set days, and the event=True are the days with storms. Note that this plot is for the 20 weather stations. All days prior to 2013-04-01 are given out as training data.

![Image](https://storage.googleapis.com/kaggle-competitions/kaggle/4332/media/weather_events.png)

The descriptions of the data files and the field descriptions are given below.

**Field descriptions**

- `date` - the day of sales or weather
- `store_nbr` - an id representing one of the 45 stores
- `station_nbr` - an id representing one of 20 weather stations
- `item_nbr` - an id representing one of the 111 products
- `units` - the quantity sold of an item on a given day
- `id` - a triplet representing a store_nbr, item_nbr, and date. Form the id by concatenating these (in that order) with an underscore. E.g. "2_1_2013-04-01" represents store 2, item 1, sold on 2013-04-01.

**File descriptions**

- `key.csv` - the relational mapping between stores and the weather stations that cover them
- `sampleSubmission.csv` - file that gives the prediction format
- `train.csv` - sales data for all stores & dates in the training set
- `test.csv` - stores & dates for forecasting (missing 'units' that I have to predict)
- `weather.csv` - a file containing the NOAA weather information for each station and day
- `noaa_weather_qclcd_documentation.pdf` - a guide to understand the data provided in the weather.csv file

In [4]:
spark

In [5]:
# Read train, test, key and weather dataset stored as spark table 
walmrt_trn = spark.table("dbagchi2_train_csv")
walmrt_tst = spark.table("dbagchi2_test_csv")
walmrt_key = spark.table("dbagchi2_key_csv")
walmrt_weather = spark.table("dbagchi2_weather_csv")

In [6]:
# Join the train data with the key to merge the weather data for each store
from pyspark.sql import functions as F
def merge_df(df, walmrt_key, walmrt_weather):
  df_key = df.join(walmrt_key, ["store_nbr"]) # Inner Join
  df_weather = df_key.join(walmrt_weather, ["station_nbr", "date"])
  '''
  Concatenate store_nbr and item_nbr to create a primary key to get the 
  non-zero sales units( We are using only 255 combinations for modeling 
  as the sum of the sales units of other combinations is zero)
  '''
  df_weather = df_weather.withColumn("store_nbr_item_nbr", F.concat(F.col("store_nbr"), F.lit("_"), F.col("item_nbr")))
  return df_weather

walmrt_trn_weather = merge_df(walmrt_trn, walmrt_key, walmrt_weather)
walmrt_tst_weather = merge_df(walmrt_tst, walmrt_key, walmrt_weather)


In [7]:
# Grouby by the store and item - primary key, to get the total sales at a store item level
walmrt_trn_weather.createOrReplaceTempView("walmrt_trn_weather")
non_na_sls = spark.sql("SELECT store_nbr_item_nbr, sum(units) as total_units from walmrt_trn_weather group by store_nbr_item_nbr")

#This returns a dataframe with store and item combinations that has zero sales
zero_sales_df = non_na_sls.filter(F.col("total_units") == 0) # I am not using it 

'''
get the non zero sales store and item combinatios
'''
array_of_non_zero_store_item = non_na_sls.filter(F.col("total_units") > 0).select(F.col("store_nbr_item_nbr")).collect()
list_of_non_zero_store_item = [row.store_nbr_item_nbr for row in array_of_non_zero_store_item]
#print(list_of_non_zero_store_item)

In [8]:
display(non_na_sls.sort(F.col("total_units").desc()))

store_nbr_item_nbr,total_units
33_44,189903
30_44,136473
17_9,135367
16_25,135046
2_44,117125
4_9,117123
33_9,101586
25_9,98560
15_45,92320
34_45,87419


In [9]:
len(list_of_non_zero_store_item)

In [10]:
'''
This returns a data set with just 255 combinatios of store 
and item id with greater thatn 0 total sales
'''
walmrt_trn_subset = walmrt_trn_weather.filter(F.col("store_nbr_item_nbr").isin(list_of_non_zero_store_item))
#print(walmrt_trn_subset.select(F.col("store_nbr_item_nbr")).distinct().count())


In [11]:
print(walmrt_trn_subset.dtypes)

In [12]:
display(walmrt_trn_subset.summary())

summary,station_nbr,store_nbr,item_nbr,units,tmax,tmin,tavg,depart,dewpoint,wetbulb,heat,cool,sunrise,sunset,codesum,snowfall,preciptotal,stnpressure,sealevel,resultspeed,resultdir,avgspeed,store_nbr_item_nbr
count,236038.0,236038.0,236038.0,236038.0,236038,236038,236038,236038,236038,236038,236038,236038,236038,236038,236038,236038,236038,236038,236038,236038,236038,236038,236038
mean,11.161524839220805,22.761868851625582,53.17968293240919,19.30620069649802,71.02752502374356,48.876312437906755,60.160510215860555,1.7580453246167103,44.92084651400948,52.22986371191136,10.255547465347766,5.416057681208322,593.7944175550705,1825.504254847996,,0.021043076900462337,0.08428222501968483,28.6179919202359,30.013575380145152,6.2814341940227845,18.577738852125773,7.950316473666525,
stddev,4.911769444213652,13.132051688472274,34.33472367594489,39.42500991920799,19.433739746193478,19.169234548382597,18.892474775390568,7.62188070868811,19.607066658968762,16.91651297004788,13.762100813334685,7.51268380024089,88.71349093322866,88.63255509439799,,0.35156324170825753,0.2924117884782939,1.9617005919208035,0.1926706750427921,4.177247162617685,9.705649156120586,3.877083881247155,
min,1.0,1.0,1.0,0.0,-1,-1,-1,0,-1,-1,0,0,-,-,,T,T,23.72,29.16,0.0,01,0.0,10_21
25%,7.0,12.0,18.0,0.0,59.0,34.0,47.0,-3.0,29.0,39.0,0.0,0.0,531.0,1747.0,,0.0,0.0,28.69,29.89,3.1,13.0,5.2,
50%,12.0,22.0,50.0,1.0,74.0,51.0,63.0,2.0,48.0,55.0,2.0,0.0,607.0,1833.0,,0.0,0.0,29.33,30.01,5.4,18.0,7.3,
75%,15.0,34.0,88.0,24.0,86.0,65.0,76.0,6.0,62.0,67.0,18.0,11.0,652.0,1913.0,,0.0,0.02,29.83,30.13,8.6,26.0,10.0,
max,20.0,45.0,111.0,5568.0,M,M,M,M,M,M,M,M,0740,1949,VCTS,M,M,M,M,M,M,M,9_93


## Imputation 

The are two kinds of missing data present in the merged data set. The missing data are mainly present in the weather data set - mainly due to unavilability of the data or due to the limitation of the measuring instrument to capture small but non-zero values. Two different missing values are defined below.

- M : Missing Value
- T : Trace (When the value is not missing but it is very small for mesurement)

**Missing Data** 

In statistics, missing data, or missing values, occur when no data value is stored for the variable in an observation. Missing data are a common occurrence and can have a significant effect on the conclusions that can be drawn from the data.

[Missing data](https://en.wikipedia.org/wiki/Missing_data) can occur because of nonresponse: no information is provided for one or more items or for a whole unit ("subject"). Some items are more likely to generate a nonresponse than others: for example items about private subjects such as income. Attrition is a type of missingness that can occur in longitudinal studies—for instance studying development where a measurement is repeated after a certain period of time. Missingness occurs when participants drop out before the test ends and one or more measurements are missing.

**Types of Missing Data**

- **Missing Completely at Random:** Values in a data set are **missing completely at random (MCAR)** if the events that lead to any particular data-item being missing are independent both of observable variables and of unobservable parameters of interest, and occur entirely at random. When data are MCAR, the analysis performed on the data is unbiased; however, data are rarely MCAR.

- **Missing at Random:** **Missing at random (MAR)** occurs when the missingness is not random, but where missingness can be fully accounted for by variables where there is complete information.Since MAR is an assumption that is impossible to verify statistically, we must rely on its substantive reasonableness. An example is that males are less likely to fill in a depression survey but this has nothing to do with their level of depression, after accounting for maleness. Depending on the analysis method, these data can still induce parameter bias in analyses due to the contingent emptiness of cells (male, very high depression may have zero entries). However, if the parameter is estimated with Full Information Maximum Likelihood, MAR will provide asymptotically unbiased estimates.

- **Missing not at Random:** **Missing not at random (MNAR)** (also known as nonignorable nonresponse) is data that is neither MAR nor MCAR (i.e. the value of the variable that's missing is related to the reason it's missing). To extend the previous example, this would occur if men failed to fill in a depression survey because of their level of depression.

[Reference 1](https://en.wikipedia.org/wiki/Missing_data)

In [14]:
'''Imputation for tavg, depart, preciptotal, snowfall'''
def imputaion(df):
  df = df.withColumn("tavg_impute", 
                     F.when(F.trim(F.col("tavg")) == "M", 0).otherwise(F.trim(F.col("tavg"))))
  df = df.withColumn("depart_impute", 
                     F.when(F.trim(F.col("depart")) == "M", 0).otherwise(F.trim(F.col("depart"))))
  df = df.withColumn("preciptotal_impute", 
                     F.when(F.trim(F.col("preciptotal")) == "M", 0).when(F.trim(F.col("preciptotal")) == "T",0.001).otherwise(F.trim(F.col("preciptotal"))))
  df = df.withColumn("snowfall_impute", 
                     F.when(F.trim(F.col("snowfall")) == "M", 0).when(F.trim(F.col("snowfall")) == "T", 0.01).otherwise(F.trim(F.col("snowfall"))))
  return df

# Training set - Imputation
walmrt_trn_impute = imputaion(walmrt_trn_subset)

# Testing set - Imputation
walmrt_tst_impute = imputaion(walmrt_tst_weather)

## Feature Engineering

Feature enginnering is an important step in applied data science and machine learning because it decides the predictive power of a modeling technique. Hence, using the right features is an important step in creating a model that predicts with high accuracy. In this analysis I have used some of the important features that will be helpful in predicting the sales of an sku for a day. The features are the following. 

- ***days:*** No of days from 2012-01-01 (The data set starts from this date)
- ***year:*** The year of the date 
- ***month:*** The month of the date
- ***day_of_month:*** The day of the month 
- ***day_of_week:*** The day of the week
- ***day_of_year:*** The day of the year
- ***is_weekend:*** A flag to see if the date was a weekend or not

> Coming up with features is difficult, time-consuming, requires expert knowledge. "Applied machine learning" is basically feature engineering.

-<span style="color:blue"> *Andrew Ng* </span>, _Machine Learning and AI via Brain simulations_.

In [16]:
'''
Function to create new features using date column.
'''
def feature_engineering(df):
  # Create a new column called, StartDateTime 
  start_dt = '2012-01-01'
  df = df.withColumn("StartDateTime", F.to_date(F.lit(start_dt),'yyyy-MM-dd'))

  # days: number of days after 2012-01-01
  df = df.withColumn("days",F.datediff(F.col("date"), F.col("StartDateTime")).cast("int"))

  # Year Month and day
  df = df.withColumn("year", F.year(F.col("date")))
  df = df.withColumn("month", F.month(F.col("date")))
  df = df.withColumn("day_of_month", F.dayofmonth(F.col("date")))

  # Day of the week and day of the year
  df = df.withColumn("day_of_week", F.dayofweek(F.col("date")))
  df = df.withColumn("day_of_year", F.dayofyear(F.col("date")))

  # Is weekend or not
  df = df.withColumn("is_weekend", F.when(F.col("day_of_week").isin([1, 7]), "y").otherwise("n"))
  
  return df

# Training set - Feature Engineering
walmrt_trn_feature_eng = feature_engineering(walmrt_trn_impute)

# Testing set - Feature Engineering
walmrt_tst_feature_eng = feature_engineering(walmrt_tst_impute)

## Transforming Response Variable

The response variable **sales units** has been converted into log of the sales units plus one to stabilize the error variances as the spread of the data is large. The minumum value of the sales units is 0 and the maximum value across all the different sku's is 5568 units. 

The spread of these values sometimes cause the model fit to be less accurate. In additon, the accuracy metric on kaggle is defined below - that also gives a hint of transforming the response variable. The metric is given below.

**Root Mean Squared Logarithmic Error (RMSLE)**

$$\sqrt{\frac{1}{n} \sum_{i=1}^n (\log(p_i + 1) - \log(a_i+1))^2 }$$

Where:

- \\(n\\) is the number of rows in the test set
- \\(p_i\\) is your predicted units sold
- \\(a_i\\) is the actual units sold
- \\(log(x)\\) is the natural logarithm

In [18]:
'''Transform the response variable into log(1 + units)'''
walmrt_trn_feature_eng = walmrt_trn_feature_eng.withColumn("log1_units", F.log(F.col("units") + F.lit(1)))

In [19]:
display(walmrt_trn_feature_eng)

station_nbr,date,store_nbr,item_nbr,units,tmax,tmin,tavg,depart,dewpoint,wetbulb,heat,cool,sunrise,sunset,codesum,snowfall,preciptotal,stnpressure,sealevel,resultspeed,resultdir,avgspeed,store_nbr_item_nbr,tavg_impute,depart_impute,preciptotal_impute,snowfall_impute,StartDateTime,days,year,month,day_of_month,day_of_week,day_of_year,is_weekend,log1_units
1,2012-01-01T00:00:00.000+0000,1,9,29,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_9,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,3.4011973816621555
1,2012-01-01T00:00:00.000+0000,1,28,2,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_28,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,1.0986122886681098
1,2012-01-01T00:00:00.000+0000,1,40,0,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_40,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0
1,2012-01-01T00:00:00.000+0000,1,47,0,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_47,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0
1,2012-01-01T00:00:00.000+0000,1,51,1,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_51,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.6931471805599453
1,2012-01-01T00:00:00.000+0000,1,89,0,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_89,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0
1,2012-01-01T00:00:00.000+0000,1,93,0,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_93,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0
1,2012-01-01T00:00:00.000+0000,1,99,0,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_99,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0
14,2012-01-01T00:00:00.000+0000,2,5,191,50,34,42,5,25,35,23,0,0739,1729,,0.0,0.00,29.13,30.52,11.4,32,11.3,2_5,42,5,0.0,0.0,2012-01-01,0,2012,1,1,1,1,y,5.2574953720277815
14,2012-01-01T00:00:00.000+0000,2,11,0,50,34,42,5,25,35,23,0,0739,1729,,0.0,0.00,29.13,30.52,11.4,32,11.3,2_11,42,5,0.0,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0


### Final Training-Set

In [21]:
''' 
Selecting the relevant predictors from the train data set for modeling 
'''
walmrt_trn = walmrt_trn_feature_eng.select(F.col("store_nbr_item_nbr"), 
                                        F.col("date"), 
                                        F.col("station_nbr"), 
                                        F.col("store_nbr"), 
                                        F.col("item_nbr"),
                                        F.col("log1_units"),
                                        F.col("tavg_impute").cast("float"),
                                        F.col("preciptotal_impute").cast("float"),
                                        F.col("snowfall_impute").cast("float"),
                                        F.col("depart_impute").cast("float"),
                                        F.col("StartDateTime"), 
                                        F.col("days"),
                                        F.col("year"),
                                        F.col("month").cast("string"),
                                        F.col("day_of_month"),
                                        F.col("day_of_week").cast("string"),
                                        F.col("day_of_year"),
                                        F.col("is_weekend")
                                       )

In [22]:
display(walmrt_trn.dtypes)

_1,_2
store_nbr_item_nbr,string
date,timestamp
station_nbr,int
store_nbr,int
item_nbr,int
log1_units,double
tavg_impute,float
preciptotal_impute,float
snowfall_impute,float
depart_impute,float


In [23]:
display(walmrt_trn.summary())

summary,store_nbr_item_nbr,station_nbr,store_nbr,item_nbr,log1_units,tavg_impute,preciptotal_impute,snowfall_impute,depart_impute,days,year,month,day_of_month,day_of_week,day_of_year,is_weekend
count,236038,236038.0,236038.0,236038.0,236038.0,236038.0,236038.0,236038.0,236038.0,236038.0,236038.0,236038.0,236038.0,236038.0,236038.0,236038
mean,,11.161524839220805,22.761868851625582,53.17968293240919,1.4763715414320924,57.40784534693567,0.0713210245109876,0.0122311661813043,0.5445860412306494,494.5514069768427,2012.882823952076,6.174467670459841,15.806865843635348,3.9959667511163457,172.70983485709928,
stddev,,4.911769444213652,13.132051688472274,34.33472367594489,1.757893492378327,22.32978971245385,0.2704472664168333,0.2671162516958565,4.319263121420713,298.6950895692456,0.8046348516204421,3.377718688093788,8.775634401261224,2.0002119897935926,103.2880537043144,
min,10_21,1.0,1.0,1.0,0.0,-16.0,0.0,0.0,-32.0,0.0,2012.0,1.0,1.0,1.0,1.0,n
25%,,7.0,12.0,18.0,0.0,44.0,0.0,0.0,0.0,236.0,2012.0,3.0,8.0,2.0,82.0,
50%,,12.0,22.0,50.0,0.6931471805599453,61.0,0.0,0.0,0.0,472.0,2013.0,6.0,16.0,4.0,171.0,
75%,,15.0,34.0,88.0,3.218875824868201,75.0,0.001,0.0,0.0,755.0,2014.0,9.0,23.0,6.0,260.0,
max,9_93,20.0,45.0,111.0,8.624970783589669,100.0,7.36,16.2,33.0,1034.0,2014.0,9.0,31.0,7.0,366.0,y


### Final Testing-Set (Without *Sales Units*)

In [25]:
walmrt_tst = walmrt_tst_feature_eng.select(F.col("store_nbr_item_nbr"), 
                                        F.col("date"), 
                                        F.col("station_nbr"), 
                                        F.col("store_nbr"), 
                                        F.col("item_nbr"),
                                        #F.col("units"),
                                        F.col("tavg_impute").cast("float"),
                                        F.col("preciptotal_impute").cast("float"),
                                        F.col("snowfall_impute").cast("float"),
                                        F.col("depart_impute").cast("float"),
                                        F.col("StartDateTime"), 
                                        F.col("days"),
                                        F.col("year"),
                                        F.col("month").cast("string"),
                                        F.col("day_of_month"),
                                        F.col("day_of_week").cast("string"),
                                        F.col("day_of_year"),
                                        F.col("is_weekend")
                                       )

In [26]:
display(walmrt_tst)

store_nbr_item_nbr,date,station_nbr,store_nbr,item_nbr,tavg_impute,preciptotal_impute,snowfall_impute,depart_impute,StartDateTime,days,year,month,day_of_month,day_of_week,day_of_year,is_weekend
2_1,2013-04-01T00:00:00.000+0000,14,2,1,57.0,0.001,0.0,1.0,2012-01-01,456,2013,4,1,2,91,n
2_2,2013-04-01T00:00:00.000+0000,14,2,2,57.0,0.001,0.0,1.0,2012-01-01,456,2013,4,1,2,91,n
2_3,2013-04-01T00:00:00.000+0000,14,2,3,57.0,0.001,0.0,1.0,2012-01-01,456,2013,4,1,2,91,n
2_4,2013-04-01T00:00:00.000+0000,14,2,4,57.0,0.001,0.0,1.0,2012-01-01,456,2013,4,1,2,91,n
2_5,2013-04-01T00:00:00.000+0000,14,2,5,57.0,0.001,0.0,1.0,2012-01-01,456,2013,4,1,2,91,n
2_6,2013-04-01T00:00:00.000+0000,14,2,6,57.0,0.001,0.0,1.0,2012-01-01,456,2013,4,1,2,91,n
2_7,2013-04-01T00:00:00.000+0000,14,2,7,57.0,0.001,0.0,1.0,2012-01-01,456,2013,4,1,2,91,n
2_8,2013-04-01T00:00:00.000+0000,14,2,8,57.0,0.001,0.0,1.0,2012-01-01,456,2013,4,1,2,91,n
2_9,2013-04-01T00:00:00.000+0000,14,2,9,57.0,0.001,0.0,1.0,2012-01-01,456,2013,4,1,2,91,n
2_10,2013-04-01T00:00:00.000+0000,14,2,10,57.0,0.001,0.0,1.0,2012-01-01,456,2013,4,1,2,91,n


### ML Pipeline

This is the step where the categorical vaiables and numerical variables are converted into vectors for modeling purposes using Spark MLLib.

Tere are two type of variables used for the modeling purpose. They are the follwing.

1. Categorical Variables

  - `is_weekend`: If the day is on weekend(Saturday or Sunday)
  - `day_of_week`: Which day of the week it is
  - `month`: Month of the year

2. Numerical Variables

  - `tavg_impute`: Average temperature of the day (Degrees Fahrenheit)
  - `preciptotal_impute`: Total precipitation of the day (INCHES (24-HR PERIOD ENDING AT INDICATED LOCAL STANDARD TIME))
  - `snowfall_impute`: Total snowfall in the day (INCHES)
  - `depart_impute`: Departure from the normal temperature
  - `days`: Number of days after 2012-01-01
  - `year`: Year of the date
  - `day_of_month`: Day of the month
  - `day_of_year`: Day of the year

In [28]:
'''Transform categorical variables into one-hot encoding'''
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categorical_columns = ["is_weekend", "day_of_week", "month"]
stages = [] # Stages in our pipeline
for factors in categorical_columns:
  string_indexer = StringIndexer(inputCol = factors, outputCol = factors + "index")
  encoder = OneHotEncoderEstimator(inputCols = [string_indexer.getOutputCol()], outputCols = [factors + "class_vec"])
  stages += [string_indexer, encoder]

In [29]:
# Transform all features into a vector using VectorAssembler
numeric_cols = ["tavg_impute", "preciptotal_impute", "snowfall_impute", 
                "depart_impute", "days", "year", "day_of_month", 
                "day_of_year"]
#numeric_cols = ["days"]
assembler_inputs = [f + "class_vec" for f in categorical_columns] + numeric_cols
assembler = VectorAssembler(inputCols = assembler_inputs, outputCol = "features") 
stages += [assembler]

In [30]:
# Transform the train data into a suitable format for modeling
partial_pipeline = Pipeline().setStages(stages)
pipeline_model = partial_pipeline.fit(walmrt_trn)
walmrt_trn_trnsf = pipeline_model.transform(walmrt_trn)

# Transform the test data into a suitable format for modeling
partial_pipeline = Pipeline().setStages(stages)
pipeline_model = partial_pipeline.fit(walmrt_tst)
walmrt_tst_trnsf = pipeline_model.transform(walmrt_tst)

In [31]:
display(walmrt_trn_trnsf.filter(F.col("store_nbr_item_nbr") == "33_44"))

store_nbr_item_nbr,date,station_nbr,store_nbr,item_nbr,log1_units,tavg_impute,preciptotal_impute,snowfall_impute,depart_impute,StartDateTime,days,year,month,day_of_month,day_of_week,day_of_year,is_weekend,is_weekendindex,is_weekendclass_vec,day_of_weekindex,day_of_weekclass_vec,monthindex,monthclass_vec,features
33_44,2012-01-01T00:00:00.000+0000,3,33,44,5.872117789475416,45.0,0.0,0.0,9.0,2012-01-01,0,2012,1,1,1,1,y,1.0,"List(0, 1, List(), List())",1.0,"List(0, 6, List(1), List(1.0))",1.0,"List(0, 11, List(1), List(1.0))","List(0, 26, List(2, 8, 18, 21, 23, 24, 25), List(1.0, 1.0, 45.0, 9.0, 2012.0, 1.0, 1.0))"
33_44,2012-01-02T00:00:00.000+0000,3,33,44,3.9318256327243257,35.0,0.0,0.0,-1.0,2012-01-01,1,2012,1,2,2,2,n,0.0,"List(0, 1, List(0), List(1.0))",0.0,"List(0, 6, List(0), List(1.0))",1.0,"List(0, 11, List(1), List(1.0))","List(0, 26, List(0, 1, 8, 18, 21, 22, 23, 24, 25), List(1.0, 1.0, 1.0, 35.0, -1.0, 1.0, 2012.0, 2.0, 2.0))"
33_44,2012-01-03T00:00:00.000+0000,3,33,44,5.187385805840755,38.0,0.0,0.0,2.0,2012-01-01,2,2012,1,3,3,3,n,0.0,"List(0, 1, List(0), List(1.0))",6.0,"List(0, 6, List(), List())",1.0,"List(0, 11, List(1), List(1.0))","List(0, 26, List(0, 8, 18, 21, 22, 23, 24, 25), List(1.0, 1.0, 38.0, 2.0, 2.0, 2012.0, 3.0, 3.0))"
33_44,2012-01-04T00:00:00.000+0000,3,33,44,5.225746673713202,47.0,0.0,0.0,11.0,2012-01-01,3,2012,1,4,4,4,n,0.0,"List(0, 1, List(0), List(1.0))",2.0,"List(0, 6, List(2), List(1.0))",1.0,"List(0, 11, List(1), List(1.0))","List(0, 26, List(0, 3, 8, 18, 21, 22, 23, 24, 25), List(1.0, 1.0, 1.0, 47.0, 11.0, 3.0, 2012.0, 4.0, 4.0))"
33_44,2012-01-05T00:00:00.000+0000,3,33,44,5.8805329864007,50.0,0.0,0.0,14.0,2012-01-01,4,2012,1,5,5,5,n,0.0,"List(0, 1, List(0), List(1.0))",4.0,"List(0, 6, List(4), List(1.0))",1.0,"List(0, 11, List(1), List(1.0))","List(0, 26, List(0, 5, 8, 18, 21, 22, 23, 24, 25), List(1.0, 1.0, 1.0, 50.0, 14.0, 4.0, 2012.0, 5.0, 5.0))"
33_44,2012-01-06T00:00:00.000+0000,3,33,44,5.3706380281276624,55.0,0.0,0.0,19.0,2012-01-01,5,2012,1,6,6,6,n,0.0,"List(0, 1, List(0), List(1.0))",3.0,"List(0, 6, List(3), List(1.0))",1.0,"List(0, 11, List(1), List(1.0))","List(0, 26, List(0, 4, 8, 18, 21, 22, 23, 24, 25), List(1.0, 1.0, 1.0, 55.0, 19.0, 5.0, 2012.0, 6.0, 6.0))"
33_44,2012-01-07T00:00:00.000+0000,3,33,44,5.442417710521793,47.0,0.0,0.0,11.0,2012-01-01,6,2012,1,7,7,7,y,1.0,"List(0, 1, List(), List())",5.0,"List(0, 6, List(5), List(1.0))",1.0,"List(0, 11, List(1), List(1.0))","List(0, 26, List(6, 8, 18, 21, 22, 23, 24, 25), List(1.0, 1.0, 47.0, 11.0, 6.0, 2012.0, 7.0, 7.0))"
33_44,2012-01-08T00:00:00.000+0000,3,33,44,5.455321115357702,43.0,0.0,0.0,7.0,2012-01-01,7,2012,1,8,1,8,y,1.0,"List(0, 1, List(), List())",1.0,"List(0, 6, List(1), List(1.0))",1.0,"List(0, 11, List(1), List(1.0))","List(0, 26, List(2, 8, 18, 21, 22, 23, 24, 25), List(1.0, 1.0, 43.0, 7.0, 7.0, 2012.0, 8.0, 8.0))"
33_44,2012-01-09T00:00:00.000+0000,3,33,44,5.117993812416755,40.0,0.0,0.0,4.0,2012-01-01,8,2012,1,9,2,9,n,0.0,"List(0, 1, List(0), List(1.0))",0.0,"List(0, 6, List(0), List(1.0))",1.0,"List(0, 11, List(1), List(1.0))","List(0, 26, List(0, 1, 8, 18, 21, 22, 23, 24, 25), List(1.0, 1.0, 1.0, 40.0, 4.0, 8.0, 2012.0, 9.0, 9.0))"
33_44,2012-01-10T00:00:00.000+0000,3,33,44,5.963579343618446,41.0,0.0,0.0,5.0,2012-01-01,9,2012,1,10,3,10,n,0.0,"List(0, 1, List(0), List(1.0))",6.0,"List(0, 6, List(), List())",1.0,"List(0, 11, List(1), List(1.0))","List(0, 26, List(0, 8, 18, 21, 22, 23, 24, 25), List(1.0, 1.0, 41.0, 5.0, 9.0, 2012.0, 10.0, 10.0))"


### Random Forest

[Random forests](https://en.wikipedia.org/wiki/Random_forest) are an ensemble learning method for classification, regression and other tasks that operates by constructing a multitude of decision trees at training time and outputting the class that is the mode of the classes (classification) or mean prediction (regression) of the individual trees.Random decision forests correct for decision trees' habit of overfitting to their training set.

The first algorithm for random decision forests was created by Tin Kam Ho using the random subspace method, which, in Ho's formulation, is a way to implement the "stochastic discrimination" approach to classification proposed by Eugene Kleinberg.

An extension of the algorithm was developed by Leo Breiman and Adele Cutler who registered "Random Forests" as a trademark (as of 2019, owned by Minitab, Inc.). The extension combines Breiman's "bagging" idea and random selection of features, introduced first by Ho and later independently by Amit and Geman in order to construct a collection of decision trees with controlled variance.

![Random Forest](https://i.stack.imgur.com/iY55n.jpg)

- [Reference 1](https://en.wikipedia.org/wiki/Random_forest)
- [Reference 2](https://i.stack.imgur.com/iY55n.jpg)

In [33]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# Evaluate the model with rmse 
from pyspark.ml.evaluation import RegressionEvaluator

# Regression Evaluator
reg_evaluator = RegressionEvaluator(predictionCol = "prediction", labelCol = "log1_units", metricName = "rmse")

# Create an initial random forest classifier
rf = RandomForestRegressor(labelCol = "log1_units", featuresCol = "features")

param_grid = (ParamGridBuilder()
               .addGrid(rf.maxDepth, [2, 4, 6])
               .addGrid(rf.maxBins, [20, 60])
               .addGrid(rf.numTrees, [5, 20])
               .build())

# Create 10 fold Cross Validator
cv = CrossValidator(estimator = rf, estimatorParamMaps = param_grid, evaluator = reg_evaluator, numFolds = 10)

list_of_store_item_nbr = ["33_44", "30_44"]
rmse_cal = []
for sku in list_of_store_item_nbr:
  # Train the model on the training set 
  rf_cv_mod = cv.fit(walmrt_trn_trnsf.filter(F.col("store_nbr_item_nbr") == sku))
  # Predicting the price in the testing data-set using cross_validated random forest(rf_cv_mod)
  cv_predictions = rf_cv_mod.transform(walmrt_trn_trnsf.filter(F.col("store_nbr_item_nbr") == sku))
  # cvModel uses the best model found from the Cross Validation
  # Evaluate best model
  rmse_cal.append(reg_evaluator.evaluate(cv_predictions)) 

In [34]:
print(rmse_cal)

# Conclusion and Next Steps

I have used Random Forest to predict the sales of items at different store locations around the storms. The conclusion of this analysis has been about two main important issues. First, the forecasting problem that I am trying to solve and second, is the computational aspect of the problem. The prediction problem was majorly about cleaning the data and creating the right features to predict the sales units. This has been a challenging as well as a learning moment for me. 

The second issue - that is the major component, of the project has been about changing the programming paradigm and working in the spark dataframe. There has been a signification learning in that frontier where I not only learnt about the spark framework but also about the way the distributive computing works. 

The biggest challenge that I have faced in this project is to run the same model for all the 255 combination in parallel using native spark code and hence I had to pivot to the python way of doing things and I looped over two highest selling items rather than focussing on all the 255 items. My next step is to learn and implement the distributive way of doing the modelling in parallel.

# Exploratory Data Analysis

## Appendix

In [37]:
display(walmrt_trn_feature_eng)

station_nbr,date,store_nbr,item_nbr,units,tmax,tmin,tavg,depart,dewpoint,wetbulb,heat,cool,sunrise,sunset,codesum,snowfall,preciptotal,stnpressure,sealevel,resultspeed,resultdir,avgspeed,store_nbr_item_nbr,tavg_impute,depart_impute,preciptotal_impute,snowfall_impute,StartDateTime,days,year,month,day_of_month,day_of_week,day_of_year,is_weekend,log1_units
1,2012-01-01T00:00:00.000+0000,1,9,29,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_9,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,3.4011973816621555
1,2012-01-01T00:00:00.000+0000,1,28,2,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_28,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,1.0986122886681098
1,2012-01-01T00:00:00.000+0000,1,40,0,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_40,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0
1,2012-01-01T00:00:00.000+0000,1,47,0,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_47,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0
1,2012-01-01T00:00:00.000+0000,1,51,1,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_51,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.6931471805599453
1,2012-01-01T00:00:00.000+0000,1,89,0,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_89,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0
1,2012-01-01T00:00:00.000+0000,1,93,0,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_93,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0
1,2012-01-01T00:00:00.000+0000,1,99,0,52,31,42,M,36,40,23,0,-,-,RA FZFG BR,M,0.05,29.78,29.92,3.6,20,4.6,1_99,42,0,0.05,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0
14,2012-01-01T00:00:00.000+0000,2,5,191,50,34,42,5,25,35,23,0,0739,1729,,0.0,0.00,29.13,30.52,11.4,32,11.3,2_5,42,5,0.0,0.0,2012-01-01,0,2012,1,1,1,1,y,5.2574953720277815
14,2012-01-01T00:00:00.000+0000,2,11,0,50,34,42,5,25,35,23,0,0739,1729,,0.0,0.00,29.13,30.52,11.4,32,11.3,2_11,42,5,0.0,0.0,2012-01-01,0,2012,1,1,1,1,y,0.0


In [38]:
one_df_33 = walmrt_trn_feature_eng.filter((F.col("store_nbr") == 33) & (F.col("item_nbr").isin([44, 9])))

In [39]:
display(one_df_33)

station_nbr,date,store_nbr,item_nbr,units,tmax,tmin,tavg,depart,dewpoint,wetbulb,heat,cool,sunrise,sunset,codesum,snowfall,preciptotal,stnpressure,sealevel,resultspeed,resultdir,avgspeed,store_nbr_item_nbr,tavg_impute,depart_impute,preciptotal_impute,snowfall_impute,StartDateTime,days,year,month,day_of_month,day_of_week,day_of_year,is_weekend,log1_units
3,2012-01-01T00:00:00.000+0000,33,9,169,55,34,45,9,24,36,20,0,735,1720,,0.0,0.00,29.77,30.47,9.9,31,10.0,33_9,45,9,0.0,0.0,2012-01-01,0,2012,1,1,1,1,y,5.135798437050262
3,2012-01-01T00:00:00.000+0000,33,44,354,55,34,45,9,24,36,20,0,735,1720,,0.0,0.00,29.77,30.47,9.9,31,10.0,33_44,45,9,0.0,0.0,2012-01-01,0,2012,1,1,1,1,y,5.872117789475416
3,2012-01-02T00:00:00.000+0000,33,9,195,45,24,35,-1,11,28,30,0,735,1721,,0.0,0.00,29.89,30.63,7.7,32,8.2,33_9,35,-1,0.0,0.0,2012-01-01,1,2012,1,2,2,2,n,5.278114659230517
3,2012-01-02T00:00:00.000+0000,33,44,50,45,24,35,-1,11,28,30,0,735,1721,,0.0,0.00,29.89,30.63,7.7,32,8.2,33_44,35,-1,0.0,0.0,2012-01-01,1,2012,1,2,2,2,n,3.9318256327243257
3,2012-01-03T00:00:00.000+0000,33,9,182,55,21,38,2,19,32,27,0,735,1721,,0.0,0.00,29.55,30.34,11.6,17,11.8,33_9,38,2,0.0,0.0,2012-01-01,2,2012,1,3,3,3,n,5.209486152841421
3,2012-01-03T00:00:00.000+0000,33,44,178,55,21,38,2,19,32,27,0,735,1721,,0.0,0.00,29.55,30.34,11.6,17,11.8,33_44,38,2,0.0,0.0,2012-01-01,2,2012,1,3,3,3,n,5.187385805840755
3,2012-01-04T00:00:00.000+0000,33,9,70,63,31,47,11,26,37,18,0,735,1722,,0.0,0.00,29.53,30.27,2.5,31,4.9,33_9,47,11,0.0,0.0,2012-01-01,3,2012,1,4,4,4,n,4.262679877041316
3,2012-01-04T00:00:00.000+0000,33,44,185,63,31,47,11,26,37,18,0,735,1722,,0.0,0.00,29.53,30.27,2.5,31,4.9,33_44,47,11,0.0,0.0,2012-01-01,3,2012,1,4,4,4,n,5.225746673713202
3,2012-01-05T00:00:00.000+0000,33,9,343,68,31,50,14,28,41,15,0,735,1723,,0.0,0.00,29.35,30.11,7.3,19,7.6,33_9,50,14,0.0,0.0,2012-01-01,4,2012,1,5,5,5,n,5.840641657373398
3,2012-01-05T00:00:00.000+0000,33,44,357,68,31,50,14,28,41,15,0,735,1723,,0.0,0.00,29.35,30.11,7.3,19,7.6,33_44,50,14,0.0,0.0,2012-01-01,4,2012,1,5,5,5,n,5.8805329864007


In [40]:
display(one_df_33.select(F.col("units"), F.col("date"), F.col("item_nbr")).filter(F.col("item_nbr").isin([44, 9])))

units,date,item_nbr
169,2012-01-01T00:00:00.000+0000,9
354,2012-01-01T00:00:00.000+0000,44
195,2012-01-02T00:00:00.000+0000,9
50,2012-01-02T00:00:00.000+0000,44
182,2012-01-03T00:00:00.000+0000,9
178,2012-01-03T00:00:00.000+0000,44
70,2012-01-04T00:00:00.000+0000,9
185,2012-01-04T00:00:00.000+0000,44
343,2012-01-05T00:00:00.000+0000,9
357,2012-01-05T00:00:00.000+0000,44


In [41]:
display(one_df_33.select(F.col("units"), F.col("tavg_impute"), F.col("date"), F.col("item_nbr")).filter(F.col("item_nbr").isin([44, 9])))

units,tavg_impute,date,item_nbr
169,45,2012-01-01T00:00:00.000+0000,9
354,45,2012-01-01T00:00:00.000+0000,44
195,35,2012-01-02T00:00:00.000+0000,9
50,35,2012-01-02T00:00:00.000+0000,44
182,38,2012-01-03T00:00:00.000+0000,9
178,38,2012-01-03T00:00:00.000+0000,44
70,47,2012-01-04T00:00:00.000+0000,9
185,47,2012-01-04T00:00:00.000+0000,44
343,50,2012-01-05T00:00:00.000+0000,9
357,50,2012-01-05T00:00:00.000+0000,44


In [42]:
display(one_df_33.select(F.col("units"), F.col("is_weekend"), F.col("date"), F.col("item_nbr")).filter(F.col("item_nbr").isin([44, 9])))

units,is_weekend,date,item_nbr
169,y,2012-01-01T00:00:00.000+0000,9
354,y,2012-01-01T00:00:00.000+0000,44
195,n,2012-01-02T00:00:00.000+0000,9
50,n,2012-01-02T00:00:00.000+0000,44
182,n,2012-01-03T00:00:00.000+0000,9
178,n,2012-01-03T00:00:00.000+0000,44
70,n,2012-01-04T00:00:00.000+0000,9
185,n,2012-01-04T00:00:00.000+0000,44
343,n,2012-01-05T00:00:00.000+0000,9
357,n,2012-01-05T00:00:00.000+0000,44
