In [0]:
from pyspark.sql.types import DoubleType
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql.functions import col
from pyspark.ml.regression import LinearRegression, RandomForestRegressor
from pyspark.sql.types import DoubleType
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow
import mlflow.spark

## 1.Create two (2) new tables in your own fatabse where you'll store the predictions from each model for this exercise.

In [0]:
F1_DF = spark.read.option("header", True).option("nullValue", "\\N").csv('s3://columbia-gr5069-main/raw/results.csv')
F1_DF = F1_DF.dropna(subset=["laps", "rank"])

In [0]:
F1_DF.write.mode('overwrite').parquet('s3://zl3373-gr5069/processed/F1_results')

In [0]:
display(F1_DF)

In [0]:
# split my data into train and test
(trainDF, testDF) = F1_DF.randomSplit([.8, .2], seed=42)
print(trainDF.cache().count())

In [0]:
# in case any string type data for my X variable laps and Y variable Rank
trainDF = trainDF.withColumn("laps", col("laps").cast(DoubleType()))
trainDF = trainDF.withColumn("rank", col("rank").cast(DoubleType()))
testDF = testDF.withColumn("laps", col("laps").cast(DoubleType()))
testDF = testDF.withColumn("rank", col("rank").cast(DoubleType()))

In [0]:
(trainRepartitionDF, testRepartitionDF) = (F1_DF
                                           .repartition(24)
                                           .randomSplit([.8, .2], seed=42))

print(trainRepartitionDF.count())

In [0]:
display(trainDF.select("rank", "laps"))

In [0]:
display(trainDF.select("rank", "laps").summary())

In [0]:
display(trainDF)

In [0]:
feature_cols = ["laps"]
vecAssembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
trainDF_vec = vecAssembler.transform(trainDF)
testDF_vec = vecAssembler.transform(testDF)

In [0]:
mlflow.set_experiment("/Users/zl3373@columbia.edu/experimentsHW4")

In [0]:
# Log into Linear Regression Model and apply the training model to test data
with mlflow.start_run(run_name="Linear Regression Model for F1") as run:
    lr = LinearRegression(featuresCol="features", labelCol="rank", regParam=0.1, elasticNetParam=0.5)
    model_lr = lr.fit(trainDF_vec)

    # apply the training model on test data
    preds_lr = model_lr.transform(testDF_vec)
    
    # 4 Metrics
    evaluator = RegressionEvaluator(labelCol="rank", predictionCol="prediction")
    rmse = evaluator.evaluate(preds_lr, {evaluator.metricName: "rmse"})
    mae = evaluator.evaluate(preds_lr, {evaluator.metricName: "mae"})
    r2 = evaluator.evaluate(preds_lr, {evaluator.metricName: "r2"})
    mse = evaluator.evaluate(preds_lr, {evaluator.metricName: "mse"})
    
    # hyperparams
    mlflow.log_param("model_type", "LinearRegression")
    mlflow.log_param("features", feature_cols)
    mlflow.log_param("regParam", lr.getRegParam())
    mlflow.log_param("elasticNetParam", lr.getElasticNetParam())
    
    # Log metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("mae", mae)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mse", mse)
    
    # Log model
    mlflow.spark.log_model(model_lr, "model")


In [0]:
display(preds_lr)

In [0]:
# store predictions of linear model into my database
preds_lr.select("raceId", "resultId", "laps","prediction","rank").write.format('jdbc').options(
    url='jdbc:mysql://zl3373-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com:3306/gr5069',
    driver='com.mysql.cj.jdbc.Driver',
    dbtable='linear_regression_predictions',
    user='admin',
    password='2015521Lzw'
).mode('overwrite').save()

In [0]:
# Test if I can successfully save the predicatios into my database
df = spark.read.format('jdbc').options(
    url='jdbc:mysql://zl3373-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com:3306/gr5069',
    driver='com.mysql.cj.jdbc.Driver',
    dbtable='linear_regression_predictions',
    user='admin',
    password='2015521Lzw'
).load()

df.show()

In [0]:
# log into second model: Random Forest 
with mlflow.start_run(run_name="Random Forest Model for F1") as run:
    rf = RandomForestRegressor(featuresCol="features", labelCol="rank", numTrees=35, maxDepth=5)
    model_rf = rf.fit(trainDF_vec)
    # apply the training model above on the test data
    preds_rf = model_rf.transform(testDF_vec) 
    # still 4 metrics
    rmse_rf = evaluator.evaluate(preds_rf, {evaluator.metricName: "rmse"})
    mae_rf = evaluator.evaluate(preds_rf, {evaluator.metricName: "mae"})
    r2_rf = evaluator.evaluate(preds_rf, {evaluator.metricName: "r2"})
    mse_rf = evaluator.evaluate(preds_rf, {evaluator.metricName: "mse"})
    #hyperparams
    mlflow.log_param("model_type", "RandomForestRegressor")
    mlflow.log_param("features", feature_cols)
    mlflow.log_param("numTrees", rf.getNumTrees())
    mlflow.log_param("maxDepth", rf.getMaxDepth())
    #log metrics
    mlflow.log_metric("rmse", rmse_rf)
    mlflow.log_metric("mae", mae_rf)
    mlflow.log_metric("r2", r2_rf)
    mlflow.log_metric("mse", mse_rf)
    #log model
    mlflow.spark.log_model(model_rf, "model")

In [0]:
# store predictions of random forest model into my database
preds_rf.select("raceId", "resultId", "prediction", "rank", "laps") \
    .write.format('jdbc').options(
        url='jdbc:mysql://zl3373-gr5069.ccqalx6jsr2n.us-east-1.rds.amazonaws.com:3306/gr5069',
        driver='com.mysql.cj.jdbc.Driver',
        dbtable='random_forest_model_predictions',        
        user='admin',
        password='2015521Lzw'
    ).mode('overwrite').save()