In [0]:
# Commenting this out.
# This notebook is called from the 06.7 Model Monitoring notebook in the RAI demo
# At the point where it is called, the config variables would have been initiazliazed
#%run ../config

In [0]:
# Initialize worksace client
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import sql

w = WorkspaceClient()

In [0]:
# Set demo variable names
rai_demo_folder_name = "06-Responsible-AI"
inference_log_table_name = f"{catalog}.{db}.credit_decisioning_inferencelogs"
inference_log_profile_table_name = f"{inference_log_table_name}_profile_metrics"

In [0]:
# SQL statement for the query used for the alert
sql_query = f"""
WITH 
profile_metrics AS (
  SELECT * FROM {inference_log_profile_table_name}
  WHERE isnull(slice_key) AND isnull(slice_value) -- default to "No Slice"
  AND   `model_id`   = "*" -- default to all model ids
),
last_window_in_inspection_range AS (
  SELECT window.start AS Window, granularity AS Granularity FROM profile_metrics
  WHERE window.start = (SELECT MAX(window.start) FROM profile_metrics) 
  ORDER BY Granularity LIMIT 1 -- order to ensure the `granularity` selected is stable
),
profile_metrics_inspected AS (
  SELECT * FROM profile_metrics
  WHERE Granularity = (SELECT Granularity FROM last_window_in_inspection_range)
)
SELECT
  concat(window.start," - ", window.end) AS Window,
  ROUND(accuracy_score, 2) as accuracy_score,
  ROUND(precision.macro,2) as precision_macro,
  ROUND(precision.weighted,2) as precision_weighted,
  ROUND(recall.macro,2) as recall_macro,
  ROUND(recall.weighted,2) as recall_weighted,
  ROUND(f1_score.macro, 2) as f1_score_macro,
  ROUND(f1_score.weighted, 2) as f1_score_weighted,
  granularity AS Granularity,
    `model_id`   AS `Model Id`,
  COALESCE(slice_key, "No slice") AS `Slice key`,
  COALESCE(slice_value, "No slice") AS `Slice value`
FROM profile_metrics_inspected
WHERE
  window.start = (SELECT Window FROM last_window_in_inspection_range) -- limit to last window
  AND log_type = "INPUT"
  AND column_name = ":table"
ORDER BY slice_key ASC
"""

In [0]:
import re

# Determine path to save query and alert

notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().getOrElse(None)
path_parts = re.split('/', notebook_path)
# Remove last 2 elements: notebook name and its folder. This brings us to the lakehouse-fsi-credit-decisioning level
del path_parts[-2:]
path_parts.append(rai_demo_folder_name)

monitoring_assets_path = '/Workspace' + '/'.join(path_parts) + '/monitoring'

print(f"Monitoring assets will be saved in {monitoring_assets_path}")

In [0]:
import time
import requests
import json

# Function to set up parameters
def setup_rest_params(endpoint_name):
    # Get the workspace URL from the Databricks notebook context
    DATABRICKS_HOST = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)
    # print(f"DATABRICKS_HOST: {DATABRICKS_HOST}")

    # Define the endpoint_url
    endpoint_url = f"{DATABRICKS_HOST}/api/2.0/sql"

    # Get the API key from the Databricks notebook context | You can also use PAT (Personal Access Token) or Service Principal Token for the API key required to access the REST API.
    api_key = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)

    # Define the headers
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    
    return endpoint_url, headers


# Send Databricks SQL REST API request
def send_request(api_name, request_body):
    assert api_name in ["queries", "alerts", "warehouses"], "Invalid API name. Must be either 'queries' or 'alerts'."

    endpoint_url, headers = setup_rest_params(endpoint_name)

    response = requests.post(f"{endpoint_url}/{api_name}", headers=headers, data=json.dumps(request_body))
    response.raise_for_status()  # Raise an exception for HTTP errors
    return response.json()

In [0]:
# Get a serverless warehouse
# Note that the user has to have manage permissions on it
warehouse_id = None
wh = w.warehouses.list()
for warehouse in wh:
  warehouse.enable_serverless_compute = True
  serverless_wh = warehouse
  break
warehouse_id = serverless_wh.id

In [0]:
# Payload for the request to create the SQL query
sql_query_request_body = {
    "query": {
        "display_name": "rai_credit_decisioning_performance_last_window",
        "description": "Inference metrics for last window",
        "query_text": sql_query,
        "parent_path": monitoring_assets_path,
        "warehouse_id": warehouse_id,
        "run_as_mode": "OWNER",
    }
}

In [0]:
# Create the SQL query
try:
  response = send_request("queries", sql_query_request_body)
  print(f"SQL query '{response['display_name']}' created successfully in the 'monitoring' folder. ID: {response['id']}.")
except Exception as e:
  print(f"Cannot create SQL query. Error: {e}")

query_id = response.get("id")

In [0]:
# Payload for the request to create the SQL alert
sql_alert_request_body = {
    "alert": {
        "seconds_to_retrigger": 0,
        "display_name": "rai_credit_decisioning_accuracy_alert",
        "condition": {
            "op": "LESS_THAN",
            "operand": {"column": {"name": "f1_score_weighted"}},
            "threshold": {"value": {"double_value": 0.9}},
        },
        "query_id": query_id,
        "parent_path": monitoring_assets_path,
    }
}

In [0]:
# Create the alert
try:
  response = send_request("alerts", sql_alert_request_body)
  print(f"SQL alert '{response['display_name']}' created successfully in the 'monitoring' folder. ID: {response['id']}.")
except Exception as e:
  print(f"Cannot create SQL alert. Error: {e}")

alert_id = response.get("id")

In [0]:
# from databricks.sdk import WorkspaceClient
# from databricks.sdk.service import sql

# w = WorkspaceClient()

# w.queries.delete(query_id)
# w.alerts.delete(alert_id)