## PART-3

-----
### Baseline Model

In [0]:
# Import PySpark SQL functions module
from pyspark.sql import functions as F

In [0]:
# Load full taxi data with time features
df = spark.table("final_taxi_data") \
    .withColumn("year", F.year("pickup_datetime")) \
    .withColumn("month", F.month("pickup_datetime")) \
    .withColumn("day_of_week", F.dayofweek("pickup_datetime")) \
    .withColumn("hour", F.hour("pickup_datetime"))

In [0]:
# Split into train & test
train_df = df.filter(
    ((F.col("year") >= 2009) & (F.col("year") <= 2023)) |
    ((F.col("year") == 2024) & (F.col("month") <= 9))
)

test_df = df.filter(
    (F.col("year") == 2024) & (F.col("month") >= 10)
)


In [0]:
# Baseline model (train only)
baseline_stats = train_df.groupBy(
    "taxi_colour",
    "pickup_borough",
    "dropoff_borough",
    "month",
    "day_of_week",
    "hour"
).agg(F.avg("total_amount").alias("baseline_pred"))

# Join predictions with train set
train_with_pred = train_df.join(
    baseline_stats,
    on=["taxi_colour", "pickup_borough", "dropoff_borough", "month", "day_of_week", "hour"],
    how="left"
)


In [0]:
# RMSE Calculation on train
rmse_df = train_with_pred.withColumn(
    "squared_error",
    (F.col("total_amount") - F.col("baseline_pred"))**2
)

baseline_rmse = rmse_df.agg(
    F.sqrt(F.avg("squared_error")).alias("rmse")
).collect()[0]["rmse"]

print(f"Baseline RMSE on train set: {baseline_rmse:.2f}")


Baseline RMSE on train set: 8.26


In [0]:
# Baseline model (test only)
baseline_stats = test_df.groupBy(
    "taxi_colour",
    "pickup_borough",
    "dropoff_borough",
    "month",
    "day_of_week",
    "hour"
).agg(F.avg("total_amount").alias("baseline_pred"))

# Join predictions with train set
test_with_pred = test_df.join(
    baseline_stats,
    on=["taxi_colour", "pickup_borough", "dropoff_borough", "month", "day_of_week", "hour"],
    how="left"
)


In [0]:
# RMSE Calculation on test
rmse_df = test_with_pred.withColumn(
    "squared_error",
    (F.col("total_amount") - F.col("baseline_pred"))**2
)

baseline_rmse = rmse_df.agg(
    F.sqrt(F.avg("squared_error")).alias("rmse")
).collect()[0]["rmse"]

print(f"Baseline RMSE on test set: {baseline_rmse:.2f}")


Baseline RMSE on test set: 10.21


------
## Model - 1 (SGDRegressor)

In [0]:
# Create a dataframe with only 2024 year data
df_2024 = df.filter(F.col("year") == 2024)

In [0]:
# Check the schema
df_2024.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- trip_time: decimal(27,6) (nullable = true)
 |-- speed_mph: double (nullable = true)
 |-- airport_fee: d

In [0]:
# List of columns selected for modeling
features = [
    "total_amount",
    "passenger_count",
    "trip_distance",
    "trip_time",
    "speed_mph",
    "year",
    "month",
    "day_of_week",
    "hour",
    "pickup_borough",
    "dropoff_borough",
    "RatecodeID",
    "payment_type",
    "extra",
    "mta_tax",
    "tip_amount",
    "ehail_fee",
    "improvement_surcharge",
    "congestion_surcharge",
    "airport_fee",
    "trip_type",
    "taxi_colour"
]

In [0]:
# Select only the required features
df_2024 = df_2024.select(*features)


In [0]:
# Check the schema again
df_2024.printSchema()


root
 |-- total_amount: double (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- trip_time: decimal(27,6) (nullable = true)
 |-- speed_mph: double (nullable = true)
 |-- month: integer (nullable = true)
 |-- day_of_week: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- pickup_borough: string (nullable = true)
 |-- dropoff_borough: string (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- payment_type: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- taxi_colour: string (nullable = true)



In [0]:
# Identify categorical and numerical columns for scikit-learn
categorical_cols = ["pickup_borough", "dropoff_borough", "RatecodeID", "payment_type", "trip_type", "taxi_colour"]
numerical_cols = [
    "passenger_count", "trip_distance", "trip_time", "speed_mph", "extra", "mta_tax",
    "tip_amount", "ehail_fee", "improvement_surcharge", "congestion_surcharge", "airport_fee",
]
# Time features are numerical, so add them
numerical_cols.extend(['month', 'day_of_week', 'hour'])

In [0]:
# Split the dataframe into train and test dataframes
train_df_2024= df_2024.filter((F.month("pickup_datetime") <= 9))
test_df_2024 = df_2024.filter((F.month("pickup_datetime") >= 10))

In [0]:
# Save the dataframes as Delta tables
train_df_2024.write.mode("overwrite").format("delta").saveAsTable("train_df_2024_delta")
test_df_2024.write.mode("overwrite").format("delta").saveAsTable("test_df_2024_delta")

In [0]:
# Check the shape of the dataframes
print(f"train_df_2024: rows={train_df_2024.count()}, columns={len(train_df_2024.columns)}")
print(f"test_df_2024: rows={test_df_2024.count()}, columns={len(test_df_2024.columns)}")

train_df_2024: rows=25689161, columns=21
test_df_2024: rows=9518622, columns=21


In [0]:
# Register the PySpark DataFrame as a temporary view
train_df_2024.createOrReplaceTempView("train_df_2024")
test_df_2024.createOrReplaceTempView("test_df_2024")


In [0]:
# Import the required Packages
import numpy as np
import pandas as pd
from sklearn.linear_model import SGDRegressor
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline as sk_pipeline
import joblib
import time
import os


In [0]:
# Set up dataset and columns
df = train_df_2024 

# Categorical and numerical columns
categorical_cols = ["pickup_borough", "dropoff_borough", "RatecodeID", "payment_type", "trip_type", "taxi_colour"]
numerical_cols = [
    "passenger_count", "trip_distance", "trip_time", "speed_mph", "extra", "mta_tax",
    "tip_amount", "ehail_fee", "improvement_surcharge", "congestion_surcharge", "airport_fee",
    "month", "day_of_week", "hour"
]
target_col = "total_amount"
all_cols_for_preprocessor = categorical_cols + numerical_cols

In [0]:
# Fit preprocessor on a small sample
print("⏳ Sampling 200k rows to fit preprocessor...")
sample_df = df.limit(200_000).toPandas()

numerical_transformer = sk_pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value=0)),
    ('scaler', StandardScaler(with_mean=False))  # with_mean=False for sparse matrices
])
categorical_transformer = sk_pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=True))
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numerical_transformer, numerical_cols),
        ('cat', categorical_transformer, categorical_cols)
    ],
    remainder='drop'  # drop any column not listed
)

preprocessor.fit(sample_df[all_cols_for_preprocessor])
print("✅ Preprocessor fitted successfully.")

⏳ Sampling 200k rows to fit preprocessor...
✅ Preprocessor fitted successfully.


In [0]:

# Initialize SGDRegressor for partial_fit
model = SGDRegressor()

BATCH_SIZE = 200_000
offset = 0
total_rows = df.count()
num_batches = int(np.ceil(total_rows / BATCH_SIZE))

print(f"Total rows: {total_rows}")
print(f"Training in {num_batches} batches of up to {BATCH_SIZE} rows.")


# Batch-wise training
while offset < total_rows:
    start_time = time.time()

    # Use Spark SQL row_number for batching (avoid limit/offset issues)
    batch_df = spark.sql(f"""
        SELECT *
        FROM (
            SELECT *, ROW_NUMBER() OVER (ORDER BY total_amount) AS rn
            FROM train_df_2024
        ) tmp
        WHERE rn > {offset} AND rn <= {offset + BATCH_SIZE}
    """)

    batch_pandas_df = batch_df.toPandas()
    if batch_pandas_df.empty:
        break

    # Split features and target
    X_batch = batch_pandas_df[all_cols_for_preprocessor]
    y_batch = batch_pandas_df[target_col]

    # Preprocess
    X_batch_processed = preprocessor.transform(X_batch)

    # Partial fit
    model.partial_fit(X_batch_processed, y_batch)

    offset += len(X_batch)
    end_time = time.time()
    print(f"✅ Batch trained: {len(X_batch)} rows, Time: {end_time - start_time:.2f}s, Total rows processed: {offset}")

print("🎉 Model trained on full dataset using partial_fit.")

Total rows: 25689161
Training in 129 batches of up to 200000 rows.
✅ Batch trained: 200000 rows, Time: 15.50s, Total rows processed: 200000
✅ Batch trained: 200000 rows, Time: 15.71s, Total rows processed: 400000
✅ Batch trained: 200000 rows, Time: 15.84s, Total rows processed: 600000
✅ Batch trained: 200000 rows, Time: 15.93s, Total rows processed: 800000
✅ Batch trained: 200000 rows, Time: 15.49s, Total rows processed: 1000000
✅ Batch trained: 200000 rows, Time: 15.75s, Total rows processed: 1200000
✅ Batch trained: 200000 rows, Time: 15.22s, Total rows processed: 1400000
✅ Batch trained: 200000 rows, Time: 15.05s, Total rows processed: 1600000
✅ Batch trained: 200000 rows, Time: 15.43s, Total rows processed: 1800000
✅ Batch trained: 200000 rows, Time: 15.31s, Total rows processed: 2000000
✅ Batch trained: 200000 rows, Time: 15.04s, Total rows processed: 2200000
✅ Batch trained: 200000 rows, Time: 15.21s, Total rows processed: 2400000
✅ Batch trained: 200000 rows, Time: 15.00s, Total

In [0]:
# Save preprocessor and model

# Define the path
save_path = "/Volumes/workspace/bde/assignment2"

# Ensure directory exists
os.makedirs(save_path, exist_ok=True)

# Save them
joblib.dump(model, os.path.join(save_path, "sdgregressor_model.joblib"))
joblib.dump(preprocessor, os.path.join(save_path, "preprocessor.joblib"))

print("✅ Model and preprocessor saved successfully.")

✅ Model and preprocessor saved successfully.


In [0]:
# Convert the test_df to pandas dataframe
test_df = test_df_2024.toPandas()

In [0]:
# Create X_test and y_test
X_test = test_df.drop("total_amount",axis=1)
y_test = test_df["total_amount"]


In [0]:
# Preprocess the X_test
X_test_processed = preprocessor.transform(X_test)

In [0]:
# Due to long running time sometimes it shows there is no packages imported
# So for safer side import numpy and os again
import numpy as np
import os

save_path = "/Volumes/workspace/bde/assignment2"

np.save(os.path.join(save_path, "X_test_processed.npy"), X_test_processed)
np.save(os.path.join(save_path, "y_test.npy"), y_test.to_numpy())

print("✅ Saved X_test_processed and y_test as .npy files")


✅ Saved X_test_processed and y_test as .npy files


In [0]:
# Calculate RMSE for the test set
from sklearn.metrics import root_mean_squared_error
y_pred = model.predict(X_test_processed)
rmse = root_mean_squared_error(y_test, y_pred)
print(f"RMSE: {rmse:.2f}")

RMSE: 73.64
