## Initiating Spark and a Spark Session

In [1]:
# let us first install Pyspark in Jupyter notebook

#import pysark and check version

import pyspark

pyspark.__version__

'2.4.5'

In [2]:
# Create a SparkSession to interact with Spark

from pyspark.sql import SparkSession

# Create a local cluster using a SparkSession builder and let us all cores of our machine

spark = SparkSession.builder \
.master('local[*]') \
.appName('House Prices in Spark') \
.getOrCreate()

# before we start, let us set the spark configuration to make output more like pandas and less like SQL

# see this in https://towardsdatascience.com/data-prep-with-spark-dataframes-3629478a1041

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

# Close connection to Spark

#spark.stop()

spark


## Reading data from CSV into Spark DataFrame

In [3]:
# import train data as a Spark DataFrame

# we will specify header = True which will take the first row as header

# we also ask Spark  to assume the datatype of each column with the inferSchema command

# also a good idea to tell spark how null values in data are represented, which in this case is 'NA'

df_train = spark.read.csv(r'C:\Users\Arun\Desktop\Kaggle Competitions\House-Prices-PySpark\house-prices-advanced-regression-techniques\train.csv', header=True, inferSchema=True, nullValue='NA')

# show first 5 records , similar to pandas head method

df_train.show(5)

+---+----------+--------+-----------+-------+------+-----+--------+-----------+---------+---------+---------+------------+----------+----------+--------+----------+-----------+-----------+---------+------------+---------+--------+-----------+-----------+----------+----------+---------+---------+----------+--------+--------+------------+------------+----------+------------+----------+---------+-----------+-------+---------+----------+----------+--------+--------+------------+---------+------------+------------+--------+--------+------------+------------+-----------+------------+----------+----------+-----------+----------+-----------+------------+----------+----------+----------+----------+----------+----------+-----------+-------------+---------+-----------+--------+------+-----+-----------+-------+------+------+--------+-------------+---------+
| Id|MSSubClass|MSZoning|LotFrontage|LotArea|Street|Alley|LotShape|LandContour|Utilities|LotConfig|LandSlope|Neighborhood|Condition1|Condition

In [4]:
# PySpark views as you can see can be quite dismal

# one thing, we can do is to use the getpandas function and visualise

df_train.toPandas().head()

Unnamed: 0,Id,MSSubClass,MSZoning,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,Utilities,...,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition,SalePrice
0,1,60,RL,65.0,8450,Pave,,Reg,Lvl,AllPub,...,0,,,,0,2,2008,WD,Normal,208500
1,2,20,RL,80.0,9600,Pave,,Reg,Lvl,AllPub,...,0,,,,0,5,2007,WD,Normal,181500
2,3,60,RL,68.0,11250,Pave,,IR1,Lvl,AllPub,...,0,,,,0,9,2008,WD,Normal,223500
3,4,70,RL,60.0,9550,Pave,,IR1,Lvl,AllPub,...,0,,,,0,2,2006,WD,Abnorml,140000
4,5,60,RL,84.0,14260,Pave,,IR1,Lvl,AllPub,...,0,,,,0,12,2008,WD,Normal,250000


In [5]:
# use print schema in pyspark to check the info of the data

df_train.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = true)
 |-- LotFrontage: integer (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = true)
 |-- Alley: string (nullable = true)
 |-- LotShape: string (nullable = true)
 |-- LandContour: string (nullable = true)
 |-- Utilities: string (nullable = true)
 |-- LotConfig: string (nullable = true)
 |-- LandSlope: string (nullable = true)
 |-- Neighborhood: string (nullable = true)
 |-- Condition1: string (nullable = true)
 |-- Condition2: string (nullable = true)
 |-- BldgType: string (nullable = true)
 |-- HouseStyle: string (nullable = true)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = true)
 |-- RoofMatl: string (nullable = true)
 |-- Exterior1st: string (nullable = true)
 |-

In [6]:
# to find no of null values across each column in pyspark we are going to take a detour with pandas

# let us follow below method

from pyspark.sql.functions import when, count, col

null_columns = df_train.select([count(when(col(c).isNull(), c)).alias(c) for c in df_train.columns]).toPandas()

# this converts to a pandas dataframe wow!

null_columns.head()


Unnamed: 0,Id,MSSubClass,MSZoning,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,Utilities,...,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition,SalePrice
0,0,0,0,259,0,0,1369,0,0,0,...,0,1453,1179,1406,0,0,0,0,0,0


In [7]:
# let us use the pandas melt method to provide a better view 

# as we are only looking for null columns, let us filter out other columns

import pandas as pd

null_columns3 = pd.melt(null_columns, var_name='Column', value_name = 'Null count')

null_columns5 = null_columns3[null_columns3['Null count']>0].sort_values(by='Null count', ascending=False)

null_columns5

Unnamed: 0,Column,Null count
72,PoolQC,1453
74,MiscFeature,1406
6,Alley,1369
73,Fence,1179
57,FireplaceQu,690
3,LotFrontage,259
58,GarageType,81
59,GarageYrBlt,81
60,GarageFinish,81
63,GarageQual,81


## Imputing missing values in Categorical columns- PySpark

We know that there are quite a few missing values in numerical and categorical data columns with the training data

We will need to define an imputation strategy which works as far as possible within a Pipeline.

This won't be 100% smooth as in Scikit learn Pipelines as PySpark's Imputer cannot deal with categorical value and there in column transformer to write this in one line of code

In [8]:
# first we will create a defualt dict which is a dictionary splitting each column in the dataframe by it's data type

# https://stackoverflow.com/questions/5900578/how-does-collections-defaultdict-work

from collections import defaultdict

data_types = defaultdict(list)
for entry in df_train.schema.fields:
 data_types[str(entry.dataType)].append(entry.name)


In [9]:
# let us see output of schema.fields

df_train.schema.fields

[StructField(Id,IntegerType,true),
 StructField(MSSubClass,IntegerType,true),
 StructField(MSZoning,StringType,true),
 StructField(LotFrontage,IntegerType,true),
 StructField(LotArea,IntegerType,true),
 StructField(Street,StringType,true),
 StructField(Alley,StringType,true),
 StructField(LotShape,StringType,true),
 StructField(LandContour,StringType,true),
 StructField(Utilities,StringType,true),
 StructField(LotConfig,StringType,true),
 StructField(LandSlope,StringType,true),
 StructField(Neighborhood,StringType,true),
 StructField(Condition1,StringType,true),
 StructField(Condition2,StringType,true),
 StructField(BldgType,StringType,true),
 StructField(HouseStyle,StringType,true),
 StructField(OverallQual,IntegerType,true),
 StructField(OverallCond,IntegerType,true),
 StructField(YearBuilt,IntegerType,true),
 StructField(YearRemodAdd,IntegerType,true),
 StructField(RoofStyle,StringType,true),
 StructField(RoofMatl,StringType,true),
 StructField(Exterior1st,StringType,true),
 StructF

In [10]:
# let us try the categorical imputation method tried in this well written article below

# where null values in categorical columns as just imputed with 'missing'

# http://people.stat.sc.edu/haigang/sparkCaseStudy.html

strings_used = [var for var in data_types["StringType"]]

missing_data_fill = {}
 
for var in strings_used:
 missing_data_fill[var] = "missing"

df_train = df_train.fillna(missing_data_fill)


In [11]:
# we know 'GarageCond' was a categorical column

# let us examine if the null values have been removed

df_train.filter('GarageCond IS NULL').count()

0

That's amazing! We have been able to impute all categorical values!

## String Indexer and One hot encoding in PySpark

Now we have to one hot encode categorical columns

In Pyspark this is done in two stages, a String Indexer and then a One Hot Encoder. First a string indexer will convert all categories to a numerical values.We will then one-hot encode these columns. Again, things are slightly different in PySpark. A string indexer can only take one column at a time. And this is a problem as in most instances, we will have more than a single categorical column to one hot encode. The best way is to combine several StringIndex and One Hot Encoders on a list and use a Pipeline to execute them all.

Please refer https://stackoverflow.com/questions/36942233/apply-stringindexer-to-several-columns-in-a-pyspark-dataframe and

http://people.stat.sc.edu/haigang/sparkCaseStudy.html


In [12]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer

indexers = [StringIndexer(inputCol=c, outputCol= c+"string_encoded") for c in strings_used]

# note that strings_used contains list of all columns which are categories

# let us apply PySpark one hot encoding on the output of the String Indexer

onehot = [OneHotEncoder(inputCol= c+"string_encoded", outputCol= c+ "onehot") for c in strings_used]

from pyspark.ml import Pipeline

categorical_pipeline = Pipeline(stages=indexers + onehot)

# train_testing = categorical_pipeline.fit(df_train).transform(df_train)

# it seems even with OneHotEncoderEstimator the input has to be numeric! so string indexer has to be a pre-requisite


## Imputing missing values in numerical columns - PySpark

In [13]:
# we want to use the PySpark Imputer class for Imputing missing numerical values

# But Imputer's can only deal with double datatype which is nothing but double precision floats

# let us convert all int types to double type

for c in data_types["IntegerType"]:
    df_train = df_train.withColumn(c+ "_cast_to_double", df_train[c].cast("double"))

# see if this has worked

df_train.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- MSSubClass: integer (nullable = true)
 |-- MSZoning: string (nullable = false)
 |-- LotFrontage: integer (nullable = true)
 |-- LotArea: integer (nullable = true)
 |-- Street: string (nullable = false)
 |-- Alley: string (nullable = false)
 |-- LotShape: string (nullable = false)
 |-- LandContour: string (nullable = false)
 |-- Utilities: string (nullable = false)
 |-- LotConfig: string (nullable = false)
 |-- LandSlope: string (nullable = false)
 |-- Neighborhood: string (nullable = false)
 |-- Condition1: string (nullable = false)
 |-- Condition2: string (nullable = false)
 |-- BldgType: string (nullable = false)
 |-- HouseStyle: string (nullable = false)
 |-- OverallQual: integer (nullable = true)
 |-- OverallCond: integer (nullable = true)
 |-- YearBuilt: integer (nullable = true)
 |-- YearRemodAdd: integer (nullable = true)
 |-- RoofStyle: string (nullable = false)
 |-- RoofMatl: string (nullable = false)
 |-- Exterior1st: string (nulla

In [14]:
# one problem though is that the above script will duplicate each numerical columns as int and double

# let us just select columns which are cast to double

cast_vars = [var for var in  df_train.columns if var.endswith("_cast_to_double")]

# let us also remove sales price feature which was cast to double

# we will keep the int- sales price column as this we will label as our label col in the ML model

cast_vars.remove('SalePrice_cast_to_double')

# we also need to define the output columns

numericals_imputed  = [var+ "imputed" for var in cast_vars]


In [15]:
# let us see what cast_vars represent

cast_vars

['Id_cast_to_double',
 'MSSubClass_cast_to_double',
 'LotFrontage_cast_to_double',
 'LotArea_cast_to_double',
 'OverallQual_cast_to_double',
 'OverallCond_cast_to_double',
 'YearBuilt_cast_to_double',
 'YearRemodAdd_cast_to_double',
 'MasVnrArea_cast_to_double',
 'BsmtFinSF1_cast_to_double',
 'BsmtFinSF2_cast_to_double',
 'BsmtUnfSF_cast_to_double',
 'TotalBsmtSF_cast_to_double',
 '1stFlrSF_cast_to_double',
 '2ndFlrSF_cast_to_double',
 'LowQualFinSF_cast_to_double',
 'GrLivArea_cast_to_double',
 'BsmtFullBath_cast_to_double',
 'BsmtHalfBath_cast_to_double',
 'FullBath_cast_to_double',
 'HalfBath_cast_to_double',
 'BedroomAbvGr_cast_to_double',
 'KitchenAbvGr_cast_to_double',
 'TotRmsAbvGrd_cast_to_double',
 'Fireplaces_cast_to_double',
 'GarageYrBlt_cast_to_double',
 'GarageCars_cast_to_double',
 'GarageArea_cast_to_double',
 'WoodDeckSF_cast_to_double',
 'OpenPorchSF_cast_to_double',
 'EnclosedPorch_cast_to_double',
 '3SsnPorch_cast_to_double',
 'ScreenPorch_cast_to_double',
 'PoolAre

In [16]:
# let us see what the output columns will look like

numericals_imputed

['Id_cast_to_doubleimputed',
 'MSSubClass_cast_to_doubleimputed',
 'LotFrontage_cast_to_doubleimputed',
 'LotArea_cast_to_doubleimputed',
 'OverallQual_cast_to_doubleimputed',
 'OverallCond_cast_to_doubleimputed',
 'YearBuilt_cast_to_doubleimputed',
 'YearRemodAdd_cast_to_doubleimputed',
 'MasVnrArea_cast_to_doubleimputed',
 'BsmtFinSF1_cast_to_doubleimputed',
 'BsmtFinSF2_cast_to_doubleimputed',
 'BsmtUnfSF_cast_to_doubleimputed',
 'TotalBsmtSF_cast_to_doubleimputed',
 '1stFlrSF_cast_to_doubleimputed',
 '2ndFlrSF_cast_to_doubleimputed',
 'LowQualFinSF_cast_to_doubleimputed',
 'GrLivArea_cast_to_doubleimputed',
 'BsmtFullBath_cast_to_doubleimputed',
 'BsmtHalfBath_cast_to_doubleimputed',
 'FullBath_cast_to_doubleimputed',
 'HalfBath_cast_to_doubleimputed',
 'BedroomAbvGr_cast_to_doubleimputed',
 'KitchenAbvGr_cast_to_doubleimputed',
 'TotRmsAbvGrd_cast_to_doubleimputed',
 'Fireplaces_cast_to_doubleimputed',
 'GarageYrBlt_cast_to_doubleimputed',
 'GarageCars_cast_to_doubleimputed',
 'Ga

In [17]:
# that's enough for us to use the Imputer Class

# let us import the data

from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=cast_vars, outputCols=numericals_imputed)

#train_tested = imputer.fit(df_train).transform(df_train)

#train_tested.schema.fields

## Combining to a single vector with Vector Assembler

PySpark ML models require all the feature columns in  a single vector of predictors to operate.

So we will use the Vector Assembler transformer to combine all the transfromed feature columns

In [18]:
from pyspark.ml.feature import VectorAssembler

# assemble all feature transformations as input Cols 'features'

# the following will eliminate duplicates and select simply the transformed features we need

features = numericals_imputed  \
              + [var + "onehot" for var in strings_used]

assembler = VectorAssembler(inputCols=features, outputCol='features')

pipeline = Pipeline(stages=[categorical_pipeline, imputer, assembler])

train_tested = pipeline.fit(df_train).transform(df_train)


In [19]:
# check out how the features list looks like

features

['Id_cast_to_doubleimputed',
 'MSSubClass_cast_to_doubleimputed',
 'LotFrontage_cast_to_doubleimputed',
 'LotArea_cast_to_doubleimputed',
 'OverallQual_cast_to_doubleimputed',
 'OverallCond_cast_to_doubleimputed',
 'YearBuilt_cast_to_doubleimputed',
 'YearRemodAdd_cast_to_doubleimputed',
 'MasVnrArea_cast_to_doubleimputed',
 'BsmtFinSF1_cast_to_doubleimputed',
 'BsmtFinSF2_cast_to_doubleimputed',
 'BsmtUnfSF_cast_to_doubleimputed',
 'TotalBsmtSF_cast_to_doubleimputed',
 '1stFlrSF_cast_to_doubleimputed',
 '2ndFlrSF_cast_to_doubleimputed',
 'LowQualFinSF_cast_to_doubleimputed',
 'GrLivArea_cast_to_doubleimputed',
 'BsmtFullBath_cast_to_doubleimputed',
 'BsmtHalfBath_cast_to_doubleimputed',
 'FullBath_cast_to_doubleimputed',
 'HalfBath_cast_to_doubleimputed',
 'BedroomAbvGr_cast_to_doubleimputed',
 'KitchenAbvGr_cast_to_doubleimputed',
 'TotRmsAbvGrd_cast_to_doubleimputed',
 'Fireplaces_cast_to_doubleimputed',
 'GarageYrBlt_cast_to_doubleimputed',
 'GarageCars_cast_to_doubleimputed',
 'Ga

## Model Fitting 

In this part, we will run a few PySpark ML models and check which of these have the best performance



In [20]:
# let us bring the scikit classifiers and also bring in XG boost

from pyspark.ml.regression import LinearRegression

from pyspark.ml.regression import DecisionTreeRegressor

from pyspark.ml.regression import RandomForestRegressor

lasso = LinearRegression(labelCol='SalePrice', elasticNetParam=1, regParam=0.1)

tree = DecisionTreeRegressor(labelCol='SalePrice')

rf = RandomForestRegressor(labelCol='SalePrice')

# let us evaluate with a metric

from pyspark.ml.evaluation import RegressionEvaluator

# create a training and test set

df_train_train, df_train_test = df_train.randomSplit([0.8, 0.2], seed=23)

# Define a list called classifier that contains the tuples (classifier_name, classifier)

classifiers = [('lasso Regression', lasso),('Decision Tree', tree),('Random Forest', rf)]

In [21]:
# Iterate over the defined list of tuples containing the classifiers

for clf_name, clf in classifiers:
    
# create the full pipeline to the training set

   pipeline = Pipeline(stages=[categorical_pipeline, imputer, assembler, clf])

   predictions=pipeline.fit(df_train_train).transform(df_train_test)

# let us evaluate with a metric

   from pyspark.ml.evaluation import RegressionEvaluator

# evaluate rmse

   evaluator1 = RegressionEvaluator(labelCol='SalePrice', predictionCol='prediction', metricName='rmse')

   rmse = evaluator1.evaluate(predictions)

   #print("Root Mean Squared Error (RMSE) on test data = %g". format(clf_name,rmse))
    
   print('{:s} : {:.3f}'.format(clf_name, rmse))
    
# evaluate r2

   evaluator2 = RegressionEvaluator(labelCol='SalePrice', predictionCol='prediction', metricName='r2')

   r2 = evaluator2.evaluate(predictions)

   print('{:s} : {:.3f}'.format(clf_name, r2))             


lasso Regression : 31485.694
lasso Regression : 0.880
Decision Tree : 48900.131
Decision Tree : 0.711
Random Forest : 40677.747
Random Forest : 0.800


In [22]:
# let us have a look at the lasso predictions

# complete the full pyspark pipeline

pipeline = Pipeline(stages=[categorical_pipeline, imputer, assembler, lasso])

# train the model on the train data we split

lasso_model = pipeline.fit(df_train_train)

# make predictions on the testing set we held out

lasso_predictions = lasso_model.transform(df_train_test)

In [23]:
# select out the output fields for comparison

lasso_output = lasso_predictions.select('Id','SalePrice','prediction')

In [24]:
# compare actual saleprice and prediction

lasso_output.show(5)

+---+---------+------------------+
| Id|SalePrice|        prediction|
+---+---------+------------------+
| 12|   345000|399528.55501494725|
| 13|   144000|138212.96531838604|
| 14|   279500|218056.41585143388|
| 20|   139000|114211.09995860013|
| 24|   129900|136243.62454474036|
+---+---------+------------------+
only showing top 5 rows



In [25]:
# access lasso regression coefficients

lasso_model.stages[3].coefficients

DenseVector([1.6359, -110.4567, 13.7274, 0.6665, 7616.735, 4500.4303, 275.089, 67.0897, 16.9279, 11.2654, 0.0902, -1.2915, 15.7849, 24.4738, 35.6737, 4.0874, 23.7637, 3606.3395, -1223.7905, 3424.0843, 243.6872, -4772.6782, -17453.8666, 1843.6044, 4806.3727, -20.2035, 5635.8699, 7.8791, 14.91, 0.0878, 12.5485, 29.7966, 54.7437, 557.7962, 5.7428, -552.6152, -1014.2198, 11065.4891, 10547.5996, 8449.1754, 7048.1709, 55582.7374, 590.5162, 6002.4486, 6999.6869, 4147.7159, 8327.6212, 15956.853, 11375.3115, 20767.9958, 43516.1078, -63.9795, 917.7192, 12120.9734, -3979.5896, 30878.3325, 38245.9766, -8456.0617, -1980.2342, -14349.482, -16823.9271, 19716.063, -5407.8195, 30390.7336, -4113.5156, -7296.2687, -1603.869, 13345.3063, 4378.7782, -13524.1948, 61.8372, 28975.3343, -15239.8282, 8041.4417, 52773.9858, 8362.0951, -9354.0352, 7280.7993, 11133.9632, 5962.7559, 22707.3354, 5541.474, -3396.1194, -3877.7721, 1043.3823, 2428.2788, -24041.1229, 758.5374, -7049.399, -43785.6645, -44049.89, -35514.1

#  Making predictions on the test data

In [26]:
# In this portion, we will import the test data and make predictions

# import test data as a Spark DataFrame

df_test = spark.read.csv(r'C:\Users\Arun\Desktop\Kaggle Competitions\House-Prices-PySpark\house-prices-advanced-regression-techniques\test.csv', header=True, inferSchema=True, nullValue='NA')

# show first 5 records , similar to pandas head method

df_test.toPandas().head()


Unnamed: 0,Id,MSSubClass,MSZoning,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,Utilities,...,ScreenPorch,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition
0,1461,20,RH,80.0,11622,Pave,,Reg,Lvl,AllPub,...,120,0,,MnPrv,,0,6,2010,WD,Normal
1,1462,20,RL,81.0,14267,Pave,,IR1,Lvl,AllPub,...,0,0,,,Gar2,12500,6,2010,WD,Normal
2,1463,60,RL,74.0,13830,Pave,,IR1,Lvl,AllPub,...,0,0,,MnPrv,,0,3,2010,WD,Normal
3,1464,60,RL,78.0,9978,Pave,,IR1,Lvl,AllPub,...,0,0,,,,0,6,2010,WD,Normal
4,1465,120,RL,43.0,5005,Pave,,IR1,HLS,AllPub,...,144,0,,,,0,1,2010,WD,Normal


In [27]:
# As already noted, PySpark doesn't let us have the full pipeline benefits like scikit learn

# we will have to make a few transformation as we did in train data before calling on pipeline artchitecture

# first thing we did out of pipeline in imputing categorical values

# let us do it again in the test data

# first we will create a defualt dict which is a dictionary splitting each column in the dataframe by it's data type

# https://stackoverflow.com/questions/5900578/how-does-collections-defaultdict-work

from collections import defaultdict

data_types = defaultdict(list)
for entry in df_test.schema.fields:
 data_types[str(entry.dataType)].append(entry.name)


In [28]:
# let us try the categorical imputation method tried in this well written article below

# http://people.stat.sc.edu/haigang/sparkCaseStudy.html

strings_used_test = [var for var in data_types["StringType"]]

missing_data_fill = {}
 
for var in strings_used_test:
 missing_data_fill[var] = "missing"

df_test = df_test.fillna(missing_data_fill)



In [29]:
# we will also have to repeat the cast conversion that we did with the training data

# let us convert all int types to double type

for c in data_types["IntegerType"]:
    df_test = df_test.withColumn(c+ "_cast_to_double", df_test[c].cast("double"))



In [30]:
# that's all we did out of pipeline architecture

# let us now make predictions

lasso_predictions_test= lasso_model.transform(df_test)