# Spark Machine Learning using linear regression


#### Topics covered in this example
* `VectorAssembler`, `LinearRegression` and `RegressionEvaluator` from `pyspark.ml`.

***

## Prerequisites
<div class="alert alert-block alert-info">
<b>NOTE :</b> In order to execute this notebook successfully as is, please ensure the following prerequisites are completed.</div>

* The EMR cluster attached to this notebook should have the `Spark` application installed.
* This example uses a public dataset, hence the EMR cluster attached to this notebook must have internet connectivity.
* This notebook uses the `PySpark` kernel.
***

## Introduction
In this example we use pyspark to predict the total cost of a trip using <a href="https://registry.opendata.aws/nyc-tlc-trip-records-pds/" target="_blank">New York City Taxi and Limousine Commission (TLC) Trip Record Data</a> from <a href="https://registry.opendata.aws/" target="_blank">Registry of Open Data on AWS</a>.

***

## Example
Load the data set for trips into a Spark DataFrame.

In [1]:
df = spark.read.format("csv").load("s3://nyc-tlc/trip data/yellow_tripdata_2020-11.csv", inferSchema = True, header = True)

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1628792020659_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Mark the dataFrame for caching in memory and display the schema to check the data-types using the `printSchema` method.

In [2]:
# Mark the dataFrame for caching in memory
df.cache()

# Print the scehma
df.printSchema()

# Get the dimensions of the data
df.count() , len(df.columns)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

(1508985, 18)

In [3]:
# Get the summary of the columns
df.select("total_amount", "tip_amount").describe().show()

# Value counts of VendorID column
df.groupBy("VendorID").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+------------------+-----------------+
|summary|      total_amount|       tip_amount|
+-------+------------------+-----------------+
|  count|           1508985|          1508985|
|   mean| 17.45322861398119|1.979809931841758|
| stddev|123.92467136301887|2.213538077170879|
|    min|            -497.3|          -103.06|
|    max|         151522.07|            450.0|
+-------+------------------+-----------------+

+--------+------+
|VendorID| count|
+--------+------+
|    null| 99049|
|       1|446141|
|       2|963795|
+--------+------+

### Use <a href="https://spark.apache.org/docs/2.4.7/ml-features#vectorassembler" target="_blank">VectorAssembler</a> to transform input columns into vectors
<a href="https://spark.apache.org/docs/2.3.1/api/python/pyspark.ml.html" target="_blank">pyspark.ml</a> provides dataFrame-based machine learning APIs to let users quickly assemble and configure practical machine learning pipelines.    
A `VectorAssembler` combines a given list of columns into a single vector column. In the below cell we combine the columns to a single vector cloumn `features`.

In [4]:
from pyspark.ml.feature import VectorAssembler

# Specify the input and output columns of the vector assembler
vectorAssembler = VectorAssembler(
    inputCols = [
        "trip_distance",
        "PULocationID",
        "DOLocationID",
        "fare_amount",
        "mta_tax",
        "tip_amount", 
        "tolls_amount",
        "improvement_surcharge", 
        "congestion_surcharge"
    ], 
    outputCol = "features")

# Transform the data
v_df = vectorAssembler.transform(df)

# View the transformed data
v_df = v_df.select(["features", "total_amount"])
v_df.show(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+
|            features|total_amount|
+--------------------+------------+
|[1.8,246.0,137.0,...|        17.9|
|[2.8,229.0,4.0,11...|        15.3|
|[8.1,144.0,83.0,2...|        32.8|
+--------------------+------------+
only showing top 3 rows

Divide input dataset into training set and test set

In [5]:
splits = v_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Train the model using <a href="https://spark.apache.org/docs/2.4.7/ml-classification-regression.html#linear-regression" target="_blank">LinearRegression</a> against training set

In [6]:
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol = "features", labelCol = "total_amount", maxIter = 100, regParam = 0.3, elasticNetParam = 0.8)
lr_model = lr.fit(train_df)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Coefficients: [0.0,0.0,0.0,0.9981329031685724,0.0,0.9345062568672848,0.8092398254016294,0.0,0.7608024111636874]
Intercept: 1.7511710812068897

Report the trained model performance on the training set

In [7]:
training_summary = lr_model.summary
print("RMSE: %f" % training_summary.rootMeanSquaredError)
print("R squred (R2): %f" % training_summary.r2)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

RMSE: 0.671089
R squred (R2): 0.999979

Predict the result using test set and report accuracy

In [8]:
predictions = lr_model.transform(test_df)

from pyspark.sql.functions import col
predictions.filter(predictions.total_amount > 10.0).select("prediction", "total_amount").withColumn("diff", col("prediction") - col("total_amount")).withColumn("diff%", (col("diff") / col("total_amount")) * 100).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+------------+------------------+------------------+
|        prediction|total_amount|              diff|             diff%|
+------------------+------------+------------------+------------------+
|  26.7044936604212|        25.0|1.7044936604211998| 6.817974641684799|
|13.728765919229758|        12.0|1.7287659192297582|14.406382660247985|
|37.983395466226064|        36.3|1.6833954662260666| 4.637453075002939|
|13.728765919229758|        12.0|1.7287659192297582|14.406382660247985|
| 21.71382914457834|        20.0| 1.713829144578341| 8.569145722891705|
| 21.71382914457834|        20.0| 1.713829144578341| 8.569145722891705|
|28.002066434540346|        26.3|1.7020664345403453|6.4717354925488415|
|16.919608259344866|       15.36|1.5596082593448664|10.153699605109807|
|16.919608259344866|       15.36|1.5596082593448664|10.153699605109807|
|  68.3079423619948|        67.0|1.3079423619948045|1.9521527790967232|
| 61.63914527132123|        60.3| 1.339145271321236|2.2208047617

### Report performance on the test set using <a href="https://spark.apache.org/docs/2.4.7/api/java/org/apache/spark/ml/evaluation/RegressionEvaluator.html" target="_blank">RegressionEvaluator</a>

In [9]:
from pyspark.ml.evaluation import RegressionEvaluator

lr_evaluator = RegressionEvaluator(predictionCol = "prediction", labelCol = "total_amount",metricName = "r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(predictions))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

R Squared (R2) on test data = 0.997477