## Homework #5: Model Deployment

In [0]:
%pip install mlflow mysql-connector-python

In [0]:
import mysql.connector

# Establish a connection to the MySQL server
conn = mysql.connector.connect(
    host='yq2397-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com',
    user='admin',
    password='Corrine030212'
)

cursor = conn.cursor()
cursor.execute("CREATE DATABASE IF NOT EXISTS gr5069")
cursor.execute("USE gr5069")

In [0]:
# Create the first table for Model 1 predictions
cursor.execute("""
CREATE TABLE IF NOT EXISTS model1_predictions (
    prediction_id INT AUTO_INCREMENT PRIMARY KEY,
    race_id INT,
    driver_id INT,
    actual_position INT,
    predicted_position FLOAT,
    prediction_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")

# Create the second table for Model 2 predictions
cursor.execute("""
CREATE TABLE IF NOT EXISTS model2_predictions (
    prediction_id INT AUTO_INCREMENT PRIMARY KEY,
    race_id INT,
    driver_id INT,
    actual_position INT,
    predicted_position FLOAT,
    prediction_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")

# Commit the changes
conn.commit()

# Verify tables were created
cursor.execute("SHOW TABLES")
tables = cursor.fetchall()
print("Tables in the database:")
for table in tables:
    print(table[0])

# Close the cursor and connection
cursor.close()
conn.close()

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from datetime import datetime

In [0]:
# Set up a new MLflow experiment
experiment_name = f"/Users/yq2397@columbia.edu/take-home-exercise-4"
mlflow.set_experiment(experiment_name)

In [0]:
# Load F1 datasets from AWS S3
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header = True)
df_drivers = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header = True)
df_races= spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header = True)

In [0]:
# Data preparation
# Merge the datasets
merged_df = df_results.join(df_races, on="raceId", how="left")
merged_df = merged_df.join(df_drivers, on="driverId", how="left")

# Convert data types
merged_df = merged_df.withColumn("grid", merged_df["grid"].cast(DoubleType()))
merged_df = merged_df.withColumn("positionOrder", merged_df["positionOrder"].cast(DoubleType()))
merged_df = merged_df.withColumn("laps", merged_df["laps"].cast(DoubleType()))

# Calculate driver age
merged_df = merged_df.withColumn("dob", F.to_date(merged_df["dob"]))
merged_df = merged_df.withColumn("date", F.to_date(merged_df["date"]))
merged_df = merged_df.withColumn("driver_age", F.datediff(merged_df["date"], merged_df["dob"])/365.25)

# Keep important columns
model_df = merged_df.select("raceId", "driverId", "grid", "driver_age", "laps", "positionOrder")
model_df = model_df.dropna()

print(f"Total records for modeling: {model_df.count()}")
display(model_df.limit(10))

In [0]:
# Split data for training and testing
train_df, test_df = model_df.randomSplit([0.8, 0.2], seed=42)
print(f"Training set count: {train_df.count()}")
print(f"Testing set count: {test_df.count()}")

# Create feature vector
feature_cols = ["grid", "driver_age", "laps"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

In [0]:
# Function to train model and log in MLflow
def train_and_log_model(model_type, params=None):
    """Train a model, log in MLflow, and return predictions"""
    with mlflow.start_run() as run:
        # Log parameters
        mlflow.log_param("model_type", model_type)
        if params:
            for key, value in params.items():
                mlflow.log_param(key, value)
        
        # Prepare data
        train_vector = assembler.transform(train_df)
        test_vector = assembler.transform(test_df)
        
        # Create and train model
        if model_type == 'rf':
            model = RandomForestRegressor(featuresCol="features", labelCol="positionOrder", **params if params else {})
        elif model_type == 'gbt':
            model = GBTRegressor(featuresCol="features", labelCol="positionOrder", **params if params else {})
        
        trained_model = model.fit(train_vector)
        predictions = trained_model.transform(test_vector)
        
        # Calculate and log metrics
        evaluator = RegressionEvaluator(labelCol="positionOrder", predictionCol="prediction")
        rmse = evaluator.setMetricName("rmse").evaluate(predictions)
        r2 = evaluator.setMetricName("r2").evaluate(predictions)
        mae = evaluator.setMetricName("mae").evaluate(predictions)
        
        # Calculate accuracy
        pred_pd = predictions.select("positionOrder", "prediction").toPandas()
        accuracy_within_1 = np.mean(np.abs(pred_pd["positionOrder"] - np.round(pred_pd["prediction"])) <= 1) * 100
        
        # Log metrics
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)
        mlflow.log_metric("accuracy_within_1", accuracy_within_1)
        
        # First artifact: Actual vs Predicted plot
        plt.figure(figsize=(8, 6))
        plt.scatter(pred_pd["positionOrder"], pred_pd["prediction"], alpha=0.5)
        plt.plot([pred_pd["positionOrder"].min(), pred_pd["positionOrder"].max()], 
                 [pred_pd["positionOrder"].min(), pred_pd["positionOrder"].max()], 'r--')
        plt.xlabel('Actual Position')
        plt.ylabel('Predicted Position')
        plt.title('Actual vs Predicted Race Positions')
        plt.savefig("prediction_scatter.png")
        plt.close()
        
        # Second artifact: Feature importance
        feature_importance = pd.DataFrame({
            'Feature': feature_cols,
            'Importance': trained_model.featureImportances.toArray()
        }).sort_values('Importance', ascending=False)
        
        plt.figure(figsize=(8, 6))
        plt.barh(feature_importance['Feature'], feature_importance['Importance'])
        plt.xlabel('Importance')
        plt.title('Feature Importance')
        plt.savefig("feature_importance.png")
        plt.close()
        
        # Log artifacts
        mlflow.log_artifact("prediction_scatter.png")
        mlflow.log_artifact("feature_importance.png")
        
        # Log model
        mlflow.spark.log_model(trained_model, f"{model_type}_model")
        
        print(f"Model: {model_type}, RMSE: {rmse:.4f}, R²: {r2:.4f}")
        
        return predictions

In [0]:
# Train two models
print("Building Model 1: Random Forest")
rf_params = {'numTrees': 200, 'maxDepth': 10, 'seed': 42}
rf_predictions = train_and_log_model('rf', rf_params)

print("\nBuilding Model 2: Gradient Boosting")
gbt_params = {'maxIter': 300, 'stepSize': 0.05, 'maxDepth': 3, 'seed': 42}
gbt_predictions = train_and_log_model('gbt', gbt_params)

In [0]:
# Function to store predictions in the MySQL database
def store_predictions_to_mysql(predictions_df, table_name):
    """Store model predictions in the specified database table"""
    
    # Convert predictions to a pandas DataFrame first
    pandas_df = predictions_df.select(
        "raceId", 
        "driverId", 
        F.col("positionOrder").cast("int").alias("actual_position"), 
        F.round("prediction", 2).alias("predicted_position")
    ).toPandas()
    
    # Connect to MySQL
    conn = mysql.connector.connect(
        host='yq2397-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com',
        user='admin',
        password='Corrine030212',
        database='gr5069'
    )
    
    cursor = conn.cursor()
    
    # Insert predictions
    insert_query = f"""
    INSERT INTO {table_name} (race_id, driver_id, actual_position, predicted_position)
    VALUES (%s, %s, %s, %s)
    """
    
    rows_to_insert = []
    for _, row in pandas_df.iterrows():
        rows_to_insert.append((
            int(row["raceId"]),
            int(row["driverId"]),
            int(row["actual_position"]),
            float(row["predicted_position"])
        ))
    
    cursor.executemany(insert_query, rows_to_insert)
    conn.commit()
    
    # Verify insertion
    cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
    row_count = cursor.fetchone()[0]
    
    print(f"Successfully stored {len(rows_to_insert)} predictions in {table_name}")
    print(f"Total rows in {table_name}: {row_count}")
    
    # Close connection
    cursor.close()
    conn.close()

# Store predictions from both models
print("Storing Random Forest predictions...")
store_predictions_to_mysql(rf_predictions, "model1_predictions")

print("\nStoring Gradient Boosting predictions...")
store_predictions_to_mysql(gbt_predictions, "model2_predictions")