# Chapter 8: Machine Learning with SparkML Code

In [0]:
# Read data in a Spark dataframe from dbfs path
housing_data = spark.read.csv("file:/Workspace/Users/saba.shah@databricks.com/housing_data/HousePricePrediction.csv", header=True)
housing_data.show()

+---+----------+--------+-------+---------+--------+-----------+---------+------------+-----------+----------+-----------+---------+
| Id|MSSubClass|MSZoning|LotArea|LotConfig|BldgType|OverallCond|YearBuilt|YearRemodAdd|Exterior1st|BsmtFinSF2|TotalBsmtSF|SalePrice|
+---+----------+--------+-------+---------+--------+-----------+---------+------------+-----------+----------+-----------+---------+
|  0|        60|      RL|   8450|   Inside|    1Fam|          5|     2003|        2003|    VinylSd|         0|        856|   208500|
|  1|        20|      RL|   9600|      FR2|    1Fam|          8|     1976|        1976|    MetalSd|         0|       1262|   181500|
|  2|        60|      RL|  11250|   Inside|    1Fam|          5|     2001|        2002|    VinylSd|         0|        920|   223500|
|  3|        70|      RL|   9550|   Corner|    1Fam|          5|     1915|        1970|    Wd Sdng|         0|        756|   140000|
|  4|        60|      RL|  14260|      FR2|    1Fam|          5|     

In [0]:
housing_data.display(5)

Id,MSSubClass,MSZoning,LotArea,LotConfig,BldgType,OverallCond,YearBuilt,YearRemodAdd,Exterior1st,BsmtFinSF2,TotalBsmtSF,SalePrice
0,60,RL,8450,Inside,1Fam,5,2003,2003,VinylSd,0.0,856.0,208500.0
1,20,RL,9600,FR2,1Fam,8,1976,1976,MetalSd,0.0,1262.0,181500.0
2,60,RL,11250,Inside,1Fam,5,2001,2002,VinylSd,0.0,920.0,223500.0
3,70,RL,9550,Corner,1Fam,5,1915,1970,Wd Sdng,0.0,756.0,140000.0
4,60,RL,14260,FR2,1Fam,5,2000,2000,VinylSd,0.0,1145.0,250000.0
5,50,RL,14115,Inside,1Fam,5,1993,1995,VinylSd,0.0,796.0,143000.0
6,20,RL,10084,Inside,1Fam,5,2004,2005,VinylSd,0.0,1686.0,307000.0
7,60,RL,10382,Corner,1Fam,6,1973,1973,HdBoard,32.0,1107.0,200000.0
8,50,RM,6120,Inside,1Fam,5,1931,1950,BrkFace,0.0,952.0,129900.0
9,190,RL,7420,Corner,2fmCon,6,1939,1950,MetalSd,0.0,991.0,118000.0


In [0]:
housing_data.printSchema

<bound method DataFrame.printSchema of DataFrame[Id: string, MSSubClass: string, MSZoning: string, LotArea: string, LotConfig: string, BldgType: string, OverallCond: string, YearBuilt: string, YearRemodAdd: string, Exterior1st: string, BsmtFinSF2: string, TotalBsmtSF: string, SalePrice: string]>

In [0]:
housing_data.count()

2919

In [0]:
# Remove rows with missing values
cleaned_data = housing_data.dropna()
cleaned_data.count()
# cleaned_data.show(5)


1460

In [0]:
from pyspark.sql.functions import col

cleaned_data = cleaned_data.withColumn("Id", col("Id").cast("long")) \
                         .withColumn("MSSubClass", col("MSSubClass").cast("long")) \
                         .withColumn("LotArea", col("LotArea").cast("long")) \
                         .withColumn("OverallCond", col("OverallCond").cast("long")) \
                         .withColumn("YearBuilt", col("YearBuilt").cast("long")) \
                         .withColumn("YearRemodAdd", col("YearRemodAdd").cast("long")) \
                         .withColumn("BsmtFinSF2", col("BsmtFinSF2").cast("long")) \
                         .withColumn("TotalBsmtSF", col("TotalBsmtSF").cast("long")) \
                         .withColumn("SalePrice", col("SalePrice").cast("long"))

In [0]:
#import required libraries
from pyspark.ml.feature import StringIndexer
mszoning_indexer = StringIndexer(inputCol="MSZoning",
outputCol="MSZoningIndex")
#Fits a model to the input dataset with optional parameters.
df_mszoning = mszoning_indexer.fit(cleaned_data).transform(cleaned_data)
df_mszoning.show()

+---+----------+--------+-------+---------+--------+-----------+---------+------------+-----------+----------+-----------+---------+-------------+
| Id|MSSubClass|MSZoning|LotArea|LotConfig|BldgType|OverallCond|YearBuilt|YearRemodAdd|Exterior1st|BsmtFinSF2|TotalBsmtSF|SalePrice|MSZoningIndex|
+---+----------+--------+-------+---------+--------+-----------+---------+------------+-----------+----------+-----------+---------+-------------+
|  0|        60|      RL|   8450|   Inside|    1Fam|          5|     2003|        2003|    VinylSd|         0|        856|   208500|          0.0|
|  1|        20|      RL|   9600|      FR2|    1Fam|          8|     1976|        1976|    MetalSd|         0|       1262|   181500|          0.0|
|  2|        60|      RL|  11250|   Inside|    1Fam|          5|     2001|        2002|    VinylSd|         0|        920|   223500|          0.0|
|  3|        70|      RL|   9550|   Corner|    1Fam|          5|     1915|        1970|    Wd Sdng|         0|        

In [0]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml import Pipeline
mszoning_indexer = StringIndexer(inputCol="MSZoning",
outputCol="MSZoningIndex")
lotconfig_indexer = StringIndexer(inputCol="LotConfig",
outputCol="LotConfigIndex")
bldgtype_indexer = StringIndexer(inputCol="BldgType",
outputCol="BldgTypeIndex")
exterior1st_indexer = StringIndexer(inputCol="Exterior1st",
outputCol="Exterior1stIndex")
onehotencoder_mszoning_vector = OneHotEncoder(inputCol="MSZoningIndex",
                                              outputCol="MSZoningVector")
onehotencoder_lotconfig_vector = OneHotEncoder(inputCol="LotConfigIndex", outputCol="LotConfigVector")
onehotencoder_bldgtype_vector = OneHotEncoder(inputCol="BldgTypeIndex", outputCol="BldgTypeVector")
onehotencoder_exterior1st_vector = OneHotEncoder(inputCol="Exterior1stIndex",
outputCol="Exterior1stVector")
#Create pipeline and pass all stages
pipeline = Pipeline(stages=[mszoning_indexer,
                            lotconfig_indexer,
                            bldgtype_indexer,
                            exterior1st_indexer,
                            onehotencoder_mszoning_vector,
                            onehotencoder_lotconfig_vector,
                            onehotencoder_bldgtype_vector,
                            onehotencoder_exterior1st_vector])

In [0]:
df_transformed = pipeline.fit(cleaned_data).transform(cleaned_data)
df_transformed.show(5)

Downloading artifacts:   0%|          | 0/58 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

+---+----------+--------+-------+---------+--------+-----------+---------+------------+-----------+----------+-----------+---------+-------------+--------------+-------------+----------------+--------------+---------------+--------------+-----------------+
| Id|MSSubClass|MSZoning|LotArea|LotConfig|BldgType|OverallCond|YearBuilt|YearRemodAdd|Exterior1st|BsmtFinSF2|TotalBsmtSF|SalePrice|MSZoningIndex|LotConfigIndex|BldgTypeIndex|Exterior1stIndex|MSZoningVector|LotConfigVector|BldgTypeVector|Exterior1stVector|
+---+----------+--------+-------+---------+--------+-----------+---------+------------+-----------+----------+-----------+---------+-------------+--------------+-------------+----------------+--------------+---------------+--------------+-----------------+
|  0|        60|      RL|   8450|   Inside|    1Fam|          5|     2003|        2003|    VinylSd|         0|        856|   208500|          0.0|           0.0|          0.0|             0.0| (4,[0],[1.0])|  (4,[0],[1.0])| (4,[0

In [0]:
drop_column_list = ["Id", "MSZoning","LotConfig","BldgType", "Exterior1st"]
df_dropped_cols = df_transformed.select([column for column in df_transformed.columns if column not in drop_column_list])
df_dropped_cols.columns

['MSSubClass',
 'LotArea',
 'OverallCond',
 'YearBuilt',
 'YearRemodAdd',
 'BsmtFinSF2',
 'TotalBsmtSF',
 'SalePrice',
 'MSZoningIndex',
 'LotConfigIndex',
 'BldgTypeIndex',
 'Exterior1stIndex',
 'MSZoningVector',
 'LotConfigVector',
 'BldgTypeVector',
 'Exterior1stVector']

In [0]:
from pyspark.ml.feature import VectorAssembler
#Assembling features
feature_assembly = VectorAssembler(inputCols = ['MSSubClass',
 'LotArea',
 'OverallCond',
 'YearBuilt',
 'YearRemodAdd',
 'BsmtFinSF2',
 'TotalBsmtSF',
 'MSZoningIndex',
 'LotConfigIndex',
 'BldgTypeIndex',
 'Exterior1stIndex',
 'MSZoningVector',
 'LotConfigVector',
 'BldgTypeVector',
 'Exterior1stVector'], outputCol = 'features')
output = feature_assembly.transform(df_dropped_cols)
output.show(3)

+----------+-------+-----------+---------+------------+----------+-----------+---------+-------------+--------------+-------------+----------------+--------------+---------------+--------------+-----------------+--------------------+
|MSSubClass|LotArea|OverallCond|YearBuilt|YearRemodAdd|BsmtFinSF2|TotalBsmtSF|SalePrice|MSZoningIndex|LotConfigIndex|BldgTypeIndex|Exterior1stIndex|MSZoningVector|LotConfigVector|BldgTypeVector|Exterior1stVector|            features|
+----------+-------+-----------+---------+------------+----------+-----------+---------+-------------+--------------+-------------+----------------+--------------+---------------+--------------+-----------------+--------------------+
|        60|   8450|          5|     2003|        2003|         0|        856|   208500|          0.0|           0.0|          0.0|             0.0| (4,[0],[1.0])|  (4,[0],[1.0])| (4,[0],[1.0])|   (14,[0],[1.0])|(37,[0,1,2,3,4,6,...|
|        20|   9600|          8|     1976|        1976|         

In [0]:
#Normalizing the features
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=False)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(output)

# Normalize each feature to have unit standard deviation.
scaledOutput = scalerModel.transform(output)
scaledOutput.show(3)

+----------+-------+-----------+---------+------------+----------+-----------+---------+-------------+--------------+-------------+----------------+--------------+---------------+--------------+-----------------+--------------------+--------------------+
|MSSubClass|LotArea|OverallCond|YearBuilt|YearRemodAdd|BsmtFinSF2|TotalBsmtSF|SalePrice|MSZoningIndex|LotConfigIndex|BldgTypeIndex|Exterior1stIndex|MSZoningVector|LotConfigVector|BldgTypeVector|Exterior1stVector|            features|      scaledFeatures|
+----------+-------+-----------+---------+------------+----------+-----------+---------+-------------+--------------+-------------+----------------+--------------+---------------+--------------+-----------------+--------------------+--------------------+
|        60|   8450|          5|     2003|        2003|         0|        856|   208500|          0.0|           0.0|          0.0|             0.0| (4,[0],[1.0])|  (4,[0],[1.0])| (4,[0],[1.0])|   (14,[0],[1.0])|(37,[0,1,2,3,4,6,...|(3

In [0]:
#Selecting input and output column from output
df_model_final = scaledOutput.select(['SalePrice', 'scaledFeatures'])
df_model_final.show(3)

+---------+--------------------+
|SalePrice|      scaledFeatures|
+---------+--------------------+
|   208500|(37,[0,1,2,3,4,6,...|
|   181500|(37,[0,1,2,3,4,6,...|
|   223500|(37,[0,1,2,3,4,6,...|
+---------+--------------------+
only showing top 3 rows



In [0]:
#test train split
df_train, df_test = df_model_final.randomSplit([0.75, 0.25])

In [0]:
from pyspark.ml.regression import LinearRegression
# Instantiate the linear regression model
regressor = LinearRegression(featuresCol = 'scaledFeatures', labelCol = 'SalePrice')
# Fit the model on the training data
regressor = regressor.fit(df_train)

Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

In [0]:
#MSE for the train data

pred_results = regressor.evaluate(df_train)
print("The train MSE for the model is: %2f"% pred_results.meanAbsoluteError)
print("The train r2 for the model is: %2f"% pred_results.r2)

The train MSE for the model is: 32558.987138
The train r2 for the model is: 0.606228


In [0]:
#Checking test performance
pred_results = regressor.evaluate(df_test)
print("The test MSE for the model is: %2f"% pred_results.meanAbsoluteError)
print("The test r2 for the model is: %2f"% pred_results.r2)

The test MSE for the model is: 31419.955249
The test r2 for the model is: 0.646428
