# Monitoring and Analyzing Data Quality for XGBoost Churn Models With Amazon SageMaker Model Monitor

This notebook demonstrates how to use Amazon SageMaker Model Monitor to track and analyze the performance of a deployed machine learning model in real-time. You will be working with a pre-trained XGBoost model for customer churn prediction, demonstrating how SageMaker Model Monitor can automatically detect data quality issues that might affect your model's performance in production.

To use this notebook:

- Clone the repository containing this notebook and associated files.
- Open the notebook in SageMaker Studio or a SageMaker Notebook Instance.

**Learning Objectives:**

1. Set up a SageMaker environment for model monitoring
2. Deploy a model to a SageMaker endpoint with data capture enabled
3. Create a baseline for model monitoring
4. Implement a monitoring schedule for a deployed model
5. Analyze monitoring results and detect data quality issues

## Set Up the SageMaker Environment

Initializes the SageMaker session, sets up S3 buckets, and defines key variables for the model monitoring project.

**Actions:**
1. Import necessary libraries
2. Create a SageMaker session
3. Get the execution role
4. Set up S3 bucket and prefixes for data capture and reports
5. Print out the paths for verification

In [None]:
import os
import boto3
import re
import json
import sagemaker
from sagemaker import get_execution_role, session

sm_session = sagemaker.Session()
region = sm_session.boto_region_name

role = get_execution_role()
print("Role ARN: {}".format(role))

bucket = sm_session.default_bucket()
print("Demo Bucket: {}".format(bucket))
prefix = "sagemaker/DEMO-ModelMonitor"

data_capture_prefix = "{}/datacapture".format(prefix)
s3_capture_upload_path = "s3://{}/{}".format(bucket, data_capture_prefix)
reports_prefix = "{}/reports".format(prefix)
s3_report_path = "s3://{}/{}".format(bucket, reports_prefix)

print("Capture path: {}".format(s3_capture_upload_path))
print("Report path: {}".format(s3_report_path))

## Upload the Pre-trained Model

Upload the pre-trained XGBoost model to Amazon S3. SageMaker requires the model file to be accessible in S3 for deployment.

**Actions:**
1. Open the pre-trained model file
2. Define the S3 key for the model
3. Upload the model to the S3 bucket
4. Print a confirmation message

This code assumes that the "xgb-churn-prediction-model.tar.gz" file is present in the "model" directory of your repository. If you've cloned the repository correctly, this file should already be in place.

In [None]:
model_file = open("model/xgb-churn-prediction-model.tar.gz", "rb")
s3_key = os.path.join(prefix, "xgb-churn-prediction-model.tar.gz")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(model_file)

print(f"Model uploaded successfully to s3://{bucket}/{s3_key}")

## Create a SageMaker Model Object

Create a SageMaker model object using the uploaded pre-trained model.

**Actions:**
1. Generate a unique model name
2. Construct the S3 URL for the uploaded model
3. Retrieve the Docker image URI for XGBoost
4. Create a SageMaker Model object
5. Print model details for verification

In [None]:
from time import gmtime, strftime
from sagemaker.model import Model
from sagemaker.image_uris import retrieve

model_name = "DEMO-xgb-churn-pred-model-monitor-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(f"Model name: {model_name}")

model_url = "https://{}.s3-{}.amazonaws.com/{}/xgb-churn-prediction-model.tar.gz".format(
    bucket, region, prefix
)
print(f"Model URL: {model_url}")

image_uri = retrieve("xgboost", region, "0.90-1")
print(f"Docker image URI: {image_uri}")

model = Model(image_uri=image_uri, model_data=model_url, role=role)
print("SageMaker Model object created successfully")
print(f"Model data location: {model.model_data}")
print(f"Model role ARN: {model.role}")

## Deploy the Model with Data Capture

Deploy the SageMaker model to an endpoint with data capture enabled for model monitoring.

**Actions:**
1. Generate a unique endpoint name
2. Configure data capture settings
3. Deploy the model to a SageMaker endpoint
4. Print the endpoint name for reference

In [None]:
from sagemaker.model_monitor import DataCaptureConfig

endpoint_name = "DEMO-xgb-churn-pred-model-monitor-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("EndpointName={}".format(endpoint_name))

data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
)

predictor = model.deploy(
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)

## Prepare Test Data and Send Test Traffic to the Endpoint

Prepare a sample of test data and sends it to the deployed model endpoint to generate inference data for monitoring.

**Actions:**
1. Create a Predictor object for the deployed endpoint
2. Prepare a subset of test data
3. Send test data to the endpoint
4. Print confirmation message

This code assumes that the "test-dataset-input-cols.csv" and "test_sample.csv" files are present in the "test_data" directory of your repository. If you've cloned the repository correctly, this file should already be in place.

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
import time

predictor = Predictor(endpoint_name=endpoint_name, serializer=CSVSerializer())

# Get a subset of test data for a quick test
!head -180 test_data/test-dataset-input-cols.csv > test_data/test_sample.csv
print("Sending test traffic to the endpoint {}. \nPlease wait...".format(endpoint_name))

with open("test_data/test_sample.csv", "r") as f:
    for row in f:
        payload = row.rstrip("\n")
        response = predictor.predict(data=payload)
        time.sleep(1)

print("Done!")

## Verify Data Capture

Verifiy that data is being captured correctly from the model endpoint.

**Actions:**
1. Set up an S3 client
2. List objects in the data capture S3 prefix
3. Print the captured files

In [None]:
s3_client = boto3.Session().client("s3")
current_endpoint_capture_prefix = "{}/{}".format(data_capture_prefix, endpoint_name)
result = s3_client.list_objects(Bucket=bucket, Prefix=current_endpoint_capture_prefix)
capture_files = [capture_file.get("Key") for capture_file in result.get("Contents")]
print("Found Capture Files:")
print("\n ".join(capture_files))

## Inspect Captured Data

Retrieve and displays a sample of the captured data for inspection.

**Actions:**
1. Define a function to get the body of an S3 object
2. Retrieve the content of the latest capture file
3. Print a portion of the captured data

In [None]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get("Body").read().decode("utf-8")

capture_file = get_obj_body(capture_files[-1])
print(capture_file[:2000])

## Examine a Single Captured Record

Parse and display a single record from the captured data to understand its structure in detail.

**Actions:**
1. Import the json module
2. Extract the first record from the captured data
3. Parse the record as JSON
4. Print the formatted JSON record

In [None]:
import json

print(json.dumps(json.loads(capture_file.split("\n")[0]), indent=2))

## Set Up Baseline Data for Model Monitoring

Prepares the environment for creating a baseline against which future data will be compared for model monitoring.

**Actions:**
1. Define S3 prefixes for baseline data and results
2. Construct S3 URIs for baseline data and results
3. Print the URIs for verification

In [None]:
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = "s3://{}/{}".format(bucket, baseline_data_prefix)
baseline_results_uri = "s3://{}/{}".format(bucket, baseline_results_prefix)
print("Baseline data uri: {}".format(baseline_data_uri))
print("Baseline results uri: {}".format(baseline_results_uri))

## Upload Baseline Data to S3

Upload the training dataset to S3, which will serve as the baseline for model monitoring.

**Actions:**
1. Open the training dataset file
2. Define the S3 key for the baseline data
3. Upload the file to the S3 bucket
4. Print confirmation message with the S3 path

This step assumes that the file "training-dataset-with-header.csv" is present in the "test_data" directory of your repository. If you've cloned the repository correctly, this file should already be in place. Ensure that the file exists before running this step.

In [None]:
training_data_file = open("test_data/training-dataset-with-header.csv", "rb")
s3_key = os.path.join(baseline_prefix, "data", "training-dataset-with-header.csv")
boto3.Session().resource("s3").Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

print(f"Training data uploaded to s3://{bucket}/{s3_key}")

## Create and Configure Default Model Monitor

Set up the default model monitor with the baseline data, which will be used to detect deviations in future data.

**Actions:**
1. Import necessary modules from SageMaker Model Monitor
2. Create a DefaultModelMonitor instance
3. Suggest a baseline using the uploaded training data
4. Wait for the baseline suggestion process to complete

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri + "/training-dataset-with-header.csv",
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True,
)

## Review Baseline Results

Retrieve and display the files generated during the baseline creation process.

**Actions:**
1. Set up an S3 client
2. List objects in the baseline results S3 prefix
3. Print the names of the generated baseline files

In [None]:
s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Files:")
print("\n ".join(report_files))

## Examine Baseline Statistics

Retrieve and displays the baseline statistics generated during the baseline creation process.

**Actions:**
1. Import pandas library for data manipulation
2. Get the latest baselining job
3. Retrieve the baseline statistics
4. Convert the statistics to a pandas DataFrame
5. Display the first 10 rows of the statistics

In [None]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

## Review Suggested Constraints

Retrieve and display the constraints suggested by the model monitor based on the baseline data.

**Actions:**
1. Retrieve the suggested constraints from the latest baselining job
2. Convert the constraints to a pandas DataFrame
3. Display the first 10 rows of the constraints

In [None]:
constraints_df = pd.json_normalize(
    baseline_job.suggested_constraints().body_dict["features"]
)
constraints_df.head(10)

## Create a Monitoring Schedule

Set up a regular monitoring schedule for the deployed model, using the baseline statistics and constraints.

**Actions:**
1. Import the CronExpressionGenerator
2. Generate a unique name for the monitoring schedule
3. Create the monitoring schedule using the default monitor
4. Set up hourly monitoring with CloudWatch metrics enabled
5. Print the name of the created schedule

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator

mon_schedule_name = "DEMO-xgb-churn-pred-monitor-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

print("Creating monitoring schedule...")
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint_name,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

print(f"Created monitoring schedule: {mon_schedule_name}")

## Generate Sample Traffic for Monitoring

Generate sample traffic to the endpoint to provide data for the monitoring process.

**Actions:**
1. Define a function to generate sample traffic
2. Open the test dataset file
3. Send multiple requests to the endpoint
4. Introduce a short delay between requests
5. Print confirmation messages

In [None]:
import time

def generate_sample_traffic(num_requests=50):
    with open("test_data/test-dataset-input-cols.csv", "r") as f:
        for _ in range(num_requests):
            payload = next(f).strip()
            response = predictor.predict(payload)
            time.sleep(0.5)

print("Generating sample traffic...")
generate_sample_traffic()
print("Sample traffic generation complete.")

## Check Monitoring Schedule Status

Check the status of the monitoring schedule created.

**Actions:**
1. Set up a loop to check the status multiple times
2. Retrieve the current status of the monitoring schedule
3. Print the status
4. Wait for 30 seconds between checks
5. Break the loop if the status is "Scheduled"

In [None]:
for _ in range(10):  # Check for up to ~5 minutes
    status = my_default_monitor.describe_schedule()["MonitoringScheduleStatus"]
    print(f"Monitoring schedule status: {status}")
    if status == "Scheduled":
        break
    time.sleep(30)

## Wait for and Check Monitoring Results

Wait for the first monitoring execution to complete and then checks the results.

**Actions:**
1. Wait for an hour to allow the first monitoring execution to run
2. List the monitoring executions
3. Retrieve the latest execution
4. Check the status of the latest execution
5. If completed, check for any constraint violations
6. Print the results of the monitoring

In [None]:
print("Waiting for the first execution to complete...")
time.sleep(3600)  # Wait for an hour

executions = my_default_monitor.list_executions()
if executions:
    latest_execution = executions[-1]
    execution_status = latest_execution.describe()["ProcessingJobStatus"]
    print(f"Latest execution status: {execution_status}")
    
    if execution_status == "Completed":
        violations = my_default_monitor.latest_monitoring_constraint_violations()
        if violations.body_dict["violations"]:
            print("Violations detected:")
            for violation in violations.body_dict["violations"]:
                print(f"- Feature: {violation['feature_name']}, Type: {violation['constraint_check_type']}")
        else:
            print("No violations detected.")
else:
    print("No executions found yet.")

## Retrieve the Monitoring Report URI

Retrieve the S3 URI of the monitoring report generated by the latest execution.

**Actions:**
1. Access the output destination of the latest execution
2. Print the report URI

In [None]:
report_uri = latest_execution.output.destination
print("Report Uri: {}".format(report_uri))

## List Monitoring Report Files

List the files generated in the latest monitoring report.

**Actions:**
1. Import the urlparse function
2. Parse the report URI to extract the bucket and key
3. Set up an S3 client
4. List objects in the report S3 prefix
5. Print the names of the report files

In [None]:
from urllib.parse import urlparse

s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip("/")
print("Report bucket: {}".format(report_bucket))
print("Report key: {}".format(report_key))

s3_client = boto3.Session().client("s3")
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get("Contents")]
print("Found Report Files:")
print("\n ".join(report_files))

## Analyze Constraint Violations in Detail

Retrieves and display detailed information about any constraint violations detected during the monitoring process.

**Actions:**
1. Set pandas display option to show full column width
2. Retrieve the latest monitoring constraint violations
3. Convert the violations data to a pandas DataFrame
4. Display the first 10 rows of the violations data

In [None]:
pd.set_option("display.max_colwidth", None)
violations = my_default_monitor.latest_monitoring_constraint_violations()
constraints_df = pd.json_normalize(violations.body_dict["violations"])
print(constraints_df.head(10))

## Clean Up Resources

Remove the AWS resources created during this notebook to prevent unnecessary charges.

**Actions:**
1. Delete the Monitoring Schedule in the SageMaker console under "Governance"
2. Delete the SageMaker Endpoint in the SageMaker console under "Inference" > "Endpoints"
3. Delete the SageMaker Model in the SageMaker console under "Inference" > "Models"