In [61]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import hour, dayofweek, month, radians, asin, sqrt, pow, dayofyear, log, exp, sin, cos, col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [62]:
spark = SparkSession.builder \
    .appName("Problem2 Structured API") \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.log.level", "ERROR") \
    .getOrCreate()

Setting Spark log level to "ERROR".


In [63]:
# Load data
try:
    train_data = spark.read.csv("train.csv", header=True, inferSchema=True)
except Exception as e:
    print(f"Error loading train data: {e}")
    spark.stop()
    exit(1)

                                                                                

In [64]:
# Feature engineering function
def prepare_features(data):
    return data.withColumn("pickup_day_of_year", dayofyear("pickup_datetime")) \
        .withColumn("pickup_day_of_week", dayofweek("pickup_datetime")) \
        .withColumn("pickup_hour_of_day", hour("pickup_datetime")) \
        .withColumn("pickup_month", month("pickup_datetime")) \
        .withColumn("haversine",
                    6371 * 2 * asin(sqrt(pow(sin((radians("pickup_latitude") - radians("dropoff_latitude")) / 2), 2) +
                                         cos(radians("pickup_latitude")) * cos(radians("dropoff_latitude")) *
                                         pow(sin((radians("pickup_longitude") - radians("dropoff_longitude")) / 2),
                                             2))))


# Data cleaning and outlier removal using IQR
def remove_outliers(data, column, lower_quantile=0.25, upper_quantile=0.75, k=1.5):
    quantiles = data.approxQuantile(column, [lower_quantile, upper_quantile], 0.05)
    iqr = quantiles[1] - quantiles[0]
    lower_bound = quantiles[0] - k * iqr
    upper_bound = quantiles[1] + k * iqr
    return data.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))


# Evaluate model
def evaluate_model(predictions, dataset_name):
    predictions = predictions.withColumn("predicted_duration", col("prediction"))
    metrics = {}
    for metric in ["rmse", "mse", "mae", "r2"]:
        evaluator = RegressionEvaluator(labelCol="trip_duration", predictionCol="predicted_duration", metricName=metric)
        value = evaluator.evaluate(predictions)
        metrics[metric] = value
        print(f"\n{metric.upper()} on {dataset_name} = {value}")
    return metrics

In [65]:
# Features
train_data = prepare_features(train_data)
train_data.cache()

# Remove outliers
train_data = train_data.filter("haversine > 0").filter("passenger_count > 0").filter("trip_duration > 0")
train_data = remove_outliers(train_data, "trip_duration")
train_data = remove_outliers(train_data, "haversine")

                                                                                

In [66]:
features = ["pickup_longitude", "pickup_latitude", "dropoff_longitude", "dropoff_latitude",
            "pickup_day_of_year", "pickup_day_of_week", "pickup_hour_of_day", "pickup_month",
            "haversine", "passenger_count"]

# VectorAssembler
assembler = VectorAssembler(inputCols=features, outputCol="features")
assembled_data = assembler.transform(train_data).select("id", "features", "trip_duration")
assembled_data.cache()

DataFrame[id: string, features: vector, trip_duration: int]

In [67]:
# Define Decision Tree Regressor
decision_tree = DecisionTreeRegressor(featuresCol="features", labelCol="trip_duration", seed=42)

# Define parameter grid for hyperparameter tuning
param_grid = ParamGridBuilder() \
    .addGrid(decision_tree.maxDepth, [13, 15, 17]) \
    .addGrid(decision_tree.minInstancesPerNode, [13, 15, 17]) \
    .build()

# Define evaluator
evaluator = RegressionEvaluator(labelCol="trip_duration", predictionCol="prediction", metricName="rmse")

# Set up CrossValidator
crossval = CrossValidator(estimator=decision_tree,
                          estimatorParamMaps=param_grid,
                          evaluator=evaluator,
                          numFolds=5,
                          seed=42)

In [68]:
# Train model
try:
    cv_model = crossval.fit(assembled_data)
    best_model = cv_model.bestModel
except Exception as e:
    print(f"Error during model training: {e}")
    spark.stop()
    exit(1)

# Split into training and validation sets for final evaluation
train, validation = assembled_data.randomSplit([0.8, 0.2], seed=42)

                                                                                

In [69]:
# Predictions and evaluation
train_predictions = best_model.transform(train)
print("\n=== Training Data Metrics ===")
train_metrics = evaluate_model(train_predictions, "training data")

validation_predictions = best_model.transform(validation)
print("\n=== Validation Data Metrics ===")
validation_metrics = evaluate_model(validation_predictions, "validation data")


=== Training Data Metrics ===


                                                                                


RMSE on training data = 206.6687132473464

MSE on training data = 42711.95703531389

MAE on training data = 149.83696476164894

R2 on training data = 0.6883834462327483

=== Validation Data Metrics ===

RMSE on validation data = 207.3154819234736

MSE on validation data = 42979.70904516211

MAE on validation data = 150.28094894216198

R2 on validation data = 0.688575926809168


In [70]:
# Model diagnostics
print("\n=== Model Diagnostics ===")
print("Feature Importances:")
for i, importance in enumerate(best_model.featureImportances):
    print(f"{features[i]}: {importance}")

print("Decision Tree Depth:", best_model.depth)
print("Number of Nodes:", best_model.numNodes)
print("Number of Features:", best_model.numFeatures)
print("Best maxDepth:", best_model.getMaxDepth())
print("Best minInstancesPerNode:", best_model.getMinInstancesPerNode())

try:
    test_data = spark.read.csv("test.csv", header=True, inferSchema=True)
except Exception as e:
    print(f"Error reading test CSV file: {e}")
    spark.stop()
    exit(1)

test_data = prepare_features(test_data)

# test_data = test_data.filter("haversine > 0").filter("passenger_count > 0")

test_assembled = assembler.transform(test_data).select("id", "features")

test_predictions = best_model.transform(test_assembled)

test_predictions = test_predictions.withColumn("trip_duration", col("prediction"))

test_output = test_predictions.select("id", "trip_duration")

try:
    test_output.coalesce(1).write.csv("test_predictions.csv", header=True, mode="overwrite")
    print("Predictions saved to test_predictions.csv")
except Exception as e:
    print(f"Error saving predictions: {e}")

train_data.unpersist()
assembled_data.unpersist()
spark.stop()


=== Model Diagnostics ===
Feature Importances:
pickup_longitude: 0.03318464502278229
pickup_latitude: 0.023754582305066663
dropoff_longitude: 0.03166196209901136
dropoff_latitude: 0.06300916238022077
pickup_day_of_year: 0.0139513480960523
pickup_day_of_week: 0.048524843840509924
pickup_hour_of_day: 0.10514798770599902
pickup_month: 0.0008609879923865999
haversine: 0.6791743478368988
passenger_count: 0.0007301327210722027
Decision Tree Depth: 17
Number of Nodes: 49379
Number of Features: 10
Best maxDepth: 17
Best minInstancesPerNode: 17


                                                                                

Predictions saved to test_predictions.csv
