# Amazon SageMaker Model Monitor
This notebook shows how to:
* Host a machine learning model in Amazon SageMaker and capture inference requests, results, and metadata 
* Analyze a training dataset to generate baseline constraints
* Monitor a live endpoint for violations against constraints

---
## Background

Amazon SageMaker provides every developer and data scientist with the ability to build, train, and deploy machine learning models quickly. Amazon SageMaker is a fully-managed service that encompasses the entire machine learning workflow. You can label and prepare your data, choose an algorithm, train a model, and then tune and optimize it for deployment. You can deploy your models to production with Amazon SageMaker to make predictions and lower costs than was previously possible.

In addition, Amazon SageMaker enables you to capture the input, output and metadata for invocations of the models that you deploy. It also enables you to analyze the data and monitor its quality. In this notebook, you learn how Amazon SageMaker enables these capabilities.

---
## Setup

To get started, make sure you have these prerequisites completed.

* Specify an AWS Region to host your model.
* An IAM role ARN exists that is used to give Amazon SageMaker access to your data in Amazon Simple Storage Service (Amazon S3). See the documentation for how to fine tune the permissions needed. 
* Create an S3 bucket used to store the data used to train your model, any additional model data, and the data captured from model invocations. For demonstration purposes, you are using the same bucket for these. In reality, you might want to separate them with different security policies.

In [1]:
import boto3
import os
import sagemaker
from sagemaker import get_execution_role

region = boto3.Session().region_name
role = get_execution_role()
sess = sagemaker.session.Session()
bucket = sess.default_bucket() 
prefix = 'tf-2-workflow'

s3_capture_upload_path = 's3://{}/{}/monitoring/datacapture'.format(bucket, prefix)

reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(bucket,reports_prefix)

print("Capture path: {}".format(s3_capture_upload_path))
print("Report path: {}".format(s3_report_path))

Capture path: s3://sagemaker-us-east-1-113147044314/tf-2-workflow/monitoring/datacapture
Report path: s3://sagemaker-us-east-1-113147044314/tf-2-workflow/reports


In [18]:
sagemaker.__version__

'2.29.1'

# PART A: Capturing real-time inference data from Amazon SageMaker endpoints
Create an endpoint to showcase the data capture capability in action.


### Deploy the model to Amazon SageMaker
Start with deploying the trained TensorFlow model from lab 03.

In [2]:
import boto3

def get_latest_training_job_name(base_job_name):
    client = boto3.client('sagemaker')
    response = client.list_training_jobs(NameContains=base_job_name, SortBy='CreationTime', 
                                         SortOrder='Descending', StatusEquals='Completed')
    if len(response['TrainingJobSummaries']) > 0 :
        return response['TrainingJobSummaries'][0]['TrainingJobName']
    else:
        raise Exception('Training job not found.')

def get_training_job_s3_model_artifacts(job_name):
    client = boto3.client('sagemaker')
    response = client.describe_training_job(TrainingJobName=job_name)
    s3_model_artifacts = response['ModelArtifacts']['S3ModelArtifacts']
    return s3_model_artifacts

latest_training_job_name = get_latest_training_job_name('tf-2-workflow')
print(latest_training_job_name)
model_path = get_training_job_s3_model_artifacts(latest_training_job_name)
print(model_path)

tf-2-workflow-09-13-19-53-015-472ef4b4
s3://sagemaker-us-east-1-113147044314/tf-2-workflow-09-13-19-53-015-472ef4b4/output/model.tar.gz


Here, you create the model object with the image and model data.

In [49]:
%%writefile handler.py

import json

def input_handler(data, context):
    """ Pre-process request input before it is sent to TensorFlow Serving REST API
    Args:
        data (obj): the request data, in format of dict or string
        context (Context): an object containing request and configuration details
    Returns:
        (dict): a JSON-serializable dict that contains request body and headers
    """
    if context.request_content_type == 'application/json':
        # pass through json (assumes it's correctly formed)
        d = data.read().decode('utf-8')
        return d if len(d) else ''

    if context.request_content_type == 'text/csv':
        # very simple csv handler
        return json.dumps({
            'instances': [float(x) for x in data.read().decode('utf-8').split(',')]
        })

    raise ValueError('{{"error": "unsupported content type {}"}}'.format(
        context.request_content_type or "unknown"))

def output_handler(data, context):
    """Post-process TensorFlow Serving output before it is returned to the client.
    Args:
        data (obj): the TensorFlow serving response
        context (Context): an object containing request and configuration details
    Returns:
        (bytes, string): data to return to client, response content type
    """
    #if data.status_code != 200:
    raise ValueError(data.content.decode('utf-8'))

    response_content_type = context.accept_header
    prediction = json.loads(data.content)['predictions'][0]
    print(prediction)
    return prediction, response_content_type

Overwriting handler.py


In [50]:
from sagemaker.tensorflow.model import TensorFlowModel

tensorflow_model = TensorFlowModel(
    model_data = model_path,
    role = role,
    entry_point="handler.py",
    framework_version = '2.3.1'
)


In [51]:
from time import gmtime, strftime
from sagemaker.model_monitor import DataCaptureConfig

endpoint_name = 'tf-2-workflow-endpoint-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)

predictor = tensorflow_model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge',
    endpoint_name=endpoint_name,
    data_capture_config=DataCaptureConfig(
        enable_capture=True,
        sampling_percentage=100,
        destination_s3_uri=s3_capture_upload_path
    )
)

update_endpoint is a no-op in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


tf-2-workflow-endpoint-2021-03-17-22-37-34
-------------!

In [13]:
import numpy as np
from tensorflow.python.keras.datasets import boston_housing
from sklearn.preprocessing import StandardScaler

(x_train, y_train), (x_test, y_test) = boston_housing.load_data()
scaler = StandardScaler()
scaler.fit(x_train)
x_train = scaler.transform(x_train)
x_test = scaler.transform(x_test)

## Invoke the deployed model

You can now send data to this endpoint to get inferences in real time. Because you enabled the data capture in the previous steps, the request and response payload, along with some additional metadata, is saved in the Amazon Simple Storage Service (Amazon S3) location you have specified in the DataCaptureConfig.

This step invokes the endpoint with included sample data for about 3 minutes. Data is captured based on the sampling percentage specified and the capture continues until the data capture option is turned off.

In [52]:
%%time 

import time

print("Sending test traffic to the endpoint {}. \nPlease wait...".format(endpoint_name))

flat_list =[]
for item in x_test:
    result = predictor.predict(item)['predictions'] 
    flat_list.append(float('%.1f'%(np.array(result))))
    time.sleep(0.8)
    
print("Done!")
print('predictions: \t{}'.format(np.array(flat_list)))

Sending test traffic to the endpoint tf-2-workflow-endpoint-2021-03-17-22-37-34. 
Please wait...
Done!
predictions: 	[13.7 19.9 20.3 33.3 26.1 20.1 32.5 26.9 16.  21.  17.8 20.  14.2 40.4
 13.9 23.1 27.1 25.2 18.3 35.5 13.7 13.7 20.1 19.7 22.5 21.5 31.9 28.3
 13.7 24.5 21.  13.7 32.4 25.6 19.4 13.7 13.8 17.6 18.9 32.  28.3 32.5
 13.7 40.  32.5 25.1 32.5 19.8 17.7 24.1 32.6 20.  13.7 13.5 34.1 32.4
 13.7 40.5 33.1 26.3 21.9 13.7 13.7 20.1 25.7 25.7 13.7 28.9 13.7 13.7
 36.7 29.  21.3 13.7 32.1 20.4 24.2 26.6 34.  13.7 21.4 39.8 19.8 13.7
 20.1 15.6 18.4 27.1 23.9 30.2 19.3 21.4 30.2 36.3 33.9 17.  39.3 41.2
 26.6 39.8 32.9 21. ]
CPU times: user 282 ms, sys: 6.18 ms, total: 288 ms
Wall time: 1min 23s


In [53]:
result

[[21.0472336]]

## View captured data

Now list the data capture files stored in Amazon S3. You should expect to see different files from different time periods organized based on the hour in which the invocation occurred. The format of the Amazon S3 path is:

`s3://{destination-bucket-prefix}/{endpoint-name}/{variant-name}/yyyy/mm/dd/hh/filename.jsonl`

<b>Note that the delivery of capture data to Amazon S3 can require a couple of minutes so next cell might error. If this happens, please retry after a minute.</b>

In [54]:
s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=bucket, Prefix='tf-2-workflow/monitoring/datacapture/')
capture_files = [capture_file.get("Key") for capture_file in result.get('Contents')]
print("Found Capture Files:")
print("\n ".join(capture_files))

Found Capture Files:
tf-2-workflow/monitoring/datacapture/tf-2-workflow-endpoint-2021-03-17-22-37-34/AllTraffic/2021/03/17/22/44-06-962-3b5959f0-5ff0-470c-b6d4-8792ebfd9893.jsonl


Next, view the contents of a single capture file. Here you should see all the data captured in an Amazon SageMaker specific JSON-line formatted file. Take a quick peek at the first few lines in the captured file.

In [55]:
def get_obj_body(obj_key):
    return s3_client.get_object(Bucket=bucket, Key=obj_key).get('Body').read().decode("utf-8")

capture_file = get_obj_body(capture_files[-1])
print(capture_file[:2000])

{"captureData":{"endpointInput":{"observedContentType":"application/json","mode":"INPUT","data":"[1.5536935453162368, -0.4836154708652843, 1.0283257954396188, -0.2568327484687563, 1.0383806679462964, 0.23545815425123368, 1.1104882815291357, -0.9397693559689599, 1.6758857724016463, 1.5652874992218142, 0.7844763709927688, -3.484595532746983, 2.2509207364548414]","encoding":"JSON"},"endpointOutput":{"observedContentType":"application/json","mode":"OUTPUT","data":"{\n    \"predictions\": [[13.6559658]\n    ]\n}","encoding":"JSON"}},"eventMetadata":{"eventId":"291e94f0-f02e-4270-967d-ec9148fed292","inferenceTime":"2021-03-17T22:44:06Z"},"eventVersion":"0"}
{"captureData":{"endpointInput":{"observedContentType":"application/json","mode":"INPUT","data":"[-0.39242675047976094, -0.4836154708652843, -0.1608777304070192, -0.2568327484687563, -0.0884006055354238, -0.49947436060626255, 0.8560632883792862, -0.6839623468736831, -0.3960355701527182, 0.15707841264637773, -0.30759583189964385, 0.4273312

Finally, the contents of a single line is present below in a formatted JSON file so that you can observe a little better.

In [56]:
import json
print(json.dumps(json.loads(capture_file.split('\n')[0]), indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "application/json",
      "mode": "INPUT",
      "data": "[1.5536935453162368, -0.4836154708652843, 1.0283257954396188, -0.2568327484687563, 1.0383806679462964, 0.23545815425123368, 1.1104882815291357, -0.9397693559689599, 1.6758857724016463, 1.5652874992218142, 0.7844763709927688, -3.484595532746983, 2.2509207364548414]",
      "encoding": "JSON"
    },
    "endpointOutput": {
      "observedContentType": "application/json",
      "mode": "OUTPUT",
      "data": "{\n    \"predictions\": [[13.6559658]\n    ]\n}",
      "encoding": "JSON"
    }
  },
  "eventMetadata": {
    "eventId": "291e94f0-f02e-4270-967d-ec9148fed292",
    "inferenceTime": "2021-03-17T22:44:06Z"
  },
  "eventVersion": "0"
}


As you can see, each inference request is captured in one line in the jsonl file. The line contains both the input and output merged together. In the example, you provided the ContentType as `text/csv` which is reflected in the `observedContentType` value. Also, you expose the encoding that you used to encode the input and output payloads in the capture format with the `encoding` value.

To recap, you observed how you can enable capturing the input or output payloads to an endpoint with a new parameter. You have also observed what the captured format looks like in Amazon S3. Next, continue to explore how Amazon SageMaker helps with monitoring the data collected in Amazon S3.

# PART B: Model Monitor - Baselining and continuous monitoring

In addition to collecting the data, Amazon SageMaker provides the capability for you to monitor and evaluate the data observed by the endpoints. For this:
1. Create a baseline with which you compare the realtime traffic. 
1. Once a baseline is ready, setup a schedule to continously evaluate and compare against the baseline.

## 1. Constraint suggestion with baseline/training dataset

The training dataset with which you trained the model is usually a good baseline dataset. Note that the training dataset data schema and the inference dataset schema should exactly match (i.e. the number and order of the features).

From the training dataset you can ask Amazon SageMaker to suggest a set of baseline `constraints` and generate descriptive `statistics` to explore the data. For this example, upload the training dataset that was used to train the pre-trained model included in this example. If you already have it in Amazon S3, you can directly point to it.

### Prepare training dataset with headers

In [57]:
import pandas as pd
dt = pd.DataFrame(data = x_train, 
                  columns = ["CRIM", "ZN", "INDUS", "CHAS","NOX","RM","AGE","DIS","RAD","TAX","PTRATIO","B","LSTAT"])

dt.to_csv("training-dataset-with-header.csv", index = False)

In [58]:
# copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it)
baseline_prefix = prefix + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_data_uri = 's3://{}/{}'.format(bucket,baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(bucket, baseline_results_prefix)
print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri))


Baseline data uri: s3://sagemaker-us-east-1-113147044314/tf-2-workflow/baselining/data
Baseline results uri: s3://sagemaker-us-east-1-113147044314/tf-2-workflow/baselining/results


In [59]:
training_data_file = open("training-dataset-with-header.csv", 'rb')
s3_key = os.path.join(baseline_prefix, 'data', 'training-dataset-with-header.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

### Create a baselining job with training dataset

Now that you have the training data ready in Amazon S3, start a job to `suggest` constraints. `DefaultModelMonitor.suggest_baseline(..)` starts a `ProcessingJob` using an Amazon SageMaker provided Model Monitor container to generate the constraints.

In [60]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri+'/training-dataset-with-header.csv',
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True
)


Job Name:  baseline-suggestion-job-2021-03-17-22-45-31-237
Inputs:  [{'InputName': 'baseline_dataset_input', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-113147044314/tf-2-workflow/baselining/data/training-dataset-with-header.csv', 'LocalPath': '/opt/ml/processing/input/baseline_dataset_input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-113147044314/tf-2-workflow/baselining/results', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
.......................[34m2021-03-17 22:49:04,323 - __main__ - INFO - All params:{'ProcessingJobArn': 'arn:aws:sagemaker:us-east-1:113147044314:processing-job/baseline-suggestion-job-2021-03-17-22-45-31-237', 'ProcessingJobName': 'baseline-suggestion-job-2021-03-17-22-45-31-237', 'Environment': {'dataset_format': '{

<sagemaker.processing.ProcessingJob at 0x7fb6c6797748>

### Explore the generated constraints and statistics

In [61]:
s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Files:")
print("\n ".join(report_files))

Found Files:
tf-2-workflow/baselining/results/constraints.json
 tf-2-workflow/baselining/results/statistics.json


In [62]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)



Unnamed: 0,name,inferred_type,numerical_statistics.common.num_present,numerical_statistics.common.num_missing,numerical_statistics.mean,numerical_statistics.sum,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max,numerical_statistics.distribution.kll.buckets,numerical_statistics.distribution.kll.sketch.parameters.c,numerical_statistics.distribution.kll.sketch.parameters.k,numerical_statistics.distribution.kll.sketch.data
0,CRIM,Fractional,404,0,-1.015414e-16,-4.102274e-14,1.0,-0.405101,9.234847,"[{'lower_bound': -0.40510053045456557, 'upper_...",0.64,2048.0,"[[-0.27224633436918744, -0.4034265123250389, 0..."
1,ZN,Fractional,404,0,1.0992310000000001e-17,4.440892e-15,1.0,-0.483615,3.72899,"[{'lower_bound': -0.4836154708652843, 'upper_b...",0.64,2048.0,"[[-0.4836154708652843, 2.991784193632328, -0.4..."
2,INDUS,Fractional,404,0,1.74338e-15,7.043255e-13,1.0,-1.564696,2.445374,"[{'lower_bound': -1.564696478470726, 'upper_bo...",0.64,2048.0,"[[-0.435761610917923, -1.3339116162236038, 1.0..."
3,CHAS,Fractional,404,0,-1.266863e-16,-5.118128e-14,1.0,-0.256833,3.893584,"[{'lower_bound': -0.2568327484687563, 'upper_b...",0.64,2048.0,"[[-0.2568327484687563, -0.2568327484687563, -0..."
4,NOX,Fractional,404,0,-5.253773e-15,-2.122524e-12,1.0,-1.471269,2.677335,"[{'lower_bound': -1.4712685320811714, 'upper_b...",0.64,2048.0,"[[-0.16522660145463205, -1.2151818790171445, 0..."
5,RM,Fractional,404,0,6.414149e-15,2.591316e-12,1.0,-3.81725,3.467186,"[{'lower_bound': -3.8172503201932715, 'upper_b...",0.64,2048.0,"[[-0.17644260263626882, 1.8943461340447383, -1..."
6,AGE,Fractional,404,0,2.984411e-16,1.205702e-13,1.0,-2.369042,1.110488,"[{'lower_bound': -2.369042258590638, 'upper_bo...",0.64,2048.0,"[[0.8130618810863538, -1.91036058079936, 1.110..."
7,DIS,Fractional,404,0,4.946538e-16,1.998401e-13,1.0,-1.287503,3.437406,"[{'lower_bound': -1.2875031560712922, 'upper_b...",0.64,2048.0,"[[0.11669830334066417, 1.247585237129712, -1.1..."
8,RAD,Fractional,404,0,1.126711e-17,4.551914e-15,1.0,-0.971569,1.675886,"[{'lower_bound': -0.9715692764178194, 'upper_b...",0.64,2048.0,"[[-0.6262490526587586, -0.8564625351647991, 1...."
9,TAX,Fractional,404,0,-1.981363e-16,-8.004708e-14,1.0,-1.311311,1.836097,"[{'lower_bound': -1.3113105494237356, 'upper_b...",0.64,2048.0,"[[-0.595170031037082, -0.34843254150890723, 1...."


In [63]:
constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(10)

  if __name__ == '__main__':


Unnamed: 0,name,inferred_type,completeness,num_constraints.is_non_negative
0,CRIM,Fractional,1.0,False
1,ZN,Fractional,1.0,False
2,INDUS,Fractional,1.0,False
3,CHAS,Fractional,1.0,False
4,NOX,Fractional,1.0,False
5,RM,Fractional,1.0,False
6,AGE,Fractional,1.0,False
7,DIS,Fractional,1.0,False
8,RAD,Fractional,1.0,False
9,TAX,Fractional,1.0,False


## 2. Analyzing collected data for data quality issues

### Create a schedule

You can create a model monitoring schedule for the endpoint created earlier. Use the baseline resources (constraints and statistics) to compare against the realtime traffic.

From the analysis above, you saw how the captured data is saved - that is the standard input and output format for Tensorflow models. But Model Monitor is framework-agnostic, and expects a specific format [explained in the docs](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-pre-and-post-processing.html#model-monitor-pre-processing-script):
- Input
    - Flattened JSON `{"feature0": <value>, "feature1": <value>...}`
    - Tabular `"<value>, <value>..."`
- Output:
    - Flattened JSON `{"prediction0": <value>, "prediction1": <value>...}`
    - Tabular `"<value>, <value>..."`
    
We need to transform the input records to comply with this requirement. Model Monitor offers _pre-processing scripts_ in Python to transform the input. The cell below has the script that will work for our case.

In [92]:
%%writefile preprocessing.py

import json

def preprocess_handler(inference_record):
    input_data = json.loads(inference_record.endpoint_input.data)
    input_data = {f"feature{i}": val for i, val in enumerate(input_data)}
    
    output_data = json.loads(inference_record.endpoint_output.data)["predictions"][0][0]
    output_data = {"prediction0": output_data}
    
    return{**input_data, **output_data}

Overwriting preprocessing.py


We'll upload this script to an s3 destination and pass it as the `record_preprocessor_script` parameter to the `create_monitoring_schedule` call.

In [93]:
script_s3_dest_path = f"s3://{bucket}/{prefix}/artifacts/modelmonitor"
script_s3_dest = sagemaker.s3.S3Uploader.upload("preprocessing.py", script_s3_dest_path)
print(script_s3_dest)

s3://sagemaker-us-east-1-113147044314/tf-2-workflow/artifacts/modelmonitor/preprocessing.py


In [94]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

mon_schedule_name = 'DEMO-tf-2-workflow-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint,
    record_preprocessor_script=script_s3_dest,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

The endpoint attribute has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


### Generating violations artificially

In order to get some result relevant to monitoring analysis, you can try and generate artificially some inferences with feature values causing specific violations, and then invoke the endpoint with this data

Looking at our RM and AGE features:

- RM - average number of rooms per dwelling
- AGE - proportion of owner-occupied units built prior to 1940

Let's simulate a situation where the average number of rooms is 0, and proportion of owner-occupied units built is 1000.

In [65]:
df_with_violations = pd.read_csv("training-dataset-with-header.csv")
df_with_violations["RM"] = 0
df_with_violations["AGE"] = 1000
df_with_violations

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0,-0.272246,-0.483615,-0.435762,-0.256833,-0.165227,0,1000,0.116698,-0.626249,-0.595170,1.148500,0.448077,0.825220
1,-0.403427,2.991784,-1.333912,-0.256833,-1.215182,0,1000,1.247585,-0.856463,-0.348433,-1.718189,0.431906,-1.329202
2,0.124940,-0.483615,1.028326,-0.256833,0.628642,0,1000,-1.187439,1.675886,1.565287,0.784476,0.220617,-1.308500
3,-0.401494,-0.483615,-0.869402,-0.256833,-0.361560,0,1000,1.107180,-0.511142,-1.094663,0.784476,0.448077,-0.652926
4,-0.005634,-0.483615,1.028326,-0.256833,1.328612,0,1000,-0.578572,1.675886,1.565287,0.784476,0.389882,0.263497
...,...,...,...,...,...,...,...,...,...,...,...,...,...
399,-0.381973,-0.483615,-0.616568,-0.256833,-0.933487,0,1000,1.157680,-0.741356,-1.040501,-0.262093,0.448077,0.477421
400,-0.388221,0.358906,-0.609218,-0.256833,-0.796907,0,1000,0.339660,-0.741356,-1.100681,0.056428,0.448077,-0.848908
401,-0.402030,0.990797,-0.741515,-0.256833,-1.019702,0,1000,1.430403,-0.971569,-0.613224,-0.717123,0.079439,-0.677769
402,-0.172920,-0.483615,1.245881,-0.256833,2.677335,0,1000,-1.044075,-0.511142,-0.017443,-1.718189,-0.987644,0.420835


### Start generating some artificial traffic
The cell below starts a thread to send some traffic to the endpoint. Note that you need to stop the kernel to terminate this thread. If there is no traffic, the monitoring jobs are marked as `Failed` since there is no data to process.

In [66]:
from threading import Thread
from time import sleep
import time

def invoke_endpoint():
    for item in df_with_violations.to_numpy():
        result = predictor.predict(item)['predictions'] 
        time.sleep(1)

def invoke_endpoint_forever():
    while True:
        invoke_endpoint()
        
thread = Thread(target = invoke_endpoint_forever)
thread.start()

# Note that you need to stop the kernel to stop the invocations

### Describe and inspect the schedule
Once you describe, observe that the MonitoringScheduleStatus changes to Scheduled.

In [95]:
desc_schedule_result = my_default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

Schedule status: Scheduled


### List executions
The schedule starts jobs at the previously specified intervals. Here, you list the latest five executions. Note that if you are kicking this off after creating the hourly schedule, you might find the executions empty. You might have to wait until you cross the hour boundary (in UTC) to see executions kick off. The code below has the logic for waiting.

Note: Even for an hourly schedule, Amazon SageMaker has a buffer period of 20 minutes to schedule your execution. You might see your execution start in anywhere from zero to ~20 minutes from the hour boundary. This is expected and done for load balancing in the backend.

In [96]:
mon_executions = my_default_monitor.list_executions()
print("We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will have to wait till we hit the hour...")

while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60)
    mon_executions = my_default_monitor.list_executions()    

No executions found for schedule. monitoring_schedule_name: DEMO-tf-2-workflow-model-monitor-schedule-2021-03-18-00-18-10
We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.
We will have to wait till we hit the hour...
Waiting for the 1st execution to happen...
No executions found for schedule. monitoring_schedule_name: DEMO-tf-2-workflow-model-monitor-schedule-2021-03-18-00-18-10
Waiting for the 1st execution to happen...
No executions found for schedule. monitoring_schedule_name: DEMO-tf-2-workflow-model-monitor-schedule-2021-03-18-00-18-10
Waiting for the 1st execution to happen...
No executions found for schedule. monitoring_schedule_name: DEMO-tf-2-workflow-model-monitor-schedule-2021-03-18-00-18-10
Waiting for the 1st execution to happen...
No executions found for schedule. monitoring_schedule_name: DEMO-tf-2-workflow-model-monitor-schedule-2021-03-18-00-18-10
Waiting for the 1st execution to happen...
No executions found for sc

### Inspect a specific execution (latest execution)
In the previous cell, you picked up the latest completed or failed scheduled execution. Here are the possible terminal states and what each of them mean: 
* Completed - This means the monitoring execution completed and no issues were found in the violations report.
* CompletedWithViolations - This means the execution completed, but constraint violations were detected.
* Failed - The monitoring execution failed, maybe due to client error (perhaps incorrect role premissions) or infrastructure issues. Further examination of FailureReason and ExitMessage is necessary to identify what exactly happened.
* Stopped - job exceeded max runtime or was manually stopped.

In [97]:
latest_execution = mon_executions[-1] # latest execution's index is -1, second to last is -2 and so on..
#time.sleep(60)
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()['ProcessingJobStatus']))
print("Latest execution result: {}".format(latest_execution.describe()['ExitMessage']))

latest_job = latest_execution.describe()
if (latest_job['ProcessingJobStatus'] != 'Completed'):
        print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

.....................................................!Latest execution status: Completed
Latest execution result: CompletedWithViolations: Job completed successfully with 1 violations.


In [98]:
report_uri=latest_execution.output.destination
print('Report Uri: {}'.format(report_uri))

Report Uri: s3://sagemaker-us-east-1-113147044314/tf-2-workflow/reports/tf-2-workflow-endpoint-2021-03-17-22-37-34/DEMO-tf-2-workflow-model-monitor-schedule-2021-03-18-00-18-10/2021/03/18/01


### List the generated reports

In [99]:
from urllib.parse import urlparse
s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip('/')
print('Report bucket: {}'.format(report_bucket))
print('Report key: {}'.format(report_key))

s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Report Files:")
print("\n ".join(report_files))

Report bucket: sagemaker-us-east-1-113147044314
Report key: tf-2-workflow/reports/tf-2-workflow-endpoint-2021-03-17-22-37-34/DEMO-tf-2-workflow-model-monitor-schedule-2021-03-18-00-18-10/2021/03/18/01
Found Report Files:
tf-2-workflow/reports/tf-2-workflow-endpoint-2021-03-17-22-37-34/DEMO-tf-2-workflow-model-monitor-schedule-2021-03-18-00-18-10/2021/03/18/01/constraint_violations.json


### Violations report

If there are any violations compared to the baseline, they will be listed here.

In [100]:
violations = my_default_monitor.latest_monitoring_constraint_violations()
pd.set_option('display.max_colwidth', -1)
constraints_df = pd.io.json.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)

  from ipykernel import kernelapp as app
  app.launch_new_instance()


Unnamed: 0,feature_name,constraint_check_type,description
0,Extra columns,extra_column_check,"There are extra columns in current dataset. Number of columns in current dataset: 14, Number of columns in baseline constraints: 13"


## Delete the resources

You can keep your endpoint running to continue capturing data. If you do not plan to collect more data or use this endpoint further, you should delete the endpoint to avoid incurring additional charges. Note that deleting your endpoint does not delete the data that was captured during the model invocations. That data persists in Amazon S3 until you delete it yourself.

But before that, you need to delete the schedule first.

In [91]:
my_default_monitor.delete_monitoring_schedule()
time.sleep(60) # actually wait for the deletion


Deleting Monitoring Schedule with name: DEMO-tf-2-workflow-model-monitor-schedule-2021-03-17-23-26-58


In [None]:
predictor.delete_endpoint()