In [21]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("TaxiFarePrediction") \
    .getOrCreate()

In [22]:
# Load Dataset
# file is named '2019-04.csv' and is in the same directory
df = spark.read.csv("2019-04.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)



root
 |-- vendorid: double (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- ratecodeid: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- pulocationid: double (nullable = true)
 |-- dolocationid: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+---

                                                                                

In [23]:
# Select Relevant Columns
# 4th = passenger_count, 8th = PULocationID, 9th = DOLocationID, 17th = total_amount

selected = df.select("passenger_count", "PULocationID", "DOLocationID", "total_amount")
selected.show(10)

+---------------+------------+------------+------------+
|passenger_count|PULocationID|DOLocationID|total_amount|
+---------------+------------+------------+------------+
|            1.0|       239.0|       239.0|         8.8|
|            1.0|       230.0|       100.0|         8.3|
|            1.0|        68.0|       127.0|       47.75|
|            1.0|        68.0|        68.0|         7.3|
|            1.0|        50.0|        42.0|       23.15|
|            1.0|        95.0|       196.0|         9.8|
|            1.0|       211.0|       211.0|         6.8|
|            1.0|       237.0|       162.0|         7.8|
|            1.0|       148.0|        37.0|        20.3|
|            1.0|       265.0|       265.0|        0.31|
+---------------+------------+------------+------------+
only showing top 10 rows


In [24]:
# Handle Missing or Null Data
selected = selected.dropna()

In [25]:
# Train-Test Split
trainDF, testDF = selected.randomSplit([0.8, 0.2], seed=42)

In [26]:
# Assemble Features
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["passenger_count", "PULocationID", "DOLocationID"],
    outputCol="features"
)

In [27]:
# Initialize Decision Tree Regressor
from pyspark.ml.regression import DecisionTreeRegressor

dt = DecisionTreeRegressor(featuresCol="features", labelCol="total_amount")

In [28]:
# Create Pipeline
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[assembler, dt])

In [None]:
# Train the Model
model = pipeline.fit(trainDF)

[Stage 32:>                                                       (0 + 12) / 12]

In [19]:
# Make Predictions
predictions = model.transform(testDF)
predictions.select("passenger_count", "PULocationID", "DOLocationID", "prediction").show(10)

[Stage 24:>                                                         (0 + 1) / 1]

+---------------+------------+------------+------------------+
|passenger_count|PULocationID|DOLocationID|        prediction|
+---------------+------------+------------+------------------+
|            0.0|         3.0|       136.0|18.005014552242418|
|            0.0|         4.0|        75.0| 15.64471842503167|
|            0.0|         4.0|        87.0| 15.64471842503167|
|            0.0|         4.0|       113.0| 15.64471842503167|
|            0.0|         4.0|       234.0|18.005014552242418|
|            0.0|         7.0|       121.0|18.005014552242418|
|            0.0|         7.0|       193.0|18.005014552242418|
|            0.0|        10.0|       100.0| 15.64471842503167|
|            0.0|        12.0|       142.0|18.005014552242418|
|            0.0|        12.0|       211.0|18.005014552242418|
+---------------+------------+------------+------------------+
only showing top 10 rows


                                                                                

In [20]:
# Evaluate with RMSE
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="total_amount",
    predictionCol="prediction",
    metricName="rmse"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse:.2f}")

[Stage 25:>                                                       (0 + 12) / 12]

Root Mean Squared Error (RMSE): 12.14


                                                                                