
# MLflow Webhooks & Testing

Webhooks trigger the execution of code (oftentimes tests) upon some event. This lesson explores how to employ webhooks to trigger automated tests against models in the model registry. 

## In this lesson you will:<br>
 - Explore the role of webhooks in ML pipelines
 - Create a job to test models in the model registry
 - Automate that job using MLflow webhooks
 - Create a HTTP webhook to send notifications to Slack


## Automated Testing

The backbone of the continuous integration, continuous deployment (CI/CD) process is the automated building, testing, and deployment of code. A **webhook or trigger** causes the execution of code based upon some event.  This is commonly when new code is pushed to a code repository.  In the case of machine learning jobs, this could be the arrival of a new model in the model registry.

The two types of <a href="https://docs.databricks.com/applications/mlflow/model-registry-webhooks.html" target="_blank">**MLflow Model Registry Webhooks**</a>:
 - Webhooks with Job triggers: Trigger a job in a Databricks workspace
 - Webhooks with HTTP endpoints: Send triggers to any HTTP endpoint
 
This lesson uses:
1. a **Job webhook** to trigger the execution of a Databricks job 
2. a **HTTP webhook** to send notifications to Slack 

Upon the arrival of a new model version with a given name in the model registry, the function of the Databricks job is to:<br><br>
- Import the new model version
- Test the schema of its inputs and outputs
- Pass example code through the model

This covers many of the desired tests for ML models.  However, throughput testing could also be performed using this paradigm. Also, the model could also be promoted to the production stage in an automated fashion.


## Create a Model and Job

The following steps will create a Databricks job using another notebook in this directory: **`03b-Webhooks-Job-Demo`**

**Note:** 
* Ensure that you are an admin on this workspace and that you're not using Community Edition (which has jobs disabled). 
* If you are not an admin, ask the instructor to share their token with you. 
* Alternatively, you can set **`token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)`**.


### Create a user access token

Create a user access token using the following steps:<br><br>

1. Click the Settings icon
1. Click User Settings
1. Go to the Access Tokens tab
1. Click the Generate New Token button
1. Optionally enter a description (comment) and expiration period
1. Click the Generate button
1. Copy the generated token **and paste it in the following cell**

**Note:**
* Ensure that you are an admin on this workspace and that you're not using Community Edition (which has jobs disabled). 
* If you are not an admin, ask the instructor to share their token with you. 
* Alternatively, you can set **`token = mlflow.utils.databricks_utils._get_command_context().apiToken().get()`**. However, this is not a best practice. We recommend you create your personal access token using the steps above and save it in your [secret scope](https://docs.databricks.com/security/secrets/secret-scopes.html). 


You can find details <a href="https://docs.databricks.com/dev-tools/api/latest/authentication.html" target="_blank">about access tokens here</a>

In [0]:
# TODO
 
token = "<YOUR_DATABRICKS_TOKEN>"

In [0]:
import mlflow

# With the token, we can create our authorization header for our subsequent REST calls
headers = {"Authorization": f"Bearer {token}"}

instance = mlflow.utils.databricks_utils.get_webapp_url()


### Train and Register a Model

Build and log your model.

In [0]:
from mlflow.models.signature import infer_signature
from sklearn.metrics import roc_auc_score
import mlflow.sklearn
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

with mlflow.start_run(run_name="Webhook RF Experiment") as run:
    # Data prep
    white_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-white.csv", sep=";")
    red_wine = pd.read_csv("/dbfs/databricks-datasets/wine-quality/winequality-red.csv", sep=";")

    red_wine['is_red'] = 1
    white_wine['is_red'] = 0

    data = pd.concat([red_wine, white_wine], axis=0)

    # Remove spaces from column names
    data.rename(columns=lambda x: x.replace(' ', '_'), inplace=True)

    high_quality = (data.quality >= 7).astype(int)
    data.quality = high_quality

    data.dropna(inplace=True)
    data.reset_index(drop=True,inplace=True)
    

    train, test = train_test_split(data, random_state=123)
    X_train = train.drop(["quality"], axis=1)
    X_test = test.drop(["quality"], axis=1)
    y_train = train.quality
    y_test = test.quality

    signature = infer_signature(X_train, pd.DataFrame(y_train))
    example = X_train.head(3)

    # Train and log model
    rf = RandomForestClassifier(random_state=42)
    rf.fit(X_train, y_train)
    mlflow.sklearn.log_model(rf, "random-forest-model", signature=signature, input_example=example)
    auc = roc_auc_score(y_test, rf.predict(X_test))
    mlflow.log_metric("auc", auc)
    run_id = run.info.run_id
    experiment_id = run.info.experiment_id

  inputs = _infer_schema(model_input)
  outputs = _infer_schema(model_output) if model_output is not None else None




Register the model

In [0]:
suffix = "aml"
name = f"webhook-demo_{suffix}"
model_uri = f"runs:/{run_id}/random-forest-model"

model_details = mlflow.register_model(model_uri=model_uri, name=name)

Registered model 'webhook-demo_aml' already exists. Creating a new version of this model...
2024/04/11 10:42:49 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation.                     Model name: webhook-demo_aml, version 4
Created version '4' of model 'webhook-demo_aml'.



### Creating the Job

The following steps will create a Databricks job using another notebook in this directory: **`Sesion 3 - Webhooks-Job-Demo`**


Create a job that executes the notebook **`Sesion 3 - Webhooks-Job-Demo`** in the same folder as this notebook.<br><br>

- Hover over the sidebar in the Databricks UI on the left. Click in **Job Runs**

- Click on Create Job
<br></br>
  - Name your Job
  - Select the notebook **`Sesion 3 - Webhooks-Job-Demo`** 
  - Select the current cluster
<br></br>
- Copy the Job ID


Alternatively, the code below will programmatically create the job.

In [0]:
import requests

def find_job_id(instance, headers, job_name, offset_limit=1000):
    params = {"offset": 0}
    uri = f"{instance}/api/2.1/jobs/list"
    done = False
    job_id = None
    while not done:
        done = True
        res = requests.get(uri, params=params, headers=headers)
        assert res.status_code == 200, f"Job list not returned; {res.content}"
        
        jobs = res.json().get("jobs", [])
        if len(jobs) > 0:
            for job in jobs:
                if job.get("settings", {}).get("name", None) == job_name:
                    job_id = job.get("job_id", None)
                    break

            # if job_id not found; update the offset and try again
            if job_id is None:
                params["offset"] += len(jobs)
                if params["offset"] < offset_limit:
                    done = False
    
    return job_id

def get_job_parameters(job_name, cluster_id, notebook_path):
    params = {
            "name": job_name,
            "tasks": [{"task_key": "webhook_task", 
                       "existing_cluster_id": cluster_id,
                       "notebook_task": {
                           "notebook_path": notebook_path
                       }
                      }]
        }
    return params

def get_create_parameters(job_name, cluster_id, notebook_path):
    api = "api/2.1/jobs/create"
    return api, get_job_parameters(job_name, cluster_id, notebook_path)

def get_reset_parameters(job_name, cluster_id, notebook_path, job_id):
    api = "api/2.1/jobs/reset"
    params = {"job_id": job_id, "new_settings": get_job_parameters(job_name, cluster_id, notebook_path)}
    return api, params

def get_webhook_job(instance, headers, job_name, cluster_id, notebook_path):
    job_id = find_job_id(instance, headers, job_name)
    if job_id is None:
        api, params = get_create_parameters(job_name, cluster_id, notebook_path)
    else:
        api, params = get_reset_parameters(job_name, cluster_id, notebook_path, job_id)
    
    uri = f"{instance}/{api}"
    res = requests.post(uri, headers=headers, json=params)
    assert res.status_code == 200, f"Expected an HTTP 200 response, received {res.status_code}; {res.content}"
    job_id = res.json().get("job_id", job_id)
    return job_id


In [0]:
notebook_path = mlflow.utils.databricks_utils.get_notebook_path().replace("Sesion 3 - Webhooks-and-Testing", "Sesion 3 - Webhooks-Job-Demo")

# We can use our utility method for creating a unique 
# database name to help us construct a unique job name.
prefix = "session3"
job_name = f"{prefix}_webhook-job"

# if the Job was created via UI, set it here.
job_id = get_webhook_job(instance, 
                         headers, 
                         job_name,
                         spark.conf.get("spark.databricks.clusterUsageTags.clusterId"),
                         notebook_path)

print(f"Job ID:   {job_id}")
print(f"Job name: {job_name}")

Job ID:   338420584022521
Job name: session3_webhook-job



## Create a Job Webhook

There are a few different events that can trigger a Webhook. In this notebook, we will be experimenting with triggering a job when our model transitions between stages.

In [0]:
job_id = 47669448112835

In [0]:
import json
from mlflow.utils.rest_utils import http_request
from mlflow.utils.databricks_utils import get_databricks_host_creds

endpoint = "/api/2.0/mlflow/registry-webhooks/create"
host_creds = get_databricks_host_creds("databricks")

job_json = {"model_name": name,
            "events": ["MODEL_VERSION_TRANSITIONED_STAGE"],
            "description": "Job webhook trigger",
            "status": "Active",
            "job_spec": {"job_id": job_id,
                         "workspace_url": instance,
                         "access_token": token}
           }

response = http_request(
    host_creds=host_creds, 
    endpoint=endpoint,
    method="POST",
    json=job_json
)
assert response.status_code == 200, f"Expected HTTP 200, received {response.status_code}"




Now that we have registered the webhook, we can **test it by transitioning our model from stage `None` to `Staging` in the Experiment UI.** We should see in the Jobs tab that our Job has run.


To get a list of active Webhooks, use a GET request with the LIST endpoint. Note that this command will return an error if no Webhooks have been created for the Model.

In [0]:
endpoint = f"/api/2.0/mlflow/registry-webhooks/list/?model_name={name.replace(' ', '%20')}"

response = http_request(
    host_creds=host_creds, 
    endpoint=endpoint,
    method="GET"
)
assert response.status_code == 200, f"Expected HTTP 200, received {response.status_code}"

print(json.dumps(response.json(), indent=4))

{
    "webhooks": [
        {
            "id": "2ea9474bee6147888985c2033f9cb55e",
            "events": [
                "MODEL_VERSION_TRANSITIONED_STAGE"
            ],
            "creation_timestamp": 1712832263711,
            "last_updated_timestamp": 1712832263711,
            "description": "Job webhook trigger",
            "status": "ACTIVE",
            "job_spec": {
                "job_id": "47669448112835",
                "workspace_url": "https://eastus-c3.azuredatabricks.net"
            },
            "model_name": "webhook-demo_aml"
        },
        {
            "id": "0fb3711043f149968fba4dc72b7b4296",
            "events": [
                "MODEL_VERSION_TRANSITIONED_STAGE"
            ],
            "creation_timestamp": 1712612412391,
            "last_updated_timestamp": 1712612412391,
            "description": "Job webhook trigger",
            "status": "ACTIVE",
            "job_spec": {
                "job_id": "338420584022521",
                "wo




Finally, delete the webhook by copying the webhook ID to the curl or python request. You can confirm that the Webhook was deleted by using the list request.

In [0]:
# TODO
delete_hook = "<insert your webhook id here>"

In [0]:
new_json = {"id": delete_hook}
endpoint = f"/api/2.0/mlflow/registry-webhooks/delete"

response = http_request(
    host_creds=host_creds, 
    endpoint=endpoint,
    method="DELETE",
    json=new_json
)
assert response.status_code == 200, f"Expected HTTP 200, received {response.status_code}"

print(json.dumps(response.json(), indent=4))