In [None]:
# pip install loguru

In [None]:
# # If in databricks
# import sys

# repo_path = "/Workspace/Users/opolo.holtz@amaris.com/.bundle/marvelous-databricks-course-OpoloHOLTZ/dev/files/src"
# sys.path.append(repo_path)

In [None]:
# Databricks notebook source
import hashlib

import mlflow
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import (
    EndpointCoreConfigInput,
    ServedEntityInput,
)
from mlflow.models import infer_signature
from pyspark.sql import SparkSession

from defaultccc.config import ProjectConfig, Tags
from defaultccc.models.model_basic import BasicModel

In [10]:
# Default profile:
mlflow.set_tracking_uri("databricks://opoloholtz")
mlflow.set_registry_uri("databricks-uc://opoloholtz")

config = ProjectConfig.from_yaml(config_path="../project_config.yml")
catalog_name = config.catalog_name
schema_name = config.schema_name
spark = SparkSession.builder.getOrCreate()
tags = Tags(**{"git_sha": "abcd12345", "branch": "week3"})

In [3]:
# Train & register model A with the config path
basic_model_a = BasicModel(config=config, tags=tags, spark=spark)
basic_model_a.paramaters = config.parameters_a
basic_model_a.model_name = f"{catalog_name}.{schema_name}.default_ccc_model_basic_A"
basic_model_a.load_data()
basic_model_a.prepare_features()
basic_model_a.train_model()
basic_model_a.log_model()
basic_model_a.register_model()
model_A = mlflow.sklearn.load_model(f"models:/{basic_model_a.model_name}@latest-model")

[32m2025-03-07 17:30:34.115[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mload_data[0m:[36m40[0m - [1mLoading data from maven_training_databricks.default_ccc tables train and test...[0m
[32m2025-03-07 17:30:37.141[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mload_data[0m:[36m53[0m - [1mData succesfully loaded.[0m
[32m2025-03-07 17:30:37.142[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mprepare_features[0m:[36m64[0m - [1mStarting the preprocesing with a pipeline...[0m
[32m2025-03-07 17:30:37.143[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mprepare_features[0m:[36m77[0m - [1mPreprocessing data pipeline succeded[0m
[32m2025-03-07 17:30:37.143[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mtrain_model[0m:[36m83[0m - [1mStarting to train...[0m
[32m2025-03-07 17:30:37.698[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mtrain_m

In [4]:
# Train & register model B with the config path
basic_model_b = BasicModel(config=config, tags=tags, spark=spark)
basic_model_b.paramaters = config.parameters_b
basic_model_b.model_name = f"{catalog_name}.{schema_name}.default_ccc_model_basic_B"
basic_model_b.load_data()
basic_model_b.prepare_features()
basic_model_b.train_model()
basic_model_b.log_model()
basic_model_b.register_model()
model_B = mlflow.sklearn.load_model(f"models:/{basic_model_b.model_name}@latest-model")

[32m2025-03-07 17:31:13.122[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mload_data[0m:[36m40[0m - [1mLoading data from maven_training_databricks.default_ccc tables train and test...[0m
[32m2025-03-07 17:31:15.285[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mload_data[0m:[36m53[0m - [1mData succesfully loaded.[0m
[32m2025-03-07 17:31:15.286[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mprepare_features[0m:[36m64[0m - [1mStarting the preprocesing with a pipeline...[0m
[32m2025-03-07 17:31:15.286[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mprepare_features[0m:[36m77[0m - [1mPreprocessing data pipeline succeded[0m
[32m2025-03-07 17:31:15.287[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mtrain_model[0m:[36m83[0m - [1mStarting to train...[0m
[32m2025-03-07 17:31:15.802[0m | [1mINFO    [0m | [36mdefaultccc.models.model_basic[0m:[36mtrain_m

In [5]:
class HousePriceModelWrapper(mlflow.pyfunc.PythonModel):
    def __init__(self, models):
        self.models = models
        self.model_a = models[0]
        self.model_b = models[1]

    def predict(self, context, model_input):
        defaultccc_id = str(model_input["ID"].values[0])
        hashed_id = hashlib.md5(defaultccc_id.encode(encoding="UTF-8")).hexdigest()
        # convert a hexadecimal (base-16) string into an integer
        if int(hashed_id, 16) % 2:
            predictions = self.model_a.predict(model_input.drop(["ID"], axis=1))
            return {"Prediction": predictions[0], "model": "Model A"}
        else:
            predictions = self.model_b.predict(model_input.drop(["ID"], axis=1))
            return {"Prediction": predictions[0], "model": "Model B"}

In [6]:
train_set_spark = spark.table(f"{catalog_name}.{schema_name}.train_set")
train_set = train_set_spark.toPandas()
test_set = spark.table(f"{catalog_name}.{schema_name}.test_set").toPandas()
X_train = train_set[config.num_features + config.cat_features + ["ID"]]
X_test = test_set[config.num_features + config.cat_features + ["ID"]]

In [7]:
models = [model_A, model_B]
wrapped_model = HousePriceModelWrapper(models)  # we pass the loaded models to the wrapper
example_input = X_test.iloc[0:1]  # Select the first row for prediction as example
example_prediction = wrapped_model.predict(context=None, model_input=example_input)
print("Example Prediction:", example_prediction)

Example Prediction: {'Prediction': 0, 'model': 'Model B'}


In [11]:
mlflow.set_experiment(experiment_name="/Shared/default-ccc-ab-testing")
model_name = f"{catalog_name}.{schema_name}.default_ccc_model_pyfunc_ab_test"
artifact_name = "pyfunc-default-ccc-model-ab"

with mlflow.start_run() as run:
    run_id = run.info.run_id
    signature = infer_signature(model_input=X_train, model_output={"Prediction": 1, "model": "Model B"})
    dataset = mlflow.data.from_spark(train_set_spark, table_name=f"{catalog_name}.{schema_name}.train_set", version="0")
    mlflow.log_input(dataset, context="training")
    mlflow.pyfunc.log_model(python_model=wrapped_model, artifact_path=artifact_name, signature=signature)
model_version = mlflow.register_model(model_uri=f"runs:/{run_id}/{artifact_name}", name=model_name, tags=tags.dict())

2025/03/07 17:33:39 INFO mlflow.tracking._tracking_service.client: 🏃 View run amazing-hen-75 at: https://adb-4498504268974234.14.azuredatabricks.net/ml/experiments/3772110128274668/runs/07e5526649db4c7a81d048576a1585b2.
2025/03/07 17:33:39 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: https://adb-4498504268974234.14.azuredatabricks.net/ml/experiments/3772110128274668.
Registered model 'maven_training_databricks.default_ccc.default_ccc_model_pyfunc_ab_test' already exists. Creating a new version of this model...
Created version '3' of model 'maven_training_databricks.default_ccc.default_ccc_model_pyfunc_ab_test'.


In [12]:
workspace = WorkspaceClient()
served_entities = [
    ServedEntityInput(
        entity_name=model_name,
        scale_to_zero_enabled=True,
        workload_size="Small",
        entity_version=model_version.version,
    )
]

workspace.serving_endpoints.create(
    name=f"{artifact_name}-serving",
    config=EndpointCoreConfigInput(
        served_entities=served_entities,
    ),
)

<databricks.sdk.service._internal.Wait at 0x7f265a41b510>