# Amazon SageMaker Model Monitor- Custom Container
This notebook shows how to:
* Host a machine learning model in Amazon SageMaker and capture inference requests, results, and metadata 
* Analyze a training dataset to generate baseline constraints
* Monitor a live endpoint for violations against constraints

---
## Background

Amazon SageMaker provides every developer and data scientist with the ability to build, train, and deploy machine learning models quickly. Amazon SageMaker is a fully-managed service that encompasses the entire machine learning workflow. You can label and prepare your data, choose an algorithm, train a model, and then tune and optimize it for deployment. You can deploy your models to production with Amazon SageMaker to make predictions and lower costs than was previously possible.

In addition, Amazon SageMaker enables you to capture the input, output and metadata for invocations of the models that you deploy. It also enables you to analyze the data and monitor its quality. In this notebook, you learn how Amazon SageMaker enables these capabilities.

---
## Setup

To get started, make sure you have these prerequisites completed.

* Specify an AWS Region to host your model.
* An IAM role ARN exists that is used to give Amazon SageMaker access to your data in Amazon Simple Storage Service (Amazon S3). See the documentation for how to fine tune the permissions needed. 
* Create an S3 bucket used to store the data used to train your model, any additional model data, and the data captured from model invocations. For demonstration purposes, you are using the same bucket for these. In reality, you might want to separate them with different security policies.

In [5]:
%%time

# Handful of configuration

import os
import boto3
import re
import json
import sagemaker
from sagemaker import get_execution_role, session

region= boto3.Session().region_name

boto_session = boto3.Session()
sagemaker_session = sagemaker.Session(boto_session=boto_session)

role = get_execution_role()
print("RoleArn: {}".format(role))

# You can use a different bucket, but make sure the role you chose for this notebook
# has the s3:PutObject permissions. This is the bucket into which the data is captured
bucket =  session.Session(boto3.Session()).default_bucket()
print("Demo Bucket: {}".format(bucket))
prefix = 'sagemaker/Custom-MM-Container'

data_capture_prefix = '{}/datacapture'.format(prefix)
s3_capture_upload_path = 's3://{}/{}'.format(bucket, data_capture_prefix)
reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(bucket,reports_prefix)
code_prefix = '{}/code'.format(prefix)
# s3_code_preprocessor_uri = 's3://{}/{}/{}'.format(bucket,code_prefix, 'preprocessor.py')
# s3_code_postprocessor_uri = 's3://{}/{}/{}'.format(bucket,code_prefix, 'postprocessor.py')

print("Capture path: {}".format(s3_capture_upload_path))
print("Report path: {}".format(s3_report_path))
# print("Preproc Code path: {}".format(s3_code_preprocessor_uri))
# print("Postproc Code path: {}".format(s3_code_postprocessor_uri))

RoleArn: arn:aws:iam::757967535041:role/service-role/AmazonSageMakerServiceCatalogProductsUseRole
Demo Bucket: sagemaker-us-east-1-757967535041
Capture path: s3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/datacapture
Report path: s3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/reports
CPU times: user 202 ms, sys: 22 ms, total: 224 ms
Wall time: 305 ms


You can quickly verify that the execution role for this notebook has the necessary permissions to proceed. Put a simple test object into the S3 bucket you speciﬁed above. If this command fails, update the role to have `s3:PutObject` permission on the bucket and try again.

In [6]:
# Upload some test files
boto3.Session().resource('s3').Bucket(bucket).Object("test_upload/test.txt").upload_file('test_data/upload-test-file.txt')
print("Success! You are all set to proceed.")

Success! You are all set to proceed.


# PART A: Capturing real-time inference data from Amazon SageMaker endpoints
Create an endpoint to showcase the data capture capability in action.

### Upload the pre-trained model to Amazon S3
This code uploads a pre-trained XGBoost model that is ready for you to deploy. This model was trained using the XGB Churn Prediction Notebook in SageMaker. You can also use your own pre-trained model in this step. If you already have a pretrained model in Amazon S3, you can add it instead by specifying the s3_key.

In [7]:
model_file = open("model/xgb-churn-prediction-model.tar.gz", 'rb')
s3_key = os.path.join(prefix, 'xgb-churn-prediction-model.tar.gz')
boto3.Session().resource('s3').Bucket(bucket).Object(s3_key).upload_fileobj(model_file)

### Deploy the model to Amazon SageMaker
Start with deploying a pre-trained churn prediction model. Here, you create the model object with the image and model data.

In [8]:
from time import gmtime, strftime
from sagemaker.model import Model
from sagemaker.image_uris import retrieve

model_name = "custom-mm-" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
model_url = 'https://{}.s3-{}.amazonaws.com/{}/xgb-churn-prediction-model.tar.gz'.format(bucket, region, prefix)

image_uri = retrieve('xgboost', boto3.Session().region_name, '0.90-1')

model = Model(image_uri=image_uri, model_data=model_url, role=role)

To enable data capture for monitoring the model data quality, you specify the new capture option called `DataCaptureConfig`. You can capture the request payload, the response payload or both with this configuration. The capture config applies to all variants. Go ahead with the deployment.

In [10]:
from sagemaker.model_monitor import DataCaptureConfig

endpoint_name = 'custom-mm-end-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("EndpointName={}".format(endpoint_name))

data_capture_config = DataCaptureConfig(
                        enable_capture=True,
                        sampling_percentage=20,
                        destination_s3_uri=s3_capture_upload_path)

predictor = model.deploy(initial_instance_count=1,
                instance_type='ml.m4.xlarge',
                endpoint_name=endpoint_name,
                data_capture_config=data_capture_config)

EndpointName=custom-mm-end-2021-04-04-22-21-20
-------------------!

## Invoke the deployed model

You can now send data to this endpoint to get inferences in real time. Because you enabled the data capture in the previous steps, the request and response payload, along with some additional metadata, is saved in the Amazon Simple Storage Service (Amazon S3) location you have specified in the DataCaptureConfig.

This step invokes the endpoint with included sample data for about 3 minutes. Data is captured based on the sampling percentage specified and the capture continues until the data capture option is turned off.

In [40]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
import time

predictor = Predictor(endpoint_name=endpoint_name, serializer=CSVSerializer())

# get a subset of test data for a quick test
!head -50 test_data/test-dataset-input-cols.csv > test_data/test_sample.csv
print("Sending test traffic to the endpoint {}. \nPlease wait...".format(endpoint_name))

with open('test_data/test_sample.csv', 'r') as f:
    for row in f:
        payload = row.rstrip('\n')
        response = predictor.predict(data=payload)
        time.sleep(1)
        
print("Done!")

Sending test traffic to the endpoint custom-mm-end-2021-04-04-22-21-20. 
Please wait...
Done!


## View captured data

Now list the data capture files stored in Amazon S3. You should expect to see different files from different time periods organized based on the hour in which the invocation occurred. The format of the Amazon S3 path is:

`s3://{destination-bucket-prefix}/{endpoint-name}/{variant-name}/yyyy/mm/dd/hh/filename.jsonl`

In [266]:
s3_client = boto3.Session().client('s3')
current_endpoint_capture_prefix = '{}/{}'.format(data_capture_prefix, endpoint_name)
print(current_endpoint_capture_prefix)
result = s3_client.list_objects(Bucket=bucket, Prefix=current_endpoint_capture_prefix)
capture_files = [capture_file.get("Key") for capture_file in result.get('Contents')]
print("Found Capture Files:")
print("\n ".join(capture_files))

sagemaker/Custom-MM-Container/datacapture/custom-mm-end-2021-03-31-20-45-21
Found Capture Files:
sagemaker/Custom-MM-Container/datacapture/custom-mm-end-2021-03-31-20-45-21/AllTraffic/2021/03/31/20/56-47-657-324b064e-2a82-4e72-ad1f-1ad058bd6227.jsonl
 sagemaker/Custom-MM-Container/datacapture/custom-mm-end-2021-03-31-20-45-21/AllTraffic/2021/03/31/21/03-56-195-99ef05f8-db45-490e-a9b5-737c10d06cd0.jsonl


Next, view the contents of a single capture file. Here you should see all the data captured in an Amazon SageMaker specific JSON-line formatted file. Take a quick peek at the first few lines in the captured file.

In [267]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get('Body').read().decode("utf-8")

capture_file = get_obj_body(capture_files[-1])
print(capture_file[:2000])

{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"186,0.1,137.8,97,187.7,118,146.4,85,8.7,6,1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,0.10,0.11,0.12,0.13,0.14,0.15,0.16,0.17,1.1,0.18,0.19,0.20,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.30,0.31,0.32,0.33,0.34,0.35,0.36,0.37,0.38,0.39,0.40,0.41,0.42,0.43,0.44,0.45,0.46,0.47,0.48,0.49,0.50,0.51,0.52,0.53,1.2,1.3,0.54,1.4,0.55","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"0.01584203727543354","encoding":"CSV"}},"eventMetadata":{"eventId":"b8f9d777-7248-4aed-87e1-3a51736805fc","inferenceTime":"2021-03-31T21:03:56Z"},"eventVersion":"0"}
{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"86,33,253.1,112,210.1,94,95.0,98,11.9,4,3,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,1,0,0,1","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/cs

Finally, the contents of a single line is present below in a formatted JSON file so that you can observe a little better.

In [15]:
import json
print(json.dumps(json.loads(capture_file.split('\n')[0]), indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "186,0.1,137.8,97,187.7,118,146.4,85,8.7,6,1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,0.10,0.11,0.12,0.13,0.14,0.15,0.16,0.17,1.1,0.18,0.19,0.20,0.21,0.22,0.23,0.24,0.25,0.26,0.27,0.28,0.29,0.30,0.31,0.32,0.33,0.34,0.35,0.36,0.37,0.38,0.39,0.40,0.41,0.42,0.43,0.44,0.45,0.46,0.47,0.48,0.49,0.50,0.51,0.52,0.53,1.2,1.3,0.54,1.4,0.55",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.01584203727543354",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "999adc2e-3fd4-4738-ada5-82f727f6859a",
    "inferenceTime": "2021-04-04T22:32:39Z"
  },
  "eventVersion": "0"
}


As you can see, each inference request is captured in one line in the jsonl file. The line contains both the input and output merged together. In the example, you provided the ContentType as `text/csv` which is reflected in the `observedContentType` value. Also, you expose the encoding that you used to encode the input and output payloads in the capture format with the `encoding` value.

To recap, you observed how you can enable capturing the input or output payloads to an endpoint with a new parameter. You have also observed what the captured format looks like in Amazon S3. Next, continue to explore how Amazon SageMaker helps with monitoring the data collected in Amazon S3.

# PART B: Model Monitor - Baselining and continuous monitoring

In addition to collecting the data, Amazon SageMaker provides the capability for you to monitor and evaluate the data observed by the endpoints. For this:
1. Create a baseline with which you compare the realtime traffic. 
1. Once a baseline is ready, setup a schedule to continously evaluate and compare against the baseline.

## Constraint suggestion with baseline/training dataset

The training dataset with which you trained the model is usually a good baseline dataset. Note that the training dataset data schema and the inference dataset schema should exactly match (i.e. the number and order of the features).

From the training dataset you can ask Amazon SageMaker to suggest a set of baseline `constraints` and generate descriptive `statistics` to explore the data. For this example, upload the training dataset that was used to train the pre-trained model included in this example. If you already have it in Amazon S3, you can directly point to it.

In [268]:
# copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it)
baseline_prefix = prefix + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_data_uri = 's3://{}/{}'.format(bucket,baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(bucket, baseline_results_prefix)
print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri)) 

Baseline data uri: s3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/baselining/data
Baseline results uri: s3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/baselining/results


In [18]:
#Upload the traning set to S3 bucket
training_data_file = open("test_data/training-dataset-with-header.csv", 'rb')
s3_key = os.path.join(baseline_prefix, 'data', 'training-dataset-with-header.csv')
print(s3_key)
boto3.Session().resource('s3').Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

sagemaker/Custom-MM-Container/baselining/data/training-dataset-with-header.csv


### Create custom container for Model Monitor

Refer to documentation here for composing BYOC : https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-containers.html

In [290]:
%pycat docker/evaluation.py

[0;34m'''[0m
[0;34mCustom Model Monitoring script for baselining and analysis[0m
[0;34m[0m
[0;34m'''[0m[0;34m[0m
[0;34m[0m[0;31m# Python Built-Ins:[0m[0;34m[0m
[0;34m[0m[0;32mfrom[0m [0mcollections[0m [0;32mimport[0m [0mdefaultdict[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mdatetime[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mjson[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mos[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mtraceback[0m[0;34m[0m
[0;34m[0m[0;32mfrom[0m [0mtypes[0m [0;32mimport[0m [0mSimpleNamespace[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mcsv[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mjsonlines[0m[0;34m[0m
[0;34m[0m[0;34m[0m
[0;34m[0m[0;31m# External Dependencies:[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mnumpy[0m [0;32mas[0m [0mnp[0m[0;34m[0m
[0;34m[0m[0;32mimport[0m [0mpandas[0m [0;32mas[0m [0mpd[0m[0;34m[0m
[0;34m[0m[0;34m[0m
[0;34m[0m[0;31m#Method to get the envir

In [19]:
##Create a custom contaier image for analysis and baselining and push it to ECR

import boto3

account_id = boto3.client('sts').get_caller_identity().get('Account')
ecr_repository = 'sagemaker-custom-container'
tag = ':latest'

region = boto3.session.Session().region_name

uri_suffix = 'amazonaws.com'
if region in ['cn-north-1', 'cn-northwest-1']:
    uri_suffix = 'amazonaws.com.cn'
processing_repository_uri = f'{account_id}.dkr.ecr.{region}.{uri_suffix}/{ecr_repository + tag}'
print(processing_repository_uri)

757967535041.dkr.ecr.us-east-1.amazonaws.com/sagemaker-custom-container:latest


In [289]:
# Creating the ECR repository and pushing the container image

# SageMaker Classic Notebook Instance:
!docker build -t $ecr_repository docker
!$(aws ecr get-login --region $region --registry-ids $account_id --no-include-email)
!aws ecr create-repository --repository-name $ecr_repository
!docker tag {ecr_repository + tag} $processing_repository_uri
!docker push $processing_repository_uri

# SageMaker Studio:
# !cd docker && sm-docker build . --repository $ecr_repository$tag

Sending build context to Docker daemon  25.09kB
Step 1/6 : FROM python:3.7-slim-buster
 ---> 26de45a7328b
Step 2/6 : RUN pip3 install sagemaker
 ---> Using cache
 ---> 308306e0ab69
Step 3/6 : RUN pip3 install jsonlines
 ---> Using cache
 ---> cdc31f08e74f
Step 4/6 : ENV PYTHONUNBUFFERED=TRUE
 ---> Using cache
 ---> b5f8d2d703b1
Step 5/6 : ADD evaluation.py /
 ---> b0940fb035fc
Step 6/6 : ENTRYPOINT ["python3", "/evaluation.py"]
 ---> Running in 793e7db68a52
Removing intermediate container 793e7db68a52
 ---> d80e7ff9633b
Successfully built d80e7ff9633b
Successfully tagged sagemaker-custom-container:latest
https://docs.docker.com/engine/reference/commandline/login/#credentials-store

Login Succeeded

An error occurred (RepositoryAlreadyExistsException) when calling the CreateRepository operation: The repository with name 'sagemaker-custom-container' already exists in the registry with id '757967535041'
The push refers to repository [757967535041.dkr.ecr.us-east-1.amazonaws.com/sagemaker-

In [21]:
#Create model monitor 

from sagemaker.model_monitor import ModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

custom_monitor = ModelMonitor(
    role=role,
    image_uri=processing_repository_uri,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
    #This is an example to show how we can pass environment variables to the container
    env={ 'THRESHOLD':'0.15' },
)

In [282]:
custom_monitor

<sagemaker.model_monitor.model_monitoring.ModelMonitor at 0x7f979f52b9b0>

In [278]:
#Creating a run_baseline job to run the custom container with training dataset

from sagemaker.model_monitor import DatasetFormat
from sagemaker.processing import ProcessingInput, ProcessingOutput

custom_monitor.run_baseline(
    baseline_inputs=[ProcessingInput(input_name='endpointdata',
                                   source=f's3://{sagemaker_session.default_bucket()}/{s3_key}',
                                   destination='/opt/ml/processing/input/endpoint',
                                   s3_input_mode = 'File'
                                )],
    output='/opt/ml/processing/output',
    wait=True,
    logs=True,)


Job Name:  baseline-suggestion-job-2021-04-05-20-00-16-897
Inputs:  [{'InputName': 'endpointdata', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/baselining/data/training-dataset-with-header.csv', 'LocalPath': '/opt/ml/processing/input/endpoint', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-757967535041/baseline-suggestion-job-2021-04-05-20-00-16-897/output', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
........................
[34mThe baseline_statistics file is:  None[0m
[34mThe dataset source is:  /opt/ml/processing/input/endpoint[0m
[34mThe dataset format is: None[0m
[34mCreate the baseline and constraints file...[0m
[34mfilename is.. training-dataset-with-header.csv[0m
[34mDone writing t

In [27]:
#Upload the baseline constrainsts and statistics.json to S3

baseline_constraints_file = open("test_data/baseline_constraints.json", 'rb')
baseline_statistics_file = open("test_data/baseline_statistics.json", 'rb')
s3_key1 = os.path.join(baseline_prefix,'baseline_constraints.json')
s3_key2 = os.path.join(baseline_prefix,'baseline_statistics.json')
boto3.Session().resource('s3').Bucket(bucket).Object(s3_key1).upload_fileobj(baseline_constraints_file)
boto3.Session().resource('s3').Bucket(bucket).Object(s3_key2).upload_fileobj(baseline_statistics_file)
print("Uploads completed..!")

Uploads completed..!


In [303]:
#Path to the baseline files

baseline_constraints_file_loc=f's3://{sagemaker_session.default_bucket()}/{s3_key1}'
baseline_statistics_file_loc=f's3://{sagemaker_session.default_bucket()}/{s3_key2}'
print(baseline_constraints_file_loc)
print(baseline_statistics_file_loc)

s3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/baselining/baseline_constraints.json
s3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/baselining/baseline_statistics.json


### Create a schedule

You can create a model monitoring schedule for the endpoint created earlier. Use the baseline resources (constraints and statistics) to compare against the realtime traffic.

In [33]:
#Next we define and attach a Model Monitor Schedule to the endpoint. 
#It will run our custom container on an hourly basis.

from sagemaker.model_monitor import CronExpressionGenerator, MonitoringOutput
from sagemaker.processing import ProcessingInput, ProcessingOutput

destination = f's3://{sagemaker_session.default_bucket()}/{data_capture_prefix}/endpoint/monitoring_schedule'
print(destination)
processing_output = ProcessingOutput(
    output_name='result',
    source='/opt/ml/processing/resultdata',
    destination=destination,
)
output = MonitoringOutput(source=processing_output.source, destination=processing_output.destination)

custom_monitor.create_monitoring_schedule(
    monitor_schedule_name='my-classfier-monitor',
    output=output,
    endpoint_input=predictor.endpoint_name,
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    #replace with your baseline constraints and statistics files
    statistics=baseline_constraints_file_loc,
    constraints=baseline_statistics_file_loc,
)

s3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/datacapture/endpoint/monitoring_schedule


In [279]:
#This is to delete the schedule and describe the schedule

# custom_monitor.delete_monitoring_schedule()
# time.sleep(60)
custom_monitor.describe_schedule()

{'MonitoringScheduleArn': 'arn:aws:sagemaker:us-east-1:757967535041:monitoring-schedule/my-classfier-monitor',
 'MonitoringScheduleName': 'my-classfier-monitor',
 'MonitoringScheduleStatus': 'Scheduled',
 'MonitoringType': 'DataQuality',
 'CreationTime': datetime.datetime(2021, 4, 5, 0, 47, 7, 568000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 4, 5, 20, 1, 45, 756000, tzinfo=tzlocal()),
 'MonitoringScheduleConfig': {'ScheduleConfig': {'ScheduleExpression': 'cron(0 * ? * * *)'},
  'MonitoringJobDefinition': {'BaselineConfig': {'ConstraintsResource': {'S3Uri': 's3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/baselining/baseline_constraints.json'},
    'StatisticsResource': {'S3Uri': 's3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/baselining/baseline_statistics.json'}},
   'MonitoringInputs': [{'EndpointInput': {'EndpointName': 'custom-mm-end-2021-04-04-22-21-20',
      'LocalPath': '/opt/ml/processing/input/endpoint',
      'S

In [283]:
jobs = custom_monitor.list_executions()
jobs

[<sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f97c91452b0>,
 <sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f97c9145780>,
 <sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f97c915ae80>,
 <sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f97c915a278>,
 <sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f97c915a160>,
 <sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f97c915a1d0>,
 <sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f97c915a128>,
 <sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f97c916f0b8>,
 <sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f97c916f9b0>,
 <sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f97c916f588>,
 <sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f979f773e80>,
 <sagemaker.model_monitor.model_monitoring.MonitoringExecution at 0x7f979f773f60>,
 <sa

In [51]:
if len(jobs) > 0:
    last_execution_desc = custom_monitor.list_executions()[-1].describe()
    print(last_execution_desc)
    print(f'\nExit Message: {last_execution_desc.get("ExitMessage", "None")}')
else:
    print("""No processing job has been executed yet. 
    This means that one hour has not passed yet. 
    You can go to the next code cell and run the processing job manually""")

{'ProcessingInputs': [{'InputName': 'input_1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/datacapture/custom-mm-end-2021-04-04-22-21-20/AllTraffic/2021/04/05/00', 'LocalPath': '/opt/ml/processing/input/endpoint/custom-mm-end-2021-04-04-22-21-20/AllTraffic/2021/04/05/00', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'baseline', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/baselining/baseline_statistics.json', 'LocalPath': '/opt/ml/processing/baseline/stats', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated'}}, {'InputName': 'constraints', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/baselining/baseline_constraints.json', 'LocalPath': '/opt/ml/processing/ba

In [35]:
print(data_capture_prefix)
current_endpoint_capture_prefix = '{}/{}'.format(data_capture_prefix, endpoint_name)
source=f's3://{sagemaker_session.default_bucket()}/{current_endpoint_capture_prefix}'
print(source)

sagemaker/Custom-MM-Container/datacapture
s3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/datacapture/custom-mm-end-2021-04-04-22-21-20


In [291]:
#instead of waiting for an hour, we can manually start the processing job to already get some analysis results. 
#To do this we define a Processor object that takes the image URI of our custom image. The input for our job will be the S3 location where captured inference requests and responses are stored, and we'll output results to the same destination that the scheduled jobs write to.
##This is for captured data as Input is jsonl


from sagemaker.processing import Processor

current_endpoint_capture_prefix = '{}/{}'.format(data_capture_prefix, endpoint_name)

processor = Processor(
    base_job_name='my-manual-mon',
    role=role,
    image_uri=processing_repository_uri,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    env={ 'THRESHOLD':'50' },
)
    
processor.run(
    [ProcessingInput(
        input_name='endpointdata',
        #source is captured data
        source=f's3://{sagemaker_session.default_bucket()}/{current_endpoint_capture_prefix}',
        destination='/opt/ml/processing/input/endpoint',
    )],
    [ProcessingOutput(
        output_name='result',
        source='/opt/ml/processing/resultdata',
        destination=destination,
    )],
)


Job Name:  my-manual-mon-2021-04-05-20-32-33-108
Inputs:  [{'InputName': 'endpointdata', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/datacapture/custom-mm-end-2021-03-31-20-45-21', 'LocalPath': '/opt/ml/processing/input/endpoint', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'result', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/datacapture/endpoint/monitoring_schedule', 'LocalPath': '/opt/ml/processing/resultdata', 'S3UploadMode': 'EndOfJob'}}]
.......................[34mThe baseline_statistics file is:  None[0m
[34mThe dataset source is:  /opt/ml/processing/input/endpoint[0m
[34mThe dataset format is: None[0m
[34mStarting evaluation with config:[0m
[34mnamespace(baseline_constraints=None, baseline_statistics=None, dataset_format=None, dat

### Describe and inspect the schedule
Once you describe, observe that the MonitoringScheduleStatus changes to Scheduled.

In [296]:
desc_schedule_result = custom_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

Schedule status: Scheduled


### List executions
The schedule starts jobs at the previously specified intervals. Here, you list the latest five executions. Note that if you are kicking this off after creating the hourly schedule, you might find the executions empty. You might have to wait until you cross the hour boundary (in UTC) to see executions kick off. The code below has the logic for waiting.

Note: Even for an hourly schedule, Amazon SageMaker has a buffer period of 20 minutes to schedule your execution. You might see your execution start in anywhere from zero to ~20 minutes from the hour boundary. This is expected and done for load balancing in the backend.

In [293]:
mon_executions = custom_monitor.list_executions()
print("We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will have to wait till we hit the hour...")

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = custom_monitor.list_executions()    

We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.
We will have to wait till we hit the hour...


### Inspect a specific execution (latest execution)
In the previous cell, you picked up the latest completed or failed scheduled execution. Here are the possible terminal states and what each of them mean: 
* Completed - This means the monitoring execution completed and no issues were found in the violations report.
* CompletedWithViolations - This means the execution completed, but constraint violations were detected.
* Failed - The monitoring execution failed, maybe due to client error (perhaps incorrect role premissions) or infrastructure issues. Further examination of FailureReason and ExitMessage is necessary to identify what exactly happened.
* Stopped - job exceeded max runtime or was manually stopped.

In [294]:
latest_execution = mon_executions[-1] # latest execution's index is -1, second to last is -2 and so on..
time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()['ProcessingJobStatus']))

latest_job = latest_execution.describe()
if (latest_job['ProcessingJobStatus'] != 'Completed'):
        print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

!Latest execution status: Completed


In [295]:
report_uri=latest_execution.output.destination
print('Report Uri: {}'.format(report_uri))

Report Uri: s3://sagemaker-us-east-1-757967535041/sagemaker/Custom-MM-Container/datacapture/endpoint/monitoring_schedule/custom-mm-end-2021-04-04-22-21-20/my-classfier-monitor/2021/04/05/01


### Other commands
We can also start and stop the monitoring schedules.

In [64]:
# custom_monitor.stop_monitoring_schedule()
# custom_monitor.delete_monitoring_schedule()
# time.sleep(60)
#my_default_monitor.start_monitoring_schedule()


Stopping Monitoring Schedule with name: my-classfier-monitor

Deleting Monitoring Schedule with name: my-classfier-monitor


## Delete the resources

You can keep your endpoint running to continue capturing data. If you do not plan to collect more data or use this endpoint further, you should delete the endpoint to avoid incurring additional charges. Note that deleting your endpoint does not delete the data that was captured during the model invocations. That data persists in Amazon S3 until you delete it yourself.

But before that, you need to delete the schedule first.

In [None]:
#predictor.delete_endpoint()

In [None]:
#predictor.delete_model()