In [0]:
%pip install mlflow mysql-connector-python 

In [0]:
from pyspark.sql.functions import col, count, monotonically_increasing_id
from pyspark.sql.types import IntegerType, DoubleType
import mlflow
import mlflow.spark
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
import mysql.connector

In [0]:
# Load datasets
lap_times = spark.read.csv("s3://columbia-gr5069-main/raw/lap_times.csv", header=True, inferSchema=True)
pit_stops = spark.read.csv("s3://columbia-gr5069-main/raw/pit_stops.csv", header=True, inferSchema=True)
drivers = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)
# Prepare dataset
pit_grouped = pit_stops.groupBy("raceId", "driverId").agg(count("*").alias("pit_stop_count"))
laps_joined = lap_times.join(pit_grouped, on=["raceId", "driverId"], how="left").fillna(0)

laps_features = laps_joined.select(
    col("milliseconds").cast(DoubleType()),
    col("lap").cast(IntegerType()),
    col("position").cast(IntegerType()),
    col("pit_stop_count").cast(IntegerType())
).na.drop()

In [0]:
# Create database and tables
conn = mysql.connector.connect(
    host="yp2728-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com",
    user="admin",
    password="PYMpym0919!"
)
cursor = conn.cursor()
cursor.execute("CREATE DATABASE IF NOT EXISTS gr5069")
cursor.close()
conn.close()

In [0]:
# Load datasets
lap_times = spark.read.csv("s3://columbia-gr5069-main/raw/lap_times.csv", header=True, inferSchema=True)
pit_stops = spark.read.csv("s3://columbia-gr5069-main/raw/pit_stops.csv", header=True, inferSchema=True)
drivers = spark.read.csv("s3://columbia-gr5069-main/raw/drivers.csv", header=True, inferSchema=True)


In [0]:
# Prepare dataset
pit_grouped = pit_stops.groupBy("raceId", "driverId").agg(count("*").alias("pit_stop_count"))
laps_joined = lap_times.join(pit_grouped, on=["raceId", "driverId"], how="left").fillna(0)

laps_features = laps_joined.select(
    col("milliseconds").cast(DoubleType()),
    col("lap").cast(IntegerType()),
    col("position").cast(IntegerType()),
    col("pit_stop_count").cast(IntegerType())
).na.drop()


In [0]:
# Assemble features
vec_assembler = VectorAssembler(inputCols=["lap", "position", "pit_stop_count"], outputCol="features")
laps_vector = vec_assembler.transform(laps_features)

# Split data
trainDF, testDF = laps_vector.randomSplit([0.8, 0.2], seed=42)

# Create database and tables
conn = mysql.connector.connect(
    host="yp2728-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com",
    user="admin",
    password="PYMpym0919!"
)
cursor = conn.cursor()
cursor.execute("CREATE DATABASE IF NOT EXISTS gr5069")
cursor.close()
conn.close()



In [0]:
# Train Linear Regression Model
lr = LinearRegression(featuresCol="features", labelCol="milliseconds")
lrModel = lr.fit(trainDF)
predDF_lr = lrModel.transform(testDF)

# Save predictions to MySQL
predDF_lr = predDF_lr.withColumn("id", monotonically_increasing_id()).select("id", "prediction")
predDF_lr.write.format("jdbc").options(
    url="jdbc:mysql://yp2728-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069",
    driver="com.mysql.jdbc.Driver",
    dbtable="lr_predictions_f1",
    user="admin",
    password="PYMpym0919!"
).mode("overwrite").save()

In [0]:

# Train Random Forest Model
rf = RandomForestRegressor(featuresCol="features", labelCol="milliseconds", numTrees=100)
rfModel = rf.fit(trainDF)
predDF_rf = rfModel.transform(testDF)

# Save predictions to MySQL
predDF_rf = predDF_rf.withColumn("id", monotonically_increasing_id()).select("id", "prediction")
predDF_rf.write.format("jdbc").options(
    url="jdbc:mysql://yp2728-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069",
    driver="com.mysql.jdbc.Driver",
    dbtable="rf_predictions_f1",
    user="admin",
    password="PYMpym0919!"
).mode("overwrite").save()


In [0]:
# Start MLflow experiment for Linear Regression
with mlflow.start_run(run_name="LinearRegression_F1"):

    # Train the model
    lr = LinearRegression(featuresCol="features", labelCol="milliseconds")
    lrModel = lr.fit(trainDF)

    # Predict on test set
    predDF_lr = lrModel.transform(testDF)

    # Evaluate predictions
    evaluator = RegressionEvaluator(labelCol="milliseconds", predictionCol="prediction")
    rmse = evaluator.setMetricName("rmse").evaluate(predDF_lr)
    mae = evaluator.setMetricName("mae").evaluate(predDF_lr)
    r2 = evaluator.setMetricName("r2").evaluate(predDF_lr)
    mse = evaluator.setMetricName("mse").evaluate(predDF_lr)

    # Log parameters (no hyperparameters tuned, just record default info)
    mlflow.log_param("elasticNetParam", lr.getElasticNetParam())
    mlflow.log_param("regParam", lr.getRegParam())

    # Log metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("mae", mae)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mse", mse)

    # Log model
    mlflow.spark.log_model(lrModel, "model")

    # Log artifacts (e.g., model summary text)
    with open("/tmp/lr_model_summary.txt", "w") as f:
        f.write(str(lrModel.summary))

    mlflow.log_artifact("/tmp/lr_model_summary.txt")

In [0]:
# Start MLflow experiment for Random Forest
with mlflow.start_run(run_name="RandomForest_F1"):

    # Train the Random Forest model
    rf = RandomForestRegressor(featuresCol="features", labelCol="milliseconds", numTrees=100)
    rfModel = rf.fit(trainDF)

    # Predict on test set
    predDF_rf = rfModel.transform(testDF)

    # Evaluate predictions
    evaluator = RegressionEvaluator(labelCol="milliseconds", predictionCol="prediction")
    rmse = evaluator.setMetricName("rmse").evaluate(predDF_rf)
    mae = evaluator.setMetricName("mae").evaluate(predDF_rf)
    r2 = evaluator.setMetricName("r2").evaluate(predDF_rf)
    mse = evaluator.setMetricName("mse").evaluate(predDF_rf)

    # Log parameters
    mlflow.log_param("numTrees", rf.getNumTrees())
    mlflow.log_param("maxDepth", rf.getMaxDepth())

    # Log metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("mae", mae)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mse", mse)

    # Log model
    mlflow.spark.log_model(rfModel, "model")

    # Log artifact (feature importance)
    feature_importances = rfModel.featureImportances
    with open("/tmp/rf_feature_importances.txt", "w") as f:
        f.write(str(feature_importances))

    mlflow.log_artifact("/tmp/rf_feature_importances.txt")
