# Homework 5: model deployment -- Yuxi Sun

Instructions: Using the F1 dataset, build a predictive model and log it in MLflow and write the ML model predictions into a database.

[20 pts] Create two (2) new tables in your own database where you'll store the predictions from each model for this exercise.
[30 pts] Build two (2) predictive models using MLflow, logging hyperparameters, the model itself, four metrics, and two artifcats. Submit your MLflow experiments as part of your assignments
[30 pts] For each model, store its predictions in the corresponding table you created in your own database. Ensure you are using your own database to store your predictions.
[20 pts] Push your code to GitHub upon completion.


In [0]:

import os
import tempfile

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import FloatType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

import mlflow, mlflow.spark
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.metrics import (mean_squared_error, mean_absolute_error, r2_score, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix)

# paths
S3_PATH = "s3a://columbia-gr5069-main/raw/"
MLFLOW_EXPERIMENT = "/Users/ys3874@columbia.edu/HW5/f1-finishpos-podium" 
DB_NAME = "f1_db_hw5"

# generate database
def create_databricks_db(spark):
    spark.sql(f"""
      CREATE DATABASE IF NOT EXISTS {DB_NAME}
      COMMENT 'Holds F1 model prediction tables';
    """)
    print(f"Database `{DB_NAME}` is ready.")

# data process
def prepare_data(spark):
    df_races   = spark.read.csv(f"{S3_PATH}/races.csv",   header=True, inferSchema=True)
    df_results = spark.read.csv(f"{S3_PATH}/results.csv", header=True, inferSchema=True)
    df = (
        df_results.alias("r")
          .join(df_races.alias("c"), F.col("r.raceId")==F.col("c.raceId"))
          .select("r.raceId","r.driverId","r.grid","r.laps","r.milliseconds",
                  "r.positionOrder","r.points")
    )

    # regression df: finishing position
    df_reg = (
        df.filter(F.col("positionOrder").isNotNull())
          .select(
            "raceId","driverId",
            F.col("grid").cast(FloatType()),
            F.col("laps").cast(FloatType()),
            F.col("milliseconds").cast(FloatType()),
            F.col("positionOrder").cast(FloatType()).alias("label")
          )
          .na.fill(0, subset=["grid","laps","milliseconds"])
    )
    train_reg, test_reg = df_reg.randomSplit([0.7,0.3], seed=42)

    # classification df: podium
    df_clf = (
        df.filter(F.col("positionOrder").isNotNull())
          .select(
            "raceId","driverId",
            F.col("grid").cast(FloatType()),
            F.col("laps").cast(FloatType()),
            F.col("milliseconds").cast(FloatType()),
            F.when(F.col("positionOrder")<=3,1).otherwise(0).alias("label")
          )
          .na.fill(0, subset=["grid","laps","milliseconds"])
    )
    train_clf, test_clf = df_clf.randomSplit([0.7,0.3], seed=42)

    return train_reg, test_reg, train_clf, test_clf

# regression log and write (finish position)
def log_and_write_regressor(spark, train_df, test_df):
    assembler = VectorAssembler(inputCols=["grid","laps","milliseconds"], outputCol="features")

    # decision tree regressor, used my own hyperparameters
    dt_reg = DecisionTreeRegressor(
        featuresCol="features", labelCol="label",
        maxDepth=6, maxBins=64, minInfoGain=0.05
    )
    pipe = Pipeline(stages=[assembler, dt_reg])

    with mlflow.start_run(run_name="DTReg_FinishingPosition") as run:
        run_id = run.info.run_id

        # log parameters
        mlflow.log_params({"maxDepth": 6, "maxBins": 64, "minInfoGain": 0.05})

        model = pipe.fit(train_df)
        preds = model.transform(test_df)
        pdf = preds.select("label","prediction").toPandas()

        # metrics
        mse  = mean_squared_error(pdf["label"], pdf["prediction"])
        mae  = mean_absolute_error(pdf["label"], pdf["prediction"])
        r2   = r2_score(pdf["label"], pdf["prediction"])
        rmse = mse ** 0.5

        #log metrics
        mlflow.log_metrics({"mse":mse, "mae":mae, "rmse":rmse, "r2":r2})

        # artifact 1: feature importances CSV
        feats = ["grid","laps","milliseconds"]
        fi = model.stages[-1].featureImportances.toArray()
        df_fi = pd.DataFrame({"feature":feats, "importance":fi})
        tmp_csv = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
        df_fi.to_csv(tmp_csv.name, index=False)
        mlflow.log_artifact(tmp_csv.name, artifact_path="feature_importances") # log csv
        os.unlink(tmp_csv.name)

        # artifact 2: scatter plot
        fig, ax = plt.subplots()
        ax.scatter(pdf["label"], pdf["prediction"], alpha=0.5)
        ax.set_xlabel("Actual Position"); ax.set_ylabel("Predicted Position")
        ax.set_title("Actual vs. Predicted")
        tmp_png = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
        fig.savefig(tmp_png.name); plt.close(fig)
        mlflow.log_artifact(tmp_png.name, artifact_path="plots") # log scatter plot
        os.unlink(tmp_png.name)

        mlflow.spark.log_model(model, artifact_path="model") # log model

        # write into table
        spark.sql(f"USE {DB_NAME}")
        preds_to_write = preds.select(
            "raceId","driverId",
            F.col("prediction"),
            F.col("label").alias("actual")
        ).withColumn("run_id", F.lit(run_id)).withColumn("prediction_time", F.current_timestamp())

        preds_to_write.write.mode("append").saveAsTable(f"{DB_NAME}.finishing_position_predictions")

# classifier log and write (podium)
def log_and_write_classifier(spark, train_df, test_df):
    assembler = VectorAssembler(inputCols=["grid","laps","milliseconds"], outputCol="features")

    # decision tree classifier, used my own hyperparameters
    dt_clf = DecisionTreeClassifier(
        featuresCol="features", labelCol="label",
        maxDepth=5, maxBins=32, minInfoGain=0.01
    )
    pipe = Pipeline(stages=[assembler, dt_clf])

    with mlflow.start_run(run_name="DTClf_Podium") as run:
        run_id = run.info.run_id

        # log parameters
        mlflow.log_params({"maxDepth":5, "maxBins":32, "minInfoGain":0.01})

        model = pipe.fit(train_df)
        preds = model.transform(test_df)
        pdf = preds.select("label","prediction").toPandas()

        # metrics
        acc  = accuracy_score(pdf["label"], pdf["prediction"])
        prec = precision_score(pdf["label"], pdf["prediction"], zero_division=0)
        rec  = recall_score(pdf["label"], pdf["prediction"], zero_division=0)
        f1   = f1_score(pdf["label"], pdf["prediction"], zero_division=0)

        # log metrics
        mlflow.log_metrics({"accuracy":acc, "precision":prec, "recall":rec, "f1":f1})

        # artifact 1: confusion matrix
        cm = confusion_matrix(pdf["label"], pdf["prediction"])
        fig, ax = plt.subplots()
        ax.matshow(cm)
        ax.set_xlabel("Predicted"); ax.set_ylabel("Actual")
        tmp_png = tempfile.NamedTemporaryFile(delete=False, suffix=".png")
        fig.savefig(tmp_png.name); plt.close(fig)
        mlflow.log_artifact(tmp_png.name, artifact_path="plots") # log confusion matrix
        os.unlink(tmp_png.name)

        # artifact 2: predictions CSV
        tmp_csv = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
        pdf.to_csv(tmp_csv.name, index=False)
        mlflow.log_artifact(tmp_csv.name, artifact_path="predictions") # log predictions csv
        os.unlink(tmp_csv.name)

        mlflow.spark.log_model(model, artifact_path="model") # log model

        # write into table
        spark.sql(f"USE {DB_NAME}")
        preds_to_write = preds.select(
            "raceId","driverId",
            F.col("prediction"),
            F.col("label").alias("actual")
        ).withColumn("run_id", F.lit(run_id)).withColumn("prediction_time", F.current_timestamp())

        preds_to_write.write.mode("append").saveAsTable(f"{DB_NAME}.podium_predictions")

if __name__ == "__main__":
    spark = SparkSession.builder.appName("F1ModelDeploymentDB").getOrCreate()
    mlflow.set_experiment(MLFLOW_EXPERIMENT)

    create_databricks_db(spark)
    tr_reg, te_reg, tr_clf, te_clf = prepare_data(spark)

    log_and_write_regressor(spark, tr_reg, te_reg)
    log_and_write_classifier(spark, tr_clf, te_clf)


Database `f1_db_hw5` is ready.


2025/04/29 02:59:18 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
2025/04/29 03:00:15 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().
