# Challenger model validation

This notebook performs validation tasks on the candidate __Challenger__ model.

It goes through a few steps to validate the model before labelling it (by setting its alias) to `Challenger`.

When organizations first start to put MLOps processes in place, they should consider having a "human-in-the-loop" to perform visual analyses to validate models before promoting them. As they get more familiar with the process, they can consider automating the steps in a __Workflow__ . The benefits of automation is to ensure that these validation checks are systematically performed before new models are integrated into inference pipelines or deployed for realtime serving. Of course, organizations can opt to retain a "human-in-the-loop" in any part of the process and put in place the degree of automation that suits its business needs.

<img src="https://github.com/databricks-demos/dbdemos-resources/blob/main/images/product/mlops/advanced/banners/mlflow-uc-end-to-end-advanced-4.png?raw=true" width="1200">

*Note: In a typical mlops setup, this would run as part of an automated job to validate a new model. We'll run this demo as an interactive notebook.*

<!-- Collect usage data (view). Remove it to disable the collection or the tracker during installation. View README for more details.  -->
<img width="1px" src="https://ppxrzfxige.execute-api.us-west-2.amazonaws.com/v1/analytics?category=data-science&org_id=984752964297111&notebook=%2F02-mlops-advanced%2F04_challenger_validation&demo_name=mlops-end2end&event=VIEW&path=%2F_dbdemos%2Fdata-science%2Fmlops-end2end%2F02-mlops-advanced%2F04_challenger_validation&version=1&user_hash=a3692eff9e5299c6a85c26f2dc27b2e2000517102cea778a7cc80efff9afb355">


## General Validation Checks

<!--img style="float: right" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/retail/resources/images/churn-mlflow-webhook-1.png" width=600 -->

In the context of MLOps, there are more tests than simply how accurate a model will be.  To ensure the stability of our ML system and compliance with any regulatory requirements, we will subject each model added to the registry to a series of validation checks.  These include, but are not limited to:
<br>
* __Model documentation__
* __Inference on production data__
* __Champion-Challenger testing to ensure that business KPIs are acceptable__

In this notebook, we explore some approaches to performing these tests, and how we can add metadata to our models by tagging if they have passed a given test.

This part is typically specific to your line of business and quality requirements.

For each test, we'll add information using tags to know what has been validated in the model. We can also add Comments to a model if needed.

In [0]:
%pip install --quiet mlflow==2.19 databricks-feature-engineering==0.8.0
dbutils.library.restartPython()

In [0]:
# MLflow experiment name.
dbutils.widgets.text(
    "experiment_name",
    "/advanced_mlops_churn_experiment",
    label="Experiment Name",
)

# Unity Catalog registered model name to use for the trained mode.
dbutils.widgets.text(
    "model_name", 
    "dev.koeppen_dabs_demo.advanced_mlops_churn_model", 
    label="Full (Three-Level) Model Name"
)


# Feature table to store the computed features.
dbutils.widgets.text(
    "advanced_churn_label_table",
    "dev.koeppen_dabs_demo.advanced_churn_label_table",
    label="Label Table",
)

# Feature table to store the computed features.
dbutils.widgets.text(
    "advanced_churn_feature_table",
    "dev.koeppen_dabs_demo.advanced_churn_feature_table",
    label="Feature Table",
)

# Feature table to store the computed features.
dbutils.widgets.text(
    "avg_price_increase",
    "dev.koeppen_dabs_demo.avg_price_increase",
    label="Avg Price Increase Function",
)

# Feature table to store the computed features.
dbutils.widgets.text(
    "model_alias",
    "challenger",
    label="Model Alias",
)

# Feature table to store the computed features.
dbutils.widgets.text(
    "model_info_table",
    "dev.koeppen_dabs_demo.model_info_table",
    label="Model Information Table",
)

In [0]:
from mlflow.tracking import MlflowClient
advanced_churn_label_table = dbutils.widgets.get("advanced_churn_label_table")
advanced_churn_feature_table = dbutils.widgets.get("advanced_churn_feature_table")
experiment_name = dbutils.widgets.get("experiment_name")
model_name = dbutils.widgets.get("model_name")
avg_price_increase=dbutils.widgets.get("avg_price_increase")
model_alias=dbutils.widgets.get("model_alias")
model_info_table=dbutils.widgets.get("model_info_table")

In [0]:
output_schema = advanced_churn_feature_table.split(".")[0]
output_database = advanced_churn_feature_table.split(".")[1]
spark.sql(f"USE CATALOG {output_schema}");
spark.sql(f"USE SCHEMA {output_database}")

# Getting Model Information based on which Alias we're wanting to Validate

In [0]:
client = MlflowClient()
model_details = client.get_model_version_by_alias(model_name, model_alias)
model_version = int(model_details.version)
model_uri = model_uri = f"models:/{model_name}/{model_version}"
# Determine modeling method via tag
modeling_method = model_details.tags.get("modeling_method", "")

print(f"Validating {model_alias} model for {model_name} on model version {model_version}")

In [0]:
# Load label table
labels_df = spark.table(advanced_churn_label_table)
# Load feature table
features_df = spark.table(advanced_churn_feature_table)


In [0]:
from databricks.feature_engineering import FeatureEngineeringClient
from mlflow.tracking import MlflowClient
from mlflow import pyfunc
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import Row
import pyspark.sql.functions as F

# Load model version via alias
client = MlflowClient()
fe = FeatureEngineeringClient()

# AutoML models are often exported as MLflow pyfunc models which is designed to accept input data as pandas DFs
if modeling_method == "AutoML":
    print("Using pyfunc for AutoML scoring...")

    model = pyfunc.load_model(model_uri)

    # Join labels + features
    features_df = features_df.withColumn("avg_price_increase",(F.col("monthly_charges") - (F.col("total_charges") / F.col("tenure"))))
    joined_df = labels_df.join(features_df, on=["customer_id", "transaction_ts"], how="inner")

    # Select only the columns the model expects
    input_schema = model.metadata.get_input_schema()
    expected_cols = [col.name for col in input_schema.inputs]

    validation_df = joined_df
    pdf_features = validation_df.select(*expected_cols).toPandas()
    # Convert all float64 columns to float32 to match model input schema
    pdf_features = pdf_features.astype({col: "float32" for col in pdf_features.select_dtypes("float64").columns})

    pdf_labels = validation_df.select("churn").toPandas()

    predictions = model.predict(pdf_features)

    from sklearn.metrics import f1_score
    f1 = f1_score(pdf_labels, predictions, pos_label="Yes")

    print(f"F1 Score: {f1:.4f}")

else:
    print("Using Feature Store batch scoring...")

    model_uri_with_alias = f"models:/{model_name}@{model_alias}"

    # Ensure predictions are numeric
    scored_df = fe.score_batch(df=labels_df, model_uri=model_uri_with_alias, result_type="string")

    # Convert label + prediction to DoubleType for evaluator
    scored_df = (
        scored_df
        .withColumn("prediction", F.when(F.col("prediction") == "Yes", 1.0).otherwise(0.0))
        .withColumn("churn", F.when(F.col("churn") == "Yes", 1.0).otherwise(0.0))
    )

    evaluator = MulticlassClassificationEvaluator(labelCol="churn", predictionCol="prediction", metricName="f1")
    f1 = evaluator.evaluate(scored_df)

    print(f"F1 Score: {f1:.4f}")

In [0]:
from pyspark.sql import SparkSession
from datetime import datetime

spark = SparkSession.builder.getOrCreate()
df_spark = spark.createDataFrame([(
  model_name,
  model_version,
  float(round(f1, 4)),
  model_alias, 
  modeling_method,
  datetime.now()
)], ["model_name", "model_version", "f1_score","model_alias","modeling_method","validation_timestamp"]).write.mode("append").saveAsTable(model_info_table)
