In [0]:
%pip install mlflow mysql-connector-python
%pip install mlflow==2.21.3


from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow
import mlflow.spark
import pandas as pd
import numpy as np
import boto3
import io
from pyspark.sql.functions import col


In [0]:
#Loading the data
resultsDF = spark.read.csv('s3a://columbia-gr5069-main/raw/results.csv', header=True, inferSchema=True)

In [0]:
#This section set ups the data in order to smoothly execute my machine learning models. 

#Setting up variables as one vector of features and target 
features = ['grid', 'laps', 'milliseconds', 'fastestLapSpeed']
target = 'positionOrder'

# There is a possiblity of missing values so those will be dropped
resultsDF = resultsDF.dropna(subset=features + [target])

# "milliseconds" and "fastestLapSpeed" were seen as strings, so converting them to doubles
resultsDF = resultsDF.withColumn("milliseconds", col("milliseconds").cast("double"))
resultsDF = resultsDF.withColumn("fastestLapSpeed", col("fastestLapSpeed").cast("double"))

# Using the vector assembler, the features are in a single vector
vecAssembler = VectorAssembler(inputCols=features, outputCol="features")
results_vecDF = vecAssembler.transform(resultsDF)

# 80/20 split with a seed of 42 for repodublability sake
trainDF, testDF = results_vecDF.randomSplit([0.8, 0.2], seed=42)

In [0]:
#Using a linear regression with ML Flow to predict the target 'positionOrder'.
with mlflow.start_run(run_name="LinearRegression_Spark"):

    lr = LinearRegression(featuresCol="features", labelCol=target)
    lrModel = lr.fit(trainDF)

    predLR = lrModel.transform(testDF)

    evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="rmse")
    rmse_lr = evaluator.evaluate(predLR)

    r2_evaluator = RegressionEvaluator(labelCol=target, predictionCol="prediction", metricName="r2")
    r2_lr = r2_evaluator.evaluate(predLR)

    # Log model and metrics
    mlflow.spark.log_model(lrModel, "linear_regression_model")
    mlflow.log_metric("rmse", rmse_lr)
    mlflow.log_metric("r2", r2_lr)
    mlflow.log_param("model_type", "LinearRegression")

In [0]:
#Predicting the target 'positionOrder' with a random forest.
with mlflow.start_run(run_name="RandomForest_Spark"):

    rf = RandomForestRegressor(featuresCol="features", labelCol=target, numTrees=100, maxDepth=10)
    rfModel = rf.fit(trainDF)

    predRF = rfModel.transform(testDF)

    rmse_rf = evaluator.evaluate(predRF)
    r2_rf = r2_evaluator.evaluate(predRF)

    # Log model and metrics
    mlflow.spark.log_model(rfModel, "random_forest_model")
    mlflow.log_metric("rmse", rmse_rf)
    mlflow.log_metric("r2", r2_rf)
    mlflow.log_param("model_type", "RandomForestRegressor")
    mlflow.log_param("numTrees", 100)
    mlflow.log_param("maxDepth", 10)

In [0]:
# Prepare final DataFrames to save into database
predLR_final = predLR.select(*features, target, "prediction")
predRF_final = predRF.select(*features, target, "prediction")

# Saving predictions to tables
predLR_final.write.format('jdbc').options(
    url='jdbc:mysql://jwj2123-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069',
    driver='com.mysql.jdbc.Driver',
    dbtable='lr_model_predictions',
    user='admin',
    password='12123212!Asd'
).mode('overwrite').save()

predRF_final.write.format('jdbc').options(
    url='jdbc:mysql://jwj2123-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069',
    driver='com.mysql.jdbc.Driver',
    dbtable='rf_model_predictions',
    user='admin',
    password='12123212!Asd'
).mode('overwrite').save()

In [0]:
# Here I am just viewing my predicitons to see if it worked
# View the Linear Regression predictions
spark.read.format("jdbc").option("url", "jdbc:mysql://jwj2123-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("dbtable", "lr_model_predictions") \
    .option("user", "admin") \
    .option("password", "12123212!Asd") \
    .load().display()

# View the Random Forest predictions
spark.read.format("jdbc").option("url", "jdbc:mysql://jwj2123-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com/gr5069") \
    .option("driver", "com.mysql.jdbc.Driver") \
    .option("dbtable", "rf_model_predictions") \
    .option("user", "admin") \
    .option("password", "12123212!Asd") \
    .load().display()