In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import coalesce, lit
from pyspark.sql.functions import hour, when
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col, unix_timestamp, when, coalesce, hour, to_timestamp

##Data Cleaning & Engineering:

In [None]:
spark = SparkSession.builder \
    .appName("NYC Taxi Project") \
    .getOrCreate()

# Adjust paths if needed
trip_df = spark.read.csv("/content/taxi_trip_data.csv", header=True, inferSchema=True)
zone_df = spark.read.csv("/content/taxi_zone_geo.csv", header=True, inferSchema=True)

In [None]:
trip_df.printSchema()
trip_df.show(5)

zone_df.printSchema()
zone_df.show(5)


root
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- imp_surcharge: double (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)

+---------+----------------+----------------+---------------+-------------+---------+------------------+------------+-----------+-----+-------+----------+------------+-------------+------------------+-------------------+
|vendor_id| pickup_datetime|dropoff_datetime|passenge

In [None]:
trip_df = trip_df.withColumn("pickup_datetime", to_timestamp("pickup_datetime", "M/d/yyyy H:mm")) \
                 .withColumn("dropoff_datetime", to_timestamp("dropoff_datetime", "M/d/yyyy H:mm"))

# Drop rows with essential nulls and invalid data
trip_df = trip_df.dropna(subset=["pickup_datetime", "dropoff_datetime", "fare_amount", "trip_distance", "tip_amount"])
trip_df = trip_df.filter((col("fare_amount") > 0) & (col("trip_distance") > 0))
trip_df = trip_df.dropDuplicates()

In [None]:
trip_df = trip_df.withColumn("trip_duration_minutes",
    (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60)

trip_df = trip_df.withColumn("total_trip_cost",
    coalesce(col("fare_amount"), lit(0)) +
    coalesce(col("extra"), lit(0)) +
    coalesce(col("mta_tax"), lit(0)) +
    coalesce(col("tip_amount"), lit(0)) +
    coalesce(col("tolls_amount"), lit(0)) +
    coalesce(col("imp_surcharge"), lit(0))
)

In [None]:
trip_df = trip_df.withColumn("pickup_hour", hour("pickup_datetime"))

trip_df = trip_df.withColumn("time_of_day",
    when((col("pickup_hour") >= 6) & (col("pickup_hour") < 12), "Morning")
    .when((col("pickup_hour") >= 12) & (col("pickup_hour") < 18), "Afternoon")
    .when((col("pickup_hour") >= 18) & (col("pickup_hour") < 24), "Evening")
    .otherwise("Night"))

In [None]:
# Rename zone_df for pickup and dropoff joins
pickup_zones = zone_df.withColumnRenamed("zone_id", "pickup_location_id") \
                      .withColumnRenamed("zone_name", "pickup_zone") \
                      .withColumnRenamed("borough", "pickup_borough")

dropoff_zones = zone_df.withColumnRenamed("zone_id", "dropoff_location_id") \
                       .withColumnRenamed("zone_name", "dropoff_zone") \
                       .withColumnRenamed("borough", "dropoff_borough")

# Join both
trip_df = trip_df.join(pickup_zones, on="pickup_location_id", how="left") \
                 .join(dropoff_zones, on="dropoff_location_id", how="left")

##Analytical Queries:

In [None]:
trip_df.groupBy("time_of_day", "payment_type") \
       .count().orderBy("time_of_day", "count", ascending=False).show()

+-----------+------------+------+
|time_of_day|payment_type| count|
+-----------+------------+------+
|      Night|           1| 68887|
|      Night|           2| 32040|
|      Night|           3|   684|
|      Night|           4|   191|
|    Morning|           1|181506|
|    Morning|           2| 75130|
|    Morning|           3|   796|
|    Morning|           4|   271|
|    Evening|           1|250964|
|    Evening|           2| 95029|
|    Evening|           3|  1282|
|    Evening|           4|   319|
|  Afternoon|           1|222548|
|  Afternoon|           2|108830|
|  Afternoon|           3|  1346|
|  Afternoon|           4|   404|
+-----------+------------+------+



In [None]:
trip_df.groupBy("pickup_borough") \
       .agg({"total_trip_cost": "sum", "*": "count"}) \
       .withColumnRenamed("sum(total_trip_cost)", "total_revenue") \
       .withColumnRenamed("count(1)", "trip_count") \
       .orderBy("total_revenue", ascending=False) \
       .show()


+--------------+----------+--------------------+
|pickup_borough|trip_count|       total_revenue|
+--------------+----------+--------------------+
|     Manhattan|    944995|1.3447441029941741E7|
|        Queens|     65224|   2937811.430001083|
|          NULL|     15837|   263706.2800000149|
|      Brooklyn|     12979|  238826.83000001457|
|         Bronx|      1152|  29699.299999999777|
|           EWR|        23|             1811.37|
| Staten Island|        17|              807.04|
+--------------+----------+--------------------+



In [None]:
trip_df.groupBy("passenger_count") \
       .avg("tip_amount") \
       .withColumnRenamed("avg(tip_amount)", "avg_tip") \
       .orderBy("passenger_count") \
       .show()

+---------------+------------------+
|passenger_count|           avg_tip|
+---------------+------------------+
|              0|1.9129191001514152|
|              1|1.8702055254833465|
|              2|1.8625866752402052|
|              3| 1.834265957936398|
|              4|1.7031647053078713|
|              5| 1.876816759942987|
|              6|1.8648239812246603|
|              9|              0.98|
+---------------+------------------+



In [None]:
trip_df.groupBy("pickup_zone", "time_of_day") \
       .count().orderBy("count", ascending=False).show(5)


+--------------------+-----------+-----+
|         pickup_zone|time_of_day|count|
+--------------------+-----------+-----+
|Upper East Side S...|  Afternoon|17264|
|Upper East Side N...|  Afternoon|15991|
|      Midtown Center|    Evening|14938|
|      Midtown Center|  Afternoon|14661|
|Times Sq/Theatre ...|    Evening|14255|
+--------------------+-----------+-----+
only showing top 5 rows



In [None]:
trip_df.select("trip_duration_minutes", "fare_amount",
               "pickup_zone", "dropoff_zone", "payment_type") \
       .orderBy("trip_duration_minutes", ascending=False).show(5)


+---------------------+-----------+--------------------+--------------------+------------+
|trip_duration_minutes|fare_amount|         pickup_zone|        dropoff_zone|payment_type|
+---------------------+-----------+--------------------+--------------------+------------+
|               1440.0|       12.0|   Battery Park City|Meatpacking/West ...|           1|
|               1440.0|        6.0|            Union Sq|        East Village|           1|
|               1440.0|       12.0|            Gramercy|     Lower East Side|           1|
|               1440.0|       28.5|Times Sq/Theatre ...|              Inwood|           2|
|               1440.0|       15.5|Upper East Side N...|            Gramercy|           1|
+---------------------+-----------+--------------------+--------------------+------------+
only showing top 5 rows



In [None]:
trip_df.groupBy("pickup_borough", "dropoff_borough") \
       .count().orderBy("count", ascending=False).show(10)


+--------------+---------------+------+
|pickup_borough|dropoff_borough| count|
+--------------+---------------+------+
|     Manhattan|      Manhattan|877706|
|        Queens|      Manhattan| 38059|
|     Manhattan|         Queens| 32177|
|     Manhattan|       Brooklyn| 26998|
|        Queens|         Queens| 15872|
|          NULL|           NULL| 13561|
|        Queens|       Brooklyn|  8954|
|      Brooklyn|       Brooklyn|  8185|
|     Manhattan|          Bronx|  4836|
|      Brooklyn|      Manhattan|  3842|
+--------------+---------------+------+
only showing top 10 rows



##SparkML Task: Trip Profiling: Predict Likelihood of High Tipping

In [None]:
trip_df = trip_df.withColumn("high_tip",
    when(col("tip_amount") > 0.15 * col("fare_amount"), 1).otherwise(0))

In [None]:
from pyspark.sql.functions import dayofweek, hour

trip_df = trip_df \
    .withColumn("fare_per_mile", col("fare_amount") / col("trip_distance")) \
    .withColumn("pickup_hour", hour("pickup_datetime"))


In [None]:
from pyspark.ml.feature import StringIndexer

# Define time_of_day first
trip_df = trip_df.withColumn("time_of_day",
    when((col("pickup_hour") >= 6) & (col("pickup_hour") < 12), "Morning")
    .when((col("pickup_hour") >= 12) & (col("pickup_hour") < 18), "Afternoon")
    .when((col("pickup_hour") >= 18) & (col("pickup_hour") < 24), "Evening")
    .otherwise("Night"))

# Index categorical features
borough_indexer = StringIndexer(inputCol="pickup_borough", outputCol="pickup_borough_index", handleInvalid='keep')
time_indexer = StringIndexer(inputCol="time_of_day", outputCol="time_of_day_index", handleInvalid='keep')

trip_df = trip_df.drop("pickup_borough_index", "time_of_day_index")
trip_df = borough_indexer.fit(trip_df).transform(trip_df)
trip_df = time_indexer.fit(trip_df).transform(trip_df)

In [None]:
from pyspark.ml.feature import VectorAssembler

selected_features = [
    "passenger_count", "trip_distance", "trip_duration_minutes",
    "pickup_hour", "fare_amount", "fare_per_mile",
    "pickup_borough_index", "time_of_day_index"
]

assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
assembled_df = assembler.transform(trip_df.select(*selected_features, "high_tip").dropna())


In [None]:
from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(assembled_df)
scaled_df = scaler_model.transform(assembled_df)

In [None]:
assembler = VectorAssembler(inputCols=selected_features, outputCol="features")
assembled_df = assembler.transform(trip_df.select(*selected_features, "high_tip").dropna())

In [None]:
from pyspark.ml.feature import MinMaxScaler

scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(assembled_df)
scaled_df = scaler_model.transform(assembled_df)

In [None]:
train_data, test_data = scaled_df.randomSplit([0.8, 0.2], seed=42)


In [None]:
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier

# Logistic Regression
lr = LogisticRegression(labelCol="high_tip", featuresCol="scaled_features", maxIter=20)
lr_model = lr.fit(train_data)

# Decision Tree
dt = DecisionTreeClassifier(labelCol="high_tip", featuresCol="scaled_features")
dt_model = dt.fit(train_data)

# Random Forest
rf = RandomForestClassifier(labelCol="high_tip", featuresCol="scaled_features", numTrees=50)
rf_model = rf.fit(train_data)


In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

# Predictions
lr_predictions = lr_model.transform(test_data)
dt_predictions = dt_model.transform(test_data)
rf_predictions = rf_model.transform(test_data)

# AUC evaluator
auc_evaluator = BinaryClassificationEvaluator(labelCol="high_tip", rawPredictionCol="rawPrediction")

# Other metrics evaluator
f1_evaluator = MulticlassClassificationEvaluator(labelCol="high_tip", predictionCol="prediction", metricName="f1")
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="high_tip", predictionCol="prediction", metricName="accuracy")
precision_evaluator = MulticlassClassificationEvaluator(labelCol="high_tip", predictionCol="prediction", metricName="weightedPrecision")

# Logistic Regression metrics
lr_auc = auc_evaluator.evaluate(lr_predictions)
lr_f1 = f1_evaluator.evaluate(lr_predictions)
lr_acc = accuracy_evaluator.evaluate(lr_predictions)
lr_precision = precision_evaluator.evaluate(lr_predictions)

# Decision Tree metrics
dt_auc = auc_evaluator.evaluate(dt_predictions)
dt_f1 = f1_evaluator.evaluate(dt_predictions)
dt_acc = accuracy_evaluator.evaluate(dt_predictions)
dt_precision = precision_evaluator.evaluate(dt_predictions)

# Random Forest metrics
rf_auc = auc_evaluator.evaluate(rf_predictions)
rf_f1 = f1_evaluator.evaluate(rf_predictions)
rf_acc = accuracy_evaluator.evaluate(rf_predictions)
rf_precision = precision_evaluator.evaluate(rf_predictions)


In [None]:
print("🔹 Logistic Regression")
print(f"   AUC:       {lr_auc:.4f}")
print(f"   Accuracy:  {lr_acc:.4f}")
print(f"   F1 Score:  {lr_f1:.4f}")
print(f"   Precision: {lr_precision:.4f}")

print("🔹 Decision Tree")
print(f"   AUC:       {dt_auc:.4f}")
print(f"   Accuracy:  {dt_acc:.4f}")
print(f"   F1 Score:  {dt_f1:.4f}")
print(f"   Precision: {dt_precision:.4f}")

print("🔹 Random Forest")
print(f"   AUC:       {rf_auc:.4f}")
print(f"   Accuracy:  {rf_acc:.4f}")
print(f"   F1 Score:  {rf_f1:.4f}")
print(f"   Precision: {rf_precision:.4f}")


🔹 Logistic Regression
   AUC:       0.5255
   Accuracy:  0.5725
   F1 Score:  0.4218
   Precision: 0.5546
🔹 Decision Tree
   AUC:       0.5287
   Accuracy:  0.5776
   F1 Score:  0.4538
   Precision: 0.5757
🔹 Random Forest
   AUC:       0.5502
   Accuracy:  0.5772
   F1 Score:  0.4431
   Precision: 0.5901
