In [0]:
import mlflow
import mlflow.spark
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from pyspark.sql.functions import col

In [0]:
races = spark.read.csv("s3://columbia-gr5069-main/raw/races.csv", header=True, inferSchema=True)
results = spark.read.csv("s3://columbia-gr5069-main/raw/results.csv", header=True, inferSchema=True)
drivers = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)
circuits = spark.read.csv("s3://columbia-gr5069-main/raw/circuits.csv", header=True, inferSchema=True)

In [0]:
def prepare_data():
    # Join datasets
    data = results.join(races, "raceId") \
                 .join(drivers, "driverId") \
                 .join(circuits, "circuitId") \
                 .select(
                     "positionOrder",  # Target variable
                     "grid",
                     "laps",
                     "points",
                     "year",
                     "round",
                     "circuitId",
                     "driverId"
                 )
    
    # Handle missing values and create features
    data = data.na.fill(0)
    
    # Create feature vector
    feature_cols = ["grid", "laps", "points", "year", "round", "circuitId", "driverId"]
    assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
    final_data = assembler.transform(data)
    
    return final_data

In [0]:
prepare_data().display()

In [0]:
feature_cols = ["grid", "laps", "points", "year", "round", "circuitId", "driverId"]

In [0]:
def create_plots(y_true, y_pred, run_id, feature_importances):
    # Confusion Matrix
    plt.figure(figsize=(10, 6))
    cm = pd.crosstab(y_true, y_pred)  # y_true and y_pred are 1D Series
    cm.plot(kind='bar')
    plt.title("Confusion Matrix")
    plt.savefig(f"/dbfs/confusion_matrix_{run_id}.png")
    plt.close()  
    # Feature Importance
    plt.figure(figsize=(10, 6))
    plt.bar(range(len(feature_cols)), feature_importances)
    plt.xticks(range(len(feature_cols)), feature_cols, rotation=45)
    plt.title("Feature Importance")
    plt.savefig(f"/dbfs/feature_importance_{run_id}.png")
    plt.close()

In [0]:
mlflow.set_experiment("/Users/yw4407@columbia.edu/F1_Race_Prediction")

In [0]:
data = prepare_data()
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [0]:
param_grid = [
    {"numTrees": nt, "maxDepth": md}
    for nt in [10, 20, 30, 40, 50]
    for md in [5, 10]
]

In [0]:
for params in param_grid[:10]:  # Ensure exactly 10 runs
    with mlflow.start_run():
        # Create and train model
        rf = RandomForestClassifier(
            labelCol="positionOrder",
            featuresCol="features",
            numTrees=params["numTrees"],
            maxDepth=params["maxDepth"]
        )
        model = rf.fit(train_data)
        
        # Make predictions
        predictions = model.transform(test_data)
        
        # Calculate metrics
        evaluators = {
            "accuracy": MulticlassClassificationEvaluator(labelCol="positionOrder", predictionCol="prediction", metricName="accuracy"),
            "f1": MulticlassClassificationEvaluator(labelCol="positionOrder", predictionCol="prediction", metricName="f1"),
            "weightedPrecision": MulticlassClassificationEvaluator(labelCol="positionOrder", predictionCol="prediction", metricName="weightedPrecision"),
            "weightedRecall": MulticlassClassificationEvaluator(labelCol="positionOrder", predictionCol="prediction", metricName="weightedRecall")
        }
        
        # Log parameters
        mlflow.log_params(params)
        
        # Log metrics
        metrics = {metric: evaluator.evaluate(predictions) for metric, evaluator in evaluators.items()}
        mlflow.log_metrics(metrics)
        
        # Log model with explicit pip requirements to avoid warning
        mlflow.spark.log_model(model, "model", pip_requirements=["pyspark==3.5.0"])
        
        # Create and log artifacts
        run_id = mlflow.active_run().info.run_id
        y_true = predictions.select("positionOrder").toPandas()["positionOrder"]  # Extract Series
        y_pred = predictions.select("prediction").toPandas()["prediction"]  # Extract Series
        
        create_plots(y_true, y_pred, run_id, model.featureImportances.toArray())
        
        # Save predictions as CSV
        predictions.select("positionOrder", "prediction", "features") \
                  .toPandas() \
                  .to_csv(f"/dbfs/predictions_{run_id}.csv")
        
        # Log artifacts
        mlflow.log_artifact(f"/dbfs/confusion_matrix_{run_id}.png")
        mlflow.log_artifact(f"/dbfs/feature_importance_{run_id}.png")
        mlflow.log_artifact(f"/dbfs/predictions_{run_id}.csv")

In [0]:
# Model selection and explanation
runs = mlflow.search_runs()
best_run = runs.loc[runs['metrics.accuracy'].idxmax()]
print(f"Best model Run Name: {best_run['tags.mlflow.runName']}")
print(f"Best model Run ID: {best_run['run_id']}")
print(f"Parameters: numTrees={best_run['params.numTrees']}, maxDepth={best_run['params.maxDepth']}")
print(f"Metrics: {best_run.filter(like='metrics').to_dict()}")
print("""
Explanation: The best model was selected based on highest accuracy score as it provides
the most reliable predictions for race position outcomes. This model balances complexity
(numTrees and maxDepth) with performance, avoiding overfitting while capturing key
patterns in the F1 race data.
""")