In [0]:
# load related dataset from S3
df_pitstops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True)
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True)
df_drivers = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True)
df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True)
df_laptimes = spark.read.csv('s3://columbia-gr5069-main/raw/lap_times.csv', header=True)
df_sprint_results = spark.read.csv('s3://columbia-gr5069-main/raw/sprint_results.csv', header=True)

1. [20 pts] Create two (2) new tables in your own fatabse where you'll store the predictions from each model for this exercise.

In [0]:
# Create your own database and two result tables
spark.sql("CREATE DATABASE IF NOT EXISTS my_f1_db")
# f1_model1_predictions: for classification predictions (Top 3 finish)
spark.sql("""
CREATE TABLE IF NOT EXISTS my_f1_db.f1_model1_predictions (
    driverId INT,
    raceId INT,
    prediction INT
)
""")
# f1_model2_predictions: for regression predictions (positionOrder)
spark.sql("""
CREATE TABLE IF NOT EXISTS my_f1_db.f1_model2_predictions (
    driverId INT,
    raceId INT,
    prediction DOUBLE
)
""")

DataFrame[]

2. [30 pts] Build two (2) predictive models using MLflow, logging hyperparameters, the model itself, four metrics, and two artifcats. Submit submit your MLflow experiments as part of your assignments

- Model 1: Logistic Regression

Goal: Predict whether a driver finishes in the Top 3 (positionOrder <= 3)

Logged 4 metrics: AUC, Accuracy, F1, Precision

Logged model + parameters

Logged 2 artifacts: predictions CSV + coefficients TXT

In [0]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator
import mlflow
import mlflow.spark

# Prepare data
cls_df = df_results.select("driverId", "raceId", "grid", "laps", "fastestLap", "rank", "positionOrder")
cls_df = cls_df.withColumn("grid", col("grid").cast("int")) \
               .withColumn("laps", col("laps").cast("int")) \
               .withColumn("fastestLap", col("fastestLap").cast("int")) \
               .withColumn("rank", col("rank").cast("int")) \
               .withColumn("label", (col("positionOrder") <= 3).cast("int")) \
               .na.drop()

assembler = VectorAssembler(inputCols=["grid", "laps", "fastestLap", "rank"], outputCol="features")
cls_df = assembler.transform(cls_df)
train_cls, test_cls = cls_df.randomSplit([0.8, 0.2], seed=42)

# Train and log with MLflow
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=10)

with mlflow.start_run(run_name="F1_Logistic_Classifier") as run:
    model_cls = lr.fit(train_cls)
    preds_cls = model_cls.transform(test_cls)

    # Metrics
    mlflow.log_metric("AUC", BinaryClassificationEvaluator(labelCol="label").evaluate(preds_cls))
    mlflow.log_metric("accuracy", MulticlassClassificationEvaluator(labelCol="label", metricName="accuracy").evaluate(preds_cls))
    mlflow.log_metric("f1", MulticlassClassificationEvaluator(labelCol="label", metricName="f1").evaluate(preds_cls))
    mlflow.log_metric("precision", MulticlassClassificationEvaluator(labelCol="label", metricName="weightedPrecision").evaluate(preds_cls))

    # Params and model
    mlflow.log_param("maxIter", 10)
    mlflow.spark.log_model(model_cls, "logistic_model")

    # Artifacts
    preds_cls.limit(50).toPandas().to_csv("/tmp/model1_preds.csv", index=False)
    mlflow.log_artifact("/tmp/model1_preds.csv")

    with open("/tmp/model1_coeffs.txt", "w") as f:
        f.write(str(model_cls.coefficients))
    mlflow.log_artifact("/tmp/model1_coeffs.txt")

2025/04/29 16:29:10 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


- Model 2: Linear Regression

Goal: Predict a driver's finishing position (positionOrder)

Logged 4 metrics: RMSE, MAE, MSE, R²

Logged model + parameters

Logged 2 artifacts: predictions CSV + model summary TXT

In [0]:
# Prepare data
reg_df = df_results.select("driverId", "raceId", "grid", "laps", "fastestLap", "rank", "positionOrder")
reg_df = reg_df.withColumn("grid", col("grid").cast("int")) \
               .withColumn("laps", col("laps").cast("int")) \
               .withColumn("fastestLap", col("fastestLap").cast("int")) \
               .withColumn("rank", col("rank").cast("int")) \
               .withColumn("label", col("positionOrder").cast("double")) \
               .na.drop()

reg_df = assembler.transform(reg_df)
train_reg, test_reg = reg_df.randomSplit([0.8, 0.2], seed=42)

# Train and log with MLflow
lr_reg = LinearRegression(featuresCol="features", labelCol="label", maxIter=10)

with mlflow.start_run(run_name="F1_Linear_Regression") as run:
    model_reg = lr_reg.fit(train_reg)
    preds_reg = model_reg.transform(test_reg)

    # Metrics
    mlflow.log_metric("rmse", RegressionEvaluator(labelCol="label", metricName="rmse").evaluate(preds_reg))
    mlflow.log_metric("mae", RegressionEvaluator(labelCol="label", metricName="mae").evaluate(preds_reg))
    mlflow.log_metric("mse", RegressionEvaluator(labelCol="label", metricName="mse").evaluate(preds_reg))
    mlflow.log_metric("r2", RegressionEvaluator(labelCol="label", metricName="r2").evaluate(preds_reg))

    # Params and model
    mlflow.log_param("maxIter", 10)
    mlflow.spark.log_model(model_reg, "linear_model")

    # Artifacts
    preds_reg.limit(50).toPandas().to_csv("/tmp/model2_preds.csv", index=False)
    mlflow.log_artifact("/tmp/model2_preds.csv")

    with open("/tmp/model2_summary.txt", "w") as f:
        f.write(str(model_reg.summary))
    mlflow.log_artifact("/tmp/model2_summary.txt")

2025/04/29 16:34:46 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


3. [30 pts] For each model, store its predictions in the corresponding table you created in your own database. Ensure you are using your own database to store your predictions.

In [0]:
# Clean + Save Model 1 Predictions
preds_cls.select(
    col("driverId").cast("int").alias("driverId"),
    col("raceId").cast("int").alias("raceId"),
    col("prediction").cast("int").alias("prediction")
).write.mode("overwrite").saveAsTable("my_f1_db.f1_model1_predictions")

In [0]:
spark.sql("SELECT * FROM my_f1_db.f1_model1_predictions").show()

+--------+------+----------+
|driverId|raceId|prediction|
+--------+------+----------+
|       1|  1000|         1|
|       1|  1004|         1|
|       1|  1006|         1|
|       1|  1011|         1|
|       1|  1017|         1|
|       1|  1021|         1|
|       1|  1027|         1|
|       1|  1033|         1|
|       1|  1043|         1|
|       1|  1044|         0|
|       1|  1045|         1|
|       1|  1051|         1|
|       1|  1053|         1|
|       1|  1057|         1|
|       1|  1065|         0|
|       1|  1073|         1|
|       1|  1081|         0|
|       1|  1107|         0|
|       1|    20|         0|
|       1|    24|         1|
+--------+------+----------+
only showing top 20 rows



In [0]:
# Clean + Save Model 2 Predictions
preds_reg.select(
    col("driverId").cast("int").alias("driverId"),
    col("raceId").cast("int").alias("raceId"),
    col("prediction").cast("double").alias("prediction")
).write.mode("overwrite").saveAsTable("my_f1_db.f1_model2_predictions")

In [0]:
spark.sql("SELECT * FROM my_f1_db.f1_model2_predictions").show()

+--------+------+------------------+
|driverId|raceId|        prediction|
+--------+------+------------------+
|       1|  1000| 3.318595383586363|
|       1|  1004| 4.386778208298769|
|       1|  1006| 3.151610982306848|
|       1|  1011| 4.009033004853754|
|       1|  1017|  4.15782079049644|
|       1|  1021| 2.842635288355594|
|       1|  1027|3.7832158723315636|
|       1|  1033|1.9514811380677717|
|       1|  1043|2.9287753412216526|
|       1|  1044|  6.92082154071144|
|       1|  1045|3.9843574839322624|
|       1|  1051|3.6589522515996515|
|       1|  1053|2.6189423616564076|
|       1|  1057| 5.008795121108141|
|       1|  1065|13.346934948265456|
|       1|  1073| 4.206541294581313|
|       1|  1081|   6.3018540733518|
|       1|  1107| 7.015796979854864|
|       1|    20|12.257026880719096|
|       1|    24|7.8811548350880445|
+--------+------+------------------+
only showing top 20 rows

