### Machine learning with PySpack

In this notebook we are going to create a machine learning model using pyspack. We are going to use the dataset that was obtained from kaggle:

https://www.kaggle.com/datasets/camnugent/california-housing-prices

First we are going to start by installing all required packages.

In [18]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# install findspark using pip
!pip install -q findspark

The after that we are going to initialize the sparks context.

In [19]:
# set your spark folder to your system path environment.
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In the following code cell we are going to import all the packages that we will be using in this notebook.

In [49]:
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import udf, monotonically_increasing_id,  mean
from pyspark.ml.feature import Imputer, VectorAssembler, StandardScaler, StringIndexer, OneHotEncoder
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics

We are going to load the dataset that was downloaded from kaggle.

https://www.kaggle.com/datasets/camnugent/california-housing-prices

We only care about the `train.csv` file.


In the following code cell we are then going to load our data using sparks.

In [33]:
dataframe = spark.read.format("csv").load("housing.csv", header=True, inferSchema=True)

dataframe.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



We can use the method called `show` which will show the first `n` values from a dataframe

In [34]:
dataframe.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 3 rows



Next we want to add the `id` column at the begeining of the dataframe. We can do that as follows:


In [36]:
dataframe = dataframe.withColumn('id', monotonically_increasing_id())
dataframe = dataframe[['id'] + dataframe.columns[:-1]]
dataframe.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

We can use the ``count`` method to count how many rows of data do we have in this dataset.

In [37]:
print(f"Number of rows: {dataframe.count()}")

Number of rows: 20640


We can do data aggregation based on the columns name. Here is an example

In [39]:
dataframe.select('total_rooms').agg({'total_rooms': 'avg'}).show()

+------------------+
|  avg(total_rooms)|
+------------------+
|2635.7630813953488|
+------------------+



We can calculate the mean valued per each column using the `mean` function. Here is an example

In [40]:
dataframe.select(*[mean(c) for c in dataframe.columns]).show()

+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|avg(id)|     avg(longitude)|   avg(latitude)|avg(housing_median_age)|  avg(total_rooms)|avg(total_bedrooms)|   avg(population)|  avg(households)|avg(median_income)|avg(median_house_value)|avg(ocean_proximity)|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+------------------+-----------------------+--------------------+
|10319.5|-119.56970445736148|35.6318614341087|     28.639486434108527|2635.7630813953488|  537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|     206855.81690891474|                null|
+-------+-------------------+----------------+-----------------------+------------------+-------------------+------------------+-----------------+----------

We can calculate the avarage based on the rows thar are grouped by `ocean_proximity` from column 3 to the second last.

In [41]:
dataframe.groupby('ocean_proximity').agg({col: 'avg' for col in dataframe.columns[3:-1]}).show()

+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|ocean_proximity|   avg(households)|   avg(population)|avg(total_bedrooms)|avg(median_income)|  avg(total_rooms)|avg(median_house_value)|avg(housing_median_age)|
+---------------+------------------+------------------+-------------------+------------------+------------------+-----------------------+-----------------------+
|         ISLAND|             276.6|             668.0|              420.4|2.7444200000000003|            1574.6|               380440.0|                   42.4|
|     NEAR OCEAN|501.24454477050415|1354.0086531226486|  538.6156773211568| 4.005784800601957| 2583.700902934537|     249433.97742663656|     29.347253574115875|
|       NEAR BAY| 488.6161572052402|1230.3174672489083|  514.1828193832599| 4.172884759825336| 2493.589519650655|     259212.31179039303|      37.73013100436681|
|      <1H OCEAN| 517.744964

We can add a column based on the computed valued of other columns to the dataframe using the `withColumn` function.

In [44]:
def squared(value):
  return value * value
squared_udf = udf(squared, FloatType())
dataframe.withColumn('total_rooms_squared', squared_udf('total_rooms')).show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|total_rooms_squared|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+-------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|           774400.0|
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|          5.03958E7|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|          2152089.0|
+---+---------+--------+----

Let's split our dataset into 2 sets the ``train`` and the `test` using the `randomSplit` method.

In [46]:
train, test = dataframe.randomSplit([0.7, 0.3])

Next, let's grab the numerical columns from our dataframe.

In [47]:
numerical_features_lst = train.columns
numerical_features_lst.remove('median_house_value')
numerical_features_lst.remove('id')
numerical_features_lst.remove('ocean_proximity')
numerical_features_lst

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income']

Next we are going to handle the missing values based on the columns that we have. For that we are going to use the ``Imputer`

> In PySpark, the Imputer is a feature transformer that helps handle missing values in a DataFrame. Missing values can be a common issue in real-world datasets, and imputation is the process of filling in those missing values with estimated or calculated values. - https://medium.com/@sujathamudadla1213/what-is-the-purpose-of-the-pyspark-imputer-and-how-does-it-handle-missing-values-in-a-dataframe-d117b4d16074#:~:text=2%20min%20read,with%20estimated%20or%20calculated%20values.

In [50]:
imputer = Imputer(inputCols=numerical_features_lst,
                  outputCols=numerical_features_lst)
imputer = imputer.fit(train)
train = imputer.transform(train)
test = imputer.transform(test)
train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only s

Next we are going to convert all the numerical column values into a vector, For that we will use the `VectorAssembler`

In [51]:
numerical_vector_assembler = VectorAssembler(inputCols=numerical_features_lst,
                                             outputCol='numerical_feature_vector')

train = numerical_vector_assembler.transform(train)
test = numerical_vector_assembler.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    [-122.24,37.85,52...|
|  3|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|    [-122.25,37.85,52...

In [52]:
train.select('numerical_feature_vector').take(2)

[Row(numerical_feature_vector=DenseVector([-122.22, 37.86, 21.0, 7099.0, 1106.0, 2401.0, 1138.0, 8.3014])),
 Row(numerical_feature_vector=DenseVector([-122.24, 37.85, 52.0, 1467.0, 190.0, 496.0, 177.0, 7.2574]))]

Next we are going to scale our `numerical_feature_vector` column using a `StandardScaler`.

In [53]:
scaler = StandardScaler(
    inputCol='numerical_feature_vector',
    outputCol='scaled_numerical_feature_vector',
    withStd=True, withMean=True
)

scaler = scaler.fit(train)
train = scaler.transform(train)
test = scaler.transform(test)
train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|           [-1.3300210332126...|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|    [-122.24,37.85,52...|           [-1.3400196910238...|
|  3|

We want to convert our categorical feature to be numbers specifically one hot encoded vectors. We are going to start by using the `StringIndexer` to create indecies based on the `ocean_proximity` column.

In [54]:
indexer = StringIndexer(inputCol='ocean_proximity',
                        outputCol='ocean_category_index')

indexer = indexer.fit(train)
train = indexer.transform(train)
test = indexer.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|           [-1.3300210332126...|                 3.0|
|  2|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          3521

Next we are going to one-hot encode them.

In [55]:
one_hot_encoder = OneHotEncoder(inputCol='ocean_category_index',
                                outputCol='ocean_category_one_hot')

one_hot_encoder = one_hot_encoder.fit(train)

train = one_hot_encoder.transform(train)
test = one_hot_encoder.transform(test)

train.show(3)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|           [-1.3300210332126...|                 3.0|         (4,[3],[1.0])|
|  2|  -122.24|   37.85|    

Then we are going to join the `scaled_numerical_feature_vector` with the `ocean_category_one_hot`	using the `VectorAssembler`.

In [56]:
assembler = VectorAssembler(inputCols=['scaled_numerical_feature_vector',
                                       'ocean_category_one_hot'],
                            outputCol='final_feature_vector')

train = assembler.transform(train)
test = assembler.transform(test)

In [57]:
train.show(2)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|    [-122.22,37.86,21...|           [-1.3300210332126...|          

In [58]:
train.select('final_feature_vector').take(2)

[Row(final_feature_vector=DenseVector([-1.33, 1.0529, -0.6156, 2.048, 1.3514, 0.8616, 1.6723, 2.3274, 0.0, 0.0, 0.0, 1.0])),
 Row(final_feature_vector=DenseVector([-1.34, 1.0482, 1.8369, -0.5262, -0.8136, -0.8025, -0.8303, 1.7792, 0.0, 0.0, 0.0, 1.0]))]

Next we are going to build our model. For that we are going to use the `LinearRegression` algorithim.

In [63]:
lr = LinearRegression(featuresCol='final_feature_vector',
                      labelCol='median_house_value')
model = lr.fit(train)

In [66]:
model.setPredictionCol('predicted_median_house_value')
model.setFeaturesCol('final_feature_vector')

LinearRegressionModel: uid=LinearRegression_12868fc2edf0, numFeatures=12

Now we can make predictions using our model.

In [69]:
pred_train_df = model.transform(train).withColumnRenamed(
    'prediction',
    'predicted_median_house_value'
)

pred_train_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  1|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          35850

Let's do the predictions on the test dataset.

In [71]:
pred_test_df = model.transform(test).withColumnRenamed('prediction', 'predicted_median_house_value')
pred_test_df.show(5)

+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
| id|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|numerical_feature_vector|scaled_numerical_feature_vector|ocean_category_index|ocean_category_one_hot|final_feature_vector|predicted_median_house_value|
+---+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+------------------------+-------------------------------+--------------------+----------------------+--------------------+----------------------------+
|  0|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          45260

We can convert our dataframe to pandas dataframe as follows:

In [72]:
pred_test_pd_df = pred_test_df.toPandas()

pred_test_pd_df.head(2)

Unnamed: 0,id,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity,numerical_feature_vector,scaled_numerical_feature_vector,ocean_category_index,ocean_category_one_hot,final_feature_vector,predicted_median_house_value
0,0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY,"[-122.23, 37.88, 41.0, 880.0, 129.0, 322.0, 12...","[-1.335020362118287, 1.0623359017244918, 0.966...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.335020362118287, 1.0623359017244918, 0.966...",408885.411197
1,9,-122.25,37.84,52.0,3549.0,707.0,1551.0,714.0,3.6912,261100.0,NEAR BAY,"[-122.25, 37.84, 52.0, 3549.0, 707.0, 1551.0, ...","[-1.345019019929461, 1.0435462124206465, 1.836...",3.0,"(0.0, 0.0, 0.0, 1.0)","[-1.345019019929461, 1.0435462124206465, 1.836...",267333.47984


In [73]:
predictions_and_actuals = pred_test_df[['predicted_median_house_value',
                                        'median_house_value']]

predictions_and_actuals_rdd = predictions_and_actuals.rdd

predictions_and_actuals_rdd.take(2)

[Row(predicted_median_house_value=408885.4111965421, median_house_value=452600.0),
 Row(predicted_median_house_value=267333.47984038526, median_house_value=261100.0)]

We can map our predictions to tuples as follows.

In [74]:
predictions_and_actuals_rdd = predictions_and_actuals_rdd.map(tuple)
predictions_and_actuals_rdd.take(2)

[(408885.4111965421, 452600.0), (267333.47984038526, 261100.0)]

Let's calculate the metrics.

In [75]:
metrics = RegressionMetrics(predictions_and_actuals_rdd)

s = '''
Mean Squared Error:      {0}
Root Mean Squared Error: {1}
Mean Absolute Error:     {2}
R**2:                    {3}
'''.format(metrics.meanSquaredError,
           metrics.rootMeanSquaredError,
           metrics.meanAbsoluteError,
           metrics.r2
           )

print(s)


Mean Squared Error:      4811706935.201839
Root Mean Squared Error: 69366.46837775323
Mean Absolute Error:     50260.78108241543
R**2:                    0.6458374066706432

