# Week11 - Machine Learning Using Spark's MLlib

In this lab, we will build supervised learning (linear regression) models using Apache Spark's MLlib

You are also recommended to work through the example delivered in Week11 lecture containing in the following directory on BrightSpace.

**Learning Materials -> Week11: Probabilistic Algorithms in Streams, ML For Big Data -> MLlib Linear Regression Examples**

https://brightspace.ucd.ie/d2l/le/content/249607/viewContent/2764212/View

We will install and set up Spark and PySpark as we've done in previous weeks.

# Section 1 - Setting up Spark

Install Java and Spark and set up Spark Session as we've done before.

In [1]:
# Installing Java and Spark:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
#Switching java version to use as default (choose option 2)
!update-alternatives --config java
#Switching javac version to use as default (choose option 2)
!update-alternatives --config javac
#Switching jps version to use as default (choose option 2)
!update-alternatives --config jps

There are 2 choices for the alternative java (providing /usr/bin/java).

  Selection    Path                                            Priority   Status
------------------------------------------------------------
* 0            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      auto mode
  1            /usr/lib/jvm/java-11-openjdk-amd64/bin/java      1111      manual mode
  2            /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java   1081      manual mode

Press <enter> to keep the current choice[*], or type selection number: 2
update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode
There are 2 choices for the alternative javac (providing /usr/bin/javac).

  Selection    Path                                          Priority   Status
------------------------------------------------------------
* 0            /usr/lib/jvm/java-11-openjdk-amd64/bin/javac   1111      auto mode
  1            /usr/lib/jvm/java-11-o

In [3]:
!wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz
!rm spark-3.5.0-bin-hadoop3.tgz   # Tidying up

--2023-11-22 21:03:24--  https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400395283 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.0-bin-hadoop3.tgz’


2023-11-22 21:03:26 (205 MB/s) - ‘spark-3.5.0-bin-hadoop3.tgz’ saved [400395283/400395283]



In [4]:
#Checking Java default version
!java -version

openjdk version "1.8.0_382"
OpenJDK Runtime Environment (build 1.8.0_382-8u382-ga-1~22.04.1-b05)
OpenJDK 64-Bit Server VM (build 25.382-b05, mixed mode)


In [5]:
# Setting up our environmental variables:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

In [6]:
!pip install -q findspark
import findspark
findspark.init()

In [7]:
from pyspark import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [8]:
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

# Section 1 - Linear Regression Model Using One Model Variable

Upload the **San Francisco housing data set**.

More details about the dataset below:
http://insideairbnb.com/get-the-data/

data/sf-airbnb/sf-airbnb-clean-100p.parquet.zip

Look at the header line in (data/sf-airbnb/sf-airbnb.csv) for the column names.

```python
id,listing_url,scrape_id,last_scraped,name,summary,space,description,experiences_offered,neighborhood_overview,notes,transit,access,interaction,house_rules,thumbnail_url,medium_url,picture_url,xl_picture_url,host_id,host_url,host_name,host_since,host_location,host_about,host_response_time,host_response_rate,host_acceptance_rate,host_is_superhost,host_thumbnail_url,host_picture_url,host_neighbourhood,host_listings_count,host_total_listings_count,host_verifications,...
```

In [16]:
from google.colab import files
uploaded = files.upload()

Saving sf-airbnb-clean-100p.parquet.zip to sf-airbnb-clean-100p.parquet.zip


In [17]:
!unzip sf-airbnb-clean-100p.parquet.zip

Archive:  sf-airbnb-clean-100p.parquet.zip
   creating: sf-airbnb-clean-100p.parquet/
  inflating: sf-airbnb-clean-100p.parquet/_SUCCESS  
  inflating: sf-airbnb-clean-100p.parquet/_committed_773940399323573814  
  inflating: sf-airbnb-clean-100p.parquet/_started_773940399323573814  
  inflating: sf-airbnb-clean-100p.parquet/part-00000-tid-773940399323573814-405a9fc3-d671-450e-b78b-78ebc9e2fada-16947-1-c000.snappy.parquet  
  inflating: sf-airbnb-clean-100p.parquet/part-00001-tid-773940399323573814-405a9fc3-d671-450e-b78b-78ebc9e2fada-16948-1-c000.snappy.parquet  
  inflating: sf-airbnb-clean-100p.parquet/part-00002-tid-773940399323573814-405a9fc3-d671-450e-b78b-78ebc9e2fada-16949-1-c000.snappy.parquet  
  inflating: sf-airbnb-clean-100p.parquet/part-00003-tid-773940399323573814-405a9fc3-d671-450e-b78b-78ebc9e2fada-16950-1-c000.snappy.parquet  
  inflating: sf-airbnb-clean-100p.parquet/part-00004-tid-773940399323573814-405a9fc3-d671-450e-b78b-78ebc9e2fada-16951-1-c000.snappy.parquet  


We will build a linear regression Model. The model is used to predict the **price per night** for a rental property, given the **number of bedrooms** as a model variable.

First step is data ingestion and exploration.

In [18]:
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import LinearRegression

filePath = """sf-airbnb-clean-100p.parquet/"""
airbnbDF = spark.read.parquet(filePath)
print(airbnbDF.printSchema())
print(airbnbDF.schema)
airbnbDF.select("neighbourhood_cleansed", "room_type", "bedrooms", "bathrooms", "number_of_reviews", "price").show(5)

root
 |-- host_is_superhost: string (nullable = true)
 |-- cancellation_policy: string (nullable = true)
 |-- instant_bookable: string (nullable = true)
 |-- host_total_listings_count: double (nullable = true)
 |-- neighbourhood_cleansed: string (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- property_type: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- accommodates: double (nullable = true)
 |-- bathrooms: double (nullable = true)
 |-- bedrooms: double (nullable = true)
 |-- beds: double (nullable = true)
 |-- bed_type: string (nullable = true)
 |-- minimum_nights: double (nullable = true)
 |-- number_of_reviews: double (nullable = true)
 |-- review_scores_rating: double (nullable = true)
 |-- review_scores_accuracy: double (nullable = true)
 |-- review_scores_cleanliness: double (nullable = true)
 |-- review_scores_checkin: double (nullable = true)
 |-- review_scores

We will use 80% for the training set and 20% for the test set.

A random seed is used for reproducibility. Its value is not that important.

It ensures that that the same data points go into the train and test datasets whenever the code is rerun.

In [19]:
trainDF, testDF = airbnbDF.randomSplit([.8, .2], seed=42)
print(f"""There are {trainDF.count()} rows in the training set, and {testDF.count()} in the test set""")

There are 5770 rows in the training set, and 1376 in the test set


To apply linear regression, all the input features must be contained within a single vector in your DataFrame.

We will use a transformer to transform the data by applying rule-based transformations using the transform() method.

The VectorAssembler transformer takes a list of input columns and creates a new DataFrame with an additional column (features).

In [20]:
vecAssembler = VectorAssembler(inputCols=["bedrooms"], outputCol="features")
vecTrainDF = vecAssembler.transform(trainDF)
vecTrainDF.select("bedrooms", "features", "price").show(10)

+--------+--------+------+
|bedrooms|features| price|
+--------+--------+------+
|     1.0|   [1.0]| 250.0|
|     1.0|   [1.0]| 115.0|
|     1.0|   [1.0]| 130.0|
|     1.0|   [1.0]| 130.0|
|     1.0|   [1.0]| 100.0|
|     2.0|   [2.0]| 199.0|
|     1.0|   [1.0]| 190.0|
|     1.0|   [1.0]|2010.0|
|     2.0|   [2.0]| 250.0|
|     3.0|   [3.0]| 200.0|
+--------+--------+------+
only showing top 10 rows



LinearRegression is a type of estimator, which takes in a DataFrame and returns a Model.

Estimators learn parameters from your data, have an estimator_name.fit() method, and are eagerly evaluated unlike transformers are lazily evaluated.

In [21]:
lr = LinearRegression(featuresCol="features", labelCol="price")
lrModel = lr.fit(vecTrainDF)

m = round(lrModel.coefficients[0], 2)
b = round(lrModel.intercept, 2)
print(f"""The formula for the linear regression line is price = {m}*bedrooms + {b}""")

The formula for the linear regression line is price = 120.02*bedrooms + 52.63


The Pipeline API allows specifying the stages you want your data to pass through.

In Spark, Pipelines are estimators, whereas PipelineModels (fitted Pipelines) are transformers.

The Pipeline API determines which stages are estimators/transformers and applies name.fit() and name.transform() for each of the stages accordingly.

In [22]:
pipeline = Pipeline(stages=[vecAssembler, lr])
pipelineModel = pipeline.fit(trainDF)
predDF = pipelineModel.transform(testDF)
predDF.select("bedrooms", "features", "price", "prediction").show(10)

+--------+--------+------+------------------+
|bedrooms|features| price|        prediction|
+--------+--------+------+------------------+
|     1.0|   [1.0]|  70.0|172.65166349560025|
|     1.0|   [1.0]| 159.0|172.65166349560025|
|     1.0|   [1.0]| 115.0|172.65166349560025|
|     1.0|   [1.0]| 105.0|172.65166349560025|
|     3.0|   [3.0]| 170.0|412.69702083787956|
|     2.0|   [2.0]| 165.0| 292.6743421667399|
|     2.0|   [2.0]| 300.0| 292.6743421667399|
|     2.0|   [2.0]| 145.0| 292.6743421667399|
|     4.0|   [4.0]|1450.0| 532.7196995090192|
|     2.0|   [2.0]| 160.0| 292.6743421667399|
+--------+--------+------+------------------+
only showing top 10 rows



Root-mean-square-error (RMSE) is used to evaluate the prediction accuracy of LinearRegression model.

It is a metric from 0 to ∞. The closer it is to 0, the better the prediction accuracy.

In [23]:
regressionEvaluator = RegressionEvaluator(
                          predictionCol="prediction",
                          labelCol="price",
                          metricName="rmse")
rmse = regressionEvaluator.evaluate(predDF)
print(f"RMSE is {rmse:.1f}")

RMSE is 267.6


# Section 2 - Linear Regression Using Multiple Model Variables.

Use the following dependant model variables to model the price per night using Linear Regression.

``` python
airbnbMVars = [ "acommodates",
                "bathrooms",
                "bedrooms",
                "beds",
                "minimum_nights",
                "review_scores_rating" ]
```

Follow the steps below:

1). Data Ingestion.

2). Feature engineering using transformers.

3). Build LR model.

4). Create a ML pipeline.

5). Evaluate the RMSE of the model.

In [None]:
# Solve: