# Homework #4: model building and tracking -- Yuxi Sun

Instructions: For this assignment, we’d like you to use the F1 Datasets we have been using for the class to build any ML model of your choice and track the model for each run using MLflow. Select any of the F1 datasets in AWS S3 to build your model. You are allowed to join multiple datasets.

[20 pts] Build any model of your choice with tunable hyperparameters

[20 pts] Create an experiment setup where - for each run - you log:

the hyperparameters used in the model
the model itself
every possible metric from the model you chose
at least two artifacts (plots, or csv files)
[20 pts] Track your MLFlow experiment and run at least 10 experiments with different parameters each

[20 pts] Select your best model run and explain why

### I am going to build a Decision Tree model that predicts the finishing position of the driver as a numeric value via regression.I will be using DecisionTreeRegressor for the regression, and MAE, MSE, R^2 for the metrics. 

## Data Preparattion

In [0]:
import os
import uuid
import logging

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml import Pipeline

import mlflow
import mlflow.spark
import matplotlib.pyplot as plt
import pandas as pd
import os
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score


# the S3 path
s3_path = "s3a://columbia-gr5069-main/raw/"

# read the CSV files into spark dataFrames
df_races = (spark.read.csv(f"{s3_path}/races.csv", header = True))
df_results = (spark.read.csv(f"{s3_path}/results.csv", header = True))


# join results and races tables
df_joined = (
    df_results.alias("res")
    .join(df_races.alias("rac"), F.col("res.raceId") == F.col("rac.raceId"))
    .select(
        "res.raceId",
        "res.driverId",
        "res.constructorId",
        "res.grid",
        "res.positionOrder", 
        "res.laps",
        "res.milliseconds",
        "rac.year",
    )
)

# only keep rows with a positionOrder which is non-null and > 0
df_valid = df_joined.filter(
    (F.col("positionOrder").isNotNull()) & (F.col("positionOrder") > 0)
)

# select features and label
df_features = df_valid.select(
    F.col("grid").cast(FloatType()),
    F.col("laps").cast(FloatType()),
    F.col("milliseconds").cast(FloatType()),
    F.col("positionOrder").cast(FloatType()).alias("label")
).fillna(0, subset=["grid", "laps", "milliseconds"])

# split into train/test
train_df, test_df = df_features.randomSplit([0.7, 0.3], seed=42)
print(f"Train set size: {train_df.count()}")
print(f"Test set size:  {test_df.count()}")

Train set size: 18296
Test set size:  7784


## Build a Spark ML Pipeline with Decision Tree

In [0]:
assembler = VectorAssembler(
    inputCols=["grid", "laps", "milliseconds"],
    outputCol="features"
)

dt = DecisionTreeRegressor(
    labelCol="label",
    featuresCol="features"
)

pipeline = Pipeline(stages=[assembler, dt])

## Set Up an MLflow Experiment

In [0]:
mlflow.set_experiment("/Users/ys3874@columbia.edu/HW4/f1-decisionTree-exp")

<Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/541281381338533', creation_time=1744137922619, experiment_id='541281381338533', last_update_time=1744137922619, lifecycle_stage='active', name='/Users/ys3874@columbia.edu/HW4/f1-decisionTree-exp', tags={'mlflow.experiment.sourceName': '/Users/ys3874@columbia.edu/HW4/f1-decisionTree-exp',
 'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
 'mlflow.ownerEmail': 'ys3874@columbia.edu',
 'mlflow.ownerId': '6756958367820344'}>

## Train & Track Multiple Runs

In [0]:
logging.getLogger("mlflow.utils.environment").setLevel(logging.ERROR)

# hyperparam values inputed by myself
maxDepth_values = [2, 6, 12]
maxBins_values = [16, 32, 64]
minInfoGain_values = [0.01, 0.05, 0.1]

# control to run 10 times
param_combinations = [
    (2, 16, 0.01),
    (2, 32, 0.05),
    (2, 64, 0.05),
    (6, 16, 0.01),
    (6, 32, 0.05),
    (6, 32, 0.1),
    (6, 64, 0.05),
    (12, 32, 0.01),
    (12, 32, 0.05),
    (12, 64, 0.1)
]

# some printing parameters
best_mse = float("inf")
best_run_id = None
best_params = None

for (max_depth, max_bins, min_info_gain) in param_combinations:
    with mlflow.start_run() as run:
        run_id = run.info.run_id
        
        # log hyperparams
        mlflow.log_param("maxDepth", max_depth)
        mlflow.log_param("maxBins", max_bins)
        mlflow.log_param("minInfoGain", min_info_gain)
        
        # create a new pipeline with these hyperparams
        current_dt = DecisionTreeRegressor(
            labelCol="label",
            featuresCol="features",
            maxDepth=max_depth,
            maxBins=max_bins,
            minInfoGain=min_info_gain
        )
        current_pipeline = Pipeline(stages=[assembler, current_dt])
        
        # fit the model
        model = current_pipeline.fit(train_df)
        
        # evaluate via test set
        predictions = model.transform(test_df)
        
        # collect predictions
        pdf = predictions.select("label", "prediction").toPandas()
        
        # calculate metrics
        mse = mean_squared_error(pdf["label"], pdf["prediction"])
        mae = mean_absolute_error(pdf["label"], pdf["prediction"])
        r2 = r2_score(pdf["label"], pdf["prediction"])
        
        # log metrics
        mlflow.log_metric("mse", mse)
        mlflow.log_metric("mae", mae)
        mlflow.log_metric("r2", r2)
        
        # create plot: actual vs. predicted finishing position
        fig, ax = plt.subplots()
        ax.scatter(pdf["label"], pdf["prediction"], alpha=0.5)
        ax.set_xlabel("Actual Finishing Position")
        ax.set_ylabel("Predicted Finishing Position")
        ax.set_title("Actual vs. Predicted Finishing Position")
        
        # save plot
        plot_path = f"scatter_{run_id}.png"
        plt.savefig(plot_path)
        plt.close(fig)
        
        # log artifact
        mlflow.log_artifact(plot_path)
        
        # log predictions in a CSV file
        csv_path = f"predictions_{run_id}.csv"
        pdf.to_csv(csv_path, index=False)
        mlflow.log_artifact(csv_path)
        
        # log the Spark model
        mlflow.spark.log_model(model, artifact_path="decision-tree-regressor")
        
        # clean up
        os.remove(plot_path)
        os.remove(csv_path)
        
        # select best model based on MSE
        if mse < best_mse:
            best_mse = mse
            best_run_id = run_id
            best_params = (max_depth, max_bins, min_info_gain)

print(f"Best MSE: {best_mse}")
print(f"Best run ID: {best_run_id}")
print(f"Best params: (maxDepth={best_params[0]}, maxBins={best_params[1]}, minInfoGain={best_params[2]})")

2025/04/08 19:02:04 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
2025/04/08 19:02:44 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
2025/04/08 19:03:24 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
2025/04/08 19:04:10 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_require

Best MSE: 13.474520365813373
Best run ID: b6350355c72440bc95fad2e6b40c7a78
Best params: (maxDepth=6, maxBins=64, minInfoGain=0.05)


As can be seen from the results, the combination "maxDepth=6, maxBins=64, minInfoGain=0.05" obtains the lowest MSE, which I consider it as the best model run. 

For max depth, the best parameter is 6, among 2,6,12. The depth = 6 is deep enough to capture nuanced relationships, while 2 is too shallower and 12 is too deep that may face overfitting.

For max bins, the best parameter is 64, among 16, 32, 64. The bins = 64 gives the model finer-grained split thresholds that helps the tree isolate patterns relevant to finishing position without causing significant overfitting. 

For minimun information gain, the best parameter is 0.05, among 0.01,0.05,0.1. The minInfoGain = 0.05 allows the model to require each split to be at least 5% improvement, which avoids more noise comparing to 0.01.

Overall, with maxDepth = 6 and maxBins = 64, the model is capable of finding proper splits while avoid overfitting and underfitting by setting the mininum information gain to be 5%.