In [0]:
import mlflow
import boto3
import pandas as pd

In [0]:
# Set up S3 client and bucket
s3 = boto3.client('s3')
bucket = "columbia-gr5069-main"

# List of files to load from the bucket
keys = {
    "races": "raw/races.csv",
    "results": "raw/results.csv",
}

# Dictionary to store the loaded DataFrames
dataframes = {}

# Loop through and load each CSV into a DataFrame
for name, key in keys.items():
    obj = s3.get_object(Bucket=bucket, Key=key)
    df = pd.read_csv(obj['Body'])
    dataframes[name] = df
    print(f"Loaded {name} ({df.shape[0]} rows, {df.shape[1]} columns)")

# Example usage:
races_df = dataframes['races']
results_df = dataframes['results']

# Preview a DataFrame
display(races_df)
display(results_df)

In [0]:
from pyspark.sql.functions import col

# Convert races_df from pandas to PySpark
races_df_spark = spark.createDataFrame(races_df)
results_df_spark = spark.createDataFrame(results_df)

# Join results_df with races_df_spark on "raceId" to get "year"
data_df = results_df_spark.join(races_df_spark.select("raceId", "year"), on="raceId", how="inner")

# Select only needed columns
data_df = data_df.select(
    "grid", "constructorId", "driverId", "year", "position"
)

# Filter out rows where position is null or '\N' (invalid)
data_df = data_df.filter(
    (col("position").isNotNull()) & (col("position") != '\\N')
)

# Convert "position" column to integer
data_df = data_df.withColumn("position", col("position").cast("int"))

# Show sample
data_df.show(5)

Question 1: Create two (2) new tables in your own fatabse where you'll store the predictions from each model for this exercise.

In [0]:
%pip install mysql-connector-python
import mysql.connector

# Connect to database
conn = mysql.connector.connect(
    host='ml4995-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com',
    user='admin',
    password='123456789',
    database='gr5069'
)

cursor = conn.cursor()

# Create table for Linear Regression predictions
cursor.execute("""
    CREATE TABLE IF NOT EXISTS linear_model_preds (
        id INT AUTO_INCREMENT PRIMARY KEY,
        grid INT,
        constructorId INT,
        driverId INT,
        year INT,
        true_position INT,
        predicted_position DOUBLE
    )
""")

# Create table for Random Forest predictions
cursor.execute("""
    CREATE TABLE IF NOT EXISTS rf_model_preds (
        id INT AUTO_INCREMENT PRIMARY KEY,
        grid INT,
        constructorId INT,
        driverId INT,
        year INT,
        true_position INT,
        predicted_position DOUBLE
    )
""")

# Close
cursor.close()
conn.close()

2. Build two (2) predictive models using MLflow, logging hyperparameters, the model itself, four metrics, and two artifcats. Submit submit your MLflow experiments as part of your assignments

In [0]:
# for the linear model prediction
import mlflow
import mlflow.spark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import pandas as pd

# 1. Vector Assembler
vec_assembler = VectorAssembler(
    inputCols=["grid", "constructorId", "driverId", "year"],
    outputCol="features"
)
vec_data = vec_assembler.transform(data_df).select("features", "position", "grid", "constructorId", "driverId", "year")

# 2. Train/Test Split
train_data, test_data = vec_data.randomSplit([0.8, 0.2], seed=42)

# 3. Initialize model with hyperparameter
lr = LinearRegression(featuresCol="features", labelCol="position", elasticNetParam=0.3)

# 4. MLflow Logging
with mlflow.start_run(run_name="LinearRegression_F1_Logged") as run:
    
    # Fit model
    model = lr.fit(train_data)

    # Predict
    predictions = model.transform(test_data)

    # Evaluation
    evaluator = RegressionEvaluator(labelCol="position", predictionCol="prediction")
    rmse = evaluator.evaluate(predictions, {evaluator.metricName: "rmse"})
    mae = evaluator.evaluate(predictions, {evaluator.metricName: "mae"})
    r2 = evaluator.evaluate(predictions, {evaluator.metricName: "r2"})
    mse = evaluator.evaluate(predictions, {evaluator.metricName: "mse"})

    # Log hyperparameters
    mlflow.log_param("elasticNetParam", 0.3)

    # Log metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("mae", mae)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mse", mse)

    # Log model
    mlflow.spark.log_model(model, "model")

    # Save predictions as artifact
    predictions_full = predictions.select("features", "prediction") \
        .join(vec_data, on="features") \
        .select("grid", "constructorId", "driverId", "year", col("position").alias("true_position"), "prediction")

    pred_pd = predictions_full.toPandas()
    pred_pd.to_csv("linear_predictions.csv", index=False)
    mlflow.log_artifact("linear_predictions.csv")

    # Save residual plot as artifact
    plt.figure(figsize=(6, 4))
    plt.scatter(pred_pd["true_position"], pred_pd["prediction"], alpha=0.6)
    plt.xlabel("True Position")
    plt.ylabel("Predicted Position")
    plt.title("Linear Regression: True vs Predicted")
    plt.grid(True)
    plt.tight_layout()
    plt.savefig("residual_plot.png")
    mlflow.log_artifact("residual_plot.png")

    print(f"✅ Logged to MLflow. RMSE: {rmse:.2f}, MAE: {mae:.2f}, R²: {r2:.2f}, MSE: {mse:.2f}")

In [0]:
# for the random forest model prediction (the second model)
from pyspark.ml.regression import RandomForestRegressor
import mlflow
import mlflow.spark
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql.functions import col

# 1. Vector Assembler (re-use vec_assembler if needed)
vec_assembler_rf = VectorAssembler(
    inputCols=["grid", "constructorId", "driverId", "year"],
    outputCol="features"
)
vec_data_rf = vec_assembler_rf.transform(data_df).select("features", "position", "grid", "constructorId", "driverId", "year")

# 2. Train/Test Split
train_data_rf, test_data_rf = vec_data_rf.randomSplit([0.8, 0.2], seed=42)

# 3. Initialize Random Forest
rf = RandomForestRegressor(featuresCol="features", labelCol="position", numTrees=100, maxDepth=5)

# 4. MLflow Logging
with mlflow.start_run(run_name="RandomForest_F1") as run:

    # Fit model
    rf_model = rf.fit(train_data_rf)

    # Predict
    predictions_rf = rf_model.transform(test_data_rf)

    # Evaluate
    evaluator_rf = RegressionEvaluator(labelCol="position", predictionCol="prediction")
    rmse_rf = evaluator_rf.evaluate(predictions_rf, {evaluator_rf.metricName: "rmse"})
    mae_rf = evaluator_rf.evaluate(predictions_rf, {evaluator_rf.metricName: "mae"})
    r2_rf = evaluator_rf.evaluate(predictions_rf, {evaluator_rf.metricName: "r2"})
    mse_rf = evaluator_rf.evaluate(predictions_rf, {evaluator_rf.metricName: "mse"})

    # Log hyperparameters
    mlflow.log_param("numTrees", 100)
    mlflow.log_param("maxDepth", 5)

    # Log metrics
    mlflow.log_metric("rmse", rmse_rf)
    mlflow.log_metric("mae", mae_rf)
    mlflow.log_metric("r2", r2_rf)
    mlflow.log_metric("mse", mse_rf)

    # Log model
    mlflow.spark.log_model(rf_model, "model")

    # Save predictions as artifact
    predictions_rf_full = predictions_rf.select("features", "prediction") \
        .join(vec_data_rf, on="features") \
        .select("grid", "constructorId", "driverId", "year", col("position").alias("true_position"), "prediction")

    pred_rf_pd = predictions_rf_full.toPandas()
    pred_rf_pd.to_csv("rf_predictions.csv", index=False)
    mlflow.log_artifact("rf_predictions.csv")

    # Save residual plot
    plt.figure(figsize=(6, 4))
    plt.scatter(pred_rf_pd["true_position"], pred_rf_pd["prediction"], alpha=0.6)
    plt.xlabel("True Position")
    plt.ylabel("Predicted Position")
    plt.title("Random Forest: True vs Predicted")
    plt.grid(True)
    plt.tight_layout()
    plt.savefig("rf_residual_plot.png")
    mlflow.log_artifact("rf_residual_plot.png")

    print(f"✅ Random Forest Logged! RMSE: {rmse_rf:.2f}, MAE: {mae_rf:.2f}, R²: {r2_rf:.2f}, MSE: {mse_rf:.2f}")

3. For each model, store its predictions in the corresponding table you created in your own database. Ensure you are using your own database to store your predictions.

In [0]:
# Save predictions from Linear Regression model to MySQL
predictions_full.write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://ml4995-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("dbtable", "linear_model_preds") \
    .option("user", "admin") \
    .option("password", "123456789") \
    .mode("overwrite") \
    .save()

In [0]:
# Save predictions from Random Forest model to MySQL
predictions_rf_full.write \
    .format("jdbc") \
    .option("url", "jdbc:mysql://ml4995-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("dbtable", "rf_model_preds") \
    .option("user", "admin") \
    .option("password", "123456789") \
    .mode("overwrite") \
    .save()