# Amazon SageMaker Model Quality Monitor


## Section 1 - Setup <a id='setup'></a>

In this section, you will import the necessary libraries, setup variables and examine data that was used to train the XGBoost customer churn model provided with this notebook.

Let's start by specifying:

* The AWS region used to host your model.
* The IAM role associated with this SageMaker notebook instance.
* The S3 bucket used to store the data used to train your model, any additional model data, and the data captured from model invocations.

#### 1.1 Import necessary libraries

In [11]:
%%time

from datetime import datetime, timedelta, timezone
import json
import os
import re
import boto3
from time import sleep
from threading import Thread

import pandas as pd

from sagemaker import get_execution_role, session, Session, image_uris
from sagemaker.s3 import S3Downloader, S3Uploader
from sagemaker.processing import ProcessingJob
from sagemaker.serializers import CSVSerializer

from sagemaker.model import Model
from sagemaker.model_monitor import (
    BiasAnalysisConfig,
    CronExpressionGenerator,
    DataCaptureConfig,
    EndpointInput,
    ExplainabilityAnalysisConfig,
    ModelBiasMonitor,
    ModelExplainabilityMonitor,
)

from sagemaker.clarify import (
    BiasConfig,
    DataConfig,
    ModelConfig,
    ModelPredictedLabelConfig,
    SHAPConfig,
)

session = Session()

CPU times: user 137 ms, sys: 31.9 ms, total: 168 ms
Wall time: 271 ms


#### 1.2 AWS region and  IAM Role & Sagemaker Clients

In [12]:
# Get Execution role
role = get_execution_role()
print("RoleArn:", role)

region = session.boto_region_name
print("Region:", region)

RoleArn: arn:aws:iam::796598873577:role/LabRole
Region: us-east-1


In [14]:
sagemaker_session = Session()
sagemaker_client = sagemaker_session.sagemaker_client
sagemaker_runtime_client = sagemaker_session.sagemaker_runtime_client

#### 1.3 S3 bucket and prefixes

In [15]:
# Setup S3 bucket
# You can use a different bucket, but make sure the role you chose for this notebook
# has the s3:PutObject permissions. This is the bucket into which the data is captured
bucket = session.default_bucket()
print("Demo Bucket:", bucket)
prefix = "sagemaker/clarify-bias-monitor"

##S3 prefixes
data_capture_prefix = f"{prefix}/datacapture"
s3_capture_upload_path = f"s3://{bucket}/{data_capture_prefix}"

ground_truth_upload_path = (
    f"s3://{bucket}/{prefix}/ground_truth_data/{datetime.now():%Y-%m-%d-%H-%M-%S}"
)

reports_prefix = f"{prefix}/reports"
s3_report_path = f"s3://{bucket}/{reports_prefix}"

##Get the model monitor image
monitor_image_uri = image_uris.retrieve(framework="model-monitor", region=region)

print("Image URI:", monitor_image_uri)
print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {ground_truth_upload_path}")
print(f"Report path: {s3_report_path}")

Demo Bucket: sagemaker-us-east-1-796598873577
Image URI: 156813124566.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer
Capture path: s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/datacapture
Ground truth path: s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/ground_truth_data/2025-02-11-07-05-56
Report path: s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/reports


#### 1.4 Test access to the S3 bucket
Let's quickly verify that the notebook has the right permissions to access the S3 bucket specified above.
Upload a simple test object into the S3 bucket.  If this command fails, the data capture and model monitoring capabilities will not work from this notebook.  You can fix this by updating the role associated with this notebook instance to have "s3:PutObject" permissions and try this validation again

In [16]:
# Upload some test files
S3Uploader.upload("test_data/upload-test-file.txt", f"s3://{bucket}/test_upload")
print("Success! You are all set to proceed.")

Success! You are all set to proceed.


In [17]:
# Setup variables 
model_file = "model/xgb-churn-prediction-model.tar.gz"
test_file = "test_data/upload-test-file.txt"
test_dataset = "test_data/test-dataset-input-cols.csv"
validation_dataset = "test_data/validation-dataset-with-header.csv"
dataset_type = "text/csv"

## Some data understanding 

In [19]:
# Taken from https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker_model_monitor/model_monitor_batch_transform/test_data/test-dataset-with-header.csv
df = pd.read_csv('test_data/test-dataset-with-header.csv')

# Display the first few rows of the dataset
display(df.head())

display(df.describe())

Unnamed: 0,Account Length,VMail Message,Day Mins,Day Calls,Eve Mins,Eve Calls,Night Mins,Night Calls,Intl Mins,Intl Calls,...,State_WI,State_WV,State_WY,Area Code_408,Area Code_415,Area Code_510,Int'l Plan_no,Int'l Plan_yes,VMail Plan_no,VMail Plan_yes
0,106,0,274.4,120,198.6,82,160.8,62,6.0,3,...,0,0,0,0,0,1,1,0,1,0
1,28,0,187.8,94,248.6,86,208.8,124,10.6,5,...,0,0,1,0,1,0,1,0,1,0
2,148,0,279.3,104,201.6,87,280.8,99,7.9,2,...,0,0,0,0,1,0,1,0,1,0
3,132,0,191.9,107,206.9,127,272.0,88,12.6,2,...,0,0,0,0,0,1,1,0,1,0
4,92,29,155.4,110,188.5,104,254.9,118,8.0,4,...,0,0,0,0,0,1,1,0,0,1


Unnamed: 0,Account Length,VMail Message,Day Mins,Day Calls,Eve Mins,Eve Calls,Night Mins,Night Calls,Intl Mins,Intl Calls,...,State_WI,State_WV,State_WY,Area Code_408,Area Code_415,Area Code_510,Int'l Plan_no,Int'l Plan_yes,VMail Plan_no,VMail Plan_yes
count,2333.0,2333.0,2333.0,2333.0,2333.0,2333.0,2333.0,2333.0,2333.0,2333.0,...,2333.0,2333.0,2333.0,2333.0,2333.0,2333.0,2333.0,2333.0,2333.0,2333.0
mean,101.276897,8.214316,180.226489,100.259323,200.050107,99.573939,201.388598,100.227175,10.253065,4.495071,...,0.021432,0.034719,0.022718,0.243035,0.502357,0.254608,0.898414,0.101586,0.721389,0.278611
std,39.560922,13.779861,53.998753,20.169332,50.026651,19.679796,50.638815,19.286163,2.779361,2.48867,...,0.144849,0.183107,0.149033,0.429008,0.500102,0.435734,0.302168,0.302168,0.448412,0.448412
min,1.0,0.0,0.0,0.0,31.2,12.0,23.2,42.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,74.0,0.0,145.4,87.0,165.8,86.0,167.1,87.0,8.5,3.0,...,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
50%,100.0,0.0,180.9,100.0,199.9,99.0,201.2,100.0,10.3,4.0,...,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,1.0,0.0
75%,127.0,20.0,215.9,114.0,233.7,113.0,236.3,113.0,12.1,6.0,...,0.0,0.0,0.0,0.0,1.0,1.0,1.0,0.0,1.0,1.0
max,243.0,51.0,350.8,165.0,361.8,170.0,395.0,175.0,18.4,20.0,...,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0


In [20]:
validation_df = pd.read_csv(validation_dataset)
all_headers = validation_df.columns.to_list()
print(all_headers)

label_header = all_headers[0]
print(label_header)

['Churn', 'Account Length', 'VMail Message', 'Day Mins', 'Day Calls', 'Eve Mins', 'Eve Calls', 'Night Mins', 'Night Calls', 'Intl Mins', 'Intl Calls', 'CustServ Calls', 'State_AK', 'State_AL', 'State_AR', 'State_AZ', 'State_CA', 'State_CO', 'State_CT', 'State_DC', 'State_DE', 'State_FL', 'State_GA', 'State_HI', 'State_IA', 'State_ID', 'State_IL', 'State_IN', 'State_KS', 'State_KY', 'State_LA', 'State_MA', 'State_MD', 'State_ME', 'State_MI', 'State_MN', 'State_MO', 'State_MS', 'State_MT', 'State_NC', 'State_ND', 'State_NE', 'State_NH', 'State_NJ', 'State_NM', 'State_NV', 'State_NY', 'State_OH', 'State_OK', 'State_OR', 'State_PA', 'State_RI', 'State_SC', 'State_SD', 'State_TN', 'State_TX', 'State_UT', 'State_VA', 'State_VT', 'State_WA', 'State_WI', 'State_WV', 'State_WY', 'Area Code_408', 'Area Code_415', 'Area Code_510', "Int'l Plan_no", "Int'l Plan_yes", 'VMail Plan_no', 'VMail Plan_yes']
Churn


## Section 2 - Deploy pre-trained model with data capture enabled <a id='deploy'></a>

In this section, you will upload the pretrained model to the S3 bucket, create an Amazon SageMaker Model, create an Amazon SageMaker real time endpoint, and enable data capture on the endpoint to capture endpoint invocations, predictions, and metadata.

#### 2.1 Upload the pre-trained model to S3

This code uploads a pre-trained XGBoost model that is ready for you to deploy. This model was trained using the XGB Churn Prediction Notebook in SageMaker. You can also use your own pre-trained model in this step. If you already have a pretrained model in Amazon S3, you can add it instead by specifying the s3_key.


In [21]:
##Upload the pretrained model to S3
s3_key = f"s3://{bucket}/{prefix}"
model_url = S3Uploader.upload("model/xgb-churn-prediction-model.tar.gz", s3_key)
model_url

's3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/xgb-churn-prediction-model.tar.gz'

#### 2.2 Create SageMaker Model entity

This step creates an Amazon SageMaker model from the  model file uploaded to S3.

In [23]:
model_name = f"DEMO-xgb-churn-pred-model-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"

image_uri = image_uris.retrieve(framework="xgboost", version="0.90-1", region=region)

model = Model(image_uri=image_uri, model_data=model_url, role=role, sagemaker_session=session)

#### 2.3 Deploy the model with data capture enabled.
Next, deploy the SageMaker model on a specific instance with data capture enabled.

In [24]:
endpoint_name = f"DEMO-xgb-churn-model-bias-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("EndpointName =", endpoint_name)

data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
)

model.deploy(
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)

EndpointName = DEMO-xgb-churn-model-bias-monitor-2025-02-11-0706
-------!

## Add some fake traffic & Ground truth data

In [31]:
print(f"Sending test traffic to the endpoint {endpoint_name}. \nPlease wait", end="")
test_dataset_size = 0  # record the number of rows in data we're sending for inference
with open(test_dataset, "r") as f:
    for row in f:
        if test_dataset_size < 120:
            payload = row.rstrip("\n")
            response = sagemaker_runtime_client.invoke_endpoint(
                EndpointName=endpoint_name,
                Body=payload,
                ContentType=dataset_type,
            )
            prediction = response["Body"].read()
            if float(prediction) >= 0.5:
                print(".", end="", flush=True)
            else:
                print("-", end="", flush=True)
            sleep(0.5)
        test_dataset_size += 1

print()
print("Done!")

Sending test traffic to the endpoint DEMO-xgb-churn-model-bias-monitor-2025-02-11-0706. 
Please wait------.--------------.-----.-------.---------.-----------.-----------------.---------.----------.-.------.------------.-
Done!


In [32]:
# View Capture data
print("Waiting for captures to show up", end="")
for _ in range(120):
    capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
    if capture_files:
        capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")
        capture_record = json.loads(capture_file[0])
        if "inferenceId" in capture_record["eventMetadata"]:
            break
    print(".", end="", flush=True)
    sleep(1)
print()
print("Found Capture Files:")
print("\n ".join(capture_files[-3:]))

Waiting for captures to show up........................................................................................................................
Found Capture Files:
s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/datacapture/DEMO-xgb-churn-model-bias-monitor-2025-02-11-0706/AllTraffic/2025/02/11/07/13-36-396-0dda26ab-9a63-4bee-ad0e-ca6734a544b4.jsonl
 s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/datacapture/DEMO-xgb-churn-model-bias-monitor-2025-02-11-0706/AllTraffic/2025/02/11/07/15-38-319-1645232c-dbf4-4912-8241-672f09f7ab10.jsonl
 s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/datacapture/DEMO-xgb-churn-model-bias-monitor-2025-02-11-0706/AllTraffic/2025/02/11/07/16-38-450-20878bb6-d6ed-4c0b-bfd4-573423da5356.jsonl


In [33]:
print(json.dumps(capture_record, indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "118,21,156.5,122,209.2,125,158.7,81,11.1,3,4,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,1,0,1",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.7764694094657898",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "6f1b6aba-7bd6-4f32-a7dc-4f0405f392cf",
    "inferenceTime": "2025-02-11T07:16:38Z"
  },
  "eventVersion": "0"
}


In [36]:
import threading

# Source: https://sagemaker-examples.readthedocs.io/en/latest/sagemaker_model_monitor/fairness_and_explainability/SageMaker-Model-Monitor-Fairness-and-Explainability.html
class WorkerThread(threading.Thread):
    def __init__(self, do_run, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.__do_run = do_run
        self.__terminate_event = threading.Event()

    def terminate(self):
        self.__terminate_event.set()

    def run(self):
        while not self.__terminate_event.is_set():
            self.__do_run(self.__terminate_event)

In [37]:
def invoke_endpoint(terminate_event):
    with open(test_dataset, "r") as f:
        i = 0
        for row in f:
            payload = row.rstrip("\n")
            response = sagemaker_runtime_client.invoke_endpoint(
                EndpointName=endpoint_name,
                ContentType="text/csv",
                Body=payload,
                InferenceId=str(i),  # unique ID per row
            )
            i += 1
            response["Body"].read()
            sleep(1)
            if terminate_event.is_set():
                break

# Keep invoking the endpoint with test data
invoke_endpoint_thread = WorkerThread(do_run=invoke_endpoint)
invoke_endpoint_thread.start()

In [38]:
import random

def ground_truth_with_id(inference_id):
    random.seed(inference_id)  # to get consistent results
    rand = random.random()
    # format required by the merge container
    return {
        "groundTruthData": {
            "data": "1" if rand < 0.75 else "0",  # randomly generate positive labels 75% of the time
            "encoding": "CSV",
        },
        "eventMetadata": {
            "eventId": str(inference_id),
        },
        "eventVersion": "0",
    }


def upload_ground_truth(upload_time):
    records = [ground_truth_with_id(i) for i in range(test_dataset_size)]
    fake_records = [json.dumps(r) for r in records]
    data_to_upload = "\n".join(fake_records)
    target_s3_uri = f"{ground_truth_upload_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
    print(f"Uploading {len(fake_records)} records to", target_s3_uri)
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

In [39]:
# Generate data for the last hour
upload_ground_truth(datetime.utcnow() - timedelta(hours=1))

Uploading 334 records to s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/ground_truth_data/2025-02-11-07-05-56/2025/02/11/06/2444.jsonl


In [40]:
# Generate data once a hour
def generate_fake_ground_truth(terminate_event):
    upload_ground_truth(datetime.utcnow())
    for _ in range(0, 60):
        sleep(60)
        if terminate_event.is_set():
            break


ground_truth_thread = WorkerThread(do_run=generate_fake_ground_truth)
ground_truth_thread.start()

Uploading 334 records to s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/ground_truth_data/2025-02-11-07-05-56/2025/02/11/07/2447.jsonl


## 3 Setup Bias Monitor

In [41]:
model_bias_monitor = ModelBiasMonitor(
    role=role,
    sagemaker_session=sagemaker_session,
    max_runtime_in_seconds=1800,
)

model_bias_config = BiasConfig(
    label_values_or_threshold=[1],
    facet_name="Account Length", # sensitive feature to check for bias
    facet_values_or_threshold=[100],
)

model_bias_analysis_config = BiasAnalysisConfig(
    model_bias_config,
    headers=all_headers,
    label=label_header,
)

### Setup Monitor Schedule

In [46]:
# every hour
schedule_expression = CronExpressionGenerator.hourly()


model_bias_monitor.create_monitoring_schedule(
    analysis_config=model_bias_analysis_config,
    output_s3_uri=s3_report_path,
    endpoint_input=EndpointInput(
        endpoint_name=endpoint_name,
        destination="/opt/ml/processing/input/endpoint",
        start_time_offset="-PT1H",
        end_time_offset="-PT0H",
        probability_threshold_attribute=0.8,
    ),
    ground_truth_input=ground_truth_upload_path,
    schedule_cron_expression=schedule_expression,
)
print(f"Model bias monitoring schedule: {model_bias_monitor.monitoring_schedule_name}")

Model bias monitoring schedule: monitoring-schedule-2025-02-11-07-32-59-785


In [None]:
# restart schedule if needed 

# model_bias_monitor.stop_monitoring_schedule()

# model_bias_monitor.start_monitoring_schedule()

In [47]:
# Check for start 
def wait_for_execution_to_start(model_monitor):
    print(
        "A hourly schedule was created above and it will kick off executions ON the hour (plus 0 - 20 min buffer)."
    )

    print("Waiting for the first execution to happen", end="")
    schedule_desc = model_monitor.describe_schedule()
    while "LastMonitoringExecutionSummary" not in schedule_desc:
        schedule_desc = model_monitor.describe_schedule()
        print(".", end="", flush=True)
        sleep(60)
    print()
    print("Done! Execution has been created")

    print("Now waiting for execution to start", end="")
    while schedule_desc["LastMonitoringExecutionSummary"]["MonitoringExecutionStatus"] in "Pending":
        schedule_desc = model_monitor.describe_schedule()
        print(".", end="", flush=True)
        sleep(10)

    print()
    print("Done! Execution has started")
        
wait_for_execution_to_start(model_bias_monitor)

A hourly schedule was created above and it will kick off executions ON the hour (plus 0 - 20 min buffer).
Waiting for the first execution to happen............................
Done! Execution has been created
Now waiting for execution to start.
Done! Execution has started


In [53]:
# Waits for the schedule to have last execution in a terminal status.
def wait_for_execution_to_finish(model_monitor):
    schedule_desc = model_monitor.describe_schedule()
    execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
    if execution_summary is not None:
        print("Waiting for execution to finish", end="")
        while execution_summary["MonitoringExecutionStatus"] not in [
            "Completed",
            "CompletedWithViolations",
            "Failed",
            "Stopped",
        ]:
            print(".", end="", flush=True)
            sleep(60)
            schedule_desc = model_monitor.describe_schedule()
            execution_summary = schedule_desc["LastMonitoringExecutionSummary"]
        print()
        print("Done! Execution has finished")
    else:
        print("Last execution not found")
        
        
wait_for_execution_to_finish(model_bias_monitor)

Waiting for execution to finish.........
Done! Execution has finished


## Check Results

In [54]:
schedule_desc = model_bias_monitor.describe_schedule()
execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
if execution_summary and execution_summary["MonitoringExecutionStatus"] in [
    "Completed",
    "CompletedWithViolations",
]:
    last_model_bias_monitor_execution = model_bias_monitor.list_executions()[-1]
    last_model_bias_monitor_execution_report_uri = (
        last_model_bias_monitor_execution.output.destination
    )
    print(f"Report URI: {last_model_bias_monitor_execution_report_uri}")
    last_model_bias_monitor_execution_report_files = sorted(
        S3Downloader.list(last_model_bias_monitor_execution_report_uri)
    )
    print("Found Report Files:")
    print("\n ".join(last_model_bias_monitor_execution_report_files))
else:
    last_model_bias_monitor_execution = None
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )

Report URI: s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/reports/DEMO-xgb-churn-model-bias-monitor-2025-02-11-0706/monitoring-schedule-2025-02-11-07-32-59-785/2025/02/11/08
Found Report Files:
s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/reports/DEMO-xgb-churn-model-bias-monitor-2025-02-11-0706/monitoring-schedule-2025-02-11-07-32-59-785/2025/02/11/08/analysis.json
 s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/reports/DEMO-xgb-churn-model-bias-monitor-2025-02-11-0706/monitoring-schedule-2025-02-11-07-32-59-785/2025/02/11/08/report.html
 s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/reports/DEMO-xgb-churn-model-bias-monitor-2025-02-11-0706/monitoring-schedule-2025-02-11-07-32-59-785/2025/02/11/08/report.ipynb
 s3://sagemaker-us-east-1-796598873577/sagemaker/clarify-bias-monitor/reports/DEMO-xgb-churn-model-bias-monitor-2025-02-11-0706/monitoring-schedule-2025-02-11-07-32-59-785/2025/02/11/08/r

## Clean up <a id='cleanup'></a>  

You can keep your endpoint running to continue capturing data. If you do not plan to collect more data or use this endpoint further, you should delete the endpoint to avoid incurring additional charges. Note that deleting your endpoint does not delete the data that was captured during the model invocations. That data persists in Amazon S3 until you delete it yourself.

But before that, you need to delete the schedule first.

In [None]:
invoke_endpoint_thread.terminate()
ground_truth_thread.terminate()

In [None]:
from sagemaker.predictor import Predictor

predictor = Predictor(endpoint_name, sagemaker_session=sagemaker_session)
model_monitors = predictor.list_monitors()
for model_monitor in model_monitors:
    model_monitor.stop_monitoring_schedule()
    wait_for_execution_to_finish(model_monitor)
    model_monitor.delete_monitoring_schedule()

In [None]:
predictor.delete_endpoint()
predictor.delete_model()

## References

https://sagemaker-examples.readthedocs.io/en/latest/sagemaker_model_monitor/fairness_and_explainability/SageMaker-Model-Monitor-Fairness-and-Explainability.html