In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import mlflow
import mlflow.spark
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

In [0]:
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True, inferSchema=True)
df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True, inferSchema=True)
df_drivers = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True, inferSchema=True)


In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS yq2396_db")


In [0]:
from pyspark.sql.functions import col, when

df_results = df_results.withColumn(
    "label", when(col("positionOrder") <= 3, 1).otherwise(0)
)

In [0]:
df_joined = df_results.join(df_races, on="raceId", how="left") \
                      .join(df_drivers, on="driverId", how="left")

In [0]:
feature_cols = ["grid", "laps", "statusId"]  

vec = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_model = vec.transform(df_joined).select("features", "label")

In [0]:
train_data, test_data = df_model.randomSplit([0.8, 0.2], seed=42)


In [0]:
#Logistic regression 

# Train model
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=100, regParam=0.0)

with mlflow.start_run(run_name="LogReg_Top3"):

    # Fit model
    model_lr = lr.fit(train_data)
    preds_lr = model_lr.transform(test_data)

    # Log model
    mlflow.spark.log_model(model_lr, "logreg_model")

    # Log hyperparameters
    mlflow.log_param("maxIter", 100)
    mlflow.log_param("regParam", 0.0)

    # Log metrics
    evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
    auc = evaluator.evaluate(preds_lr)
    mlflow.log_metric("AUC", auc)

    multiclass_eval = MulticlassClassificationEvaluator(labelCol="label")
    mlflow.log_metric("accuracy", multiclass_eval.evaluate(preds_lr, {multiclass_eval.metricName: "accuracy"}))
    mlflow.log_metric("f1", multiclass_eval.evaluate(preds_lr, {multiclass_eval.metricName: "f1"}))
    mlflow.log_metric("precision", multiclass_eval.evaluate(preds_lr, {multiclass_eval.metricName: "weightedPrecision"}))

    # Log artifact: confusion matrix
    y_true = preds_lr.select("label").toPandas()
    y_pred = preds_lr.select("prediction").toPandas()

    cm = confusion_matrix(y_true, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm)
    disp.plot()
    plt.title("Confusion Matrix - Logistic Regression")
    plt.savefig("/tmp/confusion_lr.png")
    mlflow.log_artifact("/tmp/confusion_lr.png")



In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

# Train Random Forest model
rf = RandomForestClassifier(featuresCol="features", labelCol="label", numTrees=50, maxDepth=5)

with mlflow.start_run(run_name="RandomForest_Top3"):

    # Fit model
    model_rf = rf.fit(train_data)
    preds_rf = model_rf.transform(test_data)

    # Log model
    mlflow.spark.log_model(model_rf, "rf_model")

    # Log hyperparameters
    mlflow.log_param("numTrees", 50)
    mlflow.log_param("maxDepth", 5)

    # Log metrics
    evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
    auc = evaluator.evaluate(preds_rf)
    mlflow.log_metric("AUC", auc)

    multiclass_eval = MulticlassClassificationEvaluator(labelCol="label")
    mlflow.log_metric("accuracy", multiclass_eval.evaluate(preds_rf, {multiclass_eval.metricName: "accuracy"}))
    mlflow.log_metric("f1", multiclass_eval.evaluate(preds_rf, {multiclass_eval.metricName: "f1"}))
    mlflow.log_metric("precision", multiclass_eval.evaluate(preds_rf, {multiclass_eval.metricName: "weightedPrecision"}))

    # Confusion matrix artifact
    y_true_rf = preds_rf.select("label").toPandas()
    y_pred_rf = preds_rf.select("prediction").toPandas()

    cm_rf = confusion_matrix(y_true_rf, y_pred_rf)
    disp_rf = ConfusionMatrixDisplay(confusion_matrix=cm_rf)
    disp_rf.plot()
    plt.title("Confusion Matrix - Random Forest")
    plt.savefig("/tmp/confusion_rf.png")
    mlflow.log_artifact("/tmp/confusion_rf.png")


In [0]:
preds_lr.select("prediction", "label") \
    .write.mode("overwrite") \
    .saveAsTable("yq2396_db.model1_predictions")

preds_rf.select("prediction", "label") \
    .write.mode("overwrite") \
    .saveAsTable("yq2396_db.model2_predictions")

In [0]:
spark.sql("SHOW DATABASES").show()

In [0]:
spark.sql("SELECT * FROM yq2396_db.model1_predictions LIMIT 5").show()
spark.sql("SELECT * FROM yq2396_db.model2_predictions LIMIT 5").show()

In [0]:
df_model.groupBy("label").count().show()
