In [0]:
from pyspark.sql.functions import col, count
from pyspark.sql.types import IntegerType, DoubleType
import mlflow
import mlflow.spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.ml.regression import RandomForestRegressor

In [0]:
%pip install mysql-connector-python

In [0]:
%pip install mlflow mysql-connector-python

In [0]:
df_laps = spark.read.csv("s3://columbia-gr5069-main/raw/lap_times.csv", header=True,)
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header = True)
df_drivers = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True)

In [0]:
# Select only needed columns from each DataFrame
laps_sel = df_laps.select("raceId", "driverId", "lap", "position", "time", "milliseconds")
results_sel = df_results.select("raceId", "driverId", "constructorId", "grid", "positionOrder", "points", "statusId")
drivers_sel = df_drivers.select("driverId", "driverRef", "code", "forename", "surname", "nationality")

# Rename columns to avoid confusion 
results_sel = results_sel.withColumnRenamed("positionOrder", "finalPosition")
drivers_sel = drivers_sel.withColumnRenamed("nationality", "driverNationality")

# Join laps with results
df_laps_results = laps_sel.join(results_sel, on=["raceId", "driverId"], how="inner")

# Join with drivers
df_final = df_laps_results.join(drivers_sel, on="driverId", how="inner")

# Show schema and a few rows
df_final.printSchema()
df_final.display()


In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml import Pipeline

# Filter nulls if needed
df_model = df_final.dropna(subset=['milliseconds'])

# Select features and label
feature_cols = ['grid', 'position', 'points', 'lap']  # You can add more
assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')

# Define model
rf = RandomForestRegressor(labelCol='milliseconds', featuresCol='features')

# Create pipeline
pipeline = Pipeline(stages=[assembler, rf])

# Train/test split
train, test = df_model.randomSplit([0.8, 0.2], seed=42)

In [0]:
%python
import mlflow
import mlflow.spark
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

# Cast the columns to numeric types
train = train.withColumn("grid", col("grid").cast("double"))
train = train.withColumn("position", col("position").cast("double"))
train = train.withColumn("points", col("points").cast("double"))
train = train.withColumn("lap", col("lap").cast("double"))
train = train.withColumn("milliseconds", col("milliseconds").cast("double"))

test = test.withColumn("grid", col("grid").cast("double"))
test = test.withColumn("position", col("position").cast("double"))
test = test.withColumn("points", col("points").cast("double"))
test = test.withColumn("lap", col("lap").cast("double"))
test = test.withColumn("milliseconds", col("milliseconds").cast("double"))

with mlflow.start_run(run_name="RF_Model_LapTime") as run:
    model = pipeline.fit(train)
    predictions = model.transform(test)

    # Evaluate
    evaluator = RegressionEvaluator(labelCol="milliseconds", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    
    # Log model and params
    mlflow.log_param("features", feature_cols)
    mlflow.log_metric("rmse", rmse)

    # Log the pipeline model
    mlflow.spark.log_model(model, "random-forest-pipeline")

In [0]:
import mysql.connector
conn = mysql.connector.connect(
    host='hh3098-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com',
    user='admin',
    password='011102Hannah'
)

# Create a cursor object
cursor = conn.cursor()

# Execute the SQL command to create a database
cursor.execute("CREATE DATABASE IF NOT EXISTS gr5069")
cursor.execute("USE gr5069")

# Close the cursor and connection
cursor.close()
conn.close()

In [0]:
# Select relevant columns and rename prediction
pred_df = predictions.select("raceId", "driverId", "milliseconds", col("prediction").alias("predicted_ms"))

# Optional: cast columns to match MySQL types (if needed)
pred_df = pred_df.withColumn("milliseconds", col("milliseconds").cast("double")) \
                 .withColumn("predicted_ms", col("predicted_ms").cast("double"))

# Write to MySQL
pred_df.write.format('jdbc').options(
    url='jdbc:mysql://hh3098-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069',
    driver='com.mysql.jdbc.Driver', 
    dbtable='lap_time_predictions',
    user='admin',
    password='011102Hannah'
).mode('overwrite').save()



[20 pts] Create two (2) new tables in your own database where you'll store the predictions from each model for this exercise.

In [0]:
# model a
from pyspark.ml.regression import RandomForestRegressor

rf = RandomForestRegressor(featuresCol="features", labelCol="milliseconds")
pipeline_rf = Pipeline(stages=[assembler, rf])
model_rf = pipeline_rf.fit(train)
predictions_model_a = model_rf.transform(test)


In [0]:
# model b
from pyspark.ml.regression import LinearRegression

lr = LinearRegression(featuresCol="features", labelCol="milliseconds")
pipeline_lr = Pipeline(stages=[assembler, lr])
model_lr = pipeline_lr.fit(train)
predictions_model_b = model_lr.transform(test)


In [0]:
predictions_model_a = predictions_model_a.select(
    col("raceId").cast("int"),
    col("driverId").cast("int"),
    col("milliseconds").cast("double").alias("actual_ms"),
    col("prediction").cast("double").alias("predicted_ms")
)

predictions_model_b = predictions_model_b.select(
    col("raceId").cast("int"),
    col("driverId").cast("int"),
    col("milliseconds").cast("double").alias("actual_ms"),
    col("prediction").cast("double").alias("predicted_ms")
)

In [0]:
# Write Model A predictions
predictions_model_a.write.format('jdbc').options(
    url='jdbc:mysql://hh3098-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069',
    driver='com.mysql.jdbc.Driver',
    dbtable='model_a_predictions',
    user='admin',
    password='011102Hannah'
).mode('overwrite').save()

# Write Model B predictions
predictions_model_b.write.format('jdbc').options(
    url='jdbc:mysql://hh3098-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069',
    driver='com.mysql.jdbc.Driver',
    dbtable='model_b_predictions',
    user='admin',
    password='011102Hannah'
).mode('overwrite').save()


[30 pts] Build two (2) predictive models using MLflow, logging hyperparameters, the model itself, four metrics, and two artifcats. Submit submit your MLflow experiments as part of your assignments

In [0]:
def log_model_with_mlflow(model, model_name, params, train_df, test_df, label_col="milliseconds"):
    from matplotlib import pyplot as plt
    import pandas as pd

    assembler = VectorAssembler(inputCols=["grid", "position", "lap"], outputCol="features")
    pipeline = Pipeline(stages=[assembler, model])

    with mlflow.start_run(run_name=model_name):
        fitted_model = pipeline.fit(train_df)
        predictions = fitted_model.transform(test_df)

        # Evaluators
        metrics = {
            "rmse": RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="rmse").evaluate(predictions),
            "mae": RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="mae").evaluate(predictions),
            "r2": RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="r2").evaluate(predictions),
            "mse": RegressionEvaluator(labelCol=label_col, predictionCol="prediction", metricName="mse").evaluate(predictions),
        }

        # Log hyperparameters
        for k, v in params.items():
            mlflow.log_param(k, v)

        # Log metrics
        for k, v in metrics.items():
            mlflow.log_metric(k, v)

        # Log model
        mlflow.spark.log_model(fitted_model, "model")

        # Artifact 1: Save predictions as CSV
        pred_pd = predictions.select("prediction", label_col).toPandas()
        pred_path = os.path.join(tempfile.mkdtemp(), f"{model_name}_predictions.csv")
        pred_pd.to_csv(pred_path, index=False)
        mlflow.log_artifact(pred_path, artifact_path="predictions")

        # Artifact 2: Save plot
        fig, ax = plt.subplots()
        ax.scatter(pred_pd[label_col], pred_pd["prediction"], alpha=0.5)
        ax.plot([pred_pd[label_col].min(), pred_pd[label_col].max()],
                [pred_pd[label_col].min(), pred_pd[label_col].max()],
                color='red', linestyle='--')
        ax.set_xlabel("Actual")
        ax.set_ylabel("Predicted")
        ax.set_title(f"{model_name} Predictions")
        plot_path = os.path.join(tempfile.mkdtemp(), f"{model_name}_scatter.png")
        plt.savefig(plot_path)
        mlflow.log_artifact(plot_path, artifact_path="plots")


In [0]:
%python
import os
import tempfile
from pyspark.ml.regression import RandomForestRegressor

rf_model = RandomForestRegressor(featuresCol="features", labelCol="milliseconds", numTrees=50, maxDepth=5)
log_model_with_mlflow(rf_model, "RandomForestModel", {"numTrees": 50, "maxDepth": 5}, train, test)

In [0]:
lr_model = LinearRegression(featuresCol="features", labelCol="milliseconds", maxIter=10, regParam=0.1)
log_model_with_mlflow(lr_model, "LinearRegressionModel", {"maxIter": 10, "regParam": 0.1}, train, test)
