# ü§ñ Phase 4: Churn Prediction Model (PySpark ML + MLflow)
**Project:** "Olist-Next" Retention Engine
**Target:** `is_churn_risk` (1 = At Risk, 0 = Retained)

## üß† Modeling Strategy
We are training a **Random Forest Classifier** to predict churn probability.
* **Leakage Prevention:** We explicitly **DROP** `recency_days` and `last_purchase_date`. These columns were used to *create* the target label, so using them as features would be cheating (Data Leakage).
* **Feature Engineering:** We use a `Pipeline` to index and encode the categorical column (`favorite_category`) and assemble numerical features.
* **Governance:** The training is tracked by **MLflow**, and the final model is registered to **Unity Catalog**.

### Setup & Data Loading 
Load the Gold data and enforce the strict leakage prevention constraints.

In [0]:
from pyspark.sql.functions import col
import mlflow

# Set Context
spark.sql("USE CATALOG olist_hackathon")
spark.sql("CREATE SCHEMA IF NOT EXISTS ml_models")

# 1. Load Gold Table
df_gold = spark.table("gold.customer_360")

# 2. Select Features & Target (STRICT LEAKAGE PREVENTION)
# We deliberately exclude 'recency_days' and 'last_purchase_date'
feature_cols = ["frequency", "monetary_value", "avg_review_score", "favorite_category"]
target_col = "is_churn_risk"

df_ml = df_gold.select(feature_cols + [target_col])

# 3. Split Data (80% Train, 20% Test)
# Seed 42 ensures reproducibility
train_df, test_df = df_ml.randomSplit([0.8, 0.2], seed=42)

print(f"‚úÖ Data Loaded.")
print(f"   Training Rows: {train_df.count()}")
print(f"   Testing Rows:  {test_df.count()}")
print(f"   Leakage columns dropped? YES.")

In [0]:
# Identify categorical columns (assuming string type)
categorical_columns = [
    col_name for col_name in train_df.columns
    if train_df.schema[col_name].dataType.simpleString() == "string"
]

# Exclude columns with only one unique value
filtered_categorical_columns = [
    col for col in categorical_columns
    if train_df.select(col).distinct().count() > 1
]

print("Filtered categorical columns for encoding:", filtered_categorical_columns)


### The ML Pipeline Construction 
Define the stages: Indexer -> Encoder -> Assembler -> Classifier.

In [0]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# Stage 1: Convert filtered categorical columns to indices
indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep") for col in filtered_categorical_columns]

# Stage 2: One-Hot Encode the indices
encoders = [OneHotEncoder(inputCols=[f"{col}_index"], outputCols=[f"{col}_vec"]) for col in filtered_categorical_columns]

# Stage 3: Assemble all features into a single 'features' vector
# Numerical features + all categorical vectors
assembler_inputs = [col for col in ["frequency", "monetary_value", "avg_review_score"]]
assembler_inputs += [f"{col}_vec" for col in filtered_categorical_columns]
assembler = VectorAssembler(
    inputCols=assembler_inputs,
    outputCol="features"
)

# Stage 4: The Random Forest Classifier
rf = RandomForestClassifier(
    labelCol="is_churn_risk", 
    featuresCol="features",
    numTrees=50,
    maxDepth=10,
    seed=42
)

# Create the Pipeline
pipeline = Pipeline(stages=indexers + encoders + [assembler, rf])

print("‚úÖ ML Pipeline constructed successfully.")


### Training & MLflow Tracking (Python)
Train the model, evaluate it, and log everything to MLflow in one atomic block.

In [0]:
%sql
-- Create the temporary schema required by MLflow for staging
CREATE SCHEMA IF NOT EXISTS olist_hackathon.tmp;

In [0]:
%sql
-- Create the volume required for MLflow artifact logging
CREATE VOLUME IF NOT EXISTS olist_hackathon.tmp.mlflow;

In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from mlflow.models import infer_signature

# Define Experiment Path (Optional: organizes runs in the UI)
experiment_name = "/Users/your_email@databricks.com/olist_churn_experiment"
# mlflow.set_experiment(experiment_name) # Uncomment if you want a specific path

print("‚è≥ Starting Training Run...")

with mlflow.start_run(run_name="Olist_RF_PySpark") as run:
    
    # 1. Train the Model (Fit the Pipeline)
    model = pipeline.fit(train_df)
    
    # 2. Make Predictions on Test Data
    predictions = model.transform(test_df)
    
    # 3. Evaluate Metrics
    # AUC (Area Under ROC) - Standard for Binary Classification
    binary_evaluator = BinaryClassificationEvaluator(labelCol="is_churn_risk", metricName="areaUnderROC")
    auc = binary_evaluator.evaluate(predictions)
    
    # F1 Score - Balances Precision and Recall (Important for Imbalanced Data)
    multi_evaluator = MulticlassClassificationEvaluator(labelCol="is_churn_risk", metricName="f1")
    f1_score = multi_evaluator.evaluate(predictions)
    
    print(f"üìä Evaluation Metrics:")
    print(f"   AUC: {auc:.4f}")
    print(f"   F1 Score: {f1_score:.4f}")
    
    # 4. Log to MLflow
    mlflow.log_param("num_trees", 50)
    mlflow.log_param("max_depth", 10)
    mlflow.log_metric("auc", auc)
    mlflow.log_metric("f1_score", f1_score)
    
    # 5. Infer model signature
    signature = infer_signature(train_df.toPandas(), predictions.toPandas())
    
    # 6. Log the Spark Model
    # This saves the entire pipeline (feature engineering + model)
    mlflow.spark.log_model(
        spark_model=model,
        artifact_path="model",
        registered_model_name="olist_hackathon.ml_models.churn_predictor", # Registers automatically to UC
        dfs_tmpdir="/Volumes/olist_hackathon/tmp/mlflow/", # <-- UC volume path required for serverless clusters
        signature=signature
    )
    
    print(f"‚úÖ Run Complete. Model logged and registered.")


### Validation (Python)
Load the model back from the registry to prove it works.

In [0]:
# Verify Model Registration
model_name = "olist_hackathon.ml_models.churn_predictor"
print(f"üîç Verifying model: {model_name}...")

# Load the registered model WITH the required volume path
loaded_model = mlflow.spark.load_model(
    f"models:/{model_name}/1",
    dfs_tmpdir="/Volumes/olist_hackathon/tmp/mlflow/" # <-- The Fix
)

# Test on a small sample
sample_pred = loaded_model.transform(test_df.limit(5))
display(sample_pred.select("favorite_category", "frequency", "prediction", "probability"))