#  Day 7 â€“ ML Model Training, Evaluation & MLflow Tracking

###  Created structured training data manually using Spark DataFrame.

In [0]:
rows = [
    (10, 1, 500.0, 500.0, 1),
    (5, 0, 0.0, 0.0, 0),
    (20, 3, 1500.0, 500.0, 1),
    (8, 0, 0.0, 0.0, 0),
    (15, 2, 800.0, 400.0, 1),
    (7, 0, 0.0, 0.0, 0),
    (25, 5, 3000.0, 600.0, 1),
    (4, 0, 0.0, 0.0, 0),
    (12, 1, 400.0, 400.0, 1),
    (6, 0, 0.0, 0.0, 0)
]

columns = ["total_events","purchases","total_spent","avg_price","purchased"]

training_data = spark.createDataFrame(rows, columns)

training_data.show()

+------------+---------+-----------+---------+---------+
|total_events|purchases|total_spent|avg_price|purchased|
+------------+---------+-----------+---------+---------+
|          10|        1|      500.0|    500.0|        1|
|           5|        0|        0.0|      0.0|        0|
|          20|        3|     1500.0|    500.0|        1|
|           8|        0|        0.0|      0.0|        0|
|          15|        2|      800.0|    400.0|        1|
|           7|        0|        0.0|      0.0|        0|
|          25|        5|     3000.0|    600.0|        1|
|           4|        0|        0.0|      0.0|        0|
|          12|        1|      400.0|    400.0|        1|
|           6|        0|        0.0|      0.0|        0|
+------------+---------+-----------+---------+---------+



### Converted multiple feature columns into a single features vector.

In [0]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["total_events","purchases","total_spent","avg_price"],
    outputCol="features"
)

final_data = assembler.transform(training_data)

### Split dataset into training and testing sets.

In [0]:
train_data, test_data = final_data.randomSplit([0.8, 0.2], seed=42)

## 1. Trained classification model using RandomForest.

In [0]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    labelCol="purchased",
    featuresCol="features"
)

model = rf.fit(train_data)

### Generated predictions on test dataset.

In [0]:
predictions = model.transform(test_data)
predictions.select("purchased","prediction","probability").show()

+---------+----------+-----------+
|purchased|prediction|probability|
+---------+----------+-----------+
|        0|       0.0|  [1.0,0.0]|
|        1|       1.0|  [0.0,1.0]|
|        1|       1.0|  [0.0,1.0]|
+---------+----------+-----------+



## 2. Evaluate Model Performance (AUC): Calculated Area Under ROC Curve.

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(
    labelCol="purchased",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc_value = evaluator.evaluate(predictions)

print("AUC:", auc_value)

AUC: 1.0


### Switched catalog from default to workspace.

In [0]:
spark.sql("SHOW CATALOGS").show()

+---------+
|  catalog|
+---------+
|  samples|
|   system|
|workspace|
+---------+



In [0]:
spark.sql("USE CATALOG workspace")

DataFrame[]

### Log MLflow Parameters & Metrics

In [0]:
import mlflow

mlflow.end_run()

with mlflow.start_run():
    
    mlflow.log_param("model_type", "RandomForest")
    mlflow.log_metric("AUC", auc_value)

print("Run Logged Successfully")

Run Logged Successfully


In [0]:
spark.sql("SHOW VOLUMES IN workspace.default").show()

+--------+-----------+
|database|volume_name|
+--------+-----------+
| default|day3_volume|
| default|day4_volume|
| default|  my_volume|
+--------+-----------+



## 3. MLflow Experiment & Model Logging

In [0]:
import mlflow
import mlflow.spark

mlflow.end_run()

with mlflow.start_run():
    
    mlflow.log_param("model_type", "RandomForest")
    mlflow.log_metric("AUC", auc_value)
    
    mlflow.spark.log_model(
        model,
        artifact_path="rf_model",
        dfs_tmpdir="/Volumes/workspace/default/my_volume/tmp"
    )

print("Model Logged Successfully")




Model Logged Successfully


In [0]:
import mlflow

mlflow.end_run()

with mlflow.start_run():
    mlflow.log_param("model_type", "RandomForest")
    mlflow.log_metric("AUC", auc_value)

print("Run Logged Successfully")

Run Logged Successfully


In [0]:
mlflow.log_metric("AUC", auc_value)