In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
import os
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from sklearn.metrics import mean_squared_error
import mlflow
import mlflow.spark
import pandas as pd
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
from mlflow.tracking import MlflowClient
from pyspark.sql.functions import col, when

## Q1.Build any model of your choice with tunable hyperparameters

In [0]:
spark = SparkSession.builder.appName("F1_Model").getOrCreate()

In [0]:
df = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True, inferSchema=True)
display(df)

In [0]:
df = df.na.drop()
df = df.withColumn('rank', when(col('rank').cast('double').isNotNull(), col('rank').cast('double')).otherwise(0)) \
       .withColumn('grid', when(col('grid').cast('double').isNotNull(), col('grid').cast('double')).otherwise(0)) \
       .withColumn('number', when(col('number').cast('double').isNotNull(), col('number').cast('double')).otherwise(0)) \
       .withColumn('points', when(col('points').cast('double').isNotNull(), col('points').cast('double')).otherwise(0)) \
       .withColumn('laps', when(col('laps').cast('double').isNotNull(), col('laps').cast('double')).otherwise(0))
assembler = VectorAssembler(inputCols=['grid', 'number', 'points', 'laps'], outputCol='features')
data = assembler.transform(df).select('features', 'rank').withColumnRenamed('rank', 'label')
train, test = data.randomSplit([0.8, 0.2], seed=42)
rf = RandomForestClassifier(labelCol='label', featuresCol='features')
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [5, 10])
             .addGrid(rf.maxDepth, [5, 10])
             .build())
cv = CrossValidator(estimator=rf, evaluator=MulticlassClassificationEvaluator(), estimatorParamMaps=paramGrid, numFolds=3)

In [0]:
mlflow.set_experiment("/Workspace/Users/zl3373@columbia.edu/F1_ML_Experiments")

## Combined Q2 and Q3 together. It may take more than 1 min for one run and approximatly 12 mins in total for 10 runs
## Q2.Create an experiment setup where - for each run - you log: 
- the hyperparameters used in the model
- the model itself
- every possible metric from the model you chose
- at least two artifacts (plots, or csv files)
## Q3.Track your MLFlow experiment and run at least 10 experiments with different parameters each

In [0]:
for i in range(10):
    with mlflow.start_run():
        cvModel = cv.fit(train)
        bestModel = cvModel.bestModel
        # MAE
        evaluator_mae = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='mae')
        predictions = bestModel.transform(test)
        mae = evaluator_mae.evaluate(predictions)
        # MSE
        evaluator_mse = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='mse')
        mse = evaluator_mse.evaluate(predictions)
        # R2
        evaluator_r2 = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='r2')
        r2 = evaluator_r2.evaluate(predictions)     
        # Log hyperparameters and metrics
        mlflow.log_params(bestModel.extractParamMap())
        mlflow.log_metric("mae", mae)
        mlflow.log_metric("mse", mse)
        mlflow.log_metric("r2", r2)
        mlflow.spark.log_model(bestModel, "random_forest_model")
        # Feature Importance
        feature_importances = bestModel.featureImportances.toArray()
        feature_names = assembler.getInputCols()
        fi = pd.DataFrame(list(zip(feature_names, feature_importances)), columns=["Feature", "Importance"])
        fi = fi.sort_values(by="Importance", ascending=False)
        fi.to_csv("/dbfs/tmp/feature_importance.csv", index=False)
        mlflow.log_artifact("/dbfs/tmp/feature_importance.csv", artifact_path="feature_importance")
        # Residuals
        residuals = predictions.withColumn("residual", col("label") - col("prediction"))
        residuals_pd = residuals.select("label", "prediction", "residual").toPandas()
        residuals_pd.to_csv("/dbfs/tmp/residuals.csv", index=False)
        mlflow.log_artifact("/dbfs/tmp/residuals.csv", artifact_path="residuals")
        # Confusion Matrix 
        cm = predictions.groupBy('label', 'prediction').count().toPandas()
        confusion_matrix = cm.pivot(index='label', columns='prediction', values='count').fillna(0)
        plt.figure(figsize=(8, 6))
        sns.heatmap(confusion_matrix, annot=True, fmt="g", cmap="Blues")
        plt.title("Confusion Matrix")
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        plt.savefig("confusion_matrix.png")
        mlflow.log_artifact("confusion_matrix.png")
        # CSV
        predictions.toPandas().to_csv("predictions.csv", index=False)
        mlflow.log_artifact("predictions.csv")


## Q4.Select your best model run and explain why

In [0]:
def get_best_model():
    client = MlflowClient()
    experiment_id = "1571216335170020"  
    runs = client.search_runs(experiment_id)
    metrics_data = []
    for run in runs:
        run_id = run.info.run_id
        metrics = run.data.metrics
        mae = metrics.get("mae")
        mse = metrics.get("mse")
        r2 = metrics.get("r2")
        metrics_data.append({
            "run_id": run_id,
            "mae": mae,
            "mse": mse,
            "r2": r2
        })
    df = pd.DataFrame(metrics_data)
    # Select the best model with lowest MAE and lowest MSE, and highest R²
    best_run = df.loc[(df['mae'].idxmin()) & (df['mse'].idxmin()) & (df['r2'].idxmax())]
    print("Best Model Run ID:", best_run["run_id"])
    print("MAE:", best_run["mae"])
    print("MSE:", best_run["mse"])
    print("R²:", best_run["r2"])

In [0]:
get_best_model()

 I finnally decide to choose the model with Run ID: 6bf61e1172a84db6bb11a38cb11dadfa. I utilized the methods to select one with relatively lower Mean Absolute Error (MAE), lower Mean Squared Error (MSE), and higher R-squared (R²) value. The model with Run ID 6bf61e1172a84db6bb11a38cb11dadfa achieved an MAE of approximately 2.87, indicating minimal deviation between predicted and actual values, and an MSE of around 39.97, reflecting a relatively small squared error.