In [0]:
import pandas as pd
import mlflow
import mlflow.sklearn

In [0]:
base_path = "s3://columbia-gr5069-main/raw/"
spark_results = spark.read.option("header", True).csv(base_path + "results.csv")
spark_drivers = spark.read.option("header", True).csv(base_path + "drivers.csv")
spark_races = spark.read.option("header", True).csv(base_path + "races.csv")
spark_status_map = spark.read.option("header", True).csv(base_path + "status.csv")
results = spark_results.toPandas()
drivers = spark_drivers.toPandas()
races = spark_races.toPandas()
status_map = spark_status_map.toPandas()

1. [20 pts] Create two (2) new tables in your own fatabse where you'll store the predictions from each model for this exercise.

In [0]:
spark.sql("""
CREATE DATABASE IF NOT EXISTS f1_db
""")

spark.sql("""
USE f1_db
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS model1_predictions (
    id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
    actual_value INT,
    predicted_value INT
) USING DELTA
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS model2_predictions (
    id BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
    actual_value INT,
    predicted_value INT
) USING DELTA
""")

spark.sql("SHOW TABLES IN f1_db").show()


+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
|   f1_db|model1_predictions|      false|
|   f1_db|model2_predictions|      false|
+--------+------------------+-----------+



2. [30 pts] Build two (2) predictive models using MLflow, logging hyperparameters, the model itself, four metrics, and two artifcats. Submit submit your MLflow experiments as part of your assignments

In [0]:
data = results[['driverId', 'raceId', 'grid', 'positionOrder', 'points']]
data = data.dropna()

data['scored'] = data['points'].apply(lambda x: 1 if float(x) > 0 else 0)

X = data[['driverId', 'raceId', 'grid', 'positionOrder']]
y = data['scored']


In [0]:

from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [0]:
from sklearn.linear_model import LogisticRegression

with mlflow.start_run(run_name="LogisticRegression_scored"):
    model1 = LogisticRegression(max_iter=500)
    model1.fit(X_train, y_train)

    y_pred1 = model1.predict(X_test)

    # log hyperparameters
    mlflow.log_param("model", "LogisticRegression")
    mlflow.log_param("max_iter", 500)

    # log metrics
    mlflow.log_metric("accuracy", accuracy_score(y_test, y_pred1))
    mlflow.log_metric("precision", precision_score(y_test, y_pred1))
    mlflow.log_metric("recall", recall_score(y_test, y_pred1))
    mlflow.log_metric("f1_score", f1_score(y_test, y_pred1))

    # log model
    mlflow.sklearn.log_model(model1, "model1_logistic_regression")

    # save prediction results as artifact
    pred1_df = pd.DataFrame({'actual_value': y_test.values, 'predicted_value': y_pred1})
    pred1_df.to_csv('/tmp/model1_logistic_predictions.csv', index=False)
    mlflow.log_artifact('/tmp/model1_logistic_predictions.csv')




In [0]:
%pip install xgboost

Collecting xgboost
  Obtaining dependency information for xgboost from https://files.pythonhosted.org/packages/63/f1/653afe1a1b7e1d03f26fd4bd30f3eebcfac2d8982e1a85b6be3355dcae25/xgboost-3.0.0-py3-none-manylinux_2_28_x86_64.whl.metadata
  Downloading xgboost-3.0.0-py3-none-manylinux_2_28_x86_64.whl.metadata (2.1 kB)
Collecting nvidia-nccl-cu12 (from xgboost)
  Obtaining dependency information for nvidia-nccl-cu12 from https://files.pythonhosted.org/packages/9f/30/aa24e8e02cd860d80a31ee32cc3a0db9ffb93efb2556705db3ce6c924926/nvidia_nccl_cu12-2.26.2.post1-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata
  Downloading nvidia_nccl_cu12-2.26.2.post1-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (2.0 kB)
Downloading xgboost-3.0.0-py3-none-manylinux_2_28_x86_64.whl (253.9 MB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/253.9 MB[0m [31m?[0m eta [36m-:--:--[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.8/253.9 MB

In [0]:
import xgboost as xgb

X_train = X_train.apply(pd.to_numeric)
X_test = X_test.apply(pd.to_numeric)

with mlflow.start_run(run_name="XGBoost_scored"):
    model2 = xgb.XGBClassifier(max_depth=3, n_estimators=100, learning_rate=0.1)
    model2.fit(X_train, y_train)

    y_pred2 = model2.predict(X_test)

    # log hyperparameters
    mlflow.log_param("max_depth", 3)
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("learning_rate", 0.1)

    # log metrics
    mlflow.log_metric("accuracy", accuracy_score(y_test, y_pred2))
    mlflow.log_metric("precision", precision_score(y_test, y_pred2))
    mlflow.log_metric("recall", recall_score(y_test, y_pred2))
    mlflow.log_metric("f1_score", f1_score(y_test, y_pred2))

    # log model
    mlflow.sklearn.log_model(model2, "model2_xgboost")

    # save prediction results as artifact
    pred2_df = pd.DataFrame({'actual_value': y_test.values, 'predicted_value': y_pred2})
    pred2_df.to_csv('/tmp/model2_xgboost_predictions.csv', index=False)
    mlflow.log_artifact('/tmp/model2_xgboost_predictions.csv')




3. [30 pts] For each model, store its predictions in the corresponding table you created in your own database. Ensure you are using your own database to store your predictions.

In [0]:
# Model 1 predictions
pred1_df = pd.DataFrame({
    'actual_value': y_test.values,
    'predicted_value': y_pred1
})

# Model 2 predictions
pred2_df = pd.DataFrame({
    'actual_value': y_test.values,
    'predicted_value': y_pred2
})

In [0]:
spark_pred1 = spark.createDataFrame(pred1_df)
spark_pred2 = spark.createDataFrame(pred2_df)

In [0]:
spark_pred1.createOrReplaceTempView("temp_pred1")
spark_pred2.createOrReplaceTempView("temp_pred2")

spark.sql("""
INSERT INTO f1_db.model1_predictions (actual_value, predicted_value)
SELECT actual_value, predicted_value
FROM temp_pred1
""")

spark.sql("""
INSERT INTO f1_db.model2_predictions (actual_value, predicted_value)
SELECT actual_value, predicted_value
FROM temp_pred2
""")


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
spark.sql("SHOW TABLES IN f1_db").show()

+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
|   f1_db|model1_predictions|      false|
|   f1_db|model2_predictions|      false|
|        |        temp_pred1|       true|
|        |        temp_pred2|       true|
+--------+------------------+-----------+



In [0]:
spark.sql("SELECT * FROM f1_db.model1_predictions LIMIT 10").show()
spark.sql("SELECT * FROM f1_db.model2_predictions LIMIT 10").show()


+---+------------+---------------+
| id|actual_value|predicted_value|
+---+------------+---------------+
|  1|           1|              0|
|  2|           0|              0|
|  3|           1|              1|
|  4|           1|              1|
|  5|           1|              1|
|  6|           0|              1|
|  7|           0|              0|
|  8|           0|              0|
|  9|           0|              0|
| 10|           1|              1|
+---+------------+---------------+

+---+------------+---------------+
| id|actual_value|predicted_value|
+---+------------+---------------+
|  1|           1|              1|
|  2|           0|              0|
|  3|           1|              1|
|  4|           1|              1|
|  5|           1|              1|
|  6|           0|              0|
|  7|           0|              0|
|  8|           0|              0|
|  9|           0|              0|
| 10|           1|              1|
+---+------------+---------------+

