In [0]:
#%python
#%pip install mlflow==1.14.0

In [0]:
import pandas as pd
import numpy as np
import os
import boto3

import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import avg, current_date, col, year, date_diff,floor, count, concat_ws
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F
from pyspark.sql import Window

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor, HistGradientBoostingClassifier
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score


import mlflow.sklearn
import mlflow
import mlflow.spark


In [0]:
#reading in data from AWs
drivers_df = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', 
                            header=True)
display(drivers_df)

In [0]:
driver_standings_df = spark.read.csv('s3://columbia-gr5069-main/raw/driver_standings.csv', 
                            header=True)
display(driver_standings_df)

In [0]:
pitstops_df = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', 
                            header=True)
display(pitstops_df)

In [0]:
merged_df = driver_standings_df.join(drivers_df, on='driverId')
merged_df = merged_df.join(pitstops_df, on=['raceId','driverId'])
display(merged_df)

In [0]:
# Creating an age column
#calclating AGE for each driver and adding it to the merged df
merged_df = merged_df.withColumn('age', floor(date_diff(current_date(), 'dob') / 365))
display(merged_df)

Running a regression model using random forest to predict position based on driverId, age, wins. number of pitstop stops, and pitstop duration. 

In [0]:
#converting to float and then merging
merged_rfVars_df = merged_df.select('raceId','driverId','age','wins', 'position', 'stop', 'duration').selectExpr('cast(age as int) as age', 'cast(wins as int) as wins', 'cast(position as int) as position', 'cast(stop as int) as stop', 'cast(duration as float) as duration', 'cast(raceId as int) as raceId', 'cast(driverId as int) as driverId').dropna(how="any")
display(merged_rfVars_df)

grouping by raceID before train-test split

I will be using driverId, pitstop stops, duration and age to predict the number of wins a driver has

In [0]:
features = ['driverId','age', 'raceId', 'duration','stop']
target = 'wins'

X_train, X_test, y_train, y_test = train_test_split(merged_rfVars_df.select('driverId','age', 'raceId', 'duration','stop').toPandas(), merged_rfVars_df[['wins']].toPandas().values.ravel(), random_state=42)


In [0]:
mlflow.end_run() 

In [0]:

with mlflow.start_run(run_name='RandomForest') as run:
  # Create model, train it, and create predictions
  rf = RandomForestRegressor(n_estimators=100, max_depth=10)
  rfFit= rf.fit(X_train, y_train)
  predictionsRF = rf.predict(X_test)

  # Log model
  mlflow.sklearn.log_model(rf, "RandomForestRegressor-model")

  # Create metrics
  rmse_rf = mean_squared_error(y_test, predictionsRF, squared=False)
  mse_rf = mean_squared_error(y_test, predictionsRF)
  mae_rf = mean_absolute_error(y_test, predictionsRF)
  r2_rf = r2_score(y_test, predictionsRF)

  
  # Log model and metrics
  mlflow.sklearn.log_model(rfFit, "random_forest_model")
  mlflow.log_metric("rmse", rmse_rf)
  mlflow.log_metric("r2", r2_rf)
  mlflow.log_metric("mae", mae_rf)
  mlflow.log_metric("mse", mse_rf)
  mlflow.log_param("model_type", "RandomForestRegressor")
  mlflow.log_param("numTrees", 100)
  mlflow.log_param("maxDepth", 10)
  # Saving and logging prediction CSV
  predRF_final = pd.DataFrame({
    'features': X_test.values.tolist(),
    'target': y_test,
    'prediction': predictionsRF
  })
  rf_csv_path = "/tmp/rf_predictions.csv"
  predRF_final.to_csv(rf_csv_path, index=False)
  mlflow.log_artifact(rf_csv_path)

Now the other model I will try is Linear regression:

In [0]:
#Using a linear regression with ML Flow to predict the target 'positionOrder'.
mlflow.end_run() 
with mlflow.start_run(run_name="LinearRegression"):
    lr = LinearRegression()
    lrFit= lr.fit(X_train, y_train)
    predictionsLR = lr.predict(X_test)
    # Log model
    mlflow.sklearn.log_model(lr, "LinearRegression-model")

    # Create metrics
    rmse_lr = mean_squared_error(y_test, predictionsLR, squared=False)
    mse_lr = mean_squared_error(y_test, predictionsLR)
    mae_lr = mean_absolute_error(y_test, predictionsLR)
    r2_lr = r2_score(y_test, predictionsLR)

    # Log model and metrics
    mlflow.sklearn.log_model(lrFit, "linear_regression_model")
    mlflow.log_metric("rmse", rmse_lr)
    mlflow.log_metric("r2", r2_lr)
    mlflow.log_metric("mae", mae_lr)
    mlflow.log_metric("mse", mse_lr)
    mlflow.log_param("model_type", "LinearRegression")

    #Saving and logging prediction CSV as a second artifact
    predLR_final = pd.DataFrame({
        'features': X_test.values.tolist(),
        'target': y_test,
        'prediction': predictionsLR
    })
    lr_csv_path = "/tmp/lr_predictions.csv"
    predLR_final.to_csv(lr_csv_path, index=False)
    mlflow.log_artifact(lr_csv_path)

In [0]:
# Convert Pandas DataFrames to Spark DataFrames
predRF_final_spark = spark.createDataFrame(predRF_final)
predLR_final_spark = spark.createDataFrame(predLR_final)

# Convert array columns to strings (assuming 'features' is the array column)
predRF_final_spark = predRF_final_spark.withColumn(
    'features', concat_ws(',', col('features'))
)
predLR_final_spark = predLR_final_spark.withColumn(
    'features', concat_ws(',', col('features'))
)

# Saving predictions to tables
predRF_final_spark.write.format('jdbc').options(
    url='jdbc:mysql://dnd2129-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069',
    driver='com.mysql.jdbc.Driver',
    dbtable='rf_model_predictions',
    user='admin',
    password='dyuthi321'
).mode('overwrite').save()


predLR_final_spark.write.format('jdbc').options(
    url='jdbc:mysql://dnd2129-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069',
    driver='com.mysql.jdbc.Driver',
    dbtable='lr_model_predictions',
    user='admin',
    password='dyuthi321'
).mode('overwrite').save()

In [0]:
# Viewing Random Forest predictions
spark.read.format("jdbc").option("url", "jdbc:mysql://dnd2129-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("dbtable", "rf_model_predictions") \
    .option("user", "admin") \
    .option("password", "dyuthi321") \
    .load().display()

# Viewing Linear Regression predictions
spark.read.format("jdbc").option("url", "jdbc:mysql://dnd2129-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("dbtable", "lr_model_predictions") \
    .option("user", "admin") \
    .option("password", "dyuthi321") \
    .load().display()