In [0]:
# ----------------------------------------
# 1. Import Necessary Libraries
# ----------------------------------------
import mlflow
import mlflow.spark

from pyspark.sql.functions import col
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
import os

# ----------------------------------------
# 2. Set MLflow Experiment
# ----------------------------------------
# Use a simple experiment name to avoid resource conflicts.
experiment_name = "f1_experiment_hw4"
try:
    mlflow.set_experiment(experiment_name)
except Exception as e:
    print("Warning: Could not set experiment. Continuing...\n", e)

# ----------------------------------------
# 3. Read Data from S3 (CSV files)
# ----------------------------------------
# Ensure your cluster has permission to access the S3 bucket.
df_laptimes = spark.read.csv("s3://columbia-gr5069-main/raw/lap_times.csv", header=True, inferSchema=True)
df_drivers  = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)
df_pitstops = spark.read.csv("s3://columbia-gr5069-main/raw/pit_stops.csv", header=True, inferSchema=True)
df_results  = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True, inferSchema=True)
df_races    = spark.read.csv("s3://columbia-gr5069-main/raw/races.csv", header=True, inferSchema=True)

# ----------------------------------------
# 4. Data Preprocessing and Joining
# ----------------------------------------
# Join 'results' with 'drivers' on "driverId" and then join with 'races' on "raceId"
df_model = df_results.join(df_drivers, on="driverId", how="inner")
df_model = df_model.join(df_races, on="raceId", how="inner")

# Create a binary label "podium": 1 if position <= 3, 0 otherwise.
df_model = df_model.withColumn("podium", (col("position") <= 3).cast("integer"))

# Filter out any rows with NULL labels (to prevent training errors).
df_model = df_model.filter(col("podium").isNotNull())

# (Optional) Display the first few rows.
display(df_model.limit(5))

# ----------------------------------------
# 5. Construct a Spark ML Pipeline
# ----------------------------------------
# Features used: 
#   - "driverRef": a string feature (to be indexed and one-hot encoded)
#   - "grid": a numeric feature.
#
# a) Convert "driverRef" into numeric indices.
driver_indexer = StringIndexer(inputCol="driverRef", outputCol="driverRef_idx", handleInvalid="skip")
# b) One-Hot Encode the indexed "driverRef"
driver_encoder = OneHotEncoder(inputCols=["driverRef_idx"], outputCols=["driverRef_ohe"])
# c) Assemble "driverRef_ohe" and the numeric "grid" into one feature vector.
assembler = VectorAssembler(inputCols=["driverRef_ohe", "grid"], outputCol="features")
# d) Create a RandomForestClassifier to predict the binary "podium" label.
rf = RandomForestClassifier(
    labelCol="podium",
    featuresCol="features",
    maxDepth=5,    # initial value; will be tuned
    numTrees=50    # initial value; will be tuned
)

# Build the pipeline.
pipeline = Pipeline(stages=[driver_indexer, driver_encoder, assembler, rf])

# ----------------------------------------
# 6. Split the Data into Train and Test Sets
# ----------------------------------------
train_df, test_df = df_model.randomSplit([0.8, 0.2], seed=42)

# ----------------------------------------
# 7. MLflow Experiment: Hyperparameter Tuning and Logging
# ----------------------------------------
# Define evaluators for F1 score and Accuracy.
f1_evaluator = MulticlassClassificationEvaluator(labelCol="podium", predictionCol="prediction", metricName="f1")
accuracy_evaluator = MulticlassClassificationEvaluator(labelCol="podium", predictionCol="prediction", metricName="accuracy")

# Define hyperparameter grids:
# Use 4 different values for maxDepth and 3 for numTrees to yield 12 runs (>= 10 experiments).
maxDepth_values = [3, 5, 7, 9]
numTrees_values = [20, 50, 100]

# Loop over each combination of hyperparameters.
for maxDepth in maxDepth_values:
    for numTrees in numTrees_values:
        run_name = f"RF_md={maxDepth}_nt={numTrees}"
        with mlflow.start_run(run_name=run_name):
            # Update the classifier hyperparameters.
            rf.setMaxDepth(maxDepth)
            rf.setNumTrees(numTrees)
            
            # Train the model pipeline.
            model = pipeline.fit(train_df)
            
            # Generate predictions on the test set.
            predictions = model.transform(test_df)
            
            # Evaluate the model.
            f1_score = f1_evaluator.evaluate(predictions)
            accuracy_score = accuracy_evaluator.evaluate(predictions)
            
            # Log hyperparameters and metrics.
            mlflow.log_param("maxDepth", maxDepth)
            mlflow.log_param("numTrees", numTrees)
            mlflow.log_metric("f1_score", f1_score)
            mlflow.log_metric("accuracy", accuracy_score)
            
            # Log the trained Spark ML model as an artifact.
            mlflow.spark.log_model(model, artifact_path="spark_rf_model")
            
            # Artifact 1: Generate and log a confusion matrix plot.
            pdf = predictions.select("podium", "prediction").toPandas()
            cm = pd.crosstab(pdf["podium"], pdf["prediction"], rownames=["Actual"], colnames=["Predicted"])
            plt.figure(figsize=(5, 4))
            sns.heatmap(cm, annot=True, fmt="d", cmap="Blues")
            plt.title(f"Confusion Matrix (maxDepth={maxDepth}, numTrees={numTrees})")
            cm_plot_path = f"confusion_matrix_md{maxDepth}_nt{numTrees}.png"
            plt.savefig(cm_plot_path)
            mlflow.log_artifact(cm_plot_path)
            plt.close()
            
            # Artifact 2: Log a text file containing the model's feature importances.
            # Since our pipeline's last stage is the RandomForestClassifier, we extract its feature importances.
            rf_model = model.stages[-1]
            importances = rf_model.featureImportances
            fi_text_path = f"feature_importance_md{maxDepth}_nt{numTrees}.txt"
            with open(fi_text_path, "w") as f:
                f.write("Feature Importances:\n")
                f.write(str(importances))
            mlflow.log_artifact(fi_text_path)
            
            print(f"Run: {run_name} | F1 Score: {f1_score:.4f} | Accuracy: {accuracy_score:.4f}")

# ----------------------------------------
# 8. Next Steps: Review Results and Summarize Best Run
# ----------------------------------------
# After the experiments complete, go to the MLflow UI (found in the notebook sidebar under 'Experiments')
# to review all runs. Identify the run with the best evaluation metrics (e.g., highest F1 score) and include
# a brief explanation in your report on why that run is the best model.
#
# Finally, capture screenshots of:
#   (a) The MLflow Experiment Homepage (showing a table of runs with parameters and metrics).
#   (b) The Detailed Run Page for at least one run.
#
# These screenshots should be included in your GitHub Classroom submission.


2025/04/09 22:05:35 INFO mlflow.tracking.fluent: Experiment with name 'f1_experiment_hw4' does not exist. Creating a new experiment.


 INVALID_PARAMETER_VALUE: Got an invalid experiment name 'f1_experiment_hw4'. An experiment name must be an absolute path within the Databricks workspace, e.g. '/Users/<some-username>/my-experiment'.


raceId,driverId,resultId,constructorId,number,grid,position,positionText,positionOrder,points,laps,time,milliseconds,fastestLap,rank,fastestLapTime,fastestLapSpeed,statusId,driverRef,number.1,code,forename,surname,dob,nationality,url,year,round,circuitId,name,date,time.1,url.1,fp1_date,fp1_time,fp2_date,fp2_time,fp3_date,fp3_time,quali_date,quali_time,sprint_date,sprint_time,podium
18,1,1,1,22,1,1,1,1,10.0,58,1:34:50.616,5690616,39,2,1:27.452,218.3,1,hamilton,44,HAM,Lewis,Hamilton,1985-01-07,British,http://en.wikipedia.org/wiki/Lewis_Hamilton,2008,1,1,Australian Grand Prix,2008-03-16,04:30:00,http://en.wikipedia.org/wiki/2008_Australian_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,1
18,2,2,2,3,5,2,2,2,8.0,58,+5.478,5696094,41,3,1:27.739,217.586,1,heidfeld,\N,HEI,Nick,Heidfeld,1977-05-10,German,http://en.wikipedia.org/wiki/Nick_Heidfeld,2008,1,1,Australian Grand Prix,2008-03-16,04:30:00,http://en.wikipedia.org/wiki/2008_Australian_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,1
18,3,3,3,7,7,3,3,3,6.0,58,+8.163,5698779,41,5,1:28.090,216.719,1,rosberg,6,ROS,Nico,Rosberg,1985-06-27,German,http://en.wikipedia.org/wiki/Nico_Rosberg,2008,1,1,Australian Grand Prix,2008-03-16,04:30:00,http://en.wikipedia.org/wiki/2008_Australian_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,1
18,4,4,4,5,11,4,4,4,5.0,58,+17.181,5707797,58,7,1:28.603,215.464,1,alonso,14,ALO,Fernando,Alonso,1981-07-29,Spanish,http://en.wikipedia.org/wiki/Fernando_Alonso,2008,1,1,Australian Grand Prix,2008-03-16,04:30:00,http://en.wikipedia.org/wiki/2008_Australian_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,0
18,5,5,1,23,3,5,5,5,4.0,58,+18.014,5708630,43,1,1:27.418,218.385,1,kovalainen,\N,KOV,Heikki,Kovalainen,1981-10-19,Finnish,http://en.wikipedia.org/wiki/Heikki_Kovalainen,2008,1,1,Australian Grand Prix,2008-03-16,04:30:00,http://en.wikipedia.org/wiki/2008_Australian_Grand_Prix,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,0


2025/04/09 22:06:11 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=3_nt=20 | F1 Score: 0.6845 | Accuracy: 0.7806


2025/04/09 22:07:14 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=3_nt=50 | F1 Score: 0.6845 | Accuracy: 0.7806


2025/04/09 22:08:09 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=3_nt=100 | F1 Score: 0.6845 | Accuracy: 0.7806


2025/04/09 22:09:12 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=5_nt=20 | F1 Score: 0.6845 | Accuracy: 0.7806


2025/04/09 22:10:08 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=5_nt=50 | F1 Score: 0.6845 | Accuracy: 0.7806


2025/04/09 22:11:11 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=5_nt=100 | F1 Score: 0.6845 | Accuracy: 0.7806


2025/04/09 22:12:09 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=7_nt=20 | F1 Score: 0.6845 | Accuracy: 0.7806


2025/04/09 22:13:10 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=7_nt=50 | F1 Score: 0.6845 | Accuracy: 0.7806


2025/04/09 22:14:11 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=7_nt=100 | F1 Score: 0.6845 | Accuracy: 0.7806


2025/04/09 22:15:10 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=9_nt=20 | F1 Score: 0.6845 | Accuracy: 0.7806


2025/04/09 22:16:13 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=9_nt=50 | F1 Score: 0.6845 | Accuracy: 0.7806


2025/04/09 22:17:14 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Run: RF_md=9_nt=100 | F1 Score: 0.6845 | Accuracy: 0.7806
