In [0]:

from pyspark.sql import SparkSession
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.feature import VectorAssembler
import mlflow
import mlflow.spark
import os

In [0]:
# 加载 CSV 数据

# df_races = spark.read.csv('s3://columbia-gr5069-main/raw/races.csv', header=True)
df_results = spark.read.csv('s3://columbia-gr5069-main/raw/results.csv', header=True)
# df_driver = spark.read.csv('s3://columbia-gr5069-main/raw/drivers.csv', header=True)
# df_pit_stops = spark.read.csv('s3://columbia-gr5069-main/raw/pit_stops.csv', header=True)

In [0]:
df_results.show(5)

In [0]:
spark.sql("""
CREATE TABLE IF NOT EXISTS model1_predictions (
    raceId INT,
    driverId INT,
    prediction DOUBLE,
    probability DOUBLE
)
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS model2_predictions (
    raceId INT,
    driverId INT,
    prediction DOUBLE,
    probability DOUBLE
)
""")


In [0]:
# 选择几个有用的特征（你可以根据具体字段做更多清洗）
df = df_results.select(
    col('raceId').cast(IntegerType()),
    col('driverId').cast(IntegerType()),
    col('constructorId').cast(IntegerType()),
    col('grid').cast(IntegerType()),
    col('positionOrder').cast(IntegerType()).alias('label')  # 将名次作为标签
).na.drop()

# 特征组装
assembler = VectorAssembler(inputCols=['constructorId', 'grid'], outputCol='features')
df_features = assembler.transform(df)

In [0]:

# Remove rows with null values
df_results = df_results.dropna(subset=[
    "grid", "points", "laps", "milliseconds", "fastestLap", "rank", "fastestLapSpeed", "positionOrder"
])

df_results = df_results.select(
    col("grid").cast("double"),
    col("points").cast("double"),
    col("laps").cast("double"),
    col("milliseconds").cast("double"),
    col("fastestLap").cast("double"),
    col("rank").cast("double"),
    col("fastestLapSpeed").cast("double"),
    col("positionOrder").cast("double")
)
     

In [0]:

# 选择特征和标签列
feature_cols = ['grid', 'points', 'laps', 'milliseconds', 'fastestLap', 'rank', 'fastestLapSpeed']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features", handleInvalid="skip")
data = assembler.transform(df_results).select("features", "positionOrder")

# 拆分训练集和测试集
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

display(train_data)

In [0]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import mlflow
import mlflow.spark

with mlflow.start_run(run_name="rf_model_v1"):
    rf = RandomForestClassifier(featuresCol="features", labelCol="positionOrder", numTrees=50, maxDepth=10)
    model = rf.fit(train_data)

    predictions = model.transform(test_data)

    evaluator = MulticlassClassificationEvaluator(labelCol="positionOrder", predictionCol="prediction")

    # 记录参数
    mlflow.log_param("numTrees", 50)
    mlflow.log_param("maxDepth", 10)

    # 记录四个评估指标
    mlflow.log_metric("accuracy", evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
    mlflow.log_metric("f1", evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))
    mlflow.log_metric("precision", evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"}))
    mlflow.log_metric("recall", evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"}))

    # 记录模型
    mlflow.spark.log_model(model, "random_forest_model")

    # 记录两个 artifacts（示例）
    with open("/tmp/summary.txt", "w") as f:
        f.write("Random Forest with engineered race features")
    mlflow.log_artifact("/tmp/summary.txt")

    # 保存 feature 列表
    with open("/tmp/features.txt", "w") as f:
        f.write(",".join(feature_cols))
    mlflow.log_artifact("/tmp/features.txt")

In [0]:
predictions.select("prediction", "positionOrder", "features") \
    .write.mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("model1_predictions")

In [0]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

with mlflow.start_run(run_name="logreg_model_v2"):
    lr = LogisticRegression(featuresCol="features", labelCol="positionOrder", maxIter=50, regParam=0.1)
    model = lr.fit(train_data)

    predictions = model.transform(test_data)

    evaluator = MulticlassClassificationEvaluator(labelCol="positionOrder", predictionCol="prediction")

    # 记录参数
    mlflow.log_param("maxIter", 50)
    mlflow.log_param("regParam", 0.1)

    # 记录评估指标
    mlflow.log_metric("accuracy", evaluator.evaluate(predictions, {evaluator.metricName: "accuracy"}))
    mlflow.log_metric("f1", evaluator.evaluate(predictions, {evaluator.metricName: "f1"}))
    mlflow.log_metric("precision", evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"}))
    mlflow.log_metric("recall", evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"}))

    # 记录模型
    mlflow.spark.log_model(model, "logreg_model")

    # 记录 artifacts
    with open("/tmp/summary_logreg.txt", "w") as f:
        f.write("Multiclass Logistic Regression using positionOrder")
    mlflow.log_artifact("/tmp/summary_logreg.txt")

In [0]:
predictions.select("prediction", "positionOrder", "features") \
    .write.mode("overwrite") \
    .option("mergeSchema", "true") \
    .saveAsTable("model2_predictions")