In [0]:
import mlflow
import mlflow.spark
from pyspark.sql import functions as F

print("MLflow setup successful!")

In [0]:
# Reload bronze events table
events = spark.table("workspace.ecommerce.events_delta")

# Reload silver feature table
features_df = spark.table("workspace.ecommerce.silver_user_features")

# Recreate binary label (target)
from pyspark.sql import functions as F

label_df = events.groupBy("user_id").agg(
    F.max(
        F.when(F.col("event_type") == "purchase", 1).otherwise(0)
    ).alias("purchased")
)
# Join features + label (same as Day 5/6)
training_data = features_df.join(label_df, "user_id")

print("Training dataset recreated successfully!")
training_data.show(5)

In [0]:
from pyspark.ml.feature import VectorAssembler
# Recreate train/test split
train_df, test_df = training_data.randomSplit([0.8, 0.2], seed=42)
# Create feature assembler (same as Day 6)
assembler = VectorAssembler(
    inputCols=["total_events", "total_purchases", "total_spent", "avg_spent"],
    outputCol="features"
)
# Create ML-ready datasets
train_ml = assembler.transform(train_df) \
.select("features", F.col("purchased").alias("label"))

test_ml = assembler.transform(test_df) \
.select("features", F.col("purchased").alias("label"))

print("Train/Test ML datasets prepared successfully!")
print("Train ML count:", train_ml.count())
print("Test ML count:", test_ml.count())


In [0]:
from pyspark.ml.classification import RandomForestClassifier

rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="label",
    numTrees=50,
    maxDepth=10,
    seed=42
)
rf_model = rf.fit(train_ml)
print("RandomForest model trained for MLflow logging!")

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Generate predictions on test data
predictions = rf_model.transform(test_ml)

# Create evaluator
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

# Calculate AUC
auc = evaluator.evaluate(predictions)

print("AUC calculated for MLflow logging:", auc)

In [0]:
test_ml.select("label").groupBy("label").count().show()

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Recreate predictions safely
predictions = rf_model.transform(test_ml)

# Check prediction sample
predictions.select("label", "prediction", "probability").show(5, truncate=False)

# Correct evaluator
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    rawPredictionCol="rawPrediction",
    metricName="areaUnderROC"
)

auc = evaluator.evaluate(predictions)

print("Correct AUC for MLflow logging:", auc)

In [0]:
import os

# Set MLflow temp directory to your UC Volume path
os.environ["MLFLOW_DFS_TMP"] = "/Volumes/workspace/ecommerce/ecommerce_data/mlflow_tmp"

print("MLflow UC volume temp path configured!")

In [0]:
with mlflow.start_run(run_name="Day7_RandomForest_Model"):

    # Log parameters
    mlflow.log_param("model_type", "RandomForest")
    mlflow.log_param("num_trees", 50)
    mlflow.log_param("max_depth", 10)
    mlflow.log_param("features", ["total_events", "purchases", "total_spent", "avg_price"])

    # Log metric (AUC)
    mlflow.log_metric("AUC", auc)

    # Log Spark ML model
    mlflow.spark.log_model(
        spark_model=rf_model,
        artifact_path="random_forest_model"
    )

print ("MLflow run logged successfully!")