# Q2 & Q3 - Step2 (run on Spark EMR cluster)

## Q2 - Initial Setup

In [34]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1715914439535_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1715914439535_0003,pyspark,idle,Link,Link,✔


In [35]:
sc.install_pypi_package("pandas==1.0.5", "https://pypi.org/simple")
sc.install_pypi_package("scipy==1.4.1", "https://pypi.org/simple")
sc.install_pypi_package("matplotlib==3.2.1", "https://pypi.org/simple")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==1.0.5
  Using cached https://files.pythonhosted.org/packages/af/f3/683bf2547a3eaeec15b39cef86f61e921b3b187f250fcd2b5c5fb4386369/pandas-1.0.5-cp37-cp37m-manylinux1_x86_64.whl
Collecting python-dateutil>=2.6.1 (from pandas==1.0.5)
  Using cached https://files.pythonhosted.org/packages/ec/57/56b9bcc3c9c6a792fcbaf139543cee77261f3651ca9da0c93f5c1221264b/python_dateutil-2.9.0.post0-py2.py3-none-any.whl
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-1.0.5 python-dateutil-2.9.0.post0

Collecting scipy==1.4.1
  Using cached https://files.pythonhosted.org/packages/dd/82/c1fe128f3526b128cfd185580ba40d01371c5d299fcf7f77968e22dfcc2e/scipy-1.4.1-cp37-cp37m-manylinux1_x86_64.whl
Installing collected packages: scipy
Successfully installed scipy-1.4.1

Collecting matplotlib==3.2.1
  Using cached https://files.pythonhosted.org/packages/b2/c2/71fcf957710f3ba1f09088b35776a799ba7dd95f7c2b195ec800933b276b/matplotlib-3.2.1-cp37-cp37m-manylinux1_x86_64.

In [36]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, Bucketizer
import matplotlib.pyplot as plt
from pyspark.sql.functions import dayofweek
import pandas as pd
import numpy as np

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
# Read the parquet file
data = spark.read.parquet('s3://nyc-tlc/trip data/yellow_tripdata_2015*.parquet')

# Clean the dataset
data = data.filter(col('tip_amount') >= 0)
data = data.filter(col('tip_amount') < 2000)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

##  Q2a - Engineer Additional Features

### Feature1 - Weekend

This is the feature in the existing linear regression model.

In [38]:
# Add day_of_week and weekend features
data = data.withColumn('day_of_week', F.dayofweek(F.col('tpep_pickup_datetime'))) \
           .withColumn('weekend', F.col('day_of_week').isin({7, 1}).cast('integer'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Feature2 - Hour of Day (date/time feature, one-hot encoded)

Add "Hour of Day" to the model because we observed in Q1's visualization that it influences the tip amount. Specifically, people tend to tip more around 5am and 7pm.

In [39]:
# Add hour_of_day feature
data = data.withColumn('hour_of_day', F.hour(data.tpep_pickup_datetime))

# Index the hour_of_day
hour_indexer = StringIndexer(inputCol="hour_of_day", outputCol="hour_of_day_index").fit(data)
data = hour_indexer.transform(data)

# One-hot encode the indexed hour_of_day
hour_encoder = OneHotEncoder(inputCol="hour_of_day_index", outputCol="hour_of_day_vector")
hour_encoder_model = hour_encoder.fit(data)
data = hour_encoder_model.transform(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
# Get the order of one-hot encoding index and the baseline category of hour_of_day feature

# Collect the frequencies into a list and sort by count in descending order
frequency_df = data.groupBy('hour_of_day').count()
frequencies = frequency_df.orderBy(col('count').desc()).collect()

# Get ordered list of feature category based on the frequencies
hour_to_index = {row['hour_of_day']: idx for idx, row in enumerate(frequencies)}
sorted_hours = sorted(hour_to_index.items(), key=lambda item: item[1])
hour_features = [f"hour_of_day_{hour}" for hour, _ in sorted_hours]

# Remove the baseline category and store it in a variable
hour_baseline = hour_features.pop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Feature3 - Day of Week (categorical feature, one-hot encoded) 

Add "Day of Week" to the model because we observed in Q1's visualization that it influences the tip amount. Specifically, people tip more during the weekdays.

In [41]:
# StringIndexer for day_of_week
indexer = StringIndexer(inputCol='day_of_week', outputCol='day_of_week_index')
data = indexer.fit(data).transform(data)

# OneHotEncoder for day_of_week_index
encoder = OneHotEncoder(inputCol='day_of_week_index', outputCol='day_of_week_vector')
data = encoder.fit(data).transform(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
# Get the order of one-hot encoding index and the baseline category of day_of_week feature

# Calculate the frequency of each day_of_week and sort by count in descending order
frequency_df = data.groupBy('day_of_week').count()
frequencies = frequency_df.orderBy(col('count').desc()).collect()

# Get ordered list of feature category based on the frequencies
day_to_index = {row['day_of_week']: idx for idx, row in enumerate(frequencies)}
sorted_days = sorted(day_to_index.items(), key=lambda item: item[1])
day_features = [f"day_of_week_{day}" for day, _ in sorted_days]

# Remove the baseline category and store it in a variable
day_baseline = day_features.pop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Feature4 - Passenger Count (integer feature, bucketized and one-hot encoded)

Add "Passenger Count" to the model because we observed in Q1's visualization that it influences the tip amount. Specifically, larger groups of people (more than 6) tend to tip more significantly. However, note that this is a driver-entered value, and there are many entries with 0, possibly because drivers don't always bother to enter the passenger count.

In [43]:
# Define the splits for passenger_count buckets
passenger_splits = [-float('inf'), 1.0, 2.0, 5.0, float('inf')]

# Initialize the Bucketizer for passenger_count
passenger_bucketizer = Bucketizer(splits=passenger_splits, inputCol="passenger_count", outputCol="passenger_count_bucket")

# Transform the data for passenger_count buckets
data = passenger_bucketizer.setHandleInvalid("skip").transform(data)

# Index the passenger_count_bucket
passenger_count_indexer = StringIndexer(inputCol="passenger_count_bucket", outputCol="passenger_count_index").fit(data)
data = passenger_count_indexer.transform(data)

# One-hot encode the indexed passenger_count_bucket
passenger_count_encoder = OneHotEncoder(inputCol="passenger_count_index", outputCol="passenger_count_vector")
passenger_count_encoder_model = passenger_count_encoder.fit(data)
data = passenger_count_encoder_model.transform(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
# Show the distribution
distribution = data.groupBy('passenger_count').count()
distribution.show()

# Show the vector distribution to clarify the order of one-hot encoding features
distribution = data.groupBy('passenger_count_vector').count()
distribution.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------+---------+
|passenger_count|    count|
+---------------+---------+
|              5|  7937313|
|              6|  5123157|
|              9|      165|
|              1|102928494|
|              3|  6133758|
|              2| 20893560|
|              0|    40681|
|              7|      227|
|              8|      178|
|              4|  2980689|
+---------------+---------+

+----------------------+---------+
|passenger_count_vector|    count|
+----------------------+---------+
|         (3,[2],[1.0])| 13061040|
|         (3,[1],[1.0])| 30008007|
|         (3,[0],[1.0])|102928494|
|             (3,[],[])|    40681|
+----------------------+---------+

In [45]:
# Get the order of one-hot encoding features and the baseline category based on the distribution
# Because it has change the order of index during the bucketization

passenger_count_features = ["passenger_count_1", "passenger_count_2_4", "passenger_count_5_", "passenger_count_NA"]

# Remove the baseline category and store it in a variable
passenger_count_baseline = passenger_count_features.pop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Feature5 - Borough to Pickup (spatial feature, one-hot encoded)

Add "Borough to Pickup" in the model to reflect the potential variability in tipping habits or travel purposes among passengers originating from different boroughs, which could impact the tip amount.

In [46]:
# Get the csv file from s3 bucket
zone_lookup_path = 's3://qixin-a3-bucket/jupyter/jovyan/taxi_zone_lookup.csv'
zone_lookup = spark.read.option("header", "true").csv(zone_lookup_path)

# Show the schema and a few rows of the lookup data
zone_lookup.printSchema()
zone_lookup.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows

In [47]:
# Add the PUBorough feature to the dataset
zone_lookup = zone_lookup.withColumnRenamed("LocationID", "PULocationID").withColumnRenamed("Borough", "PUBorough")
data = data.join(zone_lookup, on="PULocationID", how="left")

# Add the DOBorough feature to the dataset
zone_lookup = zone_lookup.withColumnRenamed("PULocationID", "DOLocationID").withColumnRenamed("PUBorough", "DOBorough")
data = data.join(zone_lookup, on="DOLocationID", how="left")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [48]:
# Index and one-hot encode the PUBorough
pu_borough_indexer = StringIndexer(inputCol="PUBorough", outputCol="PUBorough_index").fit(data)
data = pu_borough_indexer.transform(data)
pu_borough_encoder = OneHotEncoder(inputCol="PUBorough_index", outputCol="PUBorough_vector")
pu_borough_encoder_model = pu_borough_encoder.fit(data)
data = pu_borough_encoder_model.transform(data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
# Get the order of one-hot encoding index 

# Collect the frequencies into a list and sort by count in descending order
frequency_df = data.groupBy('PUBorough').count()
frequencies = frequency_df.orderBy(col('count').desc()).collect()

# Get ordered list of feature category based on the frequencies
PUBorough_to_index = {row['PUBorough']: idx for idx, row in enumerate(frequencies)}
sorted_PUBorough = sorted(PUBorough_to_index.items(), key=lambda item: item[1])
PUBorough_features = [f"PUBorough_{Borough}" for Borough, _ in sorted_PUBorough]

# Remove the baseline category and store it in a variable
PUBorough_baseline = PUBorough_features.pop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Feature6 - Trip Distance (floating point feature) & Feature7 - Fare Amount (floating point feature)

Add "Trip Distance" to the model because people might give more tips on long-distance trips due to increased fare amounts or longer travel times.

Add "Fare Amount" to the model because people might give more tips when the fare amount is higher.

## Q2a - Build Linear Regression Model and Test

In [50]:
#data = data.drop('features')  #Remove the existing 'features' column if there's one before

features = ['weekend', 'trip_distance',  'fare_amount', 'passenger_count_vector', 'hour_of_day_vector', 'day_of_week_vector', 'PUBorough_vector']
assembler = VectorAssembler(inputCols=features, outputCol='features')

data = assembler.transform(data)
data[['tip_amount', 'features']].show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------+--------------------+
|tip_amount|            features|
+----------+--------------------+
|       1.4|(42,[1,2,3,22,31,...|
|       0.0|(42,[1,2,3,22,31,...|
|       2.9|(42,[1,2,3,22,31,...|
|      2.37|(42,[1,2,3,22,31,...|
|       0.0|(42,[1,2,3,22,31,...|
+----------+--------------------+
only showing top 5 rows

In [51]:
# Use a subset of data for trial - prepare to run the full dataset in the pipeline

sampled_data = data.sample(fraction=0.001, seed=42)
train, test = sampled_data.randomSplit([0.8, 0.2], seed=42)

# Persist data in memory to speed up I/O during training/testing
train.persist()
test.persist()

# confirm they're cached
print(train.is_cached, test.is_cached)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True True

In [52]:
# Create a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="tip_amount")

# Fit the model to the training data
lr_model = lr.fit(train)

# Print the coefficients and intercept for linear regression
print(f"Coefficients: {lr_model.coefficients}")
print(f'Intercept: {lr_model.intercept}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Coefficients: [4.360983518826533,0.1770216631997958,0.06835356323241612,0.7971570878457249,0.67371430857873,0.7765339425756856,0.286785174329209,0.24808160327506928,0.2762210079117851,0.2443631730995019,0.2805118149078387,0.09274658798445717,0.15660175749607408,0.16076066186226132,0.12151273255705293,0.22901747132162056,0.11137339169389483,0.12848346201503935,0.19536933309429008,0.12835382048850996,0.23617532585193213,0.13696056303973525,0.18155044088149902,0.17297574220734027,0.13061393649648717,0.052309848045051476,0.07238224965818747,-0.06263907810482393,-0.15908296072249628,-4.501480609860989,0.03291189629744201,0.06613546044265059,0.07094999360923764,0.07120477971190783,-4.471068859894281,0.7500198810103271,0.6931163943324465,0.7782816415996743,0.8450668967407127,0.7323505393570933,2.3062560008422976,13.196164410992406]
Intercept: -1.4103424961522115

In [53]:
# Evaluate with test data using `evaluate` method
lr_evaluation_summary = lr_model.evaluate(test)

# Print out evaluation metrics like RMSE:
print(f'Test RMSE: {lr_evaluation_summary.rootMeanSquaredError}')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Test RMSE: 1.9679645393447491

In [54]:
# Extract the coefficients and feature names
coefficients = lr_model.coefficients
feature_names = ['weekend', 'trip_distance', 'fare_amount'] + \
                  passenger_count_features + \
                  hour_features + \
                  day_features + \
                  PUBorough_features

# Display the baseline category for one-hot encoded features
baselines = {
    "day_baseline": day_baseline,
    "hour_baseline": hour_baseline,
    "PUBorough_baseline": PUBorough_baseline,
    "passenger_count_baseline": passenger_count_baseline
}
for name, value in baselines.items():
    print(f"{name}: {value}")

# Display feature importance in descending order (of absolute value)
feature_importance = pd.DataFrame(list(zip(feature_names, coefficients)), columns=["Feature", "Coefficient"])
feature_importance_sorted = feature_importance.reindex(feature_importance.Coefficient.abs().sort_values(ascending=False).index)
print(feature_importance_sorted)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

day_baseline: day_of_week_2
hour_baseline: hour_of_day_5
PUBorough_baseline: PUBorough_Staten Island
passenger_count_baseline: passenger_count_NA
                Feature  Coefficient
41        PUBorough_EWR    13.196164
29        day_of_week_7    -4.501481
34        day_of_week_1    -4.471069
0               weekend     4.360984
40        PUBorough_N/A     2.306256
38    PUBorough_Unknown     0.845067
3     passenger_count_1     0.797157
37   PUBorough_Brooklyn     0.778282
5    passenger_count_5_     0.776534
35  PUBorough_Manhattan     0.750020
39      PUBorough_Bronx     0.732351
36     PUBorough_Queens     0.693116
4   passenger_count_2_4     0.673714
6        hour_of_day_19     0.286785
10       hour_of_day_22     0.280512
8        hour_of_day_20     0.276221
7        hour_of_day_18     0.248082
9        hour_of_day_21     0.244363
20        hour_of_day_8     0.236175
15       hour_of_day_23     0.229017
18        hour_of_day_9     0.195369
22        hour_of_day_0     0.181550
1  

## Q2b - Organize All Transformers and Estimators into a Reproducible Machine Learning Pipeline

In [55]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Bucketizer, StringIndexer, OneHotEncoder, VectorAssembler, SQLTransformer
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# Clean the dataset
data = spark.read.parquet('s3://nyc-tlc/trip data/yellow_tripdata_2015*.parquet')
data = data.filter(col('tip_amount') >= 0)
data = data.filter(col('tip_amount') < 2000)

# Get Borough features from csv file
zone_lookup_path = "s3://qixin-a3-bucket/jupyter/jovyan/taxi_zone_lookup.csv"
zone_lookup = spark.read.option("header", "true").csv(zone_lookup_path)
zone_lookup = zone_lookup.withColumnRenamed("LocationID", "PULocationID").withColumnRenamed("Borough", "PUBorough")
data = data.join(zone_lookup, on="PULocationID", how="left")
zone_lookup = zone_lookup.withColumnRenamed("PULocationID", "DOLocationID").withColumnRenamed("PUBorough", "DOBorough")
data = data.join(zone_lookup, on="DOLocationID", how="left")

# Define the transformations as stages in the pipeline
stages = []

# Use SQLTransformer to add day_of_week and weekend features
sql_transformer = SQLTransformer(statement="""
    SELECT *,
           dayofweek(tpep_pickup_datetime) AS day_of_week,
           CASE WHEN dayofweek(tpep_pickup_datetime) IN (1, 7) THEN 1 ELSE 0 END AS weekend
    FROM __THIS__
""")
stages += [sql_transformer]

# Index and one-hot encode the PUBorough
pu_borough_indexer = StringIndexer(inputCol="PUBorough", outputCol="PUBorough_index")
pu_borough_encoder = OneHotEncoder(inputCol="PUBorough_index", outputCol="PUBorough_vector")
stages += [pu_borough_indexer, pu_borough_encoder]

# Extract, index and one-hot encode the hour_of_day
data = data.withColumn('hour_of_day', F.hour(data.tpep_pickup_datetime))
hour_indexer = StringIndexer(inputCol="hour_of_day", outputCol="hour_of_day_index")
hour_encoder = OneHotEncoder(inputCol="hour_of_day_index", outputCol="hour_of_day_vector")
stages += [hour_indexer, hour_encoder]

# Index and one-hot encode the day_of_week
day_indexer = StringIndexer(inputCol="day_of_week", outputCol="day_of_week_index")
day_encoder = OneHotEncoder(inputCol="day_of_week_index", outputCol="day_of_week_vector")
stages += [day_indexer, day_encoder]

# Buckeize the passenger_count
passenger_splits = [-float('inf'), 1.0, 2.0, 5.0, float('inf')]
passenger_bucketizer = Bucketizer(splits=passenger_splits, inputCol="passenger_count", outputCol="passenger_count_bucket")
stages += [passenger_bucketizer]

# Index and one-hot encode the passenger_count_bucket
passenger_count_indexer = StringIndexer(inputCol="passenger_count_bucket", outputCol="passenger_count_index")
passenger_count_encoder = OneHotEncoder(inputCol="passenger_count_index", outputCol="passenger_count_vector")
stages += [passenger_count_indexer, passenger_count_encoder]

# Assemble the feature columns into a single feature vector
feature_columns = [
    "weekend",
    "trip_distance",
    "fare_amount",
    "passenger_count_vector",
    "hour_of_day_vector",
    "day_of_week_vector",
    "PUBorough_vector",
]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
stages += [assembler]

# Create a Linear Regression model
lr = LinearRegression(featuresCol="features", labelCol="tip_amount")
stages += [lr]

# Create the pipeline
pipeline = Pipeline(stages=stages)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
# Start with the subset of data - it should get the same outcome with Q2a
sampled_data = data.sample(fraction=0.001, seed=42)
train, test = sampled_data.randomSplit([0.8, 0.2], seed=42)

# Persist data in memory to speed up I/O during training/testing
train.persist()
test.persist()

# confirm they're cached
print(train.is_cached, test.is_cached)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True True

In [57]:
# Fit the pipeline to the training data and make predictions on the test data
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

# Show the predictions
predictions.select("prediction", "tip_amount", "features").show(5, truncate=False)

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol="tip_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+----------+--------------------------------------------------------+
|prediction        |tip_amount|features                                                |
+------------------+----------+--------------------------------------------------------+
|12.941511103766606|0.0       |(42,[2,3,15,31,41],[2.5,1.0,1.0,1.0,1.0])               |
|7.103404608040409 |10.8      |(42,[0,1,2,3,16,34,35],[1.0,15.6,61.5,1.0,1.0,1.0,1.0]) |
|8.41971505496696  |17.65     |(42,[0,1,2,5,14,29,35],[1.0,19.57,70.5,1.0,1.0,1.0,1.0])|
|8.126635576798902 |16.0      |(42,[1,2,4,16,31,35],[17.8,70.0,1.0,1.0,1.0,1.0])       |
|9.072528579155577 |18.0      |(42,[1,2,3,20,35],[20.6,75.5,1.0,1.0,1.0])              |
+------------------+----------+--------------------------------------------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data: 1.9679645393447491

In [58]:
# Extract the coefficients and feature names
linear_model = pipeline_model.stages[-1]
coefficients = linear_model.coefficients
feature_names = ['weekend', 'trip_distance', 'fare_amount'] + \
                  passenger_count_features + \
                  hour_features + \
                  day_features + \
                  PUBorough_features

# Display the baseline category for one-hot encoded features
baselines = {
    "day_baseline": day_baseline,
    "hour_baseline": hour_baseline,
    "PUBorough_baseline": PUBorough_baseline,
    "passenger_count_baseline": passenger_count_baseline
}
for name, value in baselines.items():
    print(f"{name}: {value}")

# Display feature importance in descending order (of absolute value)
feature_importance = pd.DataFrame(list(zip(feature_names, coefficients)), columns=["Feature", "Coefficient"])
feature_importance_sorted = feature_importance.reindex(feature_importance.Coefficient.abs().sort_values(ascending=False).index)
print(feature_importance_sorted)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

day_baseline: day_of_week_2
hour_baseline: hour_of_day_5
PUBorough_baseline: PUBorough_Staten Island
passenger_count_baseline: passenger_count_NA
                Feature  Coefficient
41        PUBorough_EWR    13.196164
29        day_of_week_7    -4.343046
34        day_of_week_1    -4.312634
0               weekend     4.202549
40        PUBorough_N/A     2.306256
38    PUBorough_Unknown     0.845067
3     passenger_count_1     0.797157
37   PUBorough_Brooklyn     0.778282
5    passenger_count_5_     0.776534
35  PUBorough_Manhattan     0.750020
39      PUBorough_Bronx     0.732351
36     PUBorough_Queens     0.693116
4   passenger_count_2_4     0.673714
6        hour_of_day_19     0.286785
10       hour_of_day_22     0.280512
8        hour_of_day_20     0.276221
7        hour_of_day_18     0.248082
9        hour_of_day_21     0.244363
19       hour_of_day_10     0.236175
13       hour_of_day_12     0.229017
18        hour_of_day_9     0.195369
22        hour_of_day_0     0.181550
1  

In [59]:
# Scale training data up to the full dataset
full_data = data
train, test = full_data.randomSplit([0.8, 0.2], seed=42)

# Persist data in memory to speed up I/O during training/testing
train.persist()
test.persist()

# confirm they're cached
print(train.is_cached, test.is_cached)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True True

In [60]:
# Fit the pipeline to the training data of full dataset and make predictions on the test data
pipeline_model = pipeline.fit(train)
predictions = pipeline_model.transform(test)

# Show the predictions
predictions.select("prediction", "tip_amount", "features").show(5, truncate=False)

# Evaluate the model using RMSE
evaluator = RegressionEvaluator(labelCol="tip_amount", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+----------+------------------------------------------------+
|prediction        |tip_amount|features                                        |
+------------------+----------+------------------------------------------------+
|10.180062794762545|0.0       |(42,[2,4,23,31,41],[82.0,1.0,1.0,1.0,1.0])      |
|10.18504640529684 |0.0       |(42,[2,3,14,31,41],[0.01,1.0,1.0,1.0,1.0])      |
|10.187047172271036|0.0       |(42,[2,3,14,31,41],[2.5,1.0,1.0,1.0,1.0])       |
|10.219750325470214|5.0       |(42,[1,2,4,16,31,41],[6.3,82.0,1.0,1.0,1.0,1.0])|
|10.22361105954122 |0.0       |(42,[2,4,7,31,41],[75.0,1.0,1.0,1.0,1.0])       |
+------------------+----------+------------------------------------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data: 2.5143160191883713

In [61]:
# Extract the coefficients and feature names
linear_model = pipeline_model.stages[-1]
coefficients = linear_model.coefficients
feature_names = ['weekend', 'trip_distance', 'fare_amount'] + \
                  passenger_count_features + \
                  hour_features + \
                  day_features + \
                  PUBorough_features

# Display the baseline category for one-hot encoded features
baselines = {
    "day_baseline": day_baseline,
    "hour_baseline": hour_baseline,
    "PUBorough_baseline": PUBorough_baseline,
    "passenger_count_baseline": passenger_count_baseline
}

# Print the variable names along with their values
for name, value in baselines.items():
    print(f"{name}: {value}")
    
feature_importance = pd.DataFrame(list(zip(feature_names, coefficients)), columns=["Feature", "Coefficient"])
# Sort by the absolute value of the coefficient in descending order
feature_importance_sorted = feature_importance.reindex(feature_importance.Coefficient.abs().sort_values(ascending=False).index)

# Display the sorted feature importance
print(feature_importance_sorted)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

day_baseline: day_of_week_2
hour_baseline: hour_of_day_5
PUBorough_baseline: PUBorough_Staten Island
passenger_count_baseline: passenger_count_NA
                Feature   Coefficient
39      PUBorough_Bronx -5.105274e+00
35  PUBorough_Manhattan -4.670595e+00
37   PUBorough_Brooklyn -4.372531e+00
38    PUBorough_Unknown -4.355740e+00
41        PUBorough_EWR  3.953106e+00
36     PUBorough_Queens -1.745025e+00
40        PUBorough_N/A  8.771515e-01
19       hour_of_day_10 -3.942180e-01
17       hour_of_day_11 -3.876403e-01
13       hour_of_day_12 -3.833229e-01
23        hour_of_day_7 -3.683635e-01
14       hour_of_day_13 -3.647642e-01
11       hour_of_day_14 -3.339801e-01
16       hour_of_day_15 -3.286762e-01
18        hour_of_day_9 -3.239464e-01
6        hour_of_day_19 -3.192633e-01
7        hour_of_day_18 -3.191906e-01
25        hour_of_day_6 -3.095514e-01
12       hour_of_day_17 -3.080025e-01
27        hour_of_day_3 -3.037761e-01
20        hour_of_day_8 -2.984289e-01
21       hour_of_d

## Q2c - Describe What Happens Under the Hood in Spark

When specifying a series of transformations in a Spark pipeline, the DataFrame is not immediately processed. Instead, Spark uses lazy evaluation to build a logical plan outlining the transformations. This plan is optimized but not executed until an action like fit, transform, or collect is called. Upon triggering an action, Spark's Catalyst optimizer converts the logical plan into a physical plan, which is then executed across the cluster.

This approach allows Spark to optimize the execution plan, ensuring efficient resource use. In contrast, Dask also employs lazy evaluation, building a task graph that is executed only when compute is called. Both systems delay execution for optimization, but Spark uses Catalyst for query optimization, while Dask relies on its task scheduler.

## Q3 - Finding Optimal Model Parameters

To run Q3, I increased the size of my EMR cluster to 8 m5.xlarge EC2 instances (1 primary node and 7 core nodes) to speed up the whole process.

In [62]:
# Use the subset of the data for a 5-fold cross-validated grid search
sample_data = data.sample(fraction=0.001, seed=42)
train, test = sample_data.randomSplit([0.8, 0.2], seed=42)

# Persist data in memory to speed up I/O during training/testing
train.persist()
test.persist()

# confirm they're cached
print(train.is_cached, test.is_cached)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

True True

In [63]:
# Define the parameter grid using the specified ranges
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, np.arange(0, .1, .01))
             .addGrid(lr.elasticNetParam, [0, 1])
             .build())

# Create the evaluator
evaluator = RegressionEvaluator(labelCol="tip_amount", predictionCol="prediction", metricName="rmse")

# Create the CrossValidator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5) 

# Run cross-validation, and choose the best set of parameters.
cv_model = crossval.fit(train)

# Make predictions on the test data and show the prediction
predictions = cv_model.transform(test)
predictions.select("prediction", "tip_amount", "features").show(5, truncate=False)

# Evaluate the model using RMSE
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data: {rmse}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+----------+--------------------------------------------------------+
|prediction        |tip_amount|features                                                |
+------------------+----------+--------------------------------------------------------+
|12.698637238215763|0.0       |(42,[2,3,15,31,41],[2.5,1.0,1.0,1.0,1.0])               |
|6.94784247622159  |10.8      |(42,[0,1,2,3,16,34,35],[1.0,15.6,61.5,1.0,1.0,1.0,1.0]) |
|8.226841491489846 |17.65     |(42,[0,1,2,5,14,29,35],[1.0,19.57,70.5,1.0,1.0,1.0,1.0])|
|7.948019542653354 |16.0      |(42,[1,2,4,16,31,35],[17.8,70.0,1.0,1.0,1.0,1.0])       |
|8.868740911395433 |18.0      |(42,[1,2,3,20,35],[20.6,75.5,1.0,1.0,1.0])              |
+------------------+----------+--------------------------------------------------------+
only showing top 5 rows

Root Mean Squared Error (RMSE) on test data: 1.967958258509648

In [64]:
# Get the best model's parameters
best_model = cv_model.bestModel
print(f"Best regParam: {best_model.stages[-1]._java_obj.getRegParam()}")
print(f"Best elasticNetParam: {best_model.stages[-1]._java_obj.getElasticNetParam()}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Best regParam: 0.09
Best elasticNetParam: 0.0

In [65]:
# Extract the coefficients and feature names
coefficients = best_model.stages[-1].coefficients
feature_names = ['weekend', 'trip_distance', 'fare_amount'] + \
                  passenger_count_features + \
                  hour_features + \
                  day_features + \
                  PUBorough_features

# Display the baseline category for one-hot encoded features
baselines = {
    "day_baseline": day_baseline,
    "hour_baseline": hour_baseline,
    "PUBorough_baseline": PUBorough_baseline,
    "passenger_count_baseline": passenger_count_baseline
}
for name, value in baselines.items():
    print(f"{name}: {value}")

# Display feature importance in descending order (of absolute value)
feature_importance = pd.DataFrame(list(zip(feature_names, coefficients)), columns=["Feature", "Coefficient"])
feature_importance_sorted = feature_importance.reindex(feature_importance.Coefficient.abs().sort_values(ascending=False).index)
print(feature_importance_sorted)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

day_baseline: day_of_week_2
hour_baseline: hour_of_day_5
PUBorough_baseline: PUBorough_Staten Island
passenger_count_baseline: passenger_count_NA
                Feature  Coefficient
41        PUBorough_EWR    12.148250
40        PUBorough_N/A     1.599610
28        hour_of_day_4    -0.276413
27        hour_of_day_3    -0.189775
1         trip_distance     0.174381
6        hour_of_day_19     0.141296
10       hour_of_day_22     0.138313
8        hour_of_day_20     0.132826
7        hour_of_day_18     0.104940
9        hour_of_day_21     0.102066
19       hour_of_day_10     0.093880
13       hour_of_day_12     0.089109
25        hour_of_day_6    -0.080138
29        day_of_week_7    -0.072869
38    PUBorough_Unknown     0.070633
0               weekend    -0.070324
32        day_of_week_4     0.066281
33        day_of_week_3     0.066075
2           fare_amount     0.065893
31        day_of_week_5     0.062111
26        hour_of_day_2    -0.061890
3     passenger_count_1     0.059450
4  

The model evaluation shows a Root Mean Squared Error (RMSE) of 1.967, indicating that predictions are, on average, about 1.967 units away from actual tip amounts. The model uses a regularization parameter (regParam) of 0.09 and an elastic net parameter (elasticNetParam) of 0.0, emphasizing L2 regularization.

Key predictors of tip amounts include pickup location and time of day. Trips from Newark (EWR) significantly increase tips, while early morning hours and weekends tend to decrease them. We can tell that hour_of_day and PUBorough might be the most relevant features for predicting tip amount because many of their one-hot encoded features have high coefficients among all features.