# Assignment 4: PySpark MLlib - NYC Taxi Fare Prediction

**Author:** Prasham Sheth
**Course:** CS131  
**Date:** 05/04/2025

In this assignment, we use PySpark MLlib to predict NYC taxi `total_amount` using a Decision Tree Regressor.

In [1]:
from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.appName("NYC Taxi MLlib").getOrCreate()

25/05/05 21:37:11 WARN Utils: Your hostname, Prashams-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.1.6 instead (on interface en0)
25/05/05 21:37:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/05 21:37:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load the Dataset and Select Required Columns

We use only:
- `passenger_count` (4th col)
- `pulocationid` (8th col)
- `dolocationid` (9th col)
- `total_amount` (17th col)

In [3]:
# Load CSV
df = spark.read.csv("2019-01-h1.csv", header=True, inferSchema=True)

# Select required columns and rename for clarity
selected = df.select(
    df.columns[3],   # passenger_count (4th col)
    df.columns[7],   # pulocationid (8th col)
    df.columns[8],   # dolocationid (9th col)
    df.columns[16],  # total_amount (17th col)
).toDF("passenger_count", "pulocationid", "dolocationid", "total_amount")

# Show first 10 entries
selected.show(10)

                                                                                

+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            1.0|       151.0|       239.0|        9.95|
|            1.0|       239.0|       246.0|        16.3|
|            3.0|       236.0|       236.0|         5.8|
|            5.0|       193.0|       193.0|        7.55|
|            5.0|       193.0|       193.0|       55.55|
|            5.0|       193.0|       193.0|       13.31|
|            5.0|       193.0|       193.0|       55.55|
|            1.0|       163.0|       229.0|        9.05|
|            1.0|       229.0|         7.0|        18.5|
|            2.0|       141.0|       234.0|        13.0|
+---------------+------------+------------+------------+
only showing top 10 rows



## Split the Data into Training and Testing Sets

In [4]:
# 80% train, 20% test
trainDF, testDF = selected.randomSplit([0.8, 0.2], seed=42)

print(f"Training set count: {trainDF.count()}")
print(f"Test set count: {testDF.count()}")

print("Sample trainDF:")
trainDF.show(5)
print("Sample testDF:")
testDF.show(5)


                                                                                

Training set count: 2922047


                                                                                

Test set count: 728952
Sample trainDF:
+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            0.0|         1.0|         1.0|        90.0|
|            0.0|         1.0|         1.0|      116.75|
|            0.0|         4.0|         4.0|        5.75|
|            0.0|         4.0|        17.0|        20.3|
|            0.0|         4.0|        68.0|        15.8|
+---------------+------------+------------+------------+
only showing top 5 rows

Sample testDF:
+---------------+------------+------------+------------+
|passenger_count|pulocationid|dolocationid|total_amount|
+---------------+------------+------------+------------+
|            0.0|         4.0|         4.0|         4.3|
|            0.0|         4.0|        79.0|         5.8|
|            0.0|         4.0|        90.0|        10.8|
|            0.0|         4.0|       170.0|        12.7|
|        

## Build a Decision Tree Regressor Pipeline

In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline

# Assemble features
assembler = VectorAssembler(
    inputCols=["passenger_count", "pulocationid", "dolocationid"],
    outputCol="features"
)

# Decision Tree Regressor
dt = DecisionTreeRegressor(
    featuresCol="features",
    labelCol="total_amount"
).setMaxBins(1000)

# Pipeline
pipeline = Pipeline(stages=[assembler, dt])


## Train the Model

In [8]:
# Fit the pipeline
model = pipeline.fit(trainDF)

25/05/05 21:42:57 WARN MemoryStore: Not enough space to cache rdd_51_9 in memory! (computed 28.4 MiB so far)
25/05/05 21:42:57 WARN MemoryStore: Not enough space to cache rdd_51_1 in memory! (computed 8.0 MiB so far)
25/05/05 21:42:57 WARN BlockManager: Persisting block rdd_51_9 to disk instead.
25/05/05 21:42:57 WARN BlockManager: Persisting block rdd_51_1 to disk instead.
25/05/05 21:42:57 WARN MemoryStore: Not enough space to cache rdd_51_8 in memory! (computed 8.0 MiB so far)
25/05/05 21:42:57 WARN BlockManager: Persisting block rdd_51_8 to disk instead.
25/05/05 21:42:57 WARN MemoryStore: Not enough space to cache rdd_51_7 in memory! (computed 8.0 MiB so far)
25/05/05 21:42:57 WARN MemoryStore: Not enough space to cache rdd_51_0 in memory! (computed 8.0 MiB so far)
25/05/05 21:42:57 WARN BlockManager: Persisting block rdd_51_0 to disk instead.
25/05/05 21:42:57 WARN MemoryStore: Not enough space to cache rdd_51_3 in memory! (computed 8.0 MiB so far)
25/05/05 21:42:57 WARN BlockMan

## Make Predictions on the Test Set and Show Results

In [9]:
# Predict
predictions = model.transform(testDF)

# Show predictions with features (first 10 entries)
predictions.select(
    "passenger_count", "pulocationid", "dolocationid", "total_amount", "prediction"
).show(10)


+---------------+------------+------------+------------+------------------+
|passenger_count|pulocationid|dolocationid|total_amount|        prediction|
+---------------+------------+------------+------------+------------------+
|            0.0|         4.0|         4.0|         4.3|17.919089195752292|
|            0.0|         4.0|        79.0|         5.8|17.919089195752292|
|            0.0|         4.0|        90.0|        10.8|17.919089195752292|
|            0.0|         4.0|       170.0|        12.7|17.919089195752292|
|            0.0|         7.0|         7.0|         9.1|17.919089195752292|
|            0.0|         7.0|       138.0|        10.8|17.919089195752292|
|            0.0|         7.0|       179.0|         3.8|17.919089195752292|
|            0.0|        10.0|       232.0|       60.72|17.919089195752292|
|            0.0|        13.0|        45.0|         9.8|17.919089195752292|
|            0.0|        13.0|        45.0|       13.55|17.919089195752292|
+-----------

                                                                                

## Evaluate the Model with RMSE

In [12]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="total_amount",
    predictionCol="prediction",
    metricName="rmse"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")


[Stage 26:>                                                       (0 + 10) / 10]

Root Mean Squared Error (RMSE) on test data = 47.19257906771566


                                                                                

In [14]:
spark.stop()
