# Enable Amazon SageMaker Model Monitor

Amazon SageMaker provides the ability to monitor machine learning models in production and detect deviations in data quality in comparison to a baseline dataset (e.g. training data set). This notebook walks you through enabling data capture and setting up continous monitoring for an existing Endpoint.

This Notebook helps with the following:
* Update your existing SageMaker Endpoint to enable Model Monitoring
* Analyze the training dataset to generate a baseline constraint
* Setup a MonitoringSchedule for monitoring deviations from the specified baseline

---

In [2]:
!pip install --upgrade sagemaker

import boto3
import sagemaker
from sagemaker.session import Session
from sagemaker.estimator import Estimator
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker import get_execution_role



# Step 1: Enable real-time inference data capture

To enable data capture for monitoring the model data quality, you specify the new capture option called `DataCaptureConfig`. You can capture the request payload, the response payload or both with this configuration. The capture config applies to all variants. Please provide the Endpoint name in the following cell:

In [3]:
# Please fill in the following for enabling data capture
from time import gmtime, strftime
endpoint_name = 'Insurance-xgb-churn-pred-model-monitor-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
#endpoint_name = 'Insurance-xgb-churn-pred-model-monitor-2021-04-27-20-34-28'
#sagemaker-xgboost-2021-05-11-18-08-59-441

print("EndpointName={}".format(endpoint_name))

#/model.tar.gz'
s3_capture_upload_path = 's3://sagemaker-us-east-1-135151577600/sagemaker-featurestore-insurance/inference_input/' 
## IMPORTANT
## Please make sure to add the "s3:PutObject" permission to the "role' you provided in the SageMaker Model 
## behind this Endpoint. Otherwise, Endpoint data capture will not work.


EndpointName=Insurance-xgb-churn-pred-model-monitor-2021-05-12-17-09-56


## Set up Hosting for the Model

Once the training is done, we can deploy the trained model as an Amazon SageMaker real-time hosted endpoint. This will allow us to make predictions (or inference) from the model. Note that we don't have to host on the same instance (or type of instance) that we used to train. The endpoint deployment can be accomplished as follows. This takes 8-10 minutes to complete.

In [4]:
region = boto3.Session().region_name

boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_featurestore_runtime_client=featurestore_runtime
)

In [5]:
# You can modify the following to use a bucket of your choosing
role = get_execution_role()
print (role)
default_s3_bucket_name = feature_store_session.default_bucket()
prefix = 'sagemaker-featurestore-insurance'
training_output_path='s3://' + default_s3_bucket_name+'/'+prefix + '/training_output'
account_id = boto3.client('sts').get_caller_identity()["Account"]
print("prefix " +prefix)
print("default bucket name "+default_s3_bucket_name)
s3_client = boto3.client('s3', region_name=region)
account_id = boto3.client('sts').get_caller_identity()["Account"]
training_output_path='s3://' + default_s3_bucket_name+'/'+prefix + '/training_output'
print("account id "+account_id)


arn:aws:iam::135151577600:role/service-role/AmazonSageMaker-ExecutionRole-20210418T230749
prefix sagemaker-featurestore-insurance
default bucket name sagemaker-us-east-1-135151577600
account id 135151577600


## Constraint suggestion with baseline/training dataset

The training dataset with which you trained the model is usually a good baseline dataset. Note that the training dataset's data schema and the inference dataset schema should exactly match (i.e. number and order of the features).
Using our training dataset, we'll ask SageMaker to suggest a set of baseline constraints and generate descriptive statistics to explore the data.

In [None]:
baseline_data_uri = 's3://sagemaker-us-east-1-135151577600/sagemaker-featurestore-insurance/training_input' ##Where your training data is
baseline_results_uri = 's3://sagemaker-us-east-1-135151577600/sagemaker-featurestore-insurance/training_output/' ##Where the results are to be stored in

In [None]:
print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri))

## Trains & Deploys the Model using Xgboost

In [6]:
training_image=sagemaker.image_uris.retrieve("xgboost", region, "1.0-1")
training_image
from sagemaker.estimator import Estimator
training_model = Estimator(training_image,
                           role, 
                           instance_count=1, 
                           instance_type='ml.m5.2xlarge',
                           volume_size = 5,
                           max_run = 3600,
                           input_mode= 'File',
                           output_path=training_output_path,
                           sagemaker_session=feature_store_session)
#dataset_uri_prefix = 's3://'+default_s3_bucket_name+'/'+prefix+'/training_input/';
train_data = sagemaker.inputs.TrainingInput(baseline_data_uri, distribution='FullyReplicated', 
                                            content_type='text/csv', s3_data_type='S3Prefix')

train_data = sagemaker.inputs.TrainingInput(baseline_data_uri, distribution='FullyReplicated', 
                                            content_type='text/csv', s3_data_type='S3Prefix')
print("interim  ")
data_channels = {'train': train_data}
training_model.set_hyperparameters(objective = "reg:tweedie",
                                   num_round = 50)
training_model.fit(inputs=data_channels, logs=True)
print("Fit Done")
# predictor = Predictor(endpoint_name)
predictor = training_model.deploy(initial_instance_count = 1, instance_type = 'ml.m5.xlarge')



# from sagemaker.predictor import Predictor
# from sagemaker.serializers import CSVSerializer
# predictor = Predictor(endpoint_name=predictor.endpoint_name, serializer=CSVSerializer())





# training_model = Estimator.attach(training_image)
# predictor = training_model.deploy(initial_instance_count = 1, instance_type = 'ml.m4.xlarge')

interim  
2021-05-12 17:09:57 Starting - Starting the training job...ProfilerReport-1620839396: InProgress
......
2021-05-12 17:11:26 Starting - Launching requested ML instances...............
2021-05-12 17:13:57 Starting - Preparing the instances for training..............................
2021-05-12 17:18:51 Downloading - Downloading input data............
2021-05-12 17:20:57 Training - Downloading the training image....[34mINFO:sagemaker-containers:Imported framework sagemaker_xgboost_container.training[0m
[34mINFO:sagemaker-containers:Failed to parse hyperparameter objective value reg:tweedie to Json.[0m
[34mReturning the value itself[0m
[34mINFO:sagemaker-containers:No GPUs detected (normal if no gpus installed)[0m
[34mINFO:sagemaker_xgboost_container.training:Running XGBoost Sagemaker in algorithm mode[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34mINFO:root:Determined delimiter of CSV input is ','[0m
[34m[17:21:29] 40095x60 matrix with 2405700 ent

In [7]:
# from sagemaker.model import Model
# from sagemaker.image_uris import retrieve
# from sagemaker.model_monitor import DataCaptureConfig
# from sagemaker.predictor import Predictor
# from sagemaker import session
# import boto3
# sm_session = session.Session(boto3.Session())
# print(endpoint_name)
# Change parameters as you would like - adjust sampling percentage, 
#  chose to capture request or response or both.
#  Learn more from our documentation
# data_capture_config = DataCaptureConfig(
#                         enable_capture = True,
#                         sampling_percentage=50,
#                         destination_s3_uri=s3_capture_upload_path,
#                         kms_key_id=None,
#                         capture_options=["REQUEST", "RESPONSE"],
#                         csv_content_types=["text/csv"],
#                         json_content_types=["application/json"])
# Now it is time to apply the new configuration and wait for it to be applied
# predictor = Predictor(endpoint_name)
# predictor.update_data_capture_config(data_capture_config=data_capture_config)
# sm_session.wait_for_endpoint(endpoint=endpoint_name)

# data_capture_config = DataCaptureConfig(
#                         enable_capture = True,
#                         sampling_percentage=50,
#                         destination_s3_uri=s3_capture_upload_path,
#                        )
# region = boto3.Session().region_name
# training_image=sagemaker.image_uris.retrieve("xgboost", region, "1.0-1")
# training_image
# session = Session(
#     boto_session=boto_session,
#     sagemaker_client=sagemaker_client,
#     sagemaker_featurestore_runtime_client=featurestore_runtime
# )
# from sagemaker.estimator import Estimator
# model = Estimator(training_image,
#                            role, 
#                            instance_count=1, 
#                            instance_type='ml.m5.2xlarge',
#                            volume_size = 5,
#                            max_run = 3600,
#                            input_mode= 'File',
#                            output_path=s3_capture_upload_path,
#                            sagemaker_session=sm_session)

# predictor = model.deploy(
#     initial_instance_count=1,
#     instance_type='ml.m4.xlarge',
#     data_capture_config=data_capture_config
# )


## Before you proceed:
Currently SageMaker supports monitoring Endpoints out of the box only for **tabular (csv, flat-json)** datasets. If your Endpoint uses some other datasets, these following steps will NOT work for you.


# Step 2: Model Monitor - Baselining

In addition to collecting the data, SageMaker allows you to monitor and evaluate the data observed by the Endpoints. For this :
1. We need to create a baseline with which we compare the realtime traffic against. 
1. Once a baseline is ready, we can setup a schedule to continously evaluate/compare against the baseline.

### Create a baselining job with the training dataset

#  chose to capture request or response or both.

In [None]:

from sagemaker.model_monitor import DataCaptureConfig
s3_capture_upload_path = 's3://sagemaker-us-east-1-135151577600/sagemaker-featurestore-insurance/inference_input/' 
data_capture_config = DataCaptureConfig(
                        enable_capture = True,
                        sampling_percentage=50,
                        destination_s3_uri=s3_capture_upload_path,
                        kms_key_id=None,
                        capture_options=["REQUEST", "RESPONSE"],
                        csv_content_types=["text/csv"],
                        json_content_types=["application/json"])

# Now it is time to apply the new configuration and wait for it to be applied
#predictor = Predictor(endpoint_name)
predictor.update_data_capture_config(data_capture_config=data_capture_config)
#sm_session.wait_for_endpoint(endpoint=endpoint_name)

Now that we have the training data ready in S3, let's kick off a job to `suggest` constraints. `DefaultModelMonitor.suggest_baseline(..)` kicks off a `ProcessingJob` using a SageMaker provided Model Monitor container to generate the constraints. Please edit the configurations to fit your needs.

In [15]:
# df = load_mtpl2(n_samples=60000)

# Note: filter out claims with zero amount, as the severity model
# requires strictly positive target values.
# df.loc[(df["ClaimAmount"] == 0) & (df["ClaimNb"] >= 1), "ClaimNb"] = 0
# df.head()

# dataset = df_features.iloc[:,np.r_[df_features.columns.get_loc('PurePremium'), 0:60]]

# Write to csv in S3 without headers and index column.
# dataset.to_csv('dataset.csv', header=False, index=False)
# s3_client.upload_file('dataset.csv', default_s3_bucket_name, prefix+'/training_input/dataset.csv')
# dataset_uri_prefix = 's3://'+default_s3_bucket_name+'/'+prefix+'/training_input/';

# dataset





#baseline_data = sagemaker.inputs.TrainingInput(baseline_data_uri, distribution='FullyReplicated', content_type='text/csv', s3_data_type='S3Prefix')

# my_default_monitor = DefaultModelMonitor(
#     role=role,
#     instance_count=1,
#     instance_type='ml.m5.xlarge',
#     volume_size_in_gb=20,
#     max_runtime_in_seconds=3600,
# )
# my_default_monitor.suggest_baseline(
#     baseline_dataset=baseline_data,
#     dataset_format=DatasetFormat.csv(header=False),
#     output_s3_uri=baseline_results_uri,
#     wait=True
# )
my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)
my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri+'/dataset.csv',
    dataset_format=DatasetFormat.csv(header=False)
)

---------------!
Job Name:  baseline-suggestion-job-2021-05-12-18-05-26-514
Inputs:  [{'InputName': 'baseline_dataset_input', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-135151577600/sagemaker-featurestore-insurance/training_input/dataset.csv', 'LocalPath': '/opt/ml/processing/input/baseline_dataset_input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-135151577600/model-monitor/baselining/baseline-suggestion-job-2021-05-12-18-05-26-514/results', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
.........................[34m2021-05-12 18:09:27,836 - __main__ - INFO - All params:{'ProcessingJobArn': 'arn:aws:sagemaker:us-east-1:135151577600:processing-job/baseline-suggestion-job-2021-05-12-18-05-26-514', 'ProcessingJobName': 'baseline-suggestion-job

### Explore the generated constraints and statistics

In [16]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)




Unnamed: 0,name,inferred_type,numerical_statistics.common.num_present,numerical_statistics.common.num_missing,numerical_statistics.mean,numerical_statistics.sum,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max,numerical_statistics.distribution.kll.buckets,numerical_statistics.distribution.kll.sketch.parameters.c,numerical_statistics.distribution.kll.sketch.parameters.k,numerical_statistics.distribution.kll.sketch.data,string_statistics.common.num_present,string_statistics.common.num_missing,string_statistics.distinct_count
0,_c0,Fractional,40095.0,0.0,906.726145,36355180.0,92540.591028,0.0,18307366.0,"[{'lower_bound': 0.0, 'upper_bound': 1830736.6...",0.64,2048.0,"[[0.0, 0.0, 0.0, 0.0, 5036.047619047619, 0.0, ...",,,
1,_c1,Fractional,40095.0,0.0,0.323606,12975.0,0.467852,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0,...",,,
2,_c2,Fractional,40095.0,0.0,0.327223,13120.0,0.469199,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0,...",,,
3,_c3,Fractional,40095.0,0.0,0.349171,14000.0,0.476708,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0,...",,,
4,_c4,Fractional,40095.0,0.0,0.334506,13412.0,0.471817,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0,...",,,
5,_c5,Fractional,40095.0,0.0,0.331089,13275.0,0.470605,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0,...",,,
6,_c6,Fractional,40095.0,0.0,0.334406,13408.0,0.471782,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[1.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 1.0,...",,,
7,_c7,Fractional,40095.0,0.0,0.276643,11092.0,0.447338,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0,...",,,
8,_c8,Fractional,40095.0,0.0,0.023345,936.0,0.150995,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,...",,,
9,_c9,Fractional,40095.0,0.0,0.019504,782.0,0.138287,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,...",,,


In [17]:
constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(10)

  if __name__ == '__main__':


Unnamed: 0,name,inferred_type,completeness,num_constraints.is_non_negative
0,_c0,Fractional,1.0,True
1,_c1,Fractional,1.0,True
2,_c2,Fractional,1.0,True
3,_c3,Fractional,1.0,True
4,_c4,Fractional,1.0,True
5,_c5,Fractional,1.0,True
6,_c6,Fractional,1.0,True
7,_c7,Fractional,1.0,True
8,_c8,Fractional,1.0,True
9,_c9,Fractional,1.0,True


Before proceeding to enable monitoring, you could chose to edit the constraint file as required to fine tune the constraints.

# Step 3: Enable continous monitoring

We have collected the data above, here we proceed to analyze and monitor the data with MonitoringSchedules.

### Create a schedule

We are ready to create a model monitoring schedule for the Endpoint created earlier with the baseline resources (constraints and statistics).

In [19]:
import time
from sagemaker.model_monitor import CronExpressionGenerator
mon_schedule_name = 'InsuranceMonitoringJobX'
s3_report_path = 's3://sagemaker-us-east-1-135151577600/model-monitor/monitoring_output/'
#predictor.update_data_capture_config(data_capture_config=data_capture_config)
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=predictor.endpoint_name,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.daily(),
    enable_cloudwatch_metrics=True
)

In [20]:
desc_schedule_result = my_default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

Schedule status: Pending


### All set
Now that your monitoring schedule has been created. Please return to the Amazon SageMaker Studio to list the executions for this Schedule and observe the results going forward.