# Machine Learning model with PySpark

Prediction Boston housing prices

The goal is to come up with a model to predict median value of a given house in the area.

Boston housing dataset:

CRIM — per capita crime rate by town.

ZN — proportion of residential land zoned for lots over 25,000 sq.ft.

INDUS — proportion of non-retail business acres per town.

CHAS — Charles River dummy variable (= 1 if tract bounds river; 0 otherwise).

NOX — nitrogen oxides concentration (parts per 10 million).

RM — average number of rooms per dwelling.

AGE — proportion of owner-occupied units built prior to 1940.

DIS — weighted mean of distances to five Boston employment centres.

RAD — index of accessibility to radial highways.

TAX — full-value property-tax rate per $10,000.

PTRATIO — pupil-teacher ratio by town.

BLACK — 1000(Bk — 0.63)² where Bk is the proportion of blacks by town.

LSTAT — lower status of the population (percent).

MEDV — median value of owner-occupied homes in $1000s. This is the target variable.

In [2]:
import findspark
findspark.init()

In [40]:
import pandas as pd 
import numpy as np 

import pyspark
import pyspark.sql.functions as pyf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

In [59]:
from pyspark.ml.feature import VectorAssembler

In [7]:
#Create the spark session

spark = SparkSession.builder.appName('boston_practice').getOrCreate()

In [8]:
spark

LOAD THE DATA

In [20]:
boston_df = spark.read.csv('housingData.csv', header=True, inferSchema=True)
boston_df.show(3)

+-------+---+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|   CRIM| ZN|INDUS|CHAS|  NOX|   RM| AGE|   DIS|RAD|TAX|PTRATIO|     B|LSTAT|MEDV|
+-------+---+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
|0.00632| 18| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296|   15.3| 396.9| 4.98|24.0|
|0.02731|  0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242|   17.8| 396.9| 9.14|21.6|
|0.02729|  0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242|   17.8|392.83| 4.03|34.7|
+-------+---+-----+----+-----+-----+----+------+---+---+-------+------+-----+----+
only showing top 3 rows



DATA EXPLORATION

In [21]:
boston_df.printSchema()

root
 |-- CRIM: string (nullable = true)
 |-- ZN: string (nullable = true)
 |-- INDUS: string (nullable = true)
 |-- CHAS: string (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: string (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: string (nullable = true)
 |-- MEDV: double (nullable = true)



In [61]:
#Covert string to float by casting

boston_df = boston_df.withColumn('AGE', pyf.col('AGE').cast(IntegerType())) \
                    .withColumn('LSTAT', pyf.col('LSTAT').cast(FloatType())) \
                    .withColumn('INDUS', pyf.col('INDUS').cast(FloatType())) \
                    .withColumn('CRIM', pyf.col('CRIM').cast(FloatType())) \
                    .withColumn('ZN', pyf.col('ZN').cast(IntegerType())) \
                    .withColumn('CHAS', pyf.col('CHAS').cast(IntegerType()))

In [62]:
boston_df.printSchema()

root
 |-- CRIM: float (nullable = true)
 |-- ZN: integer (nullable = true)
 |-- INDUS: float (nullable = true)
 |-- CHAS: integer (nullable = true)
 |-- NOX: double (nullable = true)
 |-- RM: double (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- DIS: double (nullable = true)
 |-- RAD: integer (nullable = true)
 |-- TAX: integer (nullable = true)
 |-- PTRATIO: double (nullable = true)
 |-- B: double (nullable = true)
 |-- LSTAT: float (nullable = true)
 |-- MEDV: double (nullable = true)
 |-- float: double (nullable = true)



In [63]:
#Dimensions of the dataset
print(np.shape(boston_df))

()


In [64]:
boston_df.toPandas().info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 506 entries, 0 to 505
Data columns (total 15 columns):
 #   Column   Non-Null Count  Dtype  
---  ------   --------------  -----  
 0   CRIM     486 non-null    float32
 1   ZN       486 non-null    float64
 2   INDUS    486 non-null    float32
 3   CHAS     486 non-null    float64
 4   NOX      506 non-null    float64
 5   RM       506 non-null    float64
 6   AGE      486 non-null    float64
 7   DIS      506 non-null    float64
 8   RAD      506 non-null    int32  
 9   TAX      506 non-null    int32  
 10  PTRATIO  506 non-null    float64
 11  B        506 non-null    float64
 12  LSTAT    486 non-null    float32
 13  MEDV     506 non-null    float64
 14  float    486 non-null    float64
dtypes: float32(3), float64(10), int32(2)
memory usage: 49.5 KB


In [68]:
#Null and Nan values

boston_df.select([pyf.count(pyf.when(pyf.isnan(c) | pyf.col(c).isNull(), c)).alias(c) for c in boston_df.columns]).show()

+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+-----+
|CRIM| ZN|INDUS|CHAS|NOX| RM|AGE|DIS|RAD|TAX|PTRATIO|  B|LSTAT|MEDV|float|
+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+-----+
|  20| 20|   20|  20|  0|  0| 20|  0|  0|  0|      0|  0|   20|   0|   20|
+----+---+-----+----+---+---+---+---+---+---+-------+---+-----+----+-----+



In [71]:
#Remove null values (4% of the total dataset), we still have enough data for the model.

boston_df = boston_df.na.drop()

In [72]:
#Summarize the data

boston_df.toPandas().describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
CRIM,394.0,3.690136,9.202422,0.00632,0.081955,0.26888,3.435973,88.976196
ZN,394.0,11.444162,23.941578,0.0,0.0,0.0,12.0,100.0
INDUS,394.0,11.000878,6.908369,0.46,5.13,8.56,18.1,27.74
CHAS,394.0,0.068528,0.252971,0.0,0.0,0.0,0.0,1.0
NOX,394.0,0.553215,0.113112,0.389,0.453,0.538,0.624,0.871
RM,394.0,6.280015,0.697985,3.561,5.87925,6.2015,6.6055,8.78
AGE,394.0,68.497462,27.953911,2.0,45.0,77.0,94.0,100.0
DIS,394.0,3.805268,2.098571,1.1296,2.1101,3.1992,5.1167,12.1265
RAD,394.0,9.403553,8.633451,1.0,4.0,5.0,24.0,24.0
TAX,394.0,406.431472,168.312419,187.0,280.25,330.0,666.0,711.0


In [73]:
#Find correlation between independent variables and target variable (MEDV). When it is close to 1, it means that there is a strong positive correlation.

import six
for i in boston_df.columns:
    if not(isinstance(boston_df.select(i).take(1)[0][0], six.string_types)):
        print("Correlation to MEDV for ", i, boston_df.stat.corr('MEDV',i))

Correlation to MEDV for  CRIM -0.39723005782332804
Correlation to MEDV for  ZN 0.40693969685738496
Correlation to MEDV for  INDUS -0.5108291685843283
Correlation to MEDV for  CHAS 0.1737011531689263
Correlation to MEDV for  NOX -0.4590543298280346
Correlation to MEDV for  RM 0.7239507648415225
Correlation to MEDV for  AGE -0.4075902875077575
Correlation to MEDV for  DIS 0.27954692634758843
Correlation to MEDV for  RAD -0.41663770616487195
Correlation to MEDV for  TAX -0.508864272782685
Correlation to MEDV for  PTRATIO -0.5438090137802115
Correlation to MEDV for  B 0.34725608800793734
Correlation to MEDV for  LSTAT -0.7434496284723267
Correlation to MEDV for  MEDV 1.0
Correlation to MEDV for  float -0.5108291616632262


In this case, the MEDV tends to goes up when the number of rooms (RM) goes up. 

Prepare the data for a ML models. We need only two columns: features and labels (MEDV)

In [75]:
#Put all the features to Vector using VectorAssembler

feature = VectorAssembler(inputCols=boston_df.columns[1:], outputCol='features')
vector = feature.transform(boston_df)
vector = vector.select(['features', 'MEDV'])
vector.show(3)

+--------------------+----+
|            features|MEDV|
+--------------------+----+
|[18.0,2.309999942...|24.0|
|[0.0,7.0700001716...|21.6|
|[0.0,7.0700001716...|34.7|
+--------------------+----+
only showing top 3 rows



In [76]:
#Split the data

train_data, test_data = vector.randomSplit([.8, .2], seed=42)

## LINEAR REGRESSION

In [81]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol='features', labelCol='MEDV', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(train_data)

In [83]:
print('Coefficients: ' + str(lr_model.coefficients))
print('Intercept: ' + str(lr_model.intercept))

Coefficients: [0.0,0.0,0.0,0.0,0.10091450582098951,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.9653771816148912,0.0]
Intercept: 0.13524502107248254


Summarize the model over the training set and print out some metrics

In [84]:
#Train data
trainSumm = lr_model.summary
print("RMSE: {0}".format(trainSumm.rootMeanSquaredError))
print("r2: {0}".format(trainSumm.r2))

RMSE: 0.27090760468703495
r2: 0.9991174765392707


In [85]:
train_data.describe().show()

+-------+------------------+
|summary|              MEDV|
+-------+------------------+
|  count|               337|
|   mean|22.215727002967373|
| stddev| 9.132788040978575|
|    min|               5.0|
|    max|              50.0|
+-------+------------------+



RMSE measures the differences between predicted values by the model and the actual values. The smaller RMSE value is, the closer predicted and observed values are.

However, RMSE alone is meaningless until we compare with the actual “MEDV” value, such as mean, min and max. After such comparison, our RMSE looks pretty good.

R squared at 0.99 indicates that in our model, approximate 99% of the variability in “MEDV” can be explained using the model.

### Predictions

In [90]:
#Test data
pred = lr_model.transform(test_data)
pred.select('prediction', 'MEDV').show(4)

+-----------------+----+
|       prediction|MEDV|
+-----------------+----+
|33.08504259874513|33.4|
|36.75165942777694|37.2|
|39.34085798704513|39.8|
|38.30962109703834|38.7|
+-----------------+----+
only showing top 4 rows



Evaluate the model

In [94]:
from pyspark.ml.evaluation import RegressionEvaluator

In [100]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='MEDV', metricName='r2')
print('R Squared on test data = %g' % evaluator.evaluate(pred))

R Squared on test data = 0.999179


In [104]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='MEDV', metricName='rmse')
print('RMSE on test data = %g' % evaluator.evaluate(pred))

RMSE on test data = 0.262293


In [105]:
spark.stop()