## Spark Initialization and Data Ingestion
Create local spark session and ingest data from cloud storage.

In [1]:
from pyspark.sql import SparkSession
import findspark
from pyspark.sql.functions import monotonically_increasing_id, col
from pyspark.ml.regression import RandomForestRegressor
findspark.init()
spark = SparkSession.builder.master("local") \
    .appName("TAXI_ML").getOrCreate()

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, LongType, DoubleType, IntegerType

# Define the schema with proper types for numerical fields
schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("vendor_id", StringType(), True),
    StructField("rate_code", StringType(), True),
    StructField("store_and_fwd_flag", IntegerType(), True),
    StructField("pickup_datetime", StringType(), True),
    StructField("dropoff_datetime", StringType(), True),
    StructField("passenger_count", IntegerType(), True),  # Integer for count
    StructField("trip_time_in_secs", LongType(), True),   # Long for time in seconds
    StructField("trip_distance", DoubleType(), True),     # Double for distance
    StructField("pickup_longitude", DoubleType(), True),  # Double for coordinates
    StructField("pickup_latitude", DoubleType(), True),   # Double for coordinates
    StructField("dropoff_longitude", DoubleType(), True), # Double for coordinates
    StructField("dropoff_latitude", DoubleType(), True)   # Double for coordinates
])

Infer Schema was making every field a string type so we manually define the schema.

In [3]:
# File path to OneDrive containing raw data.
trip_data_path = r"C:\Users\nicol\OneDrive - University of Calgary\filteredTrips1,2,6\raw_trip_data\*.csv"
#Load from data source
df = spark.read.format("csv").schema(schema).option("header", "true").load(trip_data_path)

We determined these fields would not be useful to train our ML classifiers. Medallion and hack_license are identifiers for the taxi cab. Rate code is a passenger provided rating that does would likely
not directly impact trip times and the flag boolean refers to if the onboard taxi info was sent off to un-external server. store_and_fwd_flag in particular had many null values.

In [4]:
#Drop unnecessary columns
df = df.drop("medallion")
df = df.drop("hack_license")
df = df.drop("rate_code")
df = df.drop("store_and_fwd_flag")

# Add a monotonically increasing ID column for future reference if needed.
df = df.withColumn("id", monotonically_increasing_id())

# Reorder columns to place `id` at the beginning
df = df[['id'] + df.columns[:-1]]

# Print the schema to check types
df.printSchema()

root
 |-- id: long (nullable = false)
 |-- vendor_id: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_time_in_secs: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)



Show first 5 entries of transformed loaded data.

In [5]:
df.show(5)

+---+---------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
| id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+---+---------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+
|  0|      CMT|2013-01-01 15:11:48|2013-01-01 15:18:10|              4|              382|          1.0|      -73.978165|      40.757977|       -73.989838|       40.751171|
|  1|      CMT|2013-01-06 00:18:35|2013-01-06 00:22:54|              1|              259|          1.5|      -74.006683|      40.731781|       -73.994499|        40.75066|
|  2|      CMT|2013-01-05 18:49:41|2013-01-05 18:54:23|              1|              282|          1.1|      -74.004707|       40.73777|    

## Data Cleaning
While passenger_count and trip_distance remain useful to us. 0's in these fields would indicate invalid trips and should not be used in training.

In [6]:
df = df.dropna()
df = df.filter(df["passenger_count"] != 0)
df = df.filter(df["trip_distance"] != 0)
df = df.dropDuplicates()

# Gradient Boosted Trees Classifier
With Gradient Boosted Trees we again chose to keep the features as is without cyclical encoding.

In [19]:
features_Needed = ["trip_distance", "passenger_count","hour_of_day", "day_of_week", "is_weekend"]

# in regression models , we need to specify the label or a target we wish to predict , in this case
# we are trying to predict trip_times in secs
label = "trip_time_in_secs"

# now we need to assemble the features into a single vector
# source used :
# https://www.machinelearningplus.com/pyspark/pyspark-gradient-boosting-model/

Assemble the feature vector, split the dataset 80-20, and train.

In [20]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=features_Needed, outputCol="features")

prepared_data_gbt = assembler.transform(df)
prepared_data_gbt.show(5)

+---+---------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+-----------+-----------+----------+--------------------+
| id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|day_of_week|hour_of_day|is_weekend|            features|
+---+---------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+-----------+-----------+----------+--------------------+
|  0|      CMT|2013-01-01 15:11:48|2013-01-01 15:18:10|              4|              382|          1.0|      -73.978165|      40.757977|       -73.989838|       40.751171|          3|         15|         0|[1.0,4.0,15.0,3.0...|
|  1|      CMT|2013-01-06 00:18:35|2013-01-06 00:22:54|              1|              259

In [21]:
from pyspark.ml.regression import GBTRegressor

train_data, test_data = prepared_data_gbt.randomSplit([0.8, 0.2], seed=42)

gbt = GBTRegressor(featuresCol="features", labelCol="trip_time_in_secs", maxIter=10 )
model = gbt.fit(train_data)
model.save(r"models\gbt_model")

## Evaluate

In [22]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test_data)
evaluator_rmse = RegressionEvaluator(labelCol="trip_time_in_secs", predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol="trip_time_in_secs", predictionCol="prediction", metricName="mae")
evaluator_r2 = RegressionEvaluator(labelCol="trip_time_in_secs", predictionCol="prediction", metricName="r2")

# Compute metrics
rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
r2 = evaluator_r2.evaluate(predictions)

# View required metrics
print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"R² (Coefficient of Determination): {r2}")

Root Mean Squared Error (RMSE): 15772.044832113776
Mean Absolute Error (MAE): 292.7362954156453
R² (Coefficient of Determination): 0.0010817964917416711
