# Amazon SageMaker Model Monitoring Beta Data Capture
_**Hosting a Model in Amazon SageMaker and Capturing Inference requests, results, and metadata**_

*NOTE - THIS FEATURE IS CONFIDENTIAL AND SHARED UNDER NDA. THE FEATURE IS IN BETA AND SHOULD NOT BE USED FOR PRODUCTION ENDPOINTS. THIS FEATURE IS CURRENTLY IN DEVELOPMENT AND THE API SPECIFICATIONS MAY CHANGE*

---





---
## Background

Amazon SageMaker provides every developer and data scientist with the ability to build, train, and deploy machine learning models quickly. Amazon SageMaker is a fully-managed service that covers the entire machine learning workflow. You can label and prepare your data, choose an algorithm, train a model, and then tune and optimize it for deployment. Amazon SageMaker gets your models into production to make predictions or take actions with less effort and lower costs than was previously possible.

Amazon SageMaker is adding new capabilities that monitor ML models while in production and that detect deviations in data quality in comparison to a baseline dataset (e.g. training data set). They enable you to capture the metadata and the input and output for invocations of the models that you deploy with Amazon SageMaker. They also enable you to analyze the data and monitor its quality. 

This notebook shows you how to capture the model invocation data from an endpoint and then view that data in S3. Soon, we plan to provide an additional example that shows how to analyze the data collected. 

---
## Setup

Let's start by specifying:

* The AWS region used to host you model.
* The IAM role ARN used to give learning and hosting access to your data. See the documentation for how to specify these.
* The S3 bucket used to store the data used to train your model, any additional model data, and the data captured from model invocations.

#### Add the SageMaker Internal Model
This step is required to enable the data capture feature for beta.

In [None]:
!aws configure add-model --service-model file://sagemaker-internal-model.json --service-name sagemaker

In [None]:
%%time

import os
import boto3
import re
import json
from sagemaker import get_execution_role, session

region = boto3.Session().region_name

role = get_execution_role()
print("RoleArn: {}".format(role))

# You can modify the bucket to be your own, but make sure the role you chose for this notebook
# has s3:PutObject permissions. This is the bucket into which the data will be captured
bucket =  session.Session(boto3.Session()).default_bucket()
print("Recommendations Bucket: {}".format(bucket))
prefix = 'sagemaker/Recommendations-ModelMonitor'

data_capture_prefix = '{}/datacapture'.format(prefix)
s3_capture_upload_path = 's3://{}/{}'.format(bucket, data_capture_prefix)
reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(bucket,reports_prefix)
code_prefix = '{}/code'.format(prefix)
s3_code_preprocessor_uri = 's3://{}/{}/{}'.format(bucket,code_prefix, 'preprocessor.py')
s3_code_postprocessor_uri = 's3://{}/{}/{}'.format(bucket,code_prefix, 'postprocessor.py')

print("Capture path: {}".format(s3_capture_upload_path))
print("Report path: {}".format(s3_report_path))
print("Preproc Code path: {}".format(s3_code_preprocessor_uri))
print("Postproc Code path: {}".format(s3_code_postprocessor_uri))

sm_client = boto3.client('sagemaker')


In [None]:

import pathlib

#PRETRAINED_MODELS_BUCKET_PREFIX="https://reinvent-sagemaker-deployment-2019.s3-us-west-2.amazonaws.com/recommendations"

LOCAL_MODELS_DIR='../../models'
LOCAL_DATA_DIR='../../data'

MOVIE_RECOMMENDATION_MODEL='movie-rec-all-features.tar.gz'
MUSIC_RECOMMENDATION_MODEL='music-rec-kiran.tar.gz'

MOVIE_RECOMMENDATION_TRAIN_DATA='movie_train.csv'

MOVIE_RECOMMENDATION_TEST_DATA='movie_test.csv'
MUSIC_RECOMMENDATION_TEST_DATA='music_test.csv'


##if not pathlib.Path(LOCAL_MODELS_DIR + "/" + MOVIE_RECOMMENDATION_MODEL).exists():
   ## !wget $PRETRAINED_MODELS_BUCKET_PREFIX/models/movie-recommendation-models/$MOVIE_RECOMMENDATION_MODEL -P $LOCAL_MODELS_DIR

##if not pathlib.Path(LOCAL_MODELS_DIR + "/" + MUSIC_RECOMMENDATION_MODEL).exists():
   ## !wget $PRETRAINED_MODELS_BUCKET_PREFIX/models/music-recommendation-models/$MUSIC_RECOMMENDATION_MODEL -P $LOCAL_MODELS_DIR

##if not pathlib.Path(LOCAL_DATA_DIR + "/" + MOVIE_RECOMMENDATION_TRAIN_DATA).exists():
    ##!wget $PRETRAINED_MODELS_BUCKET_PREFIX/data/movie-recommendations-data/$MOVIE_RECOMMENDATION_TRAIN_DATA -P $LOCAL_DATA_DIR

##if not pathlib.Path(LOCAL_DATA_DIR + "/" + MOVIE_RECOMMENDATION_TEST_DATA).exists():
   ## !wget $PRETRAINED_MODELS_BUCKET_PREFIX/data/movie-recommendations-data/$MOVIE_RECOMMENDATION_TEST_DATA -P $LOCAL_DATA_DIR

##if not pathlib.Path(LOCAL_DATA_DIR + "/" + MUSIC_RECOMMENDATION_TEST_DATA).exists():
   ## !wget $PRETRAINED_MODELS_BUCKET_PREFIX/data/music-recommendations-data/$MUSIC_RECOMMENDATION_TEST_DATA -P $LOCAL_DATA_DIR




Let's quickly test the notebook has the right permissions needed for the demo. We will put a simple test object into the S3 bucket we specified above. If this command fails, then the demo will not work as intended. You can fix this by updating the role associated with this notebook to have "s3:PutObject" permissions and try this validation again

In [None]:
# One time execution of this cell is good enough---
# let's go ahead and upload some test scripts
boto3.Session().resource('s3').Bucket(bucket).Object("test_upload/test.txt").upload_file('test_data/upload-test-file.txt')
print("Success! You are all set to proceed.")

## Deploy model on a SageMaker Endpoint

### Upload the pre-trained model to S3

This code uploads a pre-trained XGBoost model that is ready for you to deploy. You can also use your own pre-trained model in this step. If you already have a pretrained model in s3, you can add it instead by specifying its s3_key.

In [None]:
##Copy model to S3 bucket.
def copy_model_to_s3(model_name):
    key = prefix + "/" + model_name
    with open(LOCAL_MODELS_DIR+'/'+model_name, 'rb') as file_obj:
        print("Uploading ", file_obj , " to bucket ",bucket, " as " , key)
        boto3.Session().resource('s3').Bucket(bucket).Object(key).upload_fileobj(file_obj)

In [None]:
##Copy movie recommendation model to S3
copy_model_to_s3(MOVIE_RECOMMENDATION_MODEL)

### PART A: Enable capturing real-time inference data from SageMaker Endpoints
Let's first create an Endpoint to showcase the data capture capability in action.

### Create SageMaker Model entity

This step creates an Amazon SageMaker model from the model file previously uploaded to S3. If you have already created an Amazon SageMaker model, you can skip this step.

In [None]:
from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'xgboost', '0.90-1')

In [None]:
%%time
from time import gmtime, strftime

model_name = "MoviePredictions-EndpointDataCaptureModel-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

sm_client = boto3.client('sagemaker')

model_url = 'https://{}.s3-{}.amazonaws.com/{}/{}'.format(bucket, region, prefix,MOVIE_RECOMMENDATION_MODEL)

print (model_url)

primary_container = {
    'Image': container,
    'ModelDataUrl': model_url,
}

create_model_response = sm_client.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    PrimaryContainer = primary_container)

print(create_model_response['ModelArn'])

### Create Endpoint Configuration

This step is required to deploy an Amazon SageMaker model on an endpoint. To enable data capture for monitoring the model data quality, you specify the new capture option called "DataCaptureConfig". You can capture the request payload, the response payload or both with this configuration. The data capture is supported at the endpoint configuration level and applies to all variants. The captured data is stored in a json format. If you are using your own Amazon SageMaker model, you still need to complete this step to create a new endpoint configuration. The comments highlight the new API parameters for data capture.

In [None]:
from time import gmtime, strftime

data_capture_sub_folder = "datacapture-xgboost-movie-recommendations"
s3_capture_upload_path = 's3://{}/{}'.format(bucket, data_capture_sub_folder)

print("Capture path:"+ s3_capture_upload_path)

data_capture_configuration = {
    "EnableCapture": True, # flag turns data capture on and off
    "InitialSamplingPercentage": 90, # sampling rate to capture data. max is 100%
    "DestinationS3Uri": s3_capture_upload_path, # s3 location where captured data is saved
    "CaptureOptions": [
        {
            "CaptureMode": "Output" # The type of capture this option enables. Values can be: [Output/Input]
        },
        {
            "CaptureMode": "Input" # The type of capture this option enables. Values can be: [Output/Input]
        }
    ],
    "CaptureContentTypeHeader": {
       "CsvContentTypes": ["text/csv"], # headers which should signal to decode the payload into CSV format 
       "JsonContentTypes": ["application/json"] # headers which should signal to decode the payload into JSON format 
     }
}

endpoint_config_name = 'XGBoost-MovieRec-DataCaptureEndpointConfig-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_config_name)
create_endpoint_config_response = sm_client.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m5.xlarge',
        'InitialInstanceCount':1,
        'InitialVariantWeight':1,
        'ModelName':model_name,
        'VariantName':'AllTrafficVariant'
    }],
    DataCaptureConfig = data_capture_configuration) # This is where the new capture options are applied

print("Endpoint Config Arn: " + create_endpoint_config_response['EndpointConfigArn'])

### Create Endpoint
This step uses the endpoint configuration specified above to create an endpoint. This takes a few minutes (approximately 9 minutes) to complete.

In [None]:
%%time
import time

endpoint_name = 'XGBoost-MovieRec-DataCaptureEndpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)
create_endpoint_response = sm_client.create_endpoint(
    EndpointName=endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
status = resp['EndpointStatus']
print("Status: " + status)

while status=='Creating':
    time.sleep(60)
    resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
    status = resp['EndpointStatus']
    print("Status: " + status)

print("Arn: " + resp['EndpointArn'])
print("Status: " + status)

## Invoke the Deployed Model

You can now send data to this endpoint to get inferences in realtime. Because we have enabled the data capture in the previous steps, the request and response payload along with some additional metadata will be saved in the S3 location you have specified.

In [None]:
#runtime_client = boto3.client('runtime.sagemaker')

#def invoke_endpoint(ep_name, file_name, runtime_client):
 #   with open(file_name, 'r') as f:
  #      for row in f:
   #         payload = row.rstrip('\n')
    #        response = runtime_client.invoke_endpoint(EndpointName=ep_name,
     #                                    ContentType='text/csv', 
      #                                    Body=payload)
       #     print(response) # TODO: Print after decoding utf8
        #    time.sleep(0.5)

In [None]:
#endpoint_name='demo-data-capture-endpoint-2019-11-11-00-34-05'
#!head -5 test_data/test-dataset-input-cols.csv > test_data/test_sample.csv
###NOT WORKING BECAUSE OF COL NUM MISTMATCH

#!head -5 data/movie_test.csv > test_data/test_sample.csv

#print("Sending test traffic to the endpoint... Please wait...")
#invoke_endpoint(endpoint_name, 'test_data/test_sample.csv', runtime_client)

In [None]:
##For data capturing we will just perform predictions with a subset of test data.

runtime_client = boto3.client('runtime.sagemaker')

with open(LOCAL_DATA_DIR+"/"+MOVIE_RECOMMENDATION_TEST_DATA, 'r') as f:
    contents = f.readlines()
    
for i in range(0, 30):
    line = contents[i]
    split_data = line.split(',')
    #Remove the original rating value from data used for prediction
    original_value = split_data.pop(0)
    
    payload = ','.join(split_data)
    
    response = runtime_client.invoke_endpoint(EndpointName=endpoint_name,
                                              ContentType='text/csv', 
                                              Body=payload)
    prediction = response['Body'].read().decode('utf-8')

    print("Original Value ", original_value , "Prediction : ", float(prediction))
    

This step invokes the endpoint with included sample data for ~2 minutes. Data is captured based on the sampling percentage specified. If your endpoint runs for a long time, the data from your endpoint will continue to be captured and saved.

In [None]:
##TODO : Fix the test data so we can run this for the entire dataset
#for i in range(0, 40):    
    ## Process the test content 
 #   line = contents[i]
  #  split_data = line.split(',')
   # original_value = split_data.pop(0)
   # payload = ','.join(split_data)
    
    
    #response = runtime_client.invoke_endpoint(EndpointName=endpoint_name,
     #                                         ContentType='text/csv',
      #                                        Body=payload)
                                           
    #print("Original Value ", original_value , "Prediction : ", float(response['Body'].read().decode('utf-8')) )
    #time.sleep(1)

## View Captured Data

Now let's list the data capture files stored in S3. You should expect to see different files from different time periods organized based on the hour in which the invocation occurred. The format of the s3 path is:

s3://bucket-name/endpoint-name/year/month/day/hour/variant-name/filename.jsonl

**NOTE:** This path is subject to change during the beta period

In [None]:
s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=bucket, Prefix=data_capture_sub_folder)
print("data_capture_sub_folder : " , data_capture_sub_folder)
capture_files = [capture_file.get("Key") for capture_file in result.get('Contents')]
print("Found Capture Files:")
print("\n ".join(capture_files))

Next, let's view the contents of a single capture file. Here you should see all the data captured in a json-line formatted file. For simplicity the code shows only the first few lines of the file.

In [None]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get('Body').read().decode("utf-8")

capture_file = get_obj_body(capture_files[-1])
print(capture_file[:2000])

Finally, let's view the contents of a single line. This follows the data capture naming convention that you provided during the Endpoint Config setup.

In [None]:
import json
print(json.dumps(json.loads(capture_file.split('\n')[0]), indent=2))

# PART B: Model Monitoring - Baselining and continous monitoring

## 1. Constraint suggestion with baseline/training dataset

From our training dataset let's ask SageMaker to suggest a set of baseline constraints and generate descriptive statistics to explore the data. But first, let's upload the training dataset (if you already have it, you can directly point to it). In this case, we have the training dataset which was used for training the xgboost model packaged in this example for convenience.

In [None]:
# copy over the training dataset to S3 (if you already have it in S3, you could reuse it)
baseline_prefix = prefix + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_data_uri = 's3://{}/{}'.format(bucket,baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(bucket, baseline_results_prefix)
print(baseline_data_uri)
print(baseline_results_uri)

In [None]:
training_data_file = open(LOCAL_DATA_DIR+"/"+MOVIE_RECOMMENDATION_TRAIN_DATA, 'rb')
s3_key = os.path.join(baseline_prefix, 'data', 'movie_train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

## Create a baselining job with training dataset

In [None]:
%%time
import time

from processingjob_wrapper import ProcessingJob
from time import gmtime, strftime

job_name = 'MOVIE-REC-baseline-xgb-model-monitor-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
processing_job = ProcessingJob(sm_client, role).create(job_name, baseline_data_uri, baseline_results_uri)

resp = sm_client.describe_processing_job(ProcessingJobName=job_name)
status = resp['ProcessingJobStatus']
print("Status: " + status)

while status=='InProgress':
    time.sleep(60)
    resp = sm_client.describe_processing_job(ProcessingJobName=job_name)
    status = resp['ProcessingJobStatus']
    print("Status: " + status)

if status=='Failed':
    print(resp)  

## Explore the generated constraints and statistics

In [None]:
s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Files:")
print("\n ".join(report_files))

def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get('Body').read().decode("utf-8")


In [None]:
##TODO : CAN WE SHOW THE WHOLE FILE?
constraints_file = get_obj_body(baseline_results_prefix+'/constraints.json')
print(constraints_file[:4000])
##TODO : Why are the genres treated as fractional?  
##TODO : Would making rating fractional makes more sense?
##TODO : What are all available inferred_types available and can we show them all?

In [None]:
##TODO : CAN WE SHOW THE WHOLE FILE?
statistics_file = get_obj_body(baseline_results_prefix+'/statistics.json')
print(statistics_file[:4000])

## 2. Analyzing collected data for data quality issues

We have collected the data above, let's proceed to analyze and monitor the data with Monitoring Schedules

Create a schedule
Let's create a Monitoring schedule for the previously created Endpoint

In [None]:
# first copy over some test scripts to the S3 bucket so that they can be used for pre and post processing
## HOW SHOULD WE USE preprocessor /postprocessor FOR THE WORKSHOP??
boto3.Session().resource('s3').Bucket(bucket).Object(code_prefix+"/preprocessor.py").upload_file('preprocessor.py')
boto3.Session().resource('s3').Bucket(bucket).Object(code_prefix+"/postprocessor.py").upload_file('postprocessor.py')

In [None]:
from schedule_wrapper import MonitoringSchedule
from time import gmtime, strftime

# MonitoringSchedule is just a python helper to hide the large CreateMonitoringSchedule input payload. You can find it
# in scheduler_wrapper.py in this package

ms = MonitoringSchedule(sm_client, role)
mon_schedule_name = 'REC-xgb-movie-rec-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
schedule = ms.create(mon_schedule_name, endpoint_name, s3_report_path, 
                     record_preprocessor_source_uri=s3_code_preprocessor_uri, 
                     post_analytics_source_uri=s3_code_postprocessor_uri,
                     baseline_statistics_uri=baseline_results_uri + '/statistics.json',
                     baseline_constraints_uri=baseline_results_uri+ '/constraints.json')
schedule

In [None]:
#mon_schedule_name = ''
desc_schedule_result = sm_client.describe_monitoring_schedule( MonitoringScheduleName=mon_schedule_name)
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

Start generating some artificial traffic
The block below kicks off a thread to send some traffic to the created endpoint. Note that you need to stop the kernel to terminate this thread. Just having this here so that it can continue to generate traffic. If there is no traffic, the monitoring jobs will start to fail later on.

In [None]:
from threading import Thread
from time import sleep
import time

runtime_client = boto3.client('runtime.sagemaker')

# (just repeating code from above for convenience/ able to run this section independently)
## CAN WE USE DIFFERENT TEST CSVs to FIND DIFFERENT DRIFT
def invoke_endpoint(ep_name, file_name, runtime_client):
    with open(file_name, 'r') as f:
        contents = f.readlines()
    
    for i in range(0, 30):
        line = contents[i]
        split_data = line.split(',')
        #Remove the original rating value from data used for prediction
        original_value = split_data.pop(0)

        payload = ','.join(split_data)

        response = runtime_client.invoke_endpoint(EndpointName=endpoint_name,
                                                  ContentType='text/csv', 
                                                  Body=payload)
        prediction = response['Body'].read().decode('utf-8')

        #print("Original Value ", original_value , "Prediction : ", float(prediction))
    
        time.sleep(1)
            

def invoke_endpoint_forever():
    while True:
        invoke_endpoint(endpoint_name, LOCAL_DATA_DIR+"/"+MOVIE_RECOMMENDATION_TEST_DATA, runtime_client)
        
thread = Thread(target = invoke_endpoint_forever)
thread.start()

# Note that you need to stop the kernel to stop the invocations
##TODO : CAN WE INVOKE THIS TRAFFIC IN A SCRIPT, that can be kicked off in with the cloudformation script??

Describe and inspect the schedule
Once you describe, see MonitoringScheduleStatus changes to Scheduled

In [None]:
#mon_schedule_name = ''
desc_schedule_result = sm_client.describe_monitoring_schedule( MonitoringScheduleName=mon_schedule_name)
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

List executions
Once the schedule is scheduled, it will kick of jobs at specified intervals. Here we are listing the latest 5 executions. Note that if you are kicking this off after creating the hourly schedule, you might find the executions empty. You might have to wait till you cross the hour boundary (in UTC) to see executions kick off. The code below has the logic for waiting.

In [None]:
##TODO : THIS CELL GAVE ME ERRORS FIRST COUPLE OF TIMES. FIX 

mon_executions = sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)
latest_execution=None

# Wait till an execution occurs
while not mon_executions['MonitoringExecutionSummaries']:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)

# if it is one exection, let's wait for it to reach terminal state
if len(mon_executions['MonitoringExecutionSummaries']) == 1: 
    execution = mon_executions['MonitoringExecutionSummaries'][0]
    while True:
        if execution['ProcessingJobArn']:
            job_name = execution['ProcessingJobArn'].split('/')[1]    
            resp = sm_client.describe_processing_job(ProcessingJobName=job_name)
            status = resp['ProcessingJobStatus']
            print("Status: " + status)
            if status != 'InProgress':
                break
        time.sleep(60)
    
# now get the latest execution details    
for execution_summary in mon_executions['MonitoringExecutionSummaries']:
    print("ProcessingJob: {}".format(execution_summary['ProcessingJobArn'].split('/')[1]))
    print('MonitoringExecutionStatus: {} \n'.format(execution_summary['MonitoringExecutionStatus']))
    if not latest_execution:
        exec_status = execution_summary['MonitoringExecutionStatus']
        if  exec_status == 'Completed' or exec_status == 'Failed' or exec_status == 'CompletedWithViolations':
            latest_execution = execution_summary

In [None]:
sm_client.list_monitoring_executions(MonitoringScheduleName=mon_schedule_name.lower(), MaxResults=5)


Inspect a specific execution (latest execution here)¶
In the previous cell, we picked up the latest Completed/Failed scheduled execution. Let's explore what went good or wrong. Here are the possible terminal states and what each of them mean:

Completed - this means the monitoring execution completed and no issues were found in the violations report
CompletedWithIssues - this means the execution completed, but constraint violations were detected
Failed - the monitoring execution failed, may be due to client error (say role issues) or infrastructure issues. Further examination of FailureReason and ExitMessage is necessary to identify what exactly happened.

In [None]:
if latest_execution:
    job_name=latest_execution['ProcessingJobArn'].split('/')[1]
    job_status=latest_execution['MonitoringExecutionStatus']
    desc_analytics_job_result=sm_client.describe_processing_job(ProcessingJobName=job_name)
    
    if job_status == 'Completed' or job_status == 'CompletedWithViolations':
        report_uri=desc_analytics_job_result['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
        print('Report Uri: {}'.format(report_uri))
    else:
        print('Job failed, todo: print failure reason and more details..')
else:
    print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

#### List the generated reports

In [None]:
from urllib.parse import urlparse
s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip('/')
print('Report bucket: {}'.format(report_bucket))
print('Report key: {}'.format(report_key))

s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Report Files:")
print("\n ".join(report_files))

Violations report

If there are any violations compared to the baseline, it will be generated here. Let's list the violations.

NOTE: The actual description is not user friendly now, we are working on rewording.

In [None]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get('Body').read().decode("utf-8")

violationss_file = get_obj_body(report_key+'/constraint_violations.json')
print(violationss_file[:2000])

Other commands
List, stop, start on the schedule object are available

In [None]:
sm_client.list_monitoring_schedules()

### (Optional) Delete the Resources

You can keep your Endpoint running to continue capturing data. If you do not plan to collect more data or use this endpoint further, you should delete the endpoint to avoid incurring additional charges. Note that deleting your endpoint does not delete the data that was captured during the model invocaations. That data is persisted in S3 until you delete it yourself.

In [None]:
sm_client.delete_monitoring_schedule(MonitoringScheduleName=mon_schedule_name)

In [None]:
sm_client.delete_endpoint(EndpointName=endpoint_name)

In [None]:
sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

In [None]:
sm_client.delete_model(ModelName=model_name)