In [21]:
# 0) Mount Drive
from google.colab import drive
drive.mount('/content/drive')

import os, glob, shutil, time

INPUT_CSV = "/content/drive/MyDrive/2023_Yellow_Taxi_Trip_Data.csv"
PROJECT_DIR = "/content/drive/MyDrive/7006SCN_YellowTaxi_Project_NEW1"
DATA_DIR = os.path.join(PROJECT_DIR, "data")
OUT_DIR  = os.path.join(PROJECT_DIR, "outputs")

os.makedirs(DATA_DIR, exist_ok=True)
os.makedirs(OUT_DIR, exist_ok=True)

print("INPUT_CSV:", INPUT_CSV)
print("PROJECT_DIR:", PROJECT_DIR)
print("DATA_DIR:", DATA_DIR)
print("OUT_DIR:", OUT_DIR)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
INPUT_CSV: /content/drive/MyDrive/2023_Yellow_Taxi_Trip_Data.csv
PROJECT_DIR: /content/drive/MyDrive/7006SCN_YellowTaxi_Project_NEW1
DATA_DIR: /content/drive/MyDrive/7006SCN_YellowTaxi_Project_NEW1/data
OUT_DIR: /content/drive/MyDrive/7006SCN_YellowTaxi_Project_NEW1/outputs


In [22]:
# 1) Install Java + PySpark
!apt-get -qq update
!apt-get -qq install -y openjdk-11-jdk
!pip -q install pyspark==3.5.1

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = (
    SparkSession.builder
    .appName("7006SCN_YellowTaxi_Anomaly")
    .config("spark.driver.memory", "8g")
    .config("spark.executor.memory", "4g")
    .config("spark.sql.shuffle.partitions", "200")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "2g")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")
print("Spark:", spark.version)

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Spark: 3.5.1


In [23]:
# 2) Read CSV from Google Drive and LIMIT to 200k immediately (no full scan)
df_raw = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .csv(INPUT_CSV))

TARGET_N = 200_000
df = df_raw.limit(TARGET_N).cache()
print("Rows loaded (should be 200k or less):", df.count())
df.show(5, truncate=False)

Rows loaded (should be 200k or less): 200000
+--------+----------------------+----------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime  |tpep_dropoff_datetime |passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+----------------------+----------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|2       |01/01/2023 12:32:10 AM|01/01/2023 12:40:36 AM|1              |0.97         |1         |N                 |161         |141 

In [26]:
# 3) Robust parsing + safe casting + feature engineering + cleaning
import pyspark.sql.functions as F

# --- 3.1 Robust datetime parsing (tries common formats) ---
pickup_ts = F.coalesce(
    F.to_timestamp("tpep_pickup_datetime", "MM/dd/yyyy hh:mm:ss a"), # Added for AM/PM
    F.to_timestamp("tpep_pickup_datetime", "yyyy-MM-dd HH:mm:ss"),
    F.to_timestamp("tpep_pickup_datetime", "MM/dd/yyyy HH:mm:ss"),
    F.to_timestamp("tpep_pickup_datetime", "MM/dd/yyyy HH:mm"),
    F.to_timestamp("tpep_pickup_datetime", "dd-MM-yyyy HH:mm"),
    F.to_timestamp("tpep_pickup_datetime")  # last fallback
)

dropoff_ts = F.coalesce(
    F.to_timestamp("tpep_dropoff_datetime", "MM/dd/yyyy hh:mm:ss a"), # Added for AM/PM
    F.to_timestamp("tpep_dropoff_datetime", "yyyy-MM-dd HH:mm:ss"),
    F.to_timestamp("tpep_dropoff_datetime", "MM/dd/yyyy HH:mm:ss"),
    F.to_timestamp("tpep_dropoff_datetime", "MM/dd/yyyy HH:mm"),
    F.to_timestamp("tpep_dropoff_datetime", "dd-MM-yyyy HH:mm"),
    F.to_timestamp("tpep_dropoff_datetime")
)

df2 = (df
       .withColumn("pickup_ts", pickup_ts)
       .withColumn("dropoff_ts", dropoff_ts))

print("pickup_ts nulls:", df2.filter(F.col("pickup_ts").isNull()).count())
print("dropoff_ts nulls:", df2.filter(F.col("dropoff_ts").isNull()).count())

# --- 3.2 Safe numeric casts (handles string columns too) ---
num_cols = [
    "passenger_count","trip_distance","RatecodeID","PULocationID","DOLocationID","payment_type",
    "fare_amount","extra","mta_tax","tip_amount","tolls_amount","improvement_surcharge",
    "total_amount","congestion_surcharge","airport_fee"
]
for c in num_cols:
    if c in df2.columns:
        df2 = df2.withColumn(c, F.col(c).cast("double"))

# --- 3.3 Features ---
df2 = df2.withColumn(
    "trip_duration_min",
    (F.col("dropoff_ts").cast("long") - F.col("pickup_ts").cast("long")) / 60.0
)

df2 = df2.withColumn(
    "speed_mph",
    F.when(F.col("trip_duration_min") > 0,
           F.col("trip_distance") / (F.col("trip_duration_min") / 60.0)
          ).otherwise(F.lit(None))
)

df2 = df2.withColumn(
    "tip_pct",
    F.when(F.col("fare_amount") > 0, F.col("tip_amount") / F.col("fare_amount")).otherwise(F.lit(0.0))
)

# --- 3.4 Fill defaults ---
df2 = df2.na.fill({
    "passenger_count": 0.0,
    "RatecodeID": 1.0,
    "store_and_fwd_flag": "N",
    "congestion_surcharge": 0.0,
    "airport_fee": 0.0,
    "tip_amount": 0.0,
    "tolls_amount": 0.0,
    "extra": 0.0,
    "mta_tax": 0.0,
    "improvement_surcharge": 0.0
})

# --- 3.5 Cleaning filters (realistic bounds) ---
# NOTE: Keep bounds moderate to avoid dropping all rows.
df3 = (df2
    .filter(F.col("pickup_ts").isNotNull() & F.col("dropoff_ts").isNotNull())
    .filter(F.col("trip_distance").isNotNull())
    .filter(F.col("total_amount").isNotNull())
    .filter((F.col("trip_distance") > 0) & (F.col("trip_distance") <= 60))
    .filter((F.col("trip_duration_min") > 0) & (F.col("trip_duration_min") <= 360))
    .filter((F.col("total_amount") > 0) & (F.col("total_amount") <= 800))
    .cache()
)

print("Rows after cleaning:", df3.count())
df3.select("trip_distance","trip_duration_min","speed_mph","total_amount","tip_pct").show(5, truncate=False)


pickup_ts nulls: 0
dropoff_ts nulls: 0
Rows after cleaning: 194737
+-------------+------------------+------------------+------------+------------------+
|trip_distance|trip_duration_min |speed_mph         |total_amount|tip_pct           |
+-------------+------------------+------------------+------------+------------------+
|0.97         |8.433333333333334 |6.901185770750987 |14.3        |0.0               |
|1.1          |6.316666666666666 |10.448548812664908|16.9        |0.5063291139240506|
|2.51         |12.75             |11.811764705882352|34.9        |1.006711409395973 |
|1.9          |9.616666666666667 |11.854419410745232|20.85       |0.0               |
|1.43         |10.833333333333334|7.92              |19.68       |0.287719298245614 |
+-------------+------------------+------------------+------------+------------------+
only showing top 5 rows



In [27]:
# 4) Create classification label: is_anomaly (0/1)
#    We create a RULE-BASED label (common approach for anomaly detection training when no ground truth).
#    You can explain this in the report as a proxy label for “suspicious trips”.

# Anomaly rules (tunable):
# - very high speed
# - very long distance
# - very high total amount
# - very long duration
df_labeled = (df3.withColumn(
    "is_anomaly",
    F.when(
        (F.col("speed_mph") >= 60) |
        (F.col("trip_distance") >= 30) |
        (F.col("total_amount") >= 250) |
        (F.col("trip_duration_min") >= 180),
        F.lit(1.0)
    ).otherwise(F.lit(0.0))
)).cache()

print("Label distribution:")
df_labeled.groupBy("is_anomaly").count().show()

# Keep ONLY 200k rows for the rest (still <=200k after cleaning)
# If cleaning reduced rows, we keep what's available.
df_final = df_labeled.cache()
print("Final rows used:", df_final.count())

Label distribution:
+----------+------+
|is_anomaly| count|
+----------+------+
|       0.0|194306|
|       1.0|   431|
+----------+------+

Final rows used: 194737


In [28]:
# 5) Save cleaned sample to Drive:
#    - Parquet in DATA_DIR
#    - Single CSV for Tableau in OUT_DIR

PARQUET_PATH = os.path.join(DATA_DIR, "yellow_taxi_200k_clean.parquet")
df_final.write.mode("overwrite").parquet(PARQUET_PATH)
print("Saved Parquet:", PARQUET_PATH)

CSV_TMP = os.path.join(OUT_DIR, "sample_200k_for_tableau_csv_tmp")
(df_final
 .coalesce(1)
 .write.mode("overwrite")
 .option("header", True)
 .csv(CSV_TMP))

part = glob.glob(CSV_TMP + "/part-*.csv")[0]
CSV_OUT = os.path.join(OUT_DIR, "sample_200k_for_tableau.csv")
shutil.copy(part, CSV_OUT)
print("Saved Tableau CSV:", CSV_OUT)

Saved Parquet: /content/drive/MyDrive/7006SCN_YellowTaxi_Project_NEW1/data/yellow_taxi_200k_clean.parquet
Saved Tableau CSV: /content/drive/MyDrive/7006SCN_YellowTaxi_Project_NEW1/outputs/sample_200k_for_tableau.csv


In [29]:
# 6) Build ML pipeline components (Spark MLlib)
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler

# Split
train_df, test_df = df_final.randomSplit([0.8, 0.2], seed=42)
train_df = train_df.cache()
test_df  = test_df.cache()
print("Train:", train_df.count(), "Test:", test_df.count())

# Columns
categorical_cols = ["VendorID","RatecodeID","store_and_fwd_flag","payment_type","PULocationID","DOLocationID"]
numeric_cols = [
    "passenger_count","trip_distance","fare_amount","extra","mta_tax","tip_amount",
    "tolls_amount","improvement_surcharge","congestion_surcharge","airport_fee",
    "trip_duration_min","speed_mph","tip_pct"
]

# Keep only existing columns (avoids errors if a column not present)
categorical_cols = [c for c in categorical_cols if c in df_final.columns]
numeric_cols = [c for c in numeric_cols if c in df_final.columns]

indexers = [StringIndexer(inputCol=c, outputCol=f"{c}_idx", handleInvalid="keep") for c in categorical_cols]
encoder  = OneHotEncoder(
    inputCols=[f"{c}_idx" for c in categorical_cols],
    outputCols=[f"{c}_ohe" for c in categorical_cols],
    handleInvalid="keep"
)

assembler = VectorAssembler(
    inputCols=numeric_cols + [f"{c}_ohe" for c in categorical_cols],
    outputCol="features_raw",
    handleInvalid="keep"
)

scaler = StandardScaler(inputCol="features_raw", outputCol="features", withStd=True, withMean=False)

prep_stages = indexers + [encoder, assembler, scaler]

Train: 155635 Test: 39102


In [30]:
# 7) Evaluators + helper for metrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator

acc_eval = MulticlassClassificationEvaluator(labelCol="is_anomaly", predictionCol="prediction", metricName="accuracy")
f1_eval  = MulticlassClassificationEvaluator(labelCol="is_anomaly", predictionCol="prediction", metricName="f1")
auc_eval = BinaryClassificationEvaluator(labelCol="is_anomaly", rawPredictionCol="rawPrediction", metricName="areaUnderROC")

metrics = []
ts = time.strftime("%Y-%m-%d %H:%M:%S")

In [31]:
# 8) Model 1: Logistic Regression (separate training, no loop)
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol="is_anomaly", featuresCol="features", maxIter=50, regParam=0.01, elasticNetParam=0.0)
pipe_lr = Pipeline(stages=prep_stages + [lr])

start = time.time()
lr_model = pipe_lr.fit(train_df)
lr_time = time.time() - start

lr_pred = lr_model.transform(test_df)

metrics.append({
    "timestamp": ts,
    "task": "classification_anomaly",
    "model": "LogisticRegression",
    "accuracy": float(acc_eval.evaluate(lr_pred)),
    "f1": float(f1_eval.evaluate(lr_pred)),
    "auc": float(auc_eval.evaluate(lr_pred)),
    "train_time_sec": float(lr_time)
})
print(metrics[-1])

{'timestamp': '2026-02-27 07:55:41', 'task': 'classification_anomaly', 'model': 'LogisticRegression', 'accuracy': 0.998158661961025, 'f1': 0.9976308606603832, 'auc': 0.9779640548840709, 'train_time_sec': 37.22212100028992}


In [32]:
# 9) Model 2: Random Forest Classifier (separate training)
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(labelCol="is_anomaly", featuresCol="features", numTrees=120, maxDepth=12, seed=42)
pipe_rf = Pipeline(stages=prep_stages + [rf])

start = time.time()
rf_model = pipe_rf.fit(train_df)
rf_time = time.time() - start

rf_pred = rf_model.transform(test_df)

metrics.append({
    "timestamp": ts,
    "task": "classification_anomaly",
    "model": "RandomForestClassifier",
    "accuracy": float(acc_eval.evaluate(rf_pred)),
    "f1": float(f1_eval.evaluate(rf_pred)),
    "auc": float(auc_eval.evaluate(rf_pred)),
    "train_time_sec": float(rf_time)
})
print(metrics[-1])

{'timestamp': '2026-02-27 07:55:41', 'task': 'classification_anomaly', 'model': 'RandomForestClassifier', 'accuracy': 0.9978261981484323, 'f1': 0.9967404798616825, 'auc': 0.993736666822456, 'train_time_sec': 146.22498202323914}


In [33]:
# 10) Model 3: GBT Classifier (separate training)
from pyspark.ml.classification import GBTClassifier

gbt = GBTClassifier(labelCol="is_anomaly", featuresCol="features", maxIter=60, maxDepth=6, seed=42)
pipe_gbt = Pipeline(stages=prep_stages + [gbt])

start = time.time()
gbt_model = pipe_gbt.fit(train_df)
gbt_time = time.time() - start

gbt_pred = gbt_model.transform(test_df)

metrics.append({
    "timestamp": ts,
    "task": "classification_anomaly",
    "model": "GBTClassifier",
    "accuracy": float(acc_eval.evaluate(gbt_pred)),
    "f1": float(f1_eval.evaluate(gbt_pred)),
    "auc": float(auc_eval.evaluate(gbt_pred)),
    "train_time_sec": float(gbt_time)
})
print(metrics[-1])

{'timestamp': '2026-02-27 07:55:41', 'task': 'classification_anomaly', 'model': 'GBTClassifier', 'accuracy': 0.9990281827016521, 'f1': 0.9989913529584007, 'auc': 0.9986523219893592, 'train_time_sec': 207.82260274887085}


In [34]:
# 11) Model 4: Linear SVC (separate training)
# NOTE: LinearSVC does NOT output probability; it outputs rawPrediction.
# AUC works (uses rawPrediction), accuracy & f1 work too.

from pyspark.ml.classification import LinearSVC

svc = LinearSVC(labelCol="is_anomaly", featuresCol="features", maxIter=80, regParam=0.05)
pipe_svc = Pipeline(stages=prep_stages + [svc])

start = time.time()
svc_model = pipe_svc.fit(train_df)
svc_time = time.time() - start

svc_pred = svc_model.transform(test_df)

metrics.append({
    "timestamp": ts,
    "task": "classification_anomaly",
    "model": "LinearSVC",
    "accuracy": float(acc_eval.evaluate(svc_pred)),
    "f1": float(f1_eval.evaluate(svc_pred)),
    "auc": float(auc_eval.evaluate(svc_pred)),
    "train_time_sec": float(svc_time)
})
print(metrics[-1])

{'timestamp': '2026-02-27 07:55:41', 'task': 'classification_anomaly', 'model': 'LinearSVC', 'accuracy': 0.9979540688455834, 'f1': 0.9970458137745494, 'auc': 0.960583395774698, 'train_time_sec': 19.517208337783813}


In [35]:
# 12) Save metrics CSV to Drive
import pandas as pd

metrics_df = pd.DataFrame(metrics)
METRICS_PATH = os.path.join(OUT_DIR, "model_metrics_classification.csv")
metrics_df.to_csv(METRICS_PATH, index=False)

print("Saved metrics:", METRICS_PATH)
metrics_df

Saved metrics: /content/drive/MyDrive/7006SCN_YellowTaxi_Project_NEW1/outputs/model_metrics_classification.csv


Unnamed: 0,timestamp,task,model,accuracy,f1,auc,train_time_sec
0,2026-02-27 07:55:41,classification_anomaly,LogisticRegression,0.998159,0.997631,0.977964,37.222121
1,2026-02-27 07:55:41,classification_anomaly,RandomForestClassifier,0.997826,0.99674,0.993737,146.224982
2,2026-02-27 07:55:41,classification_anomaly,GBTClassifier,0.999028,0.998991,0.998652,207.822603
3,2026-02-27 07:55:41,classification_anomaly,LinearSVC,0.997954,0.997046,0.960583,19.517208


In [36]:
# 13) Save predictions for Tableau (use BEST model by AUC)
# Pick best model name from metrics_df
best_model_name = metrics_df.sort_values("auc", ascending=False).iloc[0]["model"]
print("Best model by AUC:", best_model_name)

# Choose fitted model object
best_fitted = None
if best_model_name == "LogisticRegression":
    best_fitted = lr_model
elif best_model_name == "RandomForestClassifier":
    best_fitted = rf_model
elif best_model_name == "GBTClassifier":
    best_fitted = gbt_model
else:
    best_fitted = svc_model

pred_out = (best_fitted.transform(test_df)
    .select(
        "VendorID","pickup_ts","dropoff_ts",
        "PULocationID","DOLocationID",
        "passenger_count","trip_distance","trip_duration_min","speed_mph",
        "fare_amount","tip_amount","total_amount","tip_pct",
        "is_anomaly",
        F.col("prediction").alias("pred_is_anomaly")
    )
)

# Save single CSV for Tableau
PRED_CSV_TMP = os.path.join(OUT_DIR, "test_predictions_csv_tmp")
(pred_out.coalesce(1)
 .write.mode("overwrite")
 .option("header", True)
 .csv(PRED_CSV_TMP))

part = glob.glob(PRED_CSV_TMP + "/part-*.csv")[0]
PRED_CSV = os.path.join(OUT_DIR, "test_predictions_for_tableau.csv")
shutil.copy(part, PRED_CSV)

print("Saved prediction CSV:", PRED_CSV)

Best model by AUC: GBTClassifier
Saved prediction CSV: /content/drive/MyDrive/7006SCN_YellowTaxi_Project_NEW1/outputs/test_predictions_for_tableau.csv
