## Import libraries

In [15]:
%%time

from datetime import datetime, timedelta, timezone
import json
import os
import re
import boto3
from time import sleep
from threading import Thread

import pandas as pd

from sagemaker import get_execution_role, session, Session, image_uris
from sagemaker.s3 import S3Downloader, S3Uploader
from sagemaker.processing import ProcessingJob
from sagemaker.serializers import CSVSerializer

from sagemaker.model import Model
from sagemaker.model_monitor import DataCaptureConfig

CPU times: user 61 ms, sys: 1.84 ms, total: 62.8 ms
Wall time: 141 ms


## Setup

In [16]:
session = Session()

# Get Execution role
role = get_execution_role()
print("RoleArn:", role)

region = session.boto_region_name
print("Region:", region)

RoleArn: arn:aws:iam::570360337377:role/service-role/AmazonSageMaker-ExecutionRole-20211123T171162
Region: us-east-1


In [17]:
# Setup S3 bucket
bucket = session.default_bucket()
print("Demo Bucket:", bucket)
prefix = "sagemaker/Churn-ModelQualityMonitor"

##S3 prefixes
data_capture_prefix = f"{prefix}/datacapture"
s3_capture_upload_path = f"s3://{bucket}/{data_capture_prefix}"

ground_truth_upload_path = (
    f"s3://{bucket}/{prefix}/ground_truth_data/{datetime.now():%Y-%m-%d-%H-%M-%S}"
)

reports_prefix = f"{prefix}/reports"
s3_report_path = f"s3://{bucket}/{reports_prefix}"

##Get the model monitor image
monitor_image_uri = image_uris.retrieve(framework="model-monitor", region=region)

print("Image URI:", monitor_image_uri)
print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {ground_truth_upload_path}")
print(f"Report path: {s3_report_path}")

Demo Bucket: sagemaker-us-east-1-570360337377
Image URI: 156813124566.dkr.ecr.us-east-1.amazonaws.com/sagemaker-model-monitor-analyzer
Capture path: s3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/datacapture
Ground truth path: s3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/ground_truth_data/2021-11-23-21-43-22
Report path: s3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/reports


### Test acccess to the S3 bucket

In [18]:
S3Uploader.upload("test.csv", f"s3://{bucket}/test_upload")
print("Success! You are all set to proceed.")

Success! You are all set to proceed.


## Upload model

In [19]:
s3_key = f"s3://{bucket}/{prefix}"
model_url = S3Uploader.upload("xgb-churn-prediction-model.tar.gz", s3_key)
model_url

's3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/xgb-churn-prediction-model.tar.gz'

## Create SageMaker Model entity

In [20]:
model_name = f"DEMO-xgb-churn-pred-model-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"

image_uri = image_uris.retrieve(framework="xgboost", version="0.90-1", region=region)

model = Model(image_uri=image_uri, model_data=model_url, role=role, sagemaker_session=session)

## Deploy the model 

In [21]:
endpoint_name = f"DEMO-xgb-churn-model-quality-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("EndpointName =", endpoint_name)

# enable data capture
data_capture_config = DataCaptureConfig(
    enable_capture=True, sampling_percentage=100, destination_s3_uri=s3_capture_upload_path
)

model.deploy(
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)

EndpointName = DEMO-xgb-churn-model-quality-monitor-2021-11-23-2143
----------------!

In [22]:
# Create the SageMaker Predictor object from the endpoint to be used for invoking the model
from sagemaker.predictor import Predictor

predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=session, serializer=CSVSerializer()
)

## Generate a baseline

In [26]:
i = 0
with open("data/validation_with_predictions.csv", "w") as baseline_file:
    baseline_file.write("probability,prediction,label\n")  # our header
    with open("data/validation.csv", "r") as f:
        for row in f:
            (label, input_cols) = row.split(",", 1)
            probability = float(predictor.predict(input_cols))
            prediction = "1" if probability > 0.8 else "0"
            baseline_file.write(f"{probability},{prediction},{label}\n")
            i += 1
            if i > 200:
                break
            print(".", end="", flush=True)
            sleep(0.5)
print()
print("Done!")

........................................................................................................................................................................................................
Done!


In [28]:
!head data/validation_with_predictions.csv

probability,prediction,label
0.01516005303710699,0,0
0.1684480607509613,0,0
0.21427156031131744,0,0
0.06330718100070953,0,0
0.02791607193648815,0,0
0.014169521629810333,0,0
0.00571369007229805,0,0
0.10534518957138062,0,0
0.025899196043610573,0,0


In [29]:
# setup and upload the predictions as a baseline dataset.
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = f"s3://{bucket}/{baseline_data_prefix}"
baseline_results_uri = f"s3://{bucket}/{baseline_results_prefix}"
print(f"Baseline data uri: {baseline_data_uri}")
print(f"Baseline results uri: {baseline_results_uri}")

Baseline data uri: s3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/baselining/data
Baseline results uri: s3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/baselining/results


In [31]:
baseline_dataset_uri = S3Uploader.upload(f"data/validation_with_predictions.csv", baseline_data_uri)
baseline_dataset_uri

's3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/baselining/data/validation_with_predictions.csv'

## Create a base job for model quality monitor

In [32]:
from sagemaker.model_monitor import ModelQualityMonitor
from sagemaker.model_monitor import EndpointInput
from sagemaker.model_monitor.dataset_format import DatasetFormat

In [33]:
# Create the model quality monitoring object
churn_model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=session,
)

In [34]:
# Name of the model quality baseline job
baseline_job_name = f"demo-xgb-churn-model-baseline-job-{datetime.utcnow():%Y-%m-%d-%H%M}"

In [35]:
# Execute the baseline suggestion job.
# You will specify problem type, in this case Binary Classification, and provide other required attributes.
job = churn_model_quality_monitor.suggest_baseline(
    job_name=baseline_job_name,
    baseline_dataset=baseline_dataset_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    problem_type="BinaryClassification",
    inference_attribute="prediction",
    probability_attribute="probability",
    ground_truth_attribute="label",
)
job.wait(logs=False)


Job Name:  demo-xgb-churn-model-baseline-job-2021-11-23-2202
Inputs:  [{'InputName': 'baseline_dataset_input', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/baselining/data/validation_with_predictions.csv', 'LocalPath': '/opt/ml/processing/input/baseline_dataset_input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/baselining/results', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
....................................................................!

In [36]:
baseline_job = churn_model_quality_monitor.latest_baselining_job

In [37]:
binary_metrics = baseline_job.baseline_statistics().body_dict["binary_classification_metrics"]
pd.json_normalize(binary_metrics).T

Unnamed: 0,0
confusion_matrix.0.0,173
confusion_matrix.0.1,0
confusion_matrix.1.0,12
confusion_matrix.1.1,16
recall.value,0.571429
recall.standard_deviation,0.0424679
precision.value,1
precision.standard_deviation,0
accuracy.value,0.940299
accuracy.standard_deviation,0.00807129


In [38]:
pd.DataFrame(baseline_job.suggested_constraints().body_dict["binary_classification_constraints"]).T

Unnamed: 0,threshold,comparison_operator
recall,0.571429,LessThanThreshold
precision,1.0,LessThanThreshold
accuracy,0.940299,LessThanThreshold
true_positive_rate,0.571429,LessThanThreshold
true_negative_rate,1.0,LessThanThreshold
false_positive_rate,0.0,GreaterThanThreshold
false_negative_rate,0.428571,GreaterThanThreshold
auc,0.939513,LessThanThreshold
f0_5,0.869565,LessThanThreshold
f1,0.727273,LessThanThreshold


## Setup continuous model monitoring 

In [40]:
# Generate prediction data
def invoke_endpoint(ep_name, file_name):
    with open(file_name, "r") as f:
        i = 0
        for row in f:
            payload = row.rstrip("\n")
            response = session.sagemaker_runtime_client.invoke_endpoint(
                EndpointName=endpoint_name,
                ContentType="text/csv",
                Body=payload,
                InferenceId=str(i),  # unique ID per row
            )["Body"].read()
            i += 1
            sleep(1)


def invoke_endpoint_forever():
    while True:
        invoke_endpoint(endpoint_name, "data/test-dataset-input-cols.csv")


thread = Thread(target=invoke_endpoint_forever)
thread.start()

In [41]:
print("Waiting for captures to show up", end="")
for _ in range(120):
    capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
    if capture_files:
        capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")
        capture_record = json.loads(capture_file[0])
        if "inferenceId" in capture_record["eventMetadata"]:
            break
    print(".", end="", flush=True)
    sleep(1)
print()
print("Found Capture Files:")
print("\n ".join(capture_files[-3:]))

Waiting for captures to show up
Found Capture Files:
s3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/datacapture/DEMO-xgb-churn-model-quality-monitor-2021-11-23-2143/AllTraffic/2021/11/23/21/58-25-157-cf5aa6d8-03d2-4f1b-85cc-5bedf4be7386.jsonl
 s3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/datacapture/DEMO-xgb-churn-model-quality-monitor-2021-11-23-2143/AllTraffic/2021/11/23/22/13-10-143-1bf2dc89-b736-4225-9bb8-e90199b07747.jsonl
 s3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/datacapture/DEMO-xgb-churn-model-quality-monitor-2021-11-23-2143/AllTraffic/2021/11/23/22/14-10-972-ceea61f4-0a6f-4e54-a015-2d43c71c4bcc.jsonl


In [42]:
print("\n".join(capture_file[-3:-1]))

{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"138,0,46.5,104,186.0,114,167.5,95,9.6,4,4,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,1,0","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"0.9562002420425415","encoding":"CSV"}},"eventMetadata":{"eventId":"27be2eea-5bcc-4d44-ae52-c6af194ed762","inferenceId":"118","inferenceTime":"2021-11-23T22:15:09Z"},"eventVersion":"0"}
{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"93,0,176.1,103,199.7,130,263.9,96,8.5,6,2,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1,0","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"0.007474285550415516","encoding":"CSV"}},"eventMetadata":{"eventId":"2a79ba4f-79e6-4c4e-b206-bbb2a174dc

In [43]:
print(json.dumps(capture_record, indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "130,0,263.7,113,186.5,103,195.3,99,18.3,6,1,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,1,0",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.3791300654411316",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "d5526660-5241-4099-8894-a911a18c12b4",
    "inferenceId": "60",
    "inferenceTime": "2021-11-23T22:14:10Z"
  },
  "eventVersion": "0"
}


## Generate ground truth

In [44]:
import random


def ground_truth_with_id(inference_id):
    random.seed(inference_id)  # to get consistent results
    rand = random.random()
    return {
        "groundTruthData": {
            "data": "1" if rand < 0.7 else "0",  # randomly generate positive labels 70% of the time
            "encoding": "CSV",
        },
        "eventMetadata": {
            "eventId": str(inference_id),
        },
        "eventVersion": "0",
    }


def upload_ground_truth(records, upload_time):
    fake_records = [json.dumps(r) for r in records]
    data_to_upload = "\n".join(fake_records)
    target_s3_uri = f"{ground_truth_upload_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
    print(f"Uploading {len(fake_records)} records to", target_s3_uri)
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

In [45]:
NUM_GROUND_TRUTH_RECORDS = 334  # 334 are the number of rows in data we're sending for inference


def generate_fake_ground_truth_forever():
    j = 0
    while True:
        fake_records = [ground_truth_with_id(i) for i in range(NUM_GROUND_TRUTH_RECORDS)]
        upload_ground_truth(fake_records, datetime.utcnow())
        j = (j + 1) % 5
        sleep(60 * 60)  # do this once an hour


gt_thread = Thread(target=generate_fake_ground_truth_forever)
gt_thread.start()

Uploading 334 records to s3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/ground_truth_data/2021-11-23-21-43-22/2021/11/23/22/1714.jsonl


## Create a monitoring schedule

In [46]:
##Monitoring schedule name
churn_monitor_schedule_name = (
    f"demo-xgb-churn-monitoring-schedule-{datetime.utcnow():%Y-%m-%d-%H%M}"
)

In [47]:
# Create an enpointInput
endpointInput = EndpointInput(
    endpoint_name=predictor.endpoint_name,
    probability_attribute="0",
    probability_threshold_attribute=0.5,
    destination="/opt/ml/processing/input_data",
)

In [48]:
# Create the monitoring schedule to execute every hour.
from sagemaker.model_monitor import CronExpressionGenerator

response = churn_model_quality_monitor.create_monitoring_schedule(
    monitor_schedule_name=churn_monitor_schedule_name,
    endpoint_input=endpointInput,
    output_s3_uri=baseline_results_uri,
    problem_type="BinaryClassification",
    ground_truth_input=ground_truth_upload_path,
    constraints=baseline_job.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

In [49]:
# Create the monitoring schedule
# You will see the monitoring schedule in the 'Scheduled' status
churn_model_quality_monitor.describe_schedule()

{'MonitoringScheduleArn': 'arn:aws:sagemaker:us-east-1:570360337377:monitoring-schedule/demo-xgb-churn-monitoring-schedule-2021-11-23-2217',
 'MonitoringScheduleName': 'demo-xgb-churn-monitoring-schedule-2021-11-23-2217',
 'MonitoringScheduleStatus': 'Pending',
 'MonitoringType': 'ModelQuality',
 'CreationTime': datetime.datetime(2021, 11, 23, 22, 17, 42, 821000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 11, 23, 22, 17, 42, 888000, tzinfo=tzlocal()),
 'MonitoringScheduleConfig': {'ScheduleConfig': {'ScheduleExpression': 'cron(0 * ? * * *)'},
  'MonitoringJobDefinitionName': 'model-quality-job-definition-2021-11-23-22-17-42-509',
  'MonitoringType': 'ModelQuality'},
 'EndpointName': 'DEMO-xgb-churn-model-quality-monitor-2021-11-23-2143',
 'ResponseMetadata': {'RequestId': '154f843d-d874-4048-a5d7-a9fc03363a64',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '154f843d-d874-4048-a5d7-a9fc03363a64',
   'content-type': 'application/x-amz-json-1.1',
   '

In [50]:
# Initially there will be no executions since the first execution happens at the top of the hour
# Note that it is common for the execution to luanch upto 20 min after the hour.
executions = churn_model_quality_monitor.list_executions()
executions

No executions found for schedule. monitoring_schedule_name: demo-xgb-churn-monitoring-schedule-2021-11-23-2217


[]

In [52]:
execution = churn_model_quality_monitor.describe_schedule().get("LastMonitoringExecutionSummary")
if execution:
    print("Execution found!")

In [54]:
executions = churn_model_quality_monitor.list_executions()
if executions:
    latest_execution = executions[-1]
    latest_execution.describe()

No executions found for schedule. monitoring_schedule_name: demo-xgb-churn-monitoring-schedule-2021-11-23-2217
Uploading 334 records to s3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/ground_truth_data/2021-11-23-21-43-22/2021/11/23/23/1715.jsonl
Uploading 334 records to s3://sagemaker-us-east-1-570360337377/sagemaker/Churn-ModelQualityMonitor/ground_truth_data/2021-11-23-21-43-22/2021/11/24/00/1715.jsonl
