In [0]:
pip install imblearn

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
%restart_python

In [0]:
# =======================================================
# BAN6800 - Data Analytics Capstone
# Final Project: End-to-End ML Model Training, Registration, Deployment & Scoring
# Project Name – Databricks-Enabled Procurement Analytics Optimization (Train + Register in GOLD + Deploy 1 Endpoint + Score Model + Write Back to GOLD Schema)
# Author: Taiwo Babalola
# Learner ID: 162894
# Submitted to: DR Raphael Wanjiku
# =======================================================

# =======================================================
# Source  : abc.abc_dw_gold.abc_dw_gl_pr_po_kpi
# Output Predictiona table  : abc.abc_dw_gold.abc_dw_gl_pr_po_kpi_predictions
# Model Registry: Unity Catalog Model Registry -> abc.abc_dw_gold.abc_dw_gd_model_Procurement_Combined_Model_BAN6800
# Endpoint: One Databricks Model Serving endpoint (abc.abc_dw_gold.ban6800-procurement-sla-combined)
# =======================================================

# -----------------------------
# 0. INSTALLS (only if needed)
# -----------------------------
# %pip install -U mlflow databricks-sdk imbalanced-learn scikit-learn
# dbutils.library.restartPython()

# -----------------------------
# 1. IMPORTS
# -----------------------------
import os
import numpy as np
import pandas as pd

import mlflow
import mlflow.pyfunc
from mlflow.models import infer_signature
from mlflow.tracking import MlflowClient

from pyspark.sql import functions as F

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score, roc_auc_score,
    mean_absolute_error, mean_squared_error, r2_score
)

from imblearn.over_sampling import SMOTE

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
    ServedEntityInput,
    EndpointCoreConfigInput,
    TrafficConfig,
    Route
)

# -----------------------------
# 2. CONFIG (EDIT THESE)
# -----------------------------
CATALOG = "abc"
GOLD_SCHEMA = "abc_dw_gold"

GOLD_TABLE = f"{CATALOG}.{GOLD_SCHEMA}.abc_dw_gl_pr_po_kpi"
PRED_TABLE = f"{CATALOG}.{GOLD_SCHEMA}.abc_dw_gl_pr_po_kpi_predictions"

# ✅ UC Model name (must be 3-level for Unity Catalog)
MODEL_NAME = f"{CATALOG}.{GOLD_SCHEMA}.abc_dw_gd_model_Procurement_sla_Combined_Model_BAN6800"

# ✅ Workspace experiment (can remain workspace path)
EXPERIMENT_PATH = "/Shared/Procurement_SLA_Models"

# Endpoint name (must be unique in workspace)
ENDPOINT_NAME = "ban6800-procurement-sla-combined"

# Serving workload size (common values: "Small", "Medium", "Large")
WORKLOAD_SIZE = "Small"

# Classification threshold
SLA_THRESHOLD = 0.55

# Optional: only train/score meaningful matched records
RECORD_TYPE_FILTER = "PR_PO_MATCHED"

# -----------------------------
# 3. IMPORTANT: USE UC MODEL REGISTRY
# -----------------------------
# ✅ This is what makes the model “saved on GOLD schema” (Unity Catalog)
mlflow.set_registry_uri("databricks-uc")
mlflow.set_experiment(EXPERIMENT_PATH)

# -----------------------------
# 4. LOAD GOLD DATA (SPARK -> PANDAS)
# -----------------------------
df_spark = spark.table(GOLD_TABLE)
if "record_type" in df_spark.columns:
    df_spark = df_spark.filter(F.col("record_type") == RECORD_TYPE_FILTER)

df_pd = df_spark.toPandas()
print("Loaded Gold rows:", df_pd.shape[0], "cols:", df_pd.shape[1])

# =======================================================
# 5. TRAIN SLA CLASSIFIER (SMOTE + RF)
# =======================================================
clf_df = df_pd[df_pd["sla_breach_flag"].isin(["YES", "NO"])].copy()
clf_df["sla_breach_flag_bin"] = np.where(clf_df["sla_breach_flag"] == "YES", 1, 0)

# ---- Leak-free numeric/categorical features (ensure stable order!)
num_clf = [c for c in ["pr_orderqty", "po_orderquantity", "po_netamount"] if c in clf_df.columns]

cat_clf = [c for c in [
    "pr_companycode",
    "po_purchasingorgdesc",
    "po_companycode",
    "po_companycodedesc",
    "pr_plant",
    "po_plant",
    "pr_documenttype",
    "po_purchasingdoctypedesc",
    "po_purchasinggroupdesc",
    "po_countrykey",
    "materialgroupdesc",
    "materialtypedesc",
    "record_type",
] if c in clf_df.columns]

# enforce explicit order
num_clf = list(num_clf)
cat_clf = list(cat_clf)

clf_model_df = clf_df[num_clf + cat_clf + ["sla_breach_flag_bin"]].copy()
clf_model_df[num_clf] = clf_model_df[num_clf].fillna(0)
clf_model_df[cat_clf] = clf_model_df[cat_clf].fillna("Unknown")

X_clf = clf_model_df[num_clf + cat_clf]
y_clf = clf_model_df["sla_breach_flag_bin"]

X_train_clf, X_test_clf, y_train_clf, y_test_clf = train_test_split(
    X_clf, y_clf, test_size=0.2, random_state=42, stratify=y_clf
)

preprocessor_clf = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore"), cat_clf),
        ("num", "passthrough", num_clf),
    ],
    remainder="drop"
)

X_train_enc = preprocessor_clf.fit_transform(X_train_clf)
X_test_enc = preprocessor_clf.transform(X_test_clf)

sm = SMOTE(random_state=42)
X_train_bal, y_train_bal = sm.fit_resample(X_train_enc, y_train_clf)

rf_clf = RandomForestClassifier(
    n_estimators=120,
    max_depth=5,
    min_samples_leaf=10,
    class_weight="balanced",
    max_features="sqrt",
    random_state=42,
    bootstrap=True
)
rf_clf.fit(X_train_bal, y_train_bal)

y_proba = rf_clf.predict_proba(X_test_enc)[:, 1]
y_pred = (y_proba >= SLA_THRESHOLD).astype(int)

sla_metrics = {
    "sla_accuracy": float(accuracy_score(y_test_clf, y_pred)),
    "sla_precision": float(precision_score(y_test_clf, y_pred, zero_division=0)),
    "sla_recall": float(recall_score(y_test_clf, y_pred, zero_division=0)),
    "sla_f1": float(f1_score(y_test_clf, y_pred, zero_division=0)),
    "sla_auc": float(roc_auc_score(y_test_clf, y_proba)),
    "sla_threshold": float(SLA_THRESHOLD),
}
print("SLA metrics:", sla_metrics)

# =======================================================
# 6. TRAIN REGRESSOR (RF)
# =======================================================
if "pr_to_po_ageing" not in df_pd.columns:
    raise ValueError("pr_to_po_ageing column missing in Gold dataset.")

reg_df = df_pd[df_pd["pr_to_po_ageing"].notna()].copy()
target_reg = "pr_to_po_ageing"

num_reg = [c for c in [
    "pr_approval_ageing",
    "po_approval_ageing",
    "pr_orderqty",
    "po_orderquantity",
    "po_netamount",
] if c in reg_df.columns]

cat_reg = [c for c in [
    "pr_companycode",
    "po_purchasingorgdesc",
    "po_companycode",
    "po_companycodedesc",
    "pr_plant",
    "po_plant",
    "pr_documenttype",
    "po_purchasingdoctype",
    "po_purchasingdoctypedesc",
    "po_purchasinggroupdesc",
    "po_countrykey",
    "materialgroupdesc",
    "materialtypedesc",
    "record_type",
] if c in reg_df.columns]

# enforce explicit order
num_reg = list(num_reg)
cat_reg = list(cat_reg)

reg_model_df = reg_df[num_reg + cat_reg + [target_reg]].copy()
reg_model_df[num_reg] = reg_model_df[num_reg].fillna(0)
reg_model_df[cat_reg] = reg_model_df[cat_reg].fillna("Unknown")

X_reg = reg_model_df[num_reg + cat_reg]
y_reg = reg_model_df[target_reg]

X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(
    X_reg, y_reg, test_size=0.2, random_state=42
)

preprocessor_reg = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore"), cat_reg),
        ("num", "passthrough", num_reg),
    ],
    remainder="drop"
)

X_train_reg_enc = preprocessor_reg.fit_transform(X_train_reg)
X_test_reg_enc = preprocessor_reg.transform(X_test_reg)

rf_reg = RandomForestRegressor(n_estimators=200, random_state=42)
rf_reg.fit(X_train_reg_enc, y_train_reg)

y_pred_reg = rf_reg.predict(X_test_reg_enc)

reg_metrics = {
    "reg_mae": float(mean_absolute_error(y_test_reg, y_pred_reg)),
    "reg_rmse": float(np.sqrt(mean_squared_error(y_test_reg, y_pred_reg))),
    "reg_r2": float(r2_score(y_test_reg, y_pred_reg)),
}
print("REG metrics:", reg_metrics)

# =======================================================
# 7. ONE PYFUNC MODEL (COMBINED) FOR ONE ENDPOINT
# =======================================================
class ProcurementCombinedModel(mlflow.pyfunc.PythonModel):
    def __init__(self, pre_clf, clf, threshold, num_clf, cat_clf, pre_reg, reg, num_reg, cat_reg):
        self.pre_clf = pre_clf
        self.clf = clf
        self.threshold = float(threshold)
        self.num_clf = list(num_clf)
        self.cat_clf = list(cat_clf)
        self.pre_reg = pre_reg
        self.reg = reg
        self.num_reg = list(num_reg)
        self.cat_reg = list(cat_reg)

        # store strict column orders (prevents the “feature names should match/order” error)
        self._clf_cols = self.num_clf + self.cat_clf
        self._reg_cols = self.num_reg + self.cat_reg

    def predict(self, context, model_input: pd.DataFrame) -> pd.DataFrame:
        df_in = model_input.copy()

        # --- SLA inputs (strict order)
        Xc = df_in.reindex(columns=self._clf_cols, fill_value=np.nan).copy()
        if self.num_clf:
            Xc[self.num_clf] = Xc[self.num_clf].fillna(0)
        if self.cat_clf:
            Xc[self.cat_clf] = Xc[self.cat_clf].fillna("Unknown")
        Xc_enc = self.pre_clf.transform(Xc)
        proba = self.clf.predict_proba(Xc_enc)[:, 1]
        pred_bin = (proba >= self.threshold).astype(int)
        pred_lbl = np.where(pred_bin == 1, "YES", "NO")

        # --- REG inputs (strict order)
        Xr = df_in.reindex(columns=self._reg_cols, fill_value=np.nan).copy()
        if self.num_reg:
            Xr[self.num_reg] = Xr[self.num_reg].fillna(0)
        if self.cat_reg:
            Xr[self.cat_reg] = Xr[self.cat_reg].fillna("Unknown")
        Xr_enc = self.pre_reg.transform(Xr)
        pred_days = self.reg.predict(Xr_enc)

        return pd.DataFrame({
            "pred_sla_breach_probability": proba,
            "pred_sla_breach_bin": pred_bin,
            "pred_sla_breach_label": pred_lbl,
            "pred_pr_to_po_ageing": pred_days
        })

pyfunc_model = ProcurementCombinedModel(
    pre_clf=preprocessor_clf,
    clf=rf_clf,
    threshold=SLA_THRESHOLD,
    num_clf=num_clf,
    cat_clf=cat_clf,
    pre_reg=preprocessor_reg,
    reg=rf_reg,
    num_reg=num_reg,
    cat_reg=cat_reg
)

# =======================================================
# 8. LOG + REGISTER MODEL INTO UC GOLD SCHEMA
# =======================================================
mlflow.end_run()

# Build a valid input example that contains ALL required columns (union)
all_cols_union = []
for c in (num_clf + cat_clf + num_reg + cat_reg):
    if c not in all_cols_union:
        all_cols_union.append(c)

input_example = df_pd.reindex(columns=all_cols_union).head(5).copy()
sample_out = pyfunc_model.predict(None, input_example)
signature = infer_signature(input_example, sample_out)

client = MlflowClient()

with mlflow.start_run(run_name="BAN6800_UC_Combined_Model") as run:
    run_id = run.info.run_id

    mlflow.log_metrics({**sla_metrics, **reg_metrics})
    mlflow.log_params({
        "gold_table": GOLD_TABLE,
        "record_type_filter": RECORD_TYPE_FILTER,
        "sla_threshold": SLA_THRESHOLD,
        "clf_estimators": 120,
        "clf_max_depth": 5,
        "reg_estimators": 200,
    })

    mlflow.pyfunc.log_model(
        artifact_path="model",
        python_model=pyfunc_model,
        signature=signature,
        input_example=input_example
    )

    model_uri = f"runs:/{run_id}/model"

    # Create registered model in UC if missing
    try:
        _ = client.get_registered_model(MODEL_NAME)
    except Exception:
        # If it doesn't exist, try creating it
        try:
            client.create_registered_model(MODEL_NAME)
        except Exception:
            # register_model can still create in many cases; ignore if already exists or auto-creates
            pass

    reg = mlflow.register_model(model_uri, MODEL_NAME)
    model_version = str(reg.version)

print(f"✅ UC Model registered: {MODEL_NAME} v{model_version}")

# =======================================================
# 9. SCORE FULL GOLD + WRITE BACK TO GOLD SCHEMA
# =======================================================
score_in = df_pd.reindex(columns=all_cols_union).copy()
pred_pd = pyfunc_model.predict(None, score_in)
pred_pd["scoring_timestamp"] = pd.Timestamp.utcnow()

# Keep some identifiers if present
id_cols = [c for c in [
    "pr_purchaserequisition", "pr_itemnumber",
    "purchaseorder", "po_itemnumber",
    "pr_companycode", "po_companycode",
    "pr_plant", "po_plant",
    "pr_documenttype", "po_purchasingdoctypedesc",
    "po_countrykey", "record_type"
] if c in df_pd.columns]

out_pd = df_pd[id_cols].copy() if id_cols else pd.DataFrame(index=df_pd.index)
out_pd = pd.concat([out_pd.reset_index(drop=True), pred_pd.reset_index(drop=True)], axis=1)

pred_spark = spark.createDataFrame(out_pd)

(
    pred_spark.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(PRED_TABLE)
)
print(f"✅ Predictions written to: {PRED_TABLE}")
display(spark.table(PRED_TABLE).limit(10))

# =======================================================
# 10. DEPLOY/UPSERT ONE MODEL SERVING ENDPOINT
# =======================================================
#%python
def upsert_endpoint(endpoint_name: str, model_name: str, model_version: str):
    w = WorkspaceClient()

    served_entity_name = f"procurement-combined-{model_version}"  # must match route

    served = ServedEntityInput(
        name=served_entity_name,
        entity_name=model_name,            # UC model name: abc.abc_dw_gold.<model>
        entity_version=str(model_version),
        workload_size=WORKLOAD_SIZE,
        scale_to_zero_enabled=True
    )

    core_cfg = EndpointCoreConfigInput(
        served_entities=[served],
        traffic_config=TrafficConfig(
            routes=[Route(served_model_name=served_entity_name, traffic_percentage=100)]
        ),
    )

    try:
        w.serving_endpoints.get(name=endpoint_name)
        print(f"Updating endpoint: {endpoint_name}")
        w.serving_endpoints.update_config(
            name=endpoint_name,
            served_entities=core_cfg.served_entities,
            traffic_config=core_cfg.traffic_config
        )
    except Exception:
        print(f"Creating endpoint: {endpoint_name}")
        w.serving_endpoints.create(name=endpoint_name, config=core_cfg)

#    print(f"✅ Endpoint upsert submitted: {endpoint_name}")
upsert_endpoint(ENDPOINT_NAME, MODEL_NAME, model_version)

print("\n✅ DONE: UC model in GOLD schema + one endpoint + predictions table in GOLD")
print(f"   Model    : {MODEL_NAME} v{model_version}")
print(f"   Endpoint : {ENDPOINT_NAME}")
print(f"   Pred Tbl : {PRED_TABLE}")


Loaded Gold rows: 264 cols: 80
SLA metrics: {'sla_accuracy': 0.8269230769230769, 'sla_precision': 1.0, 'sla_recall': 0.7954545454545454, 'sla_f1': 0.8860759493670886, 'sla_auc': 0.9218749999999999, 'sla_threshold': 0.55}
REG metrics: {'reg_mae': 0.32346153846153847, 'reg_rmse': 1.2873564146970093, 'reg_r2': 0.9931832935807727}


2025/12/19 14:43:44 INFO mlflow.pyfunc: Validating input example against model signature
Registered model 'abc.abc_dw_gold.abc_dw_gd_model_Procurement_sla_Combined_Model_BAN6800' already exists. Creating a new version of this model...
Created version '7' of model 'abc.abc_dw_gold.abc_dw_gd_model_procurement_sla_combined_model_ban6800'.


✅ UC Model registered: abc.abc_dw_gold.abc_dw_gd_model_Procurement_sla_Combined_Model_BAN6800 v7
✅ Predictions written to: abc.abc_dw_gold.abc_dw_gl_pr_po_kpi_predictions


pr_purchaserequisition,pr_itemnumber,purchaseorder,po_itemnumber,pr_companycode,po_companycode,pr_plant,po_plant,pr_documenttype,po_purchasingdoctypedesc,po_countrykey,record_type,pred_sla_breach_probability,pred_sla_breach_bin,pred_sla_breach_label,pred_pr_to_po_ageing,scoring_timestamp
2000126351,480,4500229324,480,9900,9900,9900,9900,ZNPR,Local PO,GH,PR_PO_MATCHED,0.9826907011809316,1,YES,41.0,2025-12-19T14:43:51.956Z
2000130460,50,4500230817,50,9900,9900,9900,9900,ZNPR,Local PO,GH,PR_PO_MATCHED,0.3249423704133456,0,NO,24.0,2025-12-19T14:43:51.956Z
2000126540,10,4500223373,10,9900,9900,9900,9900,ZNPR,Local PO,GH,PR_PO_MATCHED,0.4286469049959318,0,NO,20.0,2025-12-19T14:43:51.956Z
2000126351,180,4500229324,180,9900,9900,9900,9900,ZNPR,Local PO,GH,PR_PO_MATCHED,0.9395665085827828,1,YES,41.0,2025-12-19T14:43:51.956Z
2000137276,130,4500234397,130,9900,9900,9900,9900,ZNPR,Local PO,GH,PR_PO_MATCHED,0.198730594223598,0,NO,1.0,2025-12-19T14:43:51.956Z
2000126351,120,4500229324,120,9900,9900,9900,9900,ZNPR,Local PO,GH,PR_PO_MATCHED,0.9856629234031538,1,YES,41.0,2025-12-19T14:43:51.956Z
2000130460,20,4500230817,20,9900,9900,9900,9900,ZNPR,Local PO,GH,PR_PO_MATCHED,0.3176766229088067,0,NO,24.0,2025-12-19T14:43:51.956Z
2000126351,80,4500229324,80,9900,9900,9900,9900,ZNPR,Local PO,GH,PR_PO_MATCHED,0.9860368550270854,1,YES,41.0,2025-12-19T14:43:51.956Z
2000137276,20,4500234397,20,9900,9900,9900,9900,ZNPR,Local PO,GH,PR_PO_MATCHED,0.3257964076804264,0,NO,1.0,2025-12-19T14:43:51.956Z
2000126351,240,4500229324,240,9900,9900,9900,9900,ZNPR,Local PO,GH,PR_PO_MATCHED,0.9860368550270854,1,YES,41.0,2025-12-19T14:43:51.956Z


Creating endpoint: ban6800-procurement-sla-combined

✅ DONE: UC model in GOLD schema + one endpoint + predictions table in GOLD
   Model    : abc.abc_dw_gold.abc_dw_gd_model_Procurement_sla_Combined_Model_BAN6800 v7
   Endpoint : ban6800-procurement-sla-combined
   Pred Tbl : abc.abc_dw_gold.abc_dw_gl_pr_po_kpi_predictions
