In [0]:
from pyspark.sql.functions import col, count
from pyspark.sql.types import IntegerType, DoubleType
import mlflow
import mlflow.spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
import config
import mysql.connector
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import monotonically_increasing_id


In [0]:
%pip install mlflow mysql-connector-python


In [0]:

# Load the raw lap times, pit stops, and driver data
df_laps = spark.read.csv("s3://columbia-gr5069-main/raw/lap_times.csv", header=True, inferSchema=True)
df_pit = spark.read.csv("s3://columbia-gr5069-main/raw/pit_stops.csv", header=True, inferSchema=True)
df_drivers = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)

# Display schema and a sample of the lap times data
df_laps.printSchema()
df_laps.select("raceId", "driverId", "lap", "position", "milliseconds").show(5)

# Count the number of pit stops for each driver in each race
df_pit_grouped = df_pit.groupBy("raceId", "driverId").agg(count("*").alias("pit_stop_count"))

# Join pit stop count back to the lap time dataset
df_laps_joined = df_laps.join(df_pit_grouped, on=["raceId", "driverId"], how="left").fillna(0)

# Select relevant features and cast them to appropriate types
df_laps_feat = df_laps_joined.select(
    col("milliseconds").cast(DoubleType()),       # target variable
    col("lap").cast(IntegerType()),               # lap number
    col("position").cast(IntegerType()),          # race position
    col("pit_stop_count").cast(IntegerType())     # total pit stops
).na.drop()

# Preview final dataset used for modeling
df_laps_feat.show(5)


In [0]:
%sh
nc -vz ht2668-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com 3306


[20 pts] Create two (2) new tables in your own database where you'll store the predictions from each model for this exercise.

In [0]:


# Add an ID column to the prediction DataFrame
predDF_with_id = predDF.withColumn("id", monotonically_increasing_id())

# Select only the columns needed (id and prediction), NOT include 'features'
predDF_clean = predDF_with_id.select("id", "prediction")

# Connect to RDS and create the database if it doesn't exist
conn = mysql.connector.connect(
    host="ht2668-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com",
    user="ht2668_gr5069",
    password=config.MYSQL_PASSWORD
)
cursor = conn.cursor()
cursor.execute("CREATE DATABASE IF NOT EXISTS gr5069") 
cursor.close()
conn.close()

# the cleaned DataFrame into MySQL database
predDF_clean.write.format("jdbc").options(
    url="jdbc:mysql://ht2668-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069",
    driver="com.mysql.jdbc.Driver",
    dbtable="lr_predictions_f1",   
    user="ht2668_gr5069",
    password=config.MYSQL_PASSWORD
).mode("overwrite").save()


In [0]:

# Build Random Forest model
rf = RandomForestRegressor(featuresCol="features", labelCol="milliseconds", numTrees=100)
rfModel = rf.fit(trainDF)

# Predict on test set
predDF_rf = rfModel.transform(testDF)

# Add an ID column to Random Forest prediction DataFrame
predDF_rf_with_id = predDF_rf.withColumn("id", monotonically_increasing_id())

# Select only id and prediction columns
predDF_rf_clean = predDF_rf_with_id.select("id", "prediction")

# Write Random Forest predictions into your MySQL database
predDF_rf_clean.write.format("jdbc").options(
    url="jdbc:mysql://ht2668-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069",
    driver="com.mysql.jdbc.Driver",
    dbtable="rf_predictions_f1",   # Table name for Random Forest predictions
    user="ht2668_gr5069",
    password=config.MYSQL_PASSWORD
).mode("overwrite").save()



[30 pts] Build two (2) predictive models using MLflow, logging hyperparameters, the model itself, four metrics, and two artifcats. Submit submit your MLflow experiments as part of your assignments

In [0]:
# Start MLflow experiment for Linear Regression
with mlflow.start_run(run_name="LinearRegression_F1"):

    # Train the model
    lr = LinearRegression(featuresCol="features", labelCol="milliseconds")
    lrModel = lr.fit(trainDF)

    # Predict on test set
    predDF_lr = lrModel.transform(testDF)

    # Evaluate predictions
    evaluator = RegressionEvaluator(labelCol="milliseconds", predictionCol="prediction")
    rmse = evaluator.setMetricName("rmse").evaluate(predDF_lr)
    mae = evaluator.setMetricName("mae").evaluate(predDF_lr)
    r2 = evaluator.setMetricName("r2").evaluate(predDF_lr)
    mse = evaluator.setMetricName("mse").evaluate(predDF_lr)

    # Log parameters (no hyperparameters tuned, just record default info)
    mlflow.log_param("elasticNetParam", lr.getElasticNetParam())
    mlflow.log_param("regParam", lr.getRegParam())

    # Log metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("mae", mae)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mse", mse)

    # Log model
    mlflow.spark.log_model(lrModel, "model")

    # Log artifacts (e.g., model summary text)
    with open("/tmp/lr_model_summary.txt", "w") as f:
        f.write(str(lrModel.summary))

    mlflow.log_artifact("/tmp/lr_model_summary.txt")

In [0]:
# Start MLflow experiment for Random Forest
with mlflow.start_run(run_name="RandomForest_F1"):

    # Train the Random Forest model
    rf = RandomForestRegressor(featuresCol="features", labelCol="milliseconds", numTrees=100)
    rfModel = rf.fit(trainDF)

    # Predict on test set
    predDF_rf = rfModel.transform(testDF)

    # Evaluate predictions
    evaluator = RegressionEvaluator(labelCol="milliseconds", predictionCol="prediction")
    rmse = evaluator.setMetricName("rmse").evaluate(predDF_rf)
    mae = evaluator.setMetricName("mae").evaluate(predDF_rf)
    r2 = evaluator.setMetricName("r2").evaluate(predDF_rf)
    mse = evaluator.setMetricName("mse").evaluate(predDF_rf)

    # Log parameters
    mlflow.log_param("numTrees", rf.getNumTrees())
    mlflow.log_param("maxDepth", rf.getMaxDepth())

    # Log metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("mae", mae)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mse", mse)

    # Log model
    mlflow.spark.log_model(rfModel, "model")

    # Log artifact (feature importance)
    feature_importances = rfModel.featureImportances
    with open("/tmp/rf_feature_importances.txt", "w") as f:
        f.write(str(feature_importances))

    mlflow.log_artifact("/tmp/rf_feature_importances.txt")


## MLflow Experiment Tracking Summary

Both models were tracked using MLflow within Databricks:

- Linear Regression: `LinearRegression_F1`
- Random Forest: `RandomForest_F1`

For each model, the following were logged:
- Parameters: model-specific hyperparameters
- Metrics: RMSE, MAE, R², and MSE
- Model: full Spark ML model
- Artifact: model summary (LR) and feature importances (RF)

See screenshots of the MLflow runs below:
- `mlflow_linear_regression_run.png`
- `mlflow_random_forest_run.png`
