In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.feature import VectorAssembler
import mlflow
import mlflow.spark
import os


In [0]:
# 加载 CSV 数据

# df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True)
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True)
# df_driver = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True)
# df_pit_stops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True)

In [0]:
df_results.show(5)

In [0]:
# Remove rows with null values
df_results = df_results.dropna(subset=[
    "grid", "points", "laps", "milliseconds", "fastestLap", "rank", "fastestLapSpeed", "positionOrder"
])

df_results = df_results.select(
    col("grid").cast("double"),
    col("points").cast("double"),
    col("laps").cast("double"),
    col("milliseconds").cast("double"),
    col("fastestLap").cast("double"),
    col("rank").cast("double"),
    col("fastestLapSpeed").cast("double"),
    col("positionOrder").cast("double")
)

In [0]:
# 选择特征和标签列
feature_cols = ['grid', 'points', 'laps', 'milliseconds', 'fastestLap', 'rank', 'fastestLapSpeed']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")
data = assembler.transform(df_results).select("features", "positionOrder")

# 拆分训练集和测试集
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

display(train_data)

In [0]:
%python
import mlflow
import mlflow.spark
import os

# 创建本地目录存放 artifact
os.makedirs("./artifacts", exist_ok=True)

# 设置 MLflow 实验名
experiment_path = "/Users/rh3243@columbia.edu/f1_spark_exp"  # 替换成你自己的路径

# 创建实验目录
dbutils.fs.mkdirs(experiment_path)

# 设置实验
mlflow.set_experiment(experiment_path)

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pandas as pd
import matplotlib.pyplot as plt

for i in range(10):
    numTrees = int(10 + i * 5)
    maxDepth = int(3 + i)

    with mlflow.start_run():
        rf = RandomForestClassifier(
            featuresCol="features",
            labelCol="positionOrder",
            numTrees=numTrees,
            maxDepth=maxDepth,
            seed=42
        )

        model = rf.fit(train_data)
        predictions = model.transform(test_data)

        # 评估指标
        evaluator = MulticlassClassificationEvaluator(labelCol="positionOrder", predictionCol="prediction", metricName="accuracy")
        acc = evaluator.evaluate(predictions)

        # 记录超参数和指标
        mlflow.log_param("numTrees", numTrees)
        mlflow.log_param("maxDepth", maxDepth)
        mlflow.log_metric("accuracy", acc)

        # 记录模型
        mlflow.spark.log_model(model, "model")

        # 保存预测结果为 artifact（csv）
        pred_pd = predictions.select("positionOrder", "prediction").toPandas()
        pred_path = f"./artifacts/pred_{i}.csv"
        pred_pd.to_csv(pred_path, index=False)
        mlflow.log_artifact(pred_path)

        # 混淆矩阵图
        cm = pd.crosstab(pred_pd["positionOrder"], pred_pd["prediction"], rownames=["Actual"], colnames=["Predicted"])
        cm_fig = plt.figure()
        plt.title("Confusion Matrix")
        plt.imshow(cm, interpolation='nearest')
        plt.colorbar()
        plot_path = f"./artifacts/cm_{i}.png"
        plt.savefig(plot_path)
        mlflow.log_artifact(plot_path)

4. [20 pts] Select your best model run and explain why

best model hyperparameters has maxDepth 12, numTrees 55, it has highest accuracy.

5. [20 pts] As part of your GitHub classroom submission include screenshots

in dir `screenshots`