In [1]:
# install and set up Spark -- PySpark

# SYSTEM SETUP
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# ✅ Correct download link for Spark 3.4.1 (tested)
!wget -q https://archive.apache.org/dist/spark/spark-3.4.1/spark-3.4.1-bin-hadoop3.tgz

# ✅ Extract Spark package
!tar -xzf spark-3.4.1-bin-hadoop3.tgz

# Install Spark Python bindings
!pip install -q findspark
!pip install -q pyspark py4j

import os
import findspark

# ✅ Set environment paths
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.1-bin-hadoop3"
os.environ["PYSPARK_SUBMIT_ARGS"] = "--driver-memory 12g pyspark-shell"

# ✅ Initialize Spark
findspark.init()

from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder \
    .appName("NYC Yellow Taxi ML problems") \
    .getOrCreate()

spark

[33m0% [Working][0m            Hit:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
Hit:3 http://security.ubuntu.com/ubuntu jammy-security InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:6 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:7 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:9 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
35 packages can be upgraded. Run 'apt list --upgradable' to see them.
[1;33mW: [0mSkipping acquire of configured file 'main/source/Sources' as repository 

In [2]:
spark

In [3]:
# access the same taxi_zones dataset
from google.colab import files
uploaded = files.upload()

Saving taxi_zone_lookup.csv to taxi_zone_lookup (2).csv


In [4]:
# Then the Yellow Taxi data for 6 months Jan-June 2024
uploaded = files.upload()

Saving yellow_tripdata_2024-01.parquet to yellow_tripdata_2024-01 (2).parquet
Saving yellow_tripdata_2024-02.parquet to yellow_tripdata_2024-02 (2).parquet
Saving yellow_tripdata_2024-03.parquet to yellow_tripdata_2024-03 (2).parquet
Saving yellow_tripdata_2024-04.parquet to yellow_tripdata_2024-04 (2).parquet
Saving yellow_tripdata_2024-05.parquet to yellow_tripdata_2024-05 (2).parquet
Saving yellow_tripdata_2024-06.parquet to yellow_tripdata_2024-06 (2).parquet


In [5]:
spark= SparkSession \
       .builder \
       .appName("NYC Yellow Taxi ML problems") \
       .getOrCreate()

# Read CSV (taxi lookup)
taxi_zones = spark.read.csv('/content/taxi_zone_lookup.csv', header=True, inferSchema=True)

# Read Parquet (yellow trip data)
tripdata1 = spark.read.parquet('/content/yellow_tripdata_2024-01.parquet', header=True, inferSchema=True)
tripdata2 = spark.read.parquet('/content/yellow_tripdata_2024-02.parquet', header=True, inferSchema=True)
tripdata3 = spark.read.parquet('/content/yellow_tripdata_2024-03.parquet', header=True, inferSchema=True)
tripdata4 = spark.read.parquet('/content/yellow_tripdata_2024-04.parquet', header=True, inferSchema=True)
tripdata5 = spark.read.parquet('/content/yellow_tripdata_2024-05.parquet', header=True, inferSchema=True)
tripdata6 = spark.read.parquet('/content/yellow_tripdata_2024-06.parquet', header=True, inferSchema=True)

In [6]:
# merge the 6 monthly tripdata files uploaded into one file

tripdata = tripdata1.union(tripdata2).union(tripdata3).union(tripdata4).union(tripdata5).union(tripdata6)
tripdata.count()

20332093

In [7]:
# replace null entries with their column average

tripdatav1 = tripdata.drop("RatecodeID", "store_and_fwd_flag")

from pyspark.sql.functions import avg
averages = tripdatav1.select(
    avg("passenger_count").alias("pass_count_avg"),
    avg("congestion_surcharge").alias("conges_sur_avg"),
    avg("airport_fee").alias("airp_fee_avg")
).collect()[0]

avg_passenger = averages["pass_count_avg"]
avg_congestion = averages["conges_sur_avg"]
avg_airport = averages["airp_fee_avg"]

# then fill the null values now that we have each column average
tripdatav2 = tripdatav1.fillna({
    "passenger_count": avg_passenger,
    "congestion_surcharge": avg_congestion,
    "airport_fee": avg_airport})

In [8]:
# match and update the dataset with the taxi_zone dataset

# identify each matching columns across both taxi_zone and yellow_taxi dataset above
PULocationID = tripdatav2['PULocationID']
DOLocationID = tripdatav2['DOLocationID']
LocationID = taxi_zones['LocationID']
Borough = taxi_zones['Borough']
Zone = taxi_zones['Zone']
service_zone = taxi_zones['service_zone']

In [9]:
# match taxi zone columns to PU(pickup) column and rename the new taxi_zones columns
tripdatav2_1 = tripdatav2.join(taxi_zones, tripdatav2['PULocationID']== taxi_zones['LocationID'], how='left').withColumnRenamed('Borough', 'PUBorough').withColumnRenamed('Zone', 'PUZone').withColumnRenamed('service_zone', 'PUService_zone')
# take out Location ID from taxi_zones after matching, as is no more needed
tripdatav2_1 = tripdatav2_1.drop('LocationID')

In [10]:
# match taxi zone columns to DO(dropoff) column with the updated DF above and also rename the new taxi_zones columns
tripdatav2_2 = tripdatav2_1.join(taxi_zones, tripdatav2['DOLocationID']== taxi_zones['LocationID'], how='left').withColumnRenamed('Borough', 'DOBorough').withColumnRenamed('Zone', 'DOZone').withColumnRenamed('service_zone', 'DOService_zone')
# also take out Location ID from taxi_zones after matching, as is no more needed
tripdatav2_2 = tripdatav2_2.drop('LocationID')

In [11]:
# ....rearrange the columns closer to the PU & DO codes
columns = tripdatav2_2.columns

col_num = [0, 1, 2, 3, 4, 5, 17, 18, 19, 6, 20, 21, 22, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
rearr_columns = [columns[i] for i in col_num]

tripdatav4 = tripdatav2_2.select(rearr_columns)

In [12]:
"""
TASK 3

ML Problem 1 - to predict the daily taxes across Jan_June 2025
--------------------------------------------------------------
"""

'\nTASK 3\n\nML Problem 1 - to predict the daily taxes across Jan_June 2025\n--------------------------------------------------------------\n'

In [12]:
from pyspark.sql.functions import month, year, to_date
from pyspark.sql.functions import sum as Fsum, concat_ws
from pyspark.sql.functions import rank
from pyspark.sql.types import DecimalType
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

# focus on rides within Jan-June of 2024
months_tripdata = tripdatav4.filter(
    (year("tpep_pickup_datetime") == 2024) & (month("tpep_pickup_datetime").isin(1, 2, 3, 4, 5, 6)))

# aiming to predict total monthly taxes
months_tripdata1 = months_tripdata.withColumn("pickup_date", to_date("tpep_pickup_datetime")) \
       .withColumn("month", month("pickup_date")) \
       .withColumn("year", year("pickup_date"))

tax_tripdata1 = months_tripdata1.groupBy("year", "month", "pickup_date").agg(
    Fsum("mta_tax").alias("total_mta_tax"),
    Fsum("congestion_surcharge").alias("total_congestn"),
    Fsum("improvement_surcharge").alias("total_improvmnt"),
    Fsum("airport_fee").alias("total_airp"))

tax_tripdata1 = tax_tripdata1.withColumn("tol_tax",
                                   col("total_mta_tax") + col("total_congestn") +
                                   col("total_improvmnt") + col("total_airp"))

In [13]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# and then tax model features
tax_features = ["total_mta_tax", "total_congestn", "total_improvmnt", "total_airp"]
# using VectorAssemble
tax_assembler = VectorAssembler(inputCols=tax_features, outputCol="t_features")
tax_updated = tax_assembler.transform(tax_tripdata1).select("t_features", "tol_tax")

# the usual data split; 80:20(train:test)
tax_train_data, tax_test_data = tax_updated.randomSplit([0.8, 0.2], seed=0)
# the tax model
lr = LinearRegression(featuresCol="t_features", labelCol="tol_tax")
tax_model = lr.fit(tax_train_data)

# model evaluation and prediction and give it a unique identifer for other predictions
lr = lr.setPredictionCol("reg_prediction")
# for tuning of hyperparameters
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 0.5])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [50, 100])
             .build())

reg_evaluator = RegressionEvaluator(labelCol="tol_tax", predictionCol="reg_prediction", metricName="rmse")

# set up CrossValidation for 4 folds
tax_cross = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=reg_evaluator, numFolds=4, seed=0)
tax_cross_model = tax_cross.fit(tax_train_data)

tax_ok_model = tax_cross_model.bestModel
tax_predictions = tax_ok_model.transform(tax_test_data)

tax_test_rmse = reg_evaluator.evaluate(tax_predictions)
print(f"Root Mean Squared Error (RMSE) = {tax_test_rmse:.3f}")

Root Mean Squared Error (RMSE) = 0.167


In [15]:
"""
ML Problem 2 - to predict if a driver will be tip or not
--------------------------------------------------------
"""

'\nML Problem 2 - to predict if a driver will be tip or not\n--------------------------------------------------------\n'

In [25]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import when

# create an additional column if tip is given = 1 else 0
tip_tripdata = months_tripdata.withColumn("tip_given", when(col("tip_amount") > 0, 1).otherwise(0))

tip_features = ["fare_amount", "trip_distance", "passenger_count", "total_amount"]

tip_assembler = VectorAssembler(inputCols=tip_features, outputCol="ti_features")
tip_updated= tip_assembler.transform(tip_tripdata).select("ti_features", "tip_given")

# the usual data split; 80:20(train:test)
tip_train_data, tip_test_data = tip_updated.randomSplit([0.8, 0.2], seed=0)
tip_lr = LogisticRegression(featuresCol="ti_features", labelCol="tip_given")
tip_lr = tip_lr.setPredictionCol("acc_prediction")


# model evaluation metrics using "Accuracy"
acc_evaluator = MulticlassClassificationEvaluator(labelCol="tip_given", predictionCol="acc_prediction", metricName="accuracy")

tip_model = tip_lr.fit(tip_train_data)
tip_predictions = tip_model.transform(tip_test_data)

tip_accuracy = acc_evaluator.evaluate(tip_predictions)
print(f"Tip Accuracy = {tip_accuracy:.3f}")

Tip Accuracy = 0.913


In [30]:
paramGrid1 = (ParamGridBuilder()
             .addGrid(tip_lr.regParam, [0.01, 0.1, 0.5])
             .addGrid(tip_lr.elasticNetParam, [0.0, 0.5, 1.0])
             .build())
crossval = CrossValidator(
    estimator=tip_lr,
    estimatorParamMaps=paramGrid1,
    evaluator=acc_evaluator,
    numFolds=3,
    parallelism=2)

cv_lr_model = crossval.fit(tip_train_data)
tip_cv_predictions = cv_lr_model.transform(tip_test_data)
tip_cv_accuracy = acc_evaluator.evaluate(tip_cv_predictions)
print(f"Tip Accuracy after Hyperparameter Tuning = {tip_cv_accuracy:.3f}")

Tip Accuracy after Hyperparameter Tuning = 0.813


In [20]:
# to further show tip and no tip breakdown
tip_tripdata.groupBy("tip_given").count().show()

+---------+--------+
|tip_given|   count|
+---------+--------+
|        1|14622457|
|        0| 5709597|
+---------+--------+



In [None]:
"""
ML Problem 3 - to predict the trip payment type
-----------------------------------------------
"""

In [21]:
# we check all the payment types code(integers) available

# how many zones does each Borough have
tip_tripdata.groupBy("payment_type").count().orderBy("count", ascending=False).show()

+------------+--------+
|payment_type|   count|
+------------+--------+
|           1|15111906|
|           2| 2773053|
|           0| 1975985|
|           4|  337758|
|           3|  133350|
|           5|       2|
+------------+--------+



In [28]:
# we proceed to use FeedForward Neural Network to predict the payment type

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


payment_features = ["trip_distance", "passenger_count", "tip_amount", "total_amount"]

payment_assembler = VectorAssembler(inputCols=payment_features, outputCol="pay_features")
payment_updated = payment_assembler.transform(tip_tripdata)

indexer = StringIndexer(inputCol="payment_type", outputCol="payment_label")
payment_updated = indexer.fit(payment_updated).transform(payment_updated)

# the usual data split; 80:20(train:test)
pay_train_data, pay_test_data = payment_updated.randomSplit([0.8, 0.2], seed=0)

# model architecture
pay_layers = [4, 32, 16, 6]  # --> features, hidden1, hidden2, payment types
mlp = MultilayerPerceptronClassifier(featuresCol="pay_features", labelCol="payment_label", predictionCol="pay_prediction",
                                     maxIter=50, layers=pay_layers, blockSize=128, seed=42)


payf1_evaluator = MulticlassClassificationEvaluator(labelCol="payment_label", predictionCol="pay_prediction", metricName="f1")


pay_model = mlp.fit(pay_train_data)
pay_f1predictions = pay_model.transform(pay_test_data)

pay_f1accuracy = payf1_evaluator.evaluate(pay_f1predictions)
print(f"The Payment Type Option F1 score = {pay_f1accuracy:.3f}")

The Payment Type Option F1 score = 0.816
