### Setup and Data Loading

In [0]:
import mlflow
import mlflow.spark
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Use the catalog/schema context established in the first notebook
spark.sql("USE CATALOG ecommerce_capstone")
spark.sql("USE SCHEMA churn_analysis")

# Load the Gold data
data = spark.read.table("gold_user_features")

print("Data loaded from Gold table. Total records:", data.count())

Data loaded from Gold table. Total records: 5881


### Feature Engineering & Validation Split

In [0]:
# 1. Assemble features into a vector (Requirement: Feature selection)
feature_cols = ["Frequency", "Monetary", "Recency"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# 2. Transform the data and select only what the model needs
final_data = assembler.transform(data).select("features", "label")

# 3. Proper train/test split (80% training, 20% testing)
train_df, test_df = final_data.randomSplit([0.8, 0.2], seed=42)

print("Features assembled and data split into train_df and test_df.")

Features assembled and data split into train_df and test_df.


### Creating the Volume

In [0]:
%sql
-- Create a volume to store ML models
CREATE VOLUME IF NOT EXISTS ecommerce_capstone.churn_analysis.model_storage;

We selected Logistic Regression for its interpretability in a business context. We used a proper 80/20 train/test split and tracked our experiments using MLflow to ensure reproducibility and performance monitoring.

### Training with Manual MLflow Tracking

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# 1. Define the Volume path for model storage
# Format: /Volumes/<catalog>/<schema>/<volume_name>/
volume_path = "/Volumes/ecommerce_capstone/churn_analysis/model_storage"

# 2. Initialize the evaluator
evaluator = BinaryClassificationEvaluator(labelCol="label")

# 3. Start the MLflow run
with mlflow.start_run(run_name="Churn_Logistic_Regression"):
    
    # Select and Fit Model
    lr = LogisticRegression(featuresCol="features", labelCol="label")
    lr_model = lr.fit(train_df)
    
    # Make predictions
    predictions = lr_model.transform(test_df)
    
    # Evaluate Performance
    auc_score = evaluator.evaluate(predictions)
    
    # 4. Log Parameters and Metrics
    mlflow.log_param("model_type", "Logistic Regression")
    mlflow.log_metric("auc_roc", auc_score)
    
    # 5. Save the model using the Volume path (Fixes the MlflowException)
    # We pass the volume path to the dfs_tmpdir argument
    mlflow.spark.log_model(
        spark_model=lr_model, 
        artifact_path="churn_model",
        dfs_tmpdir=volume_path
    )
    
    print(f"Model Training Complete! Test AUC: {auc_score}")
    print(f"Model successfully saved to Volume: {volume_path}")



Model Training Complete! Test AUC: 1.0
Model successfully saved to Volume: /Volumes/ecommerce_capstone/churn_analysis/model_storage


In [0]:

last_run_id = mlflow.last_active_run().info.run_id
print(f"COPY THIS RUN ID: {last_run_id}")

COPY THIS RUN ID: 9b9d1c8c8838499c85c0010a05a4a661
