## pySpark Exploration through Housing Data Example

### Purpose: Demonstrate machine learning workflow using pySpark Dataframes and built-in ML library

#### Example adapted from https://www.datacamp.com/community/tutorials/apache-spark-tutorial-machine-learning
#### Data source: http://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html

Date: 2018-11
By: Saul Lee

In [1]:
### Import the necessary packages


from matplotlib import pyplot as plt
import numpy as np

from pyspark.ml import Pipeline 
from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import DenseVector
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import FloatType, StructField, StructType



import seaborn as sns

%matplotlib inline

In [2]:
# create a spark session

sc = SparkSession \
    .builder \
    .appName("Housing Data Example") \
    .getOrCreate()

## 0. Data loading and exploration

In [3]:
# load sample data
filename = "CaliforniaHousing/cal_housing.data"
header_list = ["longitude","latitude","houseMedianAge","totalRooms", \
          "totalBedrooms","population","households","medianIncome","medianHouseValue"]

In [4]:
# build the schema struct

fields = []

for field_name in header_list:
    fields.append(StructField(field_name, FloatType(), True))

schema = StructType(fields)

In [5]:
# read in the csv as a dataframe
df = sc.read.csv(filename,
                 header="false",
                 schema=schema,
                 ignoreLeadingWhiteSpace=True,
                 ignoreTrailingWhiteSpace=True)

In [6]:
# get the columns
df.columns

['longitude',
 'latitude',
 'houseMedianAge',
 'totalRooms',
 'totalBedrooms',
 'population',
 'households',
 'medianIncome',
 'medianHouseValue']

In [7]:
# get the schema
df.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- houseMedianAge: float (nullable = true)
 |-- totalRooms: float (nullable = true)
 |-- totalBedrooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- medianIncome: float (nullable = true)
 |-- medianHouseValue: float (nullable = true)



In [8]:
df.show(10)

+---------+--------+--------------+----------+-------------+----------+----------+------------+----------------+
|longitude|latitude|houseMedianAge|totalRooms|totalBedrooms|population|households|medianIncome|medianHouseValue|
+---------+--------+--------------+----------+-------------+----------+----------+------------+----------------+
|  -122.23|   37.88|          41.0|     880.0|        129.0|     322.0|     126.0|      8.3252|        452600.0|
|  -122.22|   37.86|          21.0|    7099.0|       1106.0|    2401.0|    1138.0|      8.3014|        358500.0|
|  -122.24|   37.85|          52.0|    1467.0|        190.0|     496.0|     177.0|      7.2574|        352100.0|
|  -122.25|   37.85|          52.0|    1274.0|        235.0|     558.0|     219.0|      5.6431|        341300.0|
|  -122.25|   37.85|          52.0|    1627.0|        280.0|     565.0|     259.0|      3.8462|        342200.0|
|  -122.25|   37.85|          52.0|     919.0|        213.0|     413.0|     193.0|      4.0368| 

In [9]:
# select columns
df.describe().show(truncate=True)

+-------+-------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|          longitude|         latitude|    houseMedianAge|        totalRooms|    totalBedrooms|        population|       households|      medianIncome|  medianHouseValue|
+-------+-------------------+-----------------+------------------+------------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              20640|            20640|             20640|             20640|            20640|             20640|            20640|             20640|             20640|
|   mean|-119.56970444871473|35.63186143109965|28.639486434108527|2635.7630813953488|537.8980135658915|1425.4767441860465|499.5396802325581|3.8706710030346416|206855.81690891474|
| stddev|  2.003531742932898|2.135952380602968| 12.58555761211163|2181.6152515827944| 421.247905943133|  

## 2. Feature Engineering

Let's add some extra features:

ln(MEDIAN AGE)
ln(TOTAL ROOMS/ POPULATION)
ln(BEDROOMS/ POPULATION) 
ln(POPULATION/ HOUSEHOLDS)
ln(HOUSEHOLDS)

In [10]:
# manipulate column data
# need to import pyspark.sql.functions to access the math functions
# add the new columns to the dataframe

df = df.withColumn("lnHouseMedianAge", log(col('houseMedianAge'))) \
        .withColumn("lnTotalRoomsPerCapita", log(col('totalRooms') / col('population'))) \
        .withColumn("lnBedRoomsPerCapita", log(col('totalBedRooms') / col('population'))) \
        .withColumn("lnPopulationHouseholds", log(col('population') / col('households'))) \
        .withColumn("lnHouseholds", log(col('households'))) \
        .withColumn("lnMedianHouseValue", log(col('medianHouseValue')))

In [11]:
# reordering columns, get just the numerical values
data_df = df.select("lnMedianHouseValue","houseMedianAge","totalRooms", \
          "totalBedrooms","population","households","medianIncome", \
            "lnHouseMedianAge", "lnTotalRoomsPerCapita", "lnBedRoomsPerCapita", \
            "lnPopulationHouseholds", "lnHouseholds", "lnMedianHouseValue")

data_df.head()

Row(lnMedianHouseValue=13.022764012181574, houseMedianAge=41.0, totalRooms=880.0, totalBedrooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, lnHouseMedianAge=3.713572066704308, lnTotalRoomsPerCapita=1.0053703619278438, lnBedRoomsPerCapita=-0.9147391411827362, lnPopulationHouseholds=0.9382696385929302, lnHouseholds=4.836281906951478, lnMedianHouseValue=13.022764012181574)

In [12]:
# convert the data into label, and a DenseVector of X features

rdd1 = data_df.rdd.map(lambda x: (x[0],DenseVector(x[1:])))

In [13]:
# create a new df for the label and features
data_df = sc.createDataFrame(rdd1,["label","features"])
data_df.head()

Row(label=13.022764012181574, features=DenseVector([41.0, 880.0, 129.0, 322.0, 126.0, 8.3252, 3.7136, 1.0054, -0.9147, 0.9383, 4.8363, 13.0228]))

In [14]:
# Split the data into train and test sets
train_data, test_data = data_df.randomSplit([.8,.2],seed=1234)

## 3. Model training

### 3.1 Build the preprocessing and model pipeline

In [15]:
# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features",outputCol="features_scaled")

In [16]:
# For the models, lets try linear regression

# Initialize 'lr'
lr = LinearRegression(labelCol="label",maxIter=10)

In [17]:
# Build the pipeline

pipeline = Pipeline(stages=[
                standardScaler,
                lr])

### 3.2 Setup the grid search crossvalidator

In [18]:
# use grid search to find the best hyperparameters

# build the grid
paramGrid = ParamGridBuilder() \
            .addGrid(lr.regParam, [0.1 , 1.0, 10.0, 100.0]) \
            .addGrid(lr.elasticNetParam, [0.0, 1.0]) \
            .build()

In [19]:
# define the evaluator function
evaluator = RegressionEvaluator()

print("The evaluation metric is:",evaluator.getMetricName())

The evaluation metric is: rmse


In [20]:
# Instantiate the crossvalidator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator = RegressionEvaluator(),
                          numFolds=5)

### 3.3 Perform grid search

In [21]:
# perform the grid search
cvModel = crossval.fit(train_data)

In [33]:
# print the results of the grid search

for cur_val in zip(cvModel.avgMetrics, paramGrid):
    print(cur_val,"\n")

(0.09508183824080504, {Param(parent='LinearRegression_4ba1b65c504689491bd6', name='regParam', doc='regularization parameter (>= 0).'): 0.1, Param(parent='LinearRegression_4ba1b65c504689491bd6', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 0.0}) 

(0.10002283787499881, {Param(parent='LinearRegression_4ba1b65c504689491bd6', name='regParam', doc='regularization parameter (>= 0).'): 0.1, Param(parent='LinearRegression_4ba1b65c504689491bd6', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.'): 1.0}) 

(0.32434473041445716, {Param(parent='LinearRegression_4ba1b65c504689491bd6', name='regParam', doc='regularization parameter (>= 0).'): 1.0, Param(parent='LinearRegression_4ba1b65c504689491bd6', name='elasticNetParam', doc='the ElasticNet mixing parameter, in range [0, 1

The best performance (lowest RMSE) was observed for regParam = 0.1 and elasticNetParam = 0 (L2 penalty)

In [23]:
# extract the best models
bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[1]
bestParams = bestModel.extractParamMap()

In [24]:
bestModel.explainParam('elasticNetParam')

'elasticNetParam: the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty (default: 0.0, current: 0.0)'

In [25]:
bestModel.explainParam('regParam')

'regParam: regularization parameter (>= 0) (default: 0.0, current: 0.1)'

In [26]:
# model weights
bestModel.coefficients

DenseVector([0.0012, -0.0, 0.0, -0.0, 0.0, 0.0433, 0.0104, -0.0198, 0.0217, -0.0811, 0.0123, 0.7603])

In [27]:
# model intercept
bestModel.intercept

2.696588819204139

In [28]:
# the r^2 of the model
bestModel.summary.r2

0.9722558303382451

### 3.4 Evaluation against test data

In [29]:
# Generate predictions
predicted = bestPipeline.transform(test_data)
predicted

DataFrame[label: double, features: vector, features_scaled: vector, prediction: double]

In [30]:
# Extract the prediction and the "known" correct labels
testSetDf = predicted.select('prediction','label')
testSetDf.show(5)

+------------------+------------------+
|        prediction|             label|
+------------------+------------------+
|10.082336334196505|  9.61573881119536|
|10.109147675523694|  9.61573881119536|
|10.046641265243546|  9.61573881119536|
|10.659304218229671| 10.25061708363133|
|10.745509084272205|10.507803519389457|
+------------------+------------------+
only showing top 5 rows



In [31]:
# evaluate the model

print("Test set RMSE: {:0.3f}".format(evaluator.evaluate(testSetDf)))
print("Test set R2: {:0.3f}".format(evaluator.evaluate(testSetDf,{evaluator.metricName: "r2"})))
print("Test set MAE: {:0.3f}".format(evaluator.evaluate(testSetDf,{evaluator.metricName: "mae"})))

Test set RMSE: 0.094
Test set R2: 0.972
Test set MAE: 0.073


## Cleaning up.  Stop the Spark session

In [32]:
sc.stop()