In [0]:
#%python
#%pip install mlflow==1.14.0

In [0]:
import pandas as pd
import numpy as np
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import avg, current_date, col, year, date_diff,floor, count
from pyspark.sql import functions as F
from pyspark.sql import Window
import boto3
from sklearn.model_selection import train_test_split
import mlflow.sklearn
from sklearn.ensemble import RandomForestRegressor, HistGradientBoostingClassifier
from sklearn.metrics import mean_squared_error

In [0]:
#reading in data from AWs
drivers_df = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', 
                            header=True)
display(drivers_df)

In [0]:
driver_standings_df = spark.read.csv('s3://columbia-gr5069-main/raw/driver_standings.csv', 
                            header=True)
display(driver_standings_df)

In [0]:
pitstops_df = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', 
                            header=True)
display(pitstops_df)

In [0]:
merged_df = driver_standings_df.join(drivers_df, on='driverId')
merged_df = merged_df.join(pitstops_df, on=['raceId','driverId'])
display(merged_df)

In [0]:
# Creating an age column
#calclating AGE for each driver and adding it to the merged df
merged_df = merged_df.withColumn('age', floor(date_diff(current_date(), 'dob') / 365))
display(merged_df)

Running a regression model using random forest to predict position based on driverId, age, wins. number of pitstop stops, and pitstop duration. 

In [0]:
#converting to float and then merging
merged_rfVars_df = merged_df.select('raceId','driverId','age','wins', 'position', 'stop', 'duration').selectExpr('cast(age as int) as age', 'cast(wins as int) as wins', 'cast(position as int) as position', 'cast(stop as int) as stop', 'cast(duration as float) as duration', 'cast(raceId as int) as raceId', 'cast(driverId as int) as driverId').dropna(how="any")
display(merged_rfVars_df)

grouping by raceID before train-test split

In [0]:
X_train, X_test, y_train, y_test = train_test_split(merged_rfVars_df.select('driverId','age', 'wins', 'raceId', 'duration','stop').toPandas(), merged_rfVars_df[['position']].toPandas().values.ravel(), random_state=42)


sklearn.ensemble.HistGradientBoostingClassifier

In [0]:
with mlflow.start_run(run_name="Basic RF Experiment") as run:
  # Create model, train it, and create predictions
  rf = RandomForestRegressor()
  rf.fit(X_train, y_train)
  predictions = rf.predict(X_test)
  
  # Log model
  mlflow.sklearn.log_model(rf, "RandomForestRegressor-model")
  
  # Create metrics
  mse = mean_squared_error(y_test, predictions)
  print("  mse: {}".format(mse))
  
  # Log metrics
  mlflow.log_metric("mse", mse)
  
  runID = run.info.run_uuid
  experimentID = run.info.experiment_id
  
  print("Inside MLflow Run with run_id {} and experiment_id {}".format(runID, experimentID))

In [0]:
def log_rf(experimentID, run_name, params, X_train, X_test, y_train, y_test):
  import os
  import matplotlib.pyplot as plt
  import mlflow.sklearn
  import seaborn as sns
  from sklearn.ensemble import RandomForestRegressor
  from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
  import tempfile

  with mlflow.start_run(experiment_id=experimentID, run_name=run_name) as run:
    # Create model, train it, and create predictions
    rf = RandomForestRegressor(**params)
    rf.fit(X_train, y_train)
    predictions = rf.predict(X_test)

    # Log model
    mlflow.sklearn.log_model(rf, "RandomForestRegressor-model")

    # Log params
    [mlflow.log_param(param, value) for param, value in params.items()]

    # Create metrics
    mse = mean_squared_error(y_test, predictions)
    mae = mean_absolute_error(y_test, predictions)
    r2 = r2_score(y_test, predictions)
    print("  mse: {}".format(mse))
    print("  mae: {}".format(mae))
    print("  R2: {}".format(r2))

    # Log metrics
    mlflow.log_metric("mse", mse)
    mlflow.log_metric("mae", mae)  
    mlflow.log_metric("r2", r2)  
    
    # Create feature importance
    importance = pd.DataFrame(list(zip(merged_rfVars_df.columns, rf.feature_importances_)), 
                                columns=["Feature", "Importance"]
                              ).sort_values("Importance", ascending=False)
    
    # Log importances using a temporary file
    temp = tempfile.NamedTemporaryFile(prefix="feature-importance-", suffix=".csv")
    temp_name = temp.name
    try:
      importance.to_csv(temp_name, index=False)
      mlflow.log_artifact(temp_name, "feature-importance.csv")
    finally:
      temp.close() # Delete the temp file
    
    # Create plot
    fig, ax = plt.subplots()

    sns.residplot(x=predictions, y=y_test, lowess=True)
    plt.xlabel("Predicted values for Postition")
    plt.ylabel("Residual")
    plt.title("Residual Plot")

    # Log residuals using a temporary file
    temp = tempfile.NamedTemporaryFile(prefix="residuals-", suffix=".png")
    temp_name = temp.name
    try:
      fig.savefig(temp_name)
      mlflow.log_artifact(temp_name, "residuals.png")
    finally:
      temp.close() # Delete the temp file
      
    display(fig)
    return run.info.run_uuid

In [0]:
params = {
  "n_estimators": 100,
  "max_depth": 5,
  "random_state": 42
}

log_rf(experimentID, "Second Run", params, X_train, X_test, y_train, y_test)

In [0]:
params_1000_trees = {
  "n_estimators": 1000,
  "max_depth": 10,
  "random_state": 42
}

log_rf(experimentID, "Third Run", params_1000_trees, X_train, X_test, y_train, y_test)

In [0]:
params_500_trees = {
  "n_estimators": 500,
  "max_depth": 10,
  "random_state": 42
}

log_rf(experimentID, "Fourth Run", params_500_trees, X_train, X_test, y_train, y_test)

In [0]:
params_1000_5trees = {
  "n_estimators": 1000,
  "max_depth": 5,
  "random_state": 42
}

log_rf(experimentID, "fifth Run", params_1000_5trees, X_train, X_test, y_train, y_test)

In [0]:
params_1000_15trees = {
  "n_estimators": 1000,
  "max_depth": 15,
  "random_state": 42
}

log_rf(experimentID, "Sixth Run", params_1000_15trees, X_train, X_test, y_train, y_test)

In [0]:
params_10000_trees = {
  "n_estimators": 10000,
  "max_depth": 10,
  "random_state": 42
}

log_rf(experimentID, "Seventh Run", params_10000_trees, X_train, X_test, y_train, y_test)

In [0]:
params_1000_20trees = {
  "n_estimators": 1000,
  "max_depth": 20,
  "random_state": 42
}

log_rf(experimentID, "Eighth Run", params_1000_20trees, X_train, X_test, y_train, y_test)

In [0]:
params_1000_25trees = {
  "n_estimators": 1000,
  "max_depth": 25,
  "random_state": 42
}

log_rf(experimentID, "Ninth Run", params_1000_25trees, X_train, X_test, y_train, y_test)

In [0]:
params_2000_20trees = {
  "n_estimators": 2000,
  "max_depth": 20,
  "random_state": 42
}

log_rf(experimentID, "Tenth Run", params_2000_20trees, X_train, X_test, y_train, y_test)

The best model seems to be the one with 1000 estimators, with max_depth = 25. This is the "Ninth Run", and it also has the least mean square error (mse) and mean absolute error (mae).  