### Imports

In [0]:
from pyspark.sql import functions as F

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import (
    LogisticRegression,
    DecisionTreeClassifier,
    RandomForestClassifier
)
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator


import mlflow
import mlflow.spark

### Load Source Data

In [0]:
df = spark.table("workspace.ecommerce.ecommerce_events_delta")

### Target Variable Definition

In [0]:
df = df.withColumn(
    "label",
    F.when(F.col("event_type") == "purchase", 1).otherwise(0)
)

### Feature Selection, Train-Test Split & Feature Vector Assembly

In [0]:
feature_cols = ["price", "product_id", "category_id"]

train_df, test_df = (
    df
    .select(feature_cols + ["label"])
    .randomSplit([0.8, 0.2], seed=42)
)


assembler = VectorAssembler(
    inputCols=feature_cols,
    outputCol="features"
)

### Model Definitions

In [0]:
models = {
    "logistic_regression": LogisticRegression(),
    "decision_tree": DecisionTreeClassifier(maxDepth=5),
    "random_forest": RandomForestClassifier(numTrees=50)
}

### Model Evaluation Setup

In [0]:
evaluator = BinaryClassificationEvaluator(
    labelCol="label",
    metricName="areaUnderROC"
)

In [0]:
accuracy_evaluator = MulticlassClassificationEvaluator(
    labelCol="label",
    metricName="accuracy"
)

### MLflow Environment Setup

In [0]:
import os

os.environ["MLFLOW_DFS_TMP"] = "/Volumes/workspace/ecommerce/ecommerce_data/mlflow_tmp"


### Model Training, Evaluation & MLflow Logging

In [0]:
results = []

for name, model in models.items():

    pipeline = Pipeline(stages=[assembler, model])

    with mlflow.start_run(run_name=name, nested=True):

        mlflow.log_param("model_name", name)

        trained_model = pipeline.fit(train_df)
        predictions = trained_model.transform(test_df)

        auc = evaluator.evaluate(predictions)
        accuracy = accuracy_evaluator.evaluate(predictions)

        mlflow.log_metric("AUC", auc)
        mlflow.log_metric("Accuracy", accuracy)

        mlflow.spark.log_model(
            trained_model,
            artifact_path="model",
            dfs_tmpdir="/Volumes/workspace/ecommerce/ecommerce_data/mlflow_tmp"
        )

        results.append((name, auc, accuracy))
        print(f"{name} â†’ AUC = {auc:.4f}, Accuracy = {accuracy:.4f}")


### Model Comparison

In [0]:
 print("\n📊 Model Comparison:")
for name, auc, accuracy in results:
    print(f"{name:<20} | AUC: {auc:.4f} | Accuracy: {accuracy:.4f}")


### Identify Top-Performing Model

In [0]:
best_model = max(results, key=lambda x: x[1])  # based on AUC

print("\n🏆 Best Model Selected:")
print(f"Model Name : {best_model[0]}")
print(f"AUC        : {best_model[1]:.4f}")
print(f"Accuracy   : {best_model[2]:.4f}")
