# Machine Learning Library (MLlib) in PySpark

MLlib is a **Machine Learning library in PySpark**. It makes machine learning models scalables and easy to apply them.

In [1]:
import pyspark
import kagglehub
import pandas as pd 
from pyspark.sql import SparkSession 
from pyspark.ml.feature import Imputer 
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression 
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.sql.functions import isnan, when, count, col # loading libraries

In [3]:
spark = SparkSession.builder\
        .appName("Machine Learning Library (MLlib) Introduction")\
        .getOrCreate() # create a Spark session 

In [4]:
spark # spark session I've created

In [6]:
path = kagglehub.dataset_download("camnugent/california-housing-prices") # path file

## Data Processing

In [7]:
df = spark.read.format('csv').load(path, header = True, inferSchema = True) # read csv file
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [8]:
df.show(5) # dataframe preview

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

### Handling Missing Values

In [9]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show() # count of NULL and NaN values for each column

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|           207|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



In [10]:
numerical_features = df.columns
numerical_features.remove('median_house_value') # remove variable we want to predict
numerical_features.remove('ocean_proximity') # remove categorical variable

In [11]:
numerical_features 

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

In [28]:
imputer = Imputer(inputCols = numerical_features, 
                 outputCols = numerical_features) # define an imputer

imputer = imputer.fit(df)
df = imputer.transform(df) # overwrite existing columns

In [27]:
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show() # missing values were imputed

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|        0|       0|                 0|          0|             0|         0|         0|            0|                 0|              0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+



## Building and Evaluating a Regression Model

### Extracting and Transforming Features

Combining all features into a single vector with **VectorAssembler** and converting string categorical features into numerical indices with **StringIndexer**.

In [29]:
numerical_vector_assembler = VectorAssembler(inputCols = ['longitude', 'latitude', 'housing_median_age', 
                                                          'total_rooms', 'total_bedrooms', 'population', 
                                                          'households', 'median_income'],
                                            outputCol = 'numerical_features',
                                            handleInvalid = "skip") # merge numerical features into a single vector

In [30]:
assembled_data = numerical_vector_assembler.transform(df) # apply VectorAssembler to the dataframe

In [31]:
assembled_data.show(5) # dataframe preview

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|  numerical_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|[-122.23,37.88,41...|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|[-122.22,37.86,21...|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|[-122.24,37.85,52...|
|  -122.25|   37.85|              52.0|     12

In [32]:
train, test = assembled_data.randomSplit([0.8, 0.2], seed = 42) # split data in train (80%) and test (20%) sets

In [33]:
train.select('numerical_features').take(2)

[Row(numerical_features=DenseVector([-124.35, 40.54, 52.0, 1820.0, 300.0, 806.0, 270.0, 3.0147])),
 Row(numerical_features=DenseVector([-124.3, 41.8, 19.0, 2672.0, 552.0, 1298.0, 478.0, 1.9797]))]

In [34]:
test.select('numerical_features').take(2)

[Row(numerical_features=DenseVector([-124.3, 41.84, 17.0, 2677.0, 531.0, 1244.0, 456.0, 3.0313])),
 Row(numerical_features=DenseVector([-124.23, 40.54, 52.0, 2694.0, 453.0, 1152.0, 435.0, 3.0806]))]

In [35]:
indexer = StringIndexer(inputCol = 'ocean_proximity',
                       outputCol = 'ocean_category') # map string column of labels to a column of label indices

indexer = indexer.fit(train)

In [36]:
train = indexer.transform(train)
test = indexer.transform(test) # adding column of label indices for ocean proximity categories

In [37]:
set(train.select('ocean_category').collect()) # categories created by the indexer

{Row(ocean_category=0.0),
 Row(ocean_category=1.0),
 Row(ocean_category=2.0),
 Row(ocean_category=3.0),
 Row(ocean_category=4.0)}

In [38]:
one_hot_encoder = OneHotEncoder(inputCol = 'ocean_category',
                               outputCol = 'ocean_category_one_hot') # map a column of category indices to a column of binary vectors

one_hot_encoder = one_hot_encoder.fit(train)

In [39]:
train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test) # apply the one hot encoder to both train and test sets 

In [40]:
set(train.select('ocean_category_one_hot').collect())

{Row(ocean_category_one_hot=SparseVector(4, {0: 1.0})),
 Row(ocean_category_one_hot=SparseVector(4, {1: 1.0})),
 Row(ocean_category_one_hot=SparseVector(4, {2: 1.0})),
 Row(ocean_category_one_hot=SparseVector(4, {3: 1.0})),
 Row(ocean_category_one_hot=SparseVector(4, {}))}

In [41]:
assembler = VectorAssembler(inputCols = ['numerical_features',
                                        'ocean_category_one_hot'],
                           outputCol = 'complete_features') # merge both numerical and categorical features

train = assembler.transform(train)
test = assembler.transform(test) # adding complete feature vector to both train and test dataframes

In [42]:
train.show(5) # preview training set

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+--------------+----------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|  numerical_features|ocean_category|ocean_category_one_hot|   complete_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+--------------+----------------------+--------------------+
|  -124.35|   40.54|              52.0|     1820.0|         300.0|     806.0|     270.0|       3.0147|           94600.0|     NEAR OCEAN|[-124.35,40.54,52...|           2.0|         (4,[2],[1.0])|[-124.35,40.54,52...|
|   -124.3|    41.8|              19.0|     2672.0|         552.0|    1298.0|     478.0|       1.9797|           85800.0|     NE

In [43]:
test.show(5) # preview testing set

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+--------------+----------------------+--------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|  numerical_features|ocean_category|ocean_category_one_hot|   complete_features|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+--------------+----------------------+--------------------+
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|     NEAR OCEAN|[-124.3,41.84,17....|           2.0|         (4,[2],[1.0])|[-124.3,41.84,17....|
|  -124.23|   40.54|              52.0|     2694.0|         453.0|    1152.0|     435.0|       3.0806|          106700.0|     NE

In [45]:
train.select('complete_features').take(5) # complete features vector preview

[Row(complete_features=DenseVector([-124.35, 40.54, 52.0, 1820.0, 300.0, 806.0, 270.0, 3.0147, 0.0, 0.0, 1.0, 0.0])),
 Row(complete_features=DenseVector([-124.3, 41.8, 19.0, 2672.0, 552.0, 1298.0, 478.0, 1.9797, 0.0, 0.0, 1.0, 0.0])),
 Row(complete_features=DenseVector([-124.27, 40.69, 36.0, 2349.0, 528.0, 1194.0, 465.0, 2.5179, 0.0, 0.0, 1.0, 0.0])),
 Row(complete_features=DenseVector([-124.26, 40.58, 52.0, 2217.0, 394.0, 907.0, 369.0, 2.3571, 0.0, 0.0, 1.0, 0.0])),
 Row(complete_features=DenseVector([-124.25, 40.28, 32.0, 1430.0, 419.0, 434.0, 187.0, 1.9417, 0.0, 0.0, 1.0, 0.0]))]

### Fitting a Linear Regression Model

In [50]:
linear_regression_model = LinearRegression(featuresCol = 'complete_features',
                                           labelCol = 'median_house_value') # define a linear regression model

linear_regression_model

LinearRegression_dfe052c5055b

In [51]:
linear_regression_model = linear_regression_model.fit(train) # fit the linear regression model to train data

24/10/22 19:42:40 WARN Instrumentation: [ed93705e] regParam is zero, which might cause numerical instability and overfitting.
                                                                                

In [53]:
predictions_test = linear_regression_model.transform(test).withColumnRenamed('prediction',
                                                                             'predicted_median_house_value') # predict the median house values

predictions_test.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+--------------+----------------------+--------------------+----------------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|  numerical_features|ocean_category|ocean_category_one_hot|   complete_features|predicted_median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+--------------------+--------------+----------------------+--------------------+----------------------------+
|   -124.3|   41.84|              17.0|     2677.0|         531.0|    1244.0|     456.0|       3.0313|          103600.0|     NEAR OCEAN|[-124.3,41.84,17....|           2.0|         (4,[2],[1.0])|[-124.3,41.84,17....|          150428.91124387458|
|  -124.23| 

### Model Evaluation

To evaluate the model, we retrieve both the predicted and actual values. The `RegressionMetrics` function requires an RDD containing the predictions alongside the observed values.

In [54]:
predictions_test_df = predictions_test.toPandas() # convert to pandas dataframe

In [55]:
predictions_observations = predictions_test_df[['predicted_median_house_value',
                                                'median_house_value']] # get the predicted and the actual median house values

predictions_observations = spark.createDataFrame(predictions_observations) # convert to pyspark dataframe

In [59]:
predictions_observations_rdd = predictions_observations.rdd # convert to an RDD
predictions_observations_rdd.take(2)

[Row(predicted_median_house_value=150428.91124387458, median_house_value=103600.0),
 Row(predicted_median_house_value=217515.79790594103, median_house_value=106700.0)]

In [60]:
predictions_observations_rdd = predictions_observations_rdd.map(tuple) # convert to a tuple to retrieve the values
predictions_observations_rdd.take(2)

[(150428.91124387458, 103600.0), (217515.79790594103, 106700.0)]

In [61]:
metrics = RegressionMetrics(predictions_observations_rdd)

In [62]:
model_evaluation = '''
Mean Squared Error: {0}
Root Mean Squared Error: {1}
Mean Absolute Error: {2}
R-Squared: {3}
'''.format(metrics.meanSquaredError,
           metrics.rootMeanSquaredError,
           metrics.meanAbsoluteError,
           metrics.r2)

In [63]:
print(model_evaluation)


Mean Squared Error: 5009982545.682196
Root Mean Squared Error: 70781.2301792092
Mean Absolute Error: 50855.05025358482
R-Squared: 0.6378987876472275



The $R^2$ value suggests a moderately good fit, but there is variability that remains unexplained, i.e. other factors or more complex relationships might be influencing house prices.

In [64]:
spark.stop() # stop spark session