### Import Libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    count, when, col, mean, sum, length, to_timestamp,
    hour, dayofweek, month, year, unix_timestamp, rand, isnan, isnull
)
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import RandomForestRegressor, LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator

In [42]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("NYC_Yellow_Cab_Outlier_Detection") \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.memory", "10g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.warehouse.dir", "file:///C:/tmp/spark-warehouse") \
    .enableHiveSupport() \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")


In [43]:
file_paths = [
    "yellow_tripdata_2024-10.parquet",
    "yellow_tripdata_2024-11.parquet",
    "yellow_tripdata_2024-12.parquet",
    "yellow_tripdata_2025-01.parquet"
]

In [44]:
df = spark.read.parquet(file_paths[0])
for path in file_paths[1:]:
    new_df = spark.read.parquet(path)
    df = df.union(new_df)

In [45]:
df.cache()

DataFrame[VendorID: int, tpep_pickup_datetime: timestamp_ntz, tpep_dropoff_datetime: timestamp_ntz, passenger_count: bigint, trip_distance: double, RatecodeID: bigint, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, Airport_fee: double]

### Exploratory Data Analysis

In [46]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [47]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [48]:
df.describe().show()

+-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------------+------------------+---------------------+------------------+--------------------+-------------------+
|summary|          VendorID|   passenger_count|    trip_distance|        RatecodeID|store_and_fwd_flag|      PULocationID|      DOLocationID|      payment_type|       fare_amount|             extra|            mta_tax|       tip_amount|      tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|        Airport_fee|
+-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------------+------------------+---------------------+------------------+--------------------+-------

In [49]:
df.summary().show()

+-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------------+------------------+---------------------+------------------+--------------------+-------------------+
|summary|          VendorID|   passenger_count|    trip_distance|        RatecodeID|store_and_fwd_flag|      PULocationID|      DOLocationID|      payment_type|       fare_amount|             extra|            mta_tax|       tip_amount|      tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|        Airport_fee|
+-------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+-----------------+------------------+---------------------+------------------+--------------------+-------

In [None]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       0|                   0|                    0|        1634009|            0|   1634009|           1634009|           0|           0|           0|          0|    0|      0|         

In [51]:
df.groupBy("passenger_count").count().show()

+---------------+--------+
|passenger_count|   count|
+---------------+--------+
|              0|  119193|
|              7|      17|
|              6|   56748|
|              9|      13|
|              5|   84507|
|              1|10069181|
|              3|  450809|
|              8|      49|
|              2| 1906059|
|              4|  303152|
|           NULL| 1634009|
+---------------+--------+



In [52]:
df.groupBy("congestion_surcharge").count().show()

+--------------------+--------+
|congestion_surcharge|   count|
+--------------------+--------+
|                 0.0|  996085|
|                 2.5|11786839|
|                0.75|       4|
|                -2.5|  206791|
|               -0.75|       1|
|                 1.0|       8|
|                NULL| 1634009|
+--------------------+--------+



In [53]:
df.groupBy("Airport_fee").count().show()


+-----------+--------+
|Airport_fee|   count|
+-----------+--------+
|        0.0|11908077|
|       1.25|      11|
|      -1.75|   40883|
|       1.75| 1040748|
|       NULL| 1634009|
|       0.75|       1|
|        5.0|       7|
|       6.75|       1|
+-----------+--------+



In [None]:
total_count = df.count()
unique_count = df.dropDuplicates().count()

if total_count > unique_count:
    print(f"There are {total_count - unique_count} duplicate rows.")
else:
    print("There are no duplicate rows.")


There are 2 duplicate rows.


In [None]:
numerical_columns = ["trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount",
                     "tolls_amount", "improvement_surcharge", "total_amount",
                     "congestion_surcharge", "Airport_fee", "passenger_count"]

outlier_columns = []
for column in numerical_columns:
    quantiles = df.approxQuantile(column, [0.25, 0.75], 0.0)
    Q1, Q3 = quantiles[0], quantiles[1]
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    outlier_count = df.filter((col(column) < lower_bound) | (col(column) > upper_bound)).count()
    if outlier_count > 0:
        outlier_columns.append(column)

if outlier_columns:
    print(f"Columns with outliers: {outlier_columns}")
    df.select(outlier_columns).show()
else:
    print("No columns with outliers found.")


Columns with outliers: ['trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'Airport_fee', 'passenger_count']
+-------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------+
|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|passenger_count|
+-------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------+
|          3.0|       18.4|  1.0|    0.5|       1.5|         0.0|                  1.0|        24.9|                 2.5|        0.0|              1|
|          2.2|       14.2|  3.5|    0.5|       3.8|         0.0|                  1.0|        23.0|                 2.5|        0.0|              1|
|          2.7|       13.5|  3.5|    0.5|    

### Preprocessing

In [56]:
df = df.na.drop(subset=["RatecodeID", "store_and_fwd_flag"])

In [None]:
mean_values = df.select(
    mean("passenger_count").alias("mean_passenger_count"),
    mean("congestion_surcharge").alias("mean_congestion_surcharge"),
    mean("Airport_fee").alias("mean_airport_fee")
).collect()[0]

df = df.na.fill({
    "passenger_count": mean_values["mean_passenger_count"],
    "congestion_surcharge": mean_values["mean_congestion_surcharge"],
    "Airport_fee": mean_values["mean_airport_fee"]
})


In [None]:
outlier_columns = ["trip_distance", "fare_amount", "extra", "mta_tax", "tip_amount",
                   "tolls_amount", "improvement_surcharge", "total_amount",
                   "congestion_surcharge", "Airport_fee", "passenger_count"]

for column in outlier_columns:
    quantiles = df.approxQuantile(column, [0.25, 0.75], 0.0)
    Q1, Q3 = quantiles[0], quantiles[1]
    IQR = Q3 - Q1
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR

    df = df.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))


In [59]:
df.describe().show()

+-------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-----------------+-------+------------------+------------+---------------------+------------------+--------------------+-----------+
|summary|           VendorID|passenger_count|     trip_distance|        RatecodeID|store_and_fwd_flag|      PULocationID|      DOLocationID|       payment_type|       fare_amount|            extra|mta_tax|        tip_amount|tolls_amount|improvement_surcharge|      total_amount|congestion_surcharge|Airport_fee|
+-------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+-----------------+-------+------------------+------------+---------------------+------------------+--------------------+-----------+
|  count|            7894427|        7894427|           7894427|

### Feature Engineering

In [None]:
df = df.withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
       .withColumn("pickup_dayofweek", dayofweek("tpep_pickup_datetime")) \
       .withColumn("pickup_month", month("tpep_pickup_datetime")) \
       .withColumn("pickup_year", year("tpep_pickup_datetime"))

df = df.withColumn("trip_duration",
                   (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60)

df = df.withColumn("distance_per_passenger", col("trip_distance") / col("passenger_count"))
df = df.withColumn("fare_per_distance", col("fare_amount") / col("trip_distance"))
df = df.withColumn("total_per_passenger", col("total_amount") / col("passenger_count"))


location_aggregates = df.groupBy("PULocationID").agg(
    mean("fare_amount").alias("avg_fare_by_location"),
    mean("trip_duration").alias("avg_duration_by_location")
)

df = df.join(location_aggregates, on="PULocationID", how="left")


In [61]:
df.show(5)

+------------+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+----------------+------------+-----------+------------------+----------------------+-----------------+-------------------+--------------------+------------------------+
|PULocationID|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|pickup_hour|pickup_dayofweek|pickup_month|pickup_year|     trip_duration|distance_per_passenger|fare_per_distance|total_per_passenger|avg_fare_by_location|avg_duration_by_location|
+------------+--------+--------------------+---------------------+---------------+-------------+--

### Check feature imporatnce for fare price prediction

In [62]:
input_features = ["PULocationID", "VendorID", "passenger_count", "trip_distance", "RatecodeID",
                  "DOLocationID", "payment_type", "extra", "mta_tax", "tip_amount",
                  "tolls_amount", "improvement_surcharge", "congestion_surcharge",
                  "Airport_fee", "pickup_hour", "pickup_dayofweek", "pickup_month",
                  "pickup_year", "trip_duration", "distance_per_passenger",
                  "fare_per_distance", "total_per_passenger", "avg_fare_by_location",
                  "avg_duration_by_location"]
for feature in input_features:
    correlation = df.stat.corr("fare_amount", feature)
    print(f"Correlation between fare_amount and {feature}: {correlation}")


Correlation between fare_amount and PULocationID: -0.05731761376031353
Correlation between fare_amount and VendorID: 0.03193474224763642
Correlation between fare_amount and passenger_count: nan
Correlation between fare_amount and trip_distance: 0.8426892946973084
Correlation between fare_amount and RatecodeID: -0.01049632237016641
Correlation between fare_amount and DOLocationID: -0.07870574956778548
Correlation between fare_amount and payment_type: -0.04142807756311245
Correlation between fare_amount and extra: -0.028787498940742975
Correlation between fare_amount and mta_tax: nan
Correlation between fare_amount and tip_amount: 0.4249995821961977
Correlation between fare_amount and tolls_amount: nan
Correlation between fare_amount and improvement_surcharge: nan
Correlation between fare_amount and congestion_surcharge: 0.0468977029458104
Correlation between fare_amount and Airport_fee: nan
Correlation between fare_amount and pickup_hour: 0.03399407386197581
Correlation between fare_amo

In [None]:
target_column = "fare_amount"
selected_features = [
    "trip_distance", "tip_amount", "trip_duration", "distance_per_passenger",
    "fare_per_distance", "total_per_passenger", "avg_fare_by_location",
    "avg_duration_by_location"
]

columns_to_keep = [target_column] + selected_features
df = df.select(*columns_to_keep)

In [None]:
df.printSchema()

root
 |-- fare_amount: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- distance_per_passenger: double (nullable = true)
 |-- fare_per_distance: double (nullable = true)
 |-- total_per_passenger: double (nullable = true)
 |-- avg_fare_by_location: double (nullable = true)
 |-- avg_duration_by_location: double (nullable = true)



### Building Linear Regression model

In [65]:
df = df.drop("features", "out_features")


In [66]:
df.printSchema()

root
 |-- fare_amount: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- distance_per_passenger: double (nullable = true)
 |-- fare_per_distance: double (nullable = true)
 |-- total_per_passenger: double (nullable = true)
 |-- avg_fare_by_location: double (nullable = true)
 |-- avg_duration_by_location: double (nullable = true)



In [None]:
for column in [target_column] + selected_features:
    df = df.withColumn(column, col(column).cast(DoubleType()))

for column in [target_column] + selected_features:
    df = df.filter(
        ~isnan(col(column)) &
        ~isnull(col(column)) &
        (col(column) != float("inf")) &
        (col(column) != float("-inf"))
    )

for col_name in ['features', 'scaled_features']:
    if col_name in df.columns:
        df = df.drop(col_name)

assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
df = assembler.transform(df)

scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)

train, test = df.randomSplit([0.8, 0.2], seed=42)

lr = LinearRegression(featuresCol="scaled_features", labelCol=target_column)
lr_model = lr.fit(train)

predictions = lr_model.transform(test)

evaluator_rmse = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")
evaluator_mae = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="mae")
evaluator_mse = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="mse")

rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
mse = evaluator_mse.evaluate(predictions)

print(f"Root Mean Squared Error (RMSE): {rmse}")
print(f"Mean Absolute Error (MAE): {mae}")
print(f"Mean Squared Error (MSE): {mse}")


Root Mean Squared Error (RMSE): 0.966536943606636
Mean Absolute Error (MAE): 0.7979545068576801
Mean Squared Error (MSE): 0.9341936633564575


In [None]:
random_predictions = predictions.select(target_column, "prediction").orderBy(rand()).limit(10)

random_predictions.show()

+-----------+------------------+
|fare_amount|        prediction|
+-----------+------------------+
|        7.2| 6.539133765982203|
|        8.6| 8.572008953876939|
|       17.0| 18.29373354874657|
|       14.2|16.079133473220104|
|       19.8| 18.80060688276303|
|       25.4|26.053411719759914|
|        8.6|7.9143755584109705|
|       12.8|11.850415569179678|
|        4.4|3.8688101578050595|
|       14.9|14.238204651586402|
+-----------+------------------+



In [70]:
df.printSchema()

root
 |-- fare_amount: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- trip_duration: double (nullable = true)
 |-- distance_per_passenger: double (nullable = true)
 |-- fare_per_distance: double (nullable = true)
 |-- total_per_passenger: double (nullable = true)
 |-- avg_fare_by_location: double (nullable = true)
 |-- avg_duration_by_location: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaled_features: vector (nullable = true)



### Checking for Overfitting or Underfitting

In [None]:
train_predictions = lr_model.transform(train)
train_rmse = evaluator.evaluate(train_predictions, {evaluator.metricName: "rmse"})
print(f"Training RMSE: {train_rmse}")

test_predictions = lr_model.transform(test)
test_rmse = evaluator.evaluate(test_predictions, {evaluator.metricName: "rmse"})
print(f"Test RMSE: {test_rmse}")

if train_rmse < test_rmse * 0.7:
    print("The model might be overfitting.")
elif train_rmse > test_rmse * 1.3:
    print("The model might be underfitting.")
else:
    print("The model seems to be well-fitted.")


Training RMSE: 0.9674816150693553
Test RMSE: 0.966536943606636
The model seems to be well-fitted.


In [None]:
evaluator_rmse = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="rmse")

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.1, 0.3])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])  
             .addGrid(lr.maxIter, [50, 100])
             .build())

crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator_rmse,
                          numFolds=3,
                          parallelism=2)

cv_model = crossval.fit(train)

best_model = cv_model.bestModel
predictions = best_model.transform(test)

rmse = evaluator_rmse.evaluate(predictions)
print(f"Best Model - Root Mean Squared Error (RMSE): {rmse}")
print(f"Best Params:")
print(f" - regParam: {best_model._java_obj.getRegParam()}")
print(f" - elasticNetParam: {best_model._java_obj.getElasticNetParam()}")
print(f" - maxIter: {best_model._java_obj.getMaxIter()}")

evaluator_mae = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="mae")
evaluator_mse = RegressionEvaluator(labelCol=target_column, predictionCol="prediction", metricName="mse")

mae = evaluator_mae.evaluate(predictions)
mse = evaluator_mse.evaluate(predictions)

print("Additional Metrics from Best Model (based on RMSE tuning):")
print(f" - Mean Absolute Error (MAE): {mae}")
print(f" - Mean Squared Error (MSE): {mse}")


Best Model - Root Mean Squared Error (RMSE): 0.96678768892694
Best Params:
 - regParam: 0.01
 - elasticNetParam: 0.5
 - maxIter: 50
Additional Metrics from Best Model (based on RMSE tuning):
 - Mean Absolute Error (MAE): 0.7981660850225181
 - Mean Squared Error (MSE): 0.9346784354606936


In [None]:
spark.stop()