# Walmart Recruiting II: Sales in Stormy Weather With Spark

<img src="https://spark.apache.org/images/spark-logo-trademark.png" alt="Alt text that describes the graphic" title="Title text" />



This notebook will detail the same methods covered in the walmart_sales_forecasting notebook, but instead leveraging Spark to generate features and perform modeling.

# Notebook Contents
## Part I: Feature Generation
## Part II: Model Testing and Optimization
## Part III: Transforming Test Data
## Part IV: Generating Predictions
## Part V: Discussion of Results

# Part I: Feature Generation

In [1]:
# Import SparkSession
import pyspark.sql.functions
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import calendar
from pandas.tseries.holiday import USFederalHolidayCalendar
import pandas as pd
import numpy as np
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import datediff, to_date, lit, date_format
import datetime
from pyspark.sql.types import *
from pyspark.ml.feature import StandardScaler
from pyspark import SparkContext
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression

In [2]:
# Build the SparkSession
spark = SparkSession.builder \
   .master("local[*]") \
   .appName("Walmart Pyspark Implementation") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext
sc

In [3]:
df=spark.read.csv("datasets/train.csv",header=True)
print df.printSchema()
df.show()

root
 |-- date: string (nullable = true)
 |-- store_nbr: string (nullable = true)
 |-- item_nbr: string (nullable = true)
 |-- units: string (nullable = true)

None
+----------+---------+--------+-----+
|      date|store_nbr|item_nbr|units|
+----------+---------+--------+-----+
|2012-01-01|        1|       1|    0|
|2012-01-01|        1|       2|    0|
|2012-01-01|        1|       3|    0|
|2012-01-01|        1|       4|    0|
|2012-01-01|        1|       5|    0|
|2012-01-01|        1|       6|    0|
|2012-01-01|        1|       7|    0|
|2012-01-01|        1|       8|    0|
|2012-01-01|        1|       9|   29|
|2012-01-01|        1|      10|    0|
|2012-01-01|        1|      11|    0|
|2012-01-01|        1|      12|    0|
|2012-01-01|        1|      13|    0|
|2012-01-01|        1|      14|    0|
|2012-01-01|        1|      15|    0|
|2012-01-01|        1|      16|    0|
|2012-01-01|        1|      17|    0|
|2012-01-01|        1|      18|    0|
|2012-01-01|        1|      19|    0|

In [4]:
# extract year, month, day
split_col = pyspark.sql.functions.split(df['date'], '-')
df = df.withColumn('year', split_col.getItem(0))
df = df.withColumn('month', split_col.getItem(1))
df = df.withColumn('day', split_col.getItem(2))
# extract day of year
df = df.withColumn('day_of_year', pyspark.sql.functions.dayofyear(df['date']))
# extract day of cycle
first_date=df.agg({"date": "min"}).collect()[0][0]
df = df.withColumn("day_of_cycle", 
              datediff(to_date("date","yyyy-MM-dd"),to_date(lit(first_date))))
# extract day of week
df=df.withColumn("day_of_week", 
              date_format('date', 'u').alias('day_of_week'))
# extract weekend
df=df.withColumn("is_weekend", \
                 pyspark.sql.functions.when(df.day_of_week == 7, 1).when(df.day_of_week == 6, 1).otherwise(0))

In [5]:
# create dataframe of holidays
cal = USFederalHolidayCalendar()
more_holidays=['01/01/2012','04/08/2012','03/31/2013','04/20/2014','12/24/2012','12/24/2013','12/24/2014']
holidays_df=pd.concat([cal.holidays(start='2011-12-31', end='2014-12-31').to_frame().reset_index(drop=True),pd.DataFrame(more_holidays)]).apply(lambda x: pd.to_datetime(x)).reset_index(drop=True)
holidays_df=holidays_df.iloc[1:,:].sort_values([0]).reset_index(drop=True)
holidays_set=set(holidays_df[0].apply(lambda x: x.strftime('%Y-%m-%d')).tolist())

# is holiday
df=df.withColumn("is_holiday", \
                 pyspark.sql.functions.when(df.date.isin(holidays_set), 1).otherwise(0))

#within one day of holiday but not on holiday
holiday_1day=set(pd.concat([holidays_df.apply(lambda x: x + datetime.timedelta(days=1)),\
                            holidays_df.apply(lambda x: x + datetime.timedelta(days=-1))])[0]\
                 .apply(lambda x: x.strftime('%Y-%m-%d')).tolist())

df=df.withColumn("1d_away_holiday", \
                 pyspark.sql.functions.when(df.date.isin(holiday_1day), 1).otherwise(0))

#within two days of holiday but not on holiday
holiday_1day.update(pd.concat([holidays_df.apply(lambda x: x + datetime.timedelta(days=2)),\
                               holidays_df.apply(lambda x: x + datetime.timedelta(days=-2))])[0]\
                    .apply(lambda x: x.strftime('%Y-%m-%d')).tolist())

df=df.withColumn("2d_away_holiday", \
                 pyspark.sql.functions.when(df.date.isin(holiday_1day), 1).otherwise(0))

#within three days of holiday but not on holiday
holiday_1day.update(pd.concat([holidays_df.apply(lambda x: x + datetime.timedelta(days=3)),\
                               holidays_df.apply(lambda x: x + datetime.timedelta(days=-3))])[0]\
                    .apply(lambda x: x.strftime('%Y-%m-%d')).tolist())

df=df.withColumn("3d_away_holiday", \
                 pyspark.sql.functions.when(df.date.isin(holiday_1day), 1).otherwise(0))

#within seven days of holiday but not on holiday
holiday_1day.update(pd.concat([holidays_df.apply(lambda x: x + datetime.timedelta(days=7)),\
                               holidays_df.apply(lambda x: x + datetime.timedelta(days=-7))])[0]\
                    .apply(lambda x: x.strftime('%Y-%m-%d')).tolist())

df=df.withColumn("7d_away_holiday", \
                 pyspark.sql.functions.when(df.date.isin(holiday_1day), 1).otherwise(0))

In [14]:
# convert numerical columns to numerical and categorical to string
# Import all from `sql.types`

from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
cat_cols = ["store_nbr", "item_nbr",'is_weekend','is_holiday','1d_away_holiday','2d_away_holiday','3d_away_holiday','7d_away_holiday']
num_cols = ["year", "month", "day", "day_of_year",'day_of_cycle','day_of_week']
target_feat = ['units']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, num_cols, FloatType())
df = convertColumn(df, cat_cols, StringType())
df = convertColumn(df, target_feat, FloatType())

# log transform target var: units
df=df.withColumn("units_log", \
              pyspark.sql.functions.log(df.units+1))

print "dataframe types after transform"
df.dtypes


dataframe types after transform


[('date', 'string'),
 ('store_nbr', 'string'),
 ('item_nbr', 'string'),
 ('units', 'float'),
 ('year', 'float'),
 ('month', 'float'),
 ('day', 'float'),
 ('day_of_year', 'float'),
 ('day_of_cycle', 'float'),
 ('day_of_week', 'float'),
 ('is_weekend', 'string'),
 ('is_holiday', 'string'),
 ('1d_away_holiday', 'string'),
 ('2d_away_holiday', 'string'),
 ('3d_away_holiday', 'string'),
 ('7d_away_holiday', 'string'),
 ('units_log', 'double'),
 ('store_nbr_indexed', 'double'),
 ('item_nbr_indexed', 'double'),
 ('is_weekend_indexed', 'double'),
 ('is_holiday_indexed', 'double'),
 ('1d_away_holiday_indexed', 'double'),
 ('2d_away_holiday_indexed', 'double'),
 ('3d_away_holiday_indexed', 'double'),
 ('7d_away_holiday_indexed', 'double'),
 ('store_nbr_indexed_encoded', 'double'),
 ('item_nbr_indexed_encoded', 'double'),
 ('is_weekend_indexed_encoded', 'double'),
 ('is_holiday_indexed_encoded', 'double'),
 ('1d_away_holiday_indexed_encoded', 'double'),
 ('2d_away_holiday_indexed_encoded', 'doubl

In [7]:
# ensure that transformation was successful
df.select('units','units_log').show(20)

+-----+------------------+
|units|         units_log|
+-----+------------------+
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
| 29.0|3.4011973816621555|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
|  0.0|               0.0|
+-----+------------------+
only showing top 20 rows



In [8]:
# https://stackoverflow.com/questions/49853188/create-features-column-in-pyspark-with-both-numerical-and-categorical-variable?noredirect=1&lq=1
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler


# categorical pipeline
indexers = [StringIndexer(inputCol = c, outputCol="{0}_indexed".format(c)) for c in cat_cols]

encoders = [StringIndexer(inputCol = indexer.getOutputCol(), outputCol = "{0}_encoded".format(indexer.getOutputCol())) 
for indexer in indexers]

assemblerCat = VectorAssembler(inputCols = [encoder.getOutputCol() for encoder in encoders], outputCol = "cat")

pipelineCat = Pipeline(stages = indexers + encoders + [assemblerCat])
df = pipelineCat.fit(df).transform(df)

#numerical pipeline
assemblerNum = VectorAssembler(inputCols = num_cols, outputCol = "num")

pipelineNum = Pipeline(stages = [assemblerNum])
df = pipelineNum.fit(df).transform(df)

assembler = VectorAssembler(inputCols = ["cat", "num"], outputCol = "features_not_scaled")

pipeline = Pipeline(stages = [assembler])
df_final = pipeline.fit(df).transform(df)

In [9]:
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="num", outputCol="num_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df_final)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df_final)

assembler = VectorAssembler(inputCols = ["cat", "num_scaled"], outputCol = "features")

pipeline = Pipeline(stages = [assembler])
df_complete = pipeline.fit(scaled_df).transform(scaled_df)

In [10]:
# assemble final training vector
# Re-order and select columns
df_complete1 = df_complete.select("units_log", 'features')
df_complete1.show(1)

+---------+--------------------+
|units_log|            features|
+---------+--------------------+
|      0.0|[18.0,52.0,1.0,1....|
+---------+--------------------+
only showing top 1 row



In [11]:
df_complete1.first()

Row(units_log=0.0, features=DenseVector([18.0, 52.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 2499.6846, 0.2956, 0.114, 0.0097, 0.0, 3.4973]))

# Part II: Model Testing and Optimization
A linear regression model will be tested against a random forest model. The winning model of the previous notebook returned a 0.435944 RMSE score on the training set, so that will be the standard that these models will be compared with.

In [12]:
# Import `LinearRegression`
# Initialize `lr`
lr = LinearRegression(featuresCol="features",labelCol="units_log", predictionCol="units_pred", maxIter=10, regParam=0.3)

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.2,0.4,0.5]) \
    .build()

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(labelCol="units_log", predictionCol="units_pred"),
                          numFolds=3)

#model=crossval.fit(df_complete1)

In [13]:
# Split the data into train and test sets
train_data, test_data = df_complete1.randomSplit([.8,.2],seed=1234)

model1=crossval.fit(train_data)

predicted = model1.bestModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("units_pred").rdd.map(lambda x: x[0])
labels = predicted.select("units_log").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(0.10077853022075889, 0.0),
 (0.1009473193743311, 0.0),
 (0.0992823995591543, 0.0),
 (0.10093760265621032, 0.0),
 (0.10078321560223102, 0.0)]

In [14]:
model1.bestModel.summary.rootMeanSquaredError

0.5135913940449095

In [13]:
train_data, test_data = df_complete1.randomSplit([.8,.2],seed=1234)
rf = RandomForestRegressor(featuresCol="features",labelCol="units_log", \
                           predictionCol="units_pred_rf", maxDepth=10, numTrees=20,maxBins=120)

paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10,30,40]) \
    .addGrid(rf.maxDepth, [5,10]) \
    .build()

crossval_rf = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid_rf,
                          evaluator= RegressionEvaluator(labelCol="units_log", predictionCol="units_pred_rf"),
                          numFolds=5)

model_rf=crossval_rf.fit(train_data)


In [16]:
predicted_rf = model_rf.bestModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions_rf = predicted_rf.select("units_pred_rf").rdd.map(lambda x: x[0])
labels_rf = predicted_rf.select("units_log").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel_rf = predictions_rf.zip(labels_rf).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel_rf[:5]

[(0.004809465615159759, 0.0),
 (0.010967348988976163, 0.0),
 (0.004693017123876308, 0.0),
 (0.007607799472912463, 0.0),
 (0.009039187172225157, 0.0)]

In [33]:
# Select (prediction, true label) and compute test error
evaluator = RegressionEvaluator(
    labelCol="units_log", predictionCol="units_pred_rf", metricName="rmse")

rmse = evaluator.evaluate(predicted_rf)
print("Root Mean Squared Error (RMSE) on training set = %g" % rmse)

Root Mean Squared Error (RMSE) on training set = 0.284088


The results of the RandomForestRegressor model are promising. 

# Part III: Transforming Test Data

In [41]:
test_df=spark.read.csv("datasets/test.csv",header=True)
sub_df=spark.read.csv("datasets/sampleSubmission.csv",header=True)
print test_df.printSchema()
test_df.show()

root
 |-- date: string (nullable = true)
 |-- store_nbr: string (nullable = true)
 |-- item_nbr: string (nullable = true)

None
+----------+---------+--------+
|      date|store_nbr|item_nbr|
+----------+---------+--------+
|2013-04-01|        2|       1|
|2013-04-01|        2|       2|
|2013-04-01|        2|       3|
|2013-04-01|        2|       4|
|2013-04-01|        2|       5|
|2013-04-01|        2|       6|
|2013-04-01|        2|       7|
|2013-04-01|        2|       8|
|2013-04-01|        2|       9|
|2013-04-01|        2|      10|
|2013-04-01|        2|      11|
|2013-04-01|        2|      12|
|2013-04-01|        2|      13|
|2013-04-01|        2|      14|
|2013-04-01|        2|      15|
|2013-04-01|        2|      16|
|2013-04-01|        2|      17|
|2013-04-01|        2|      18|
|2013-04-01|        2|      19|
|2013-04-01|        2|      20|
+----------+---------+--------+
only showing top 20 rows



In [61]:
sub_df.show(3)

+--------------+-----+
|            id|units|
+--------------+-----+
|2_1_2013-04-01|    0|
|2_2_2013-04-01|    0|
|2_3_2013-04-01|    0|
+--------------+-----+
only showing top 3 rows



In [60]:
# extract year, month, day
split_col = pyspark.sql.functions.split(test_df['date'], '-')
test_df = test_df.withColumn('year', split_col.getItem(0))
test_df = test_df.withColumn('month', split_col.getItem(1))
test_df = test_df.withColumn('day', split_col.getItem(2))
# extract day of year
test_df = test_df.withColumn('day_of_year', pyspark.sql.functions.dayofyear(test_df['date']))
# extract day of cycle
first_date=test_df.agg({"date": "min"}).collect()[0][0]
test_df = test_df.withColumn("day_of_cycle", 
              datediff(to_date("date","yyyy-MM-dd"),to_date(lit(first_date))))
# extract day of week
test_df=test_df.withColumn("day_of_week", 
              date_format('date', 'u').alias('day_of_week'))
# extract weekend
test_df=test_df.withColumn("is_weekend", \
                 pyspark.sql.functions.when(test_df.day_of_week == 7, 1).when(test_df.day_of_week == 6, 1).otherwise(0))

In [45]:
# create dataframe of holidays
cal = USFederalHolidayCalendar()
more_holidays=['01/01/2012','04/08/2012','03/31/2013','04/20/2014','12/24/2012','12/24/2013','12/24/2014']
holidays_df=pd.concat([cal.holidays(start='2011-12-31', end='2014-12-31').to_frame().reset_index(drop=True),pd.DataFrame(more_holidays)]).apply(lambda x: pd.to_datetime(x)).reset_index(drop=True)
holidays_df=holidays_df.iloc[1:,:].sort_values([0]).reset_index(drop=True)
holidays_set=set(holidays_df[0].apply(lambda x: x.strftime('%Y-%m-%d')).tolist())


# is holiday
test_df=test_df.withColumn("is_holiday", \
                 pyspark.sql.functions.when(test_df.date.isin(holidays_set), 1).otherwise(0))

#within one day of holiday but not on holiday
holiday_1day=set(pd.concat([holidays_df.apply(lambda x: x + datetime.timedelta(days=1)),\
                            holidays_df.apply(lambda x: x + datetime.timedelta(days=-1))])[0]\
                 .apply(lambda x: x.strftime('%Y-%m-%d')).tolist())

test_df=test_df.withColumn("1d_away_holiday", \
                 pyspark.sql.functions.when(test_df.date.isin(holiday_1day), 1).otherwise(0))

#within two days of holiday but not on holiday
holiday_1day.update(pd.concat([holidays_df.apply(lambda x: x + datetime.timedelta(days=2)),\
                               holidays_df.apply(lambda x: x + datetime.timedelta(days=-2))])[0]\
                    .apply(lambda x: x.strftime('%Y-%m-%d')).tolist())

test_df=test_df.withColumn("2d_away_holiday", \
                 pyspark.sql.functions.when(test_df.date.isin(holiday_1day), 1).otherwise(0))

#within three days of holiday but not on holiday
holiday_1day.update(pd.concat([holidays_df.apply(lambda x: x + datetime.timedelta(days=3)),\
                               holidays_df.apply(lambda x: x + datetime.timedelta(days=-3))])[0]\
                    .apply(lambda x: x.strftime('%Y-%m-%d')).tolist())

test_df=test_df.withColumn("3d_away_holiday", \
                 pyspark.sql.functions.when(test_df.date.isin(holiday_1day), 1).otherwise(0))

#within seven days of holiday but not on holiday
holiday_1day.update(pd.concat([holidays_df.apply(lambda x: x + datetime.timedelta(days=7)),\
                               holidays_df.apply(lambda x: x + datetime.timedelta(days=-7))])[0]\
                    .apply(lambda x: x.strftime('%Y-%m-%d')).tolist())

test_df=test_df.withColumn("7d_away_holiday", \
                 pyspark.sql.functions.when(test_df.date.isin(holiday_1day), 1).otherwise(0))

In [52]:
# convert numerical columns to numerical and categorical to string
# Import all from `sql.types`

from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(test_df, names, newType):
  for name in names: 
     test_df = test_df.withColumn(name, test_df[name].cast(newType))
  return test_df 

# Assign all column names to `columns`
cat_cols = ["store_nbr", "item_nbr",'is_weekend','is_holiday','1d_away_holiday','2d_away_holiday','3d_away_holiday','7d_away_holiday']
num_cols = ["year", "month", "day", "day_of_year",'day_of_cycle','day_of_week']

# Conver the `test_df` columns to `FloatType()`
test_df = convertColumn(test_df, num_cols, FloatType())
test_df = convertColumn(test_df, cat_cols, StringType())

print "dataframe types after transform"
test_df.dtypes


dataframe types after transform


[('date', 'string'),
 ('store_nbr', 'string'),
 ('item_nbr', 'string'),
 ('year', 'float'),
 ('month', 'float'),
 ('day', 'float'),
 ('day_of_year', 'float'),
 ('day_of_cycle', 'float'),
 ('day_of_week', 'float'),
 ('is_weekend', 'string'),
 ('is_holiday', 'string'),
 ('1d_away_holiday', 'string'),
 ('2d_away_holiday', 'string'),
 ('3d_away_holiday', 'string'),
 ('7d_away_holiday', 'string')]

In [53]:
# https://stackoverflow.com/questions/49853188/create-features-column-in-pyspark-with-both-numerical-and-categorical-variable?noredirect=1&lq=1
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StandardScaler


# categorical pipeline
indexers = [StringIndexer(inputCol = c, outputCol="{0}_indexed".format(c)) for c in cat_cols]

encoders = [StringIndexer(inputCol = indexer.getOutputCol(), outputCol = "{0}_encoded".format(indexer.getOutputCol())) 
for indexer in indexers]

assemblerCat = VectorAssembler(inputCols = [encoder.getOutputCol() for encoder in encoders], outputCol = "cat")

pipelineCat = Pipeline(stages = indexers + encoders + [assemblerCat])
test_df = pipelineCat.fit(test_df).transform(test_df)

#numerical pipeline
assemblerNum = VectorAssembler(inputCols = num_cols, outputCol = "num")

pipelineNum = Pipeline(stages = [assemblerNum])
test_df = pipelineNum.fit(test_df).transform(test_df)

assembler = VectorAssembler(inputCols = ["cat", "num"], outputCol = "features_not_scaled")

pipeline = Pipeline(stages = [assembler])
test_df = pipeline.fit(test_df).transform(test_df)
test_df.take(2)

[Row(date=u'2013-04-01', store_nbr=u'2', item_nbr=u'1', year=2013.0, month=4.0, day=1.0, day_of_year=91.0, day_of_cycle=0.0, day_of_week=1.0, is_weekend=u'0', is_holiday=u'0', 1d_away_holiday=u'1', 2d_away_holiday=u'1', 3d_away_holiday=u'1', 7d_away_holiday=u'1', store_nbr_indexed=4.0, item_nbr_indexed=69.0, is_weekend_indexed=0.0, is_holiday_indexed=0.0, 1d_away_holiday_indexed=1.0, 2d_away_holiday_indexed=1.0, 3d_away_holiday_indexed=1.0, 7d_away_holiday_indexed=1.0, store_nbr_indexed_encoded=4.0, item_nbr_indexed_encoded=52.0, is_weekend_indexed_encoded=0.0, is_holiday_indexed_encoded=0.0, 1d_away_holiday_indexed_encoded=1.0, 2d_away_holiday_indexed_encoded=1.0, 3d_away_holiday_indexed_encoded=1.0, 7d_away_holiday_indexed_encoded=1.0, cat=DenseVector([4.0, 52.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0]), num=DenseVector([2013.0, 4.0, 1.0, 91.0, 0.0, 1.0]), features_not_scaled=DenseVector([4.0, 52.0, 0.0, 0.0, 1.0, 1.0, 1.0, 1.0, 2013.0, 4.0, 1.0, 91.0, 0.0, 1.0])),
 Row(date=u'2013-04-01', sto

In [54]:
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="num", outputCol="num_scaled")

# Fit the DataFrame to the scaler
scaler_test = standardScaler.fit(test_df)

# Transform the data in `df` with the scaler
test_df = scaler_test.transform(test_df)

assembler = VectorAssembler(inputCols = ["cat", "num_scaled"], outputCol = "features")

pipeline = Pipeline(stages = [assembler])
test_df = pipeline.fit(test_df).transform(test_df)

In [58]:
test_complete = test_df.select('features')
test_complete.show(1)

+--------------------+
|            features|
+--------------------+
|[4.0,52.0,0.0,0.0...|
+--------------------+
only showing top 1 row



# Part IV: Generating Predictions

In [108]:
test_predicted_rf = model_rf.bestModel.transform(test_complete)
test_predicted_rf = test_predicted_rf.withColumn("actual_units", pyspark.sql.functions.expm1(test_predicted_rf.units_pred_rf))
test_predicted_rf.show(10)

+--------------------+--------------------+--------------------+
|            features|       units_pred_rf|        actual_units|
+--------------------+--------------------+--------------------+
|[4.0,52.0,0.0,0.0...|0.009658993904179295|0.009705792540354115|
|[4.0,73.0,0.0,0.0...|0.031625026901596034|0.032130411596397096|
|[4.0,106.0,0.0,0....|0.009658993904179295|0.009705792540354115|
|[4.0,75.0,0.0,0.0...|0.009139632375107622|0.009181526349657524|
|[4.0,32.0,0.0,0.0...|   3.166398091154888|   22.72188621883388|
|[4.0,37.0,0.0,0.0...|0.005222504277240718|0.005236165323975045|
|[4.0,2.0,0.0,0.0,...|0.009658993904179295|0.009705792540354115|
|[4.0,50.0,0.0,0.0...|0.006321803590176853|0.006341828365819665|
|[4.0,88.0,0.0,0.0...|  0.9217504015089155|   1.513686501979287|
|[4.0,53.0,0.0,0.0...|0.014041481872560522|0.014140526514044726|
+--------------------+--------------------+--------------------+
only showing top 10 rows



In [98]:
# list of ids and matching ids
test_predictions=test_predicted_rf.select('actual_units').rdd.flatMap(lambda x: x).collect()
sub_ids=sub_df.select('id').rdd.flatMap(lambda x: x).collect()

test_predictions_export_df=spark.createDataFrame(zip(sub_ids,test_predictions) , ["id", "units"])

In [104]:
test_predictions_export_df.toPandas().to_csv('submissions/pyspark_sub.csv',index=False)
test_predictions_export_df.show()

+---------------+--------------------+
|             id|               units|
+---------------+--------------------+
| 2_1_2013-04-01|0.009705792540354115|
| 2_2_2013-04-01|0.032130411596397096|
| 2_3_2013-04-01|0.009705792540354115|
| 2_4_2013-04-01|0.009181526349657524|
| 2_5_2013-04-01|   22.72188621883388|
| 2_6_2013-04-01|0.005236165323975045|
| 2_7_2013-04-01|0.009705792540354115|
| 2_8_2013-04-01|0.006341828365819665|
| 2_9_2013-04-01|   1.513686501979287|
|2_10_2013-04-01|0.014140526514044726|
|2_11_2013-04-01|0.007437953008162322|
|2_12_2013-04-01|0.014097478746625642|
|2_13_2013-04-01|0.009705792540354115|
|2_14_2013-04-01|0.009181526349657524|
|2_15_2013-04-01|  0.1291213063078504|
|2_16_2013-04-01|0.013961469509229968|
|2_17_2013-04-01| 0.07204212757354697|
|2_18_2013-04-01|0.007437953008162322|
|2_19_2013-04-01|0.009181526349657524|
|2_20_2013-04-01|0.009705792540354115|
+---------------+--------------------+
only showing top 20 rows



In [106]:
pd.read_csv('submissions/pyspark_sub.csv').describe()

Unnamed: 0,units
count,526917.0
mean,0.315809
std,3.012295
min,0.0
25%,0.005566
50%,0.009217
75%,0.017443
max,102.73276


In [None]:
spark.stop()

# Part V: Discussion of Results
When uploaded to kaggle, the random forest model returns a 0.51073 RMSLE score, worse compared to the sub .10 RMSLE score achieved with xgboost. However, a caveat of using the pyspark technique is that I used the entire dataset to generate predictions on the test data, whereas the xgboost score was achieved by generating predictions on only items that had historical sales and filling 0 sales for items that had none. This approach was taken as training on the entire dataset led to memory capacity errors, which is why Spark is favored to handling massive datasets. I suspect that if most of the dataset were not 0's, the Spark approach would beat out the traditional approach levearging pandas and sklearn libraries.