<a id='05-nb'></a>

# Music Recommender Part 5: Model Monitor

----
In this notebook, we'll set up [SageMaker Model Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html) to detect when our model or data significantly deviates from its "normal" behavior. SageMaker Model Monitor provides the ability to monitor machine learning models in production and detect deviations in data quality in comparison to a baseline dataset (e.g. training data set). This notebook walks you through enabling data capture and setting up continous monitoring for an existing Endpoint.

This Notebook helps with the following:
* Update your existing SageMaker Endpoint to enable Model Monitoring
* Analyze the training dataset to generate a baseline constraint
* Setup a MonitoringSchedule for monitoring deviations from the specified baseline

----
### Contents
- [Overview](00_overview_arch_data.ipynb)
- [Part 1: Data Prep using Data Wrangler](01_music_dataprep.flow)
- [Part 2a: Feature Store Creation - Tracks](02a_export_fg_tracks.ipynb)
- [Part 2b: Feature Store Creation - User Preferences](02b_export_fg_5star_features.ipynb)
- [Part 2c: Feature Store Creation - Ratings](02c_export_fg_ratings.ipynb)
- [Part 3: Train Model with Debugger Hooks. Set Artifacts and Register Model.](03_train_model_lineage_registry_debugger.ipynb)
- [Part 4: Deploy Model & Inference using Online Feature Store](04_deploy_inference_explainability.ipynb)
- [Part 5: Model Monitor](05_model_monitor.ipynb)
    - [Enable data capture](#05-capture)
    - [Baselining](#05-baseline)
    - [Enable continous monitoring](#05-continuous)
- [Part 6: SageMaker Pipelines](06_pipeline.ipynb)
- [Part 7: Resource Cleanup](07_clean_up.ipynb)



<a id='05-capture'></a>

## Step 1: Enable real-time inference data capture

##### [back to top](#05-nb)

----

To enable data capture for monitoring the model data quality, you specify the new capture option called `DataCaptureConfig`. You can capture the request payload, the response payload or both with this configuration. The capture config applies to all variants. Please provide the Endpoint name in the following cell:

In [1]:
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.predictor import Predictor
from sagemaker import session
import boto3

In [2]:
import sys
import pprint
sys.path.insert(1, './code')
from parameter_store import ParameterStore
ps = ParameterStore(verbose=False)

parameters = ps.read('music-rec')

bucket = parameters['bucket']
dw_ecrlist = parameters['dw_ecrlist']
fg_name_ratings = parameters['fg_name_ratings']
fg_name_tracks = parameters['fg_name_tracks']
fg_name_user_preferences = parameters['fg_name_user_preferences']

flow_export_id = parameters['flow_export_id']
flow_s3_uri = parameters['flow_s3_uri']
pretrained_model_path = parameters['pretrained_model_path']
prefix = parameters['prefix']
ratings_data_source = parameters['ratings_data_source']
tracks_data_source = parameters['tracks_data_source']
endpoint_name = parameters['endpoint_name']
val_data_uri = parameters['val_data_uri']


In [3]:
sm_session = session.Session(boto3.Session())
region = boto3.Session().region_name

In [4]:
# Please fill in the following for enabling data capture
s3_capture_upload_path = f's3://{bucket}/{prefix}/endpoint-data-capture/' #example: s3://bucket-name/path/to/endpoint-data-capture/

##### 
## IMPORTANT
##
## Please make sure to add the "s3:PutObject" permission to the "role' you provided in the SageMaker Model 
## behind this Endpoint. Otherwise, Endpoint data capture will not work.
## 
##### 

In [5]:
%%time
# Change parameters as you would like - adjust sampling percentage, 
#  chose to capture request or response or both
data_capture_config = DataCaptureConfig(
    enable_capture = True,
    sampling_percentage=25,
    destination_s3_uri=s3_capture_upload_path,
    kms_key_id=None,
    capture_options=["REQUEST", "RESPONSE"],
    csv_content_types=["text/csv"],
    json_content_types=["application/json"]
)

# Now it is time to apply the new configuration and wait for it to be applied
predictor = Predictor(endpoint_name=endpoint_name)
predictor.update_data_capture_config(data_capture_config=data_capture_config)
sm_session.wait_for_endpoint(endpoint=endpoint_name)

---------------!!CPU times: user 241 ms, sys: 25 ms, total: 266 ms
Wall time: 7min 32s


{'EndpointName': 'music-recommendation-model-endpoint-notebooks',
 'EndpointArn': 'arn:aws:sagemaker:us-east-2:738335684114:endpoint/music-recommendation-model-endpoint-notebooks',
 'EndpointConfigName': 'music-recommendation-model-endpoint-not-2021-08-05-06-42-13-585',
 'ProductionVariants': [{'VariantName': 'AllTraffic',
   'DeployedImages': [{'SpecifiedImage': '257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-xgboost:0.90-2-cpu-py3',
     'ResolvedImage': '257758044811.dkr.ecr.us-east-2.amazonaws.com/sagemaker-xgboost@sha256:0d098653ff2915993d61180da0cde0ed982805093463d40f30212b8050486f18',
     'ResolutionTime': datetime.datetime(2021, 8, 5, 6, 42, 17, 199000, tzinfo=tzlocal())}],
   'CurrentWeight': 1.0,
   'DesiredWeight': 1.0,
   'CurrentInstanceCount': 1,
   'DesiredInstanceCount': 1}],
 'DataCaptureConfig': {'EnableCapture': True,
  'CaptureStatus': 'Started',
  'CurrentSamplingPercentage': 25,
  'DestinationS3Uri': 's3://sagemaker-us-east-2-738335684114/music-recommenda

## Before you proceed:
Currently SageMaker supports monitoring Endpoints out of the box only for **tabular (csv, flat-json)** datasets. If your Endpoint uses some other datasets, these following steps will NOT work for you.


<a id='05-baseline'></a>

## Step 2: Model Monitor - Baselining

##### [back to top](#05-nb)

----

In addition to collecting the data, SageMaker allows you to monitor and evaluate the data observed by the Endpoints. For this :
1. We need to create a baseline with which we compare the realtime traffic against. 
1. Once a baseline is ready, we can setup a schedule to continously evaluate/compare against the baseline.

## Constraint suggestion with baseline/training dataset

The training dataset with which you trained the model is usually a good baseline dataset. Note that the training dataset's data schema and the inference dataset schema should exactly match (i.e. number and order of the features).

Using our training dataset, we'll ask SageMaker to suggest a set of baseline constraints and generate descriptive statistics to explore the data.

In [6]:
##'s3://bucketname/path/to/baseline/data' - Where your validation data is
baseline_data_uri = val_data_uri 
##'s3://bucketname/path/to/baseline/data' - Where the results are to be stored in
baseline_results_uri = f's3://{bucket}/{prefix}/baseline/results' 

print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri))

Baseline data uri: s3://sagemaker-us-east-2-738335684114/music-recommendation/data/val/val_data.csv
Baseline results uri: s3://sagemaker-us-east-2-738335684114/music-recommendation/baseline/results


### Create a baselining job with the validation dataset

Now that we have the training data ready in S3, let's kick off a job to `suggest` constraints. `DefaultModelMonitor.suggest_baseline(..)` kicks off a `ProcessingJob` using a SageMaker provided Model Monitor container to generate the constraints. Please edit the configurations to fit your needs.

In [7]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker import get_execution_role
import datetime

role = get_execution_role(sagemaker_session=sm_session)

datetime_stamp = datetime.datetime.now().strftime("%Y-%m-%d-%H%M%S")

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=2,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    base_job_name=f"{prefix}-monitor-{datetime_stamp}"
)

In [8]:
%%time

monitor_baseline = my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_uri,
    dataset_format=DatasetFormat.csv(header=False),
    output_s3_uri=baseline_results_uri,
    job_name=f"{prefix}-monitor-baseline-{datetime_stamp}",
    wait=True
)


Job Name:  music-recommendation-monitor-baseline-2021-08-05-064946
Inputs:  [{'InputName': 'baseline_dataset_input', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-2-738335684114/music-recommendation/data/val/val_data.csv', 'LocalPath': '/opt/ml/processing/input/baseline_dataset_input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-2-738335684114/music-recommendation/baseline/results', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
...........................[34m2021-08-05 06:54:04.091618: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory[0m
[34m2021-08-05 06:54:04.091650: I tensorflow/stream_executo

### Exploratory Analysis of the Processing Jobs underlying SageMaker Monitor
In this short section [next few cells] we will be showing you how to further view the underlying jobs for the monitoring job

In [9]:
from time import gmtime, strftime
import boto3

client = boto3.client('sagemaker')

def get_last_processing_job():
    
    response = client.list_processing_jobs(
        NameContains=f"{prefix}-monitor-baseline-{datetime_stamp}",
        StatusEquals='Completed',
        SortBy='CreationTime',
        SortOrder='Descending',
        MaxResults=20
    )
    pprint.pprint(response['ProcessingJobSummaries'][0])
    return response['ProcessingJobSummaries'][0]['ProcessingJobName']

In [10]:
from sagemaker.processing  import ProcessingJob 
from sagemaker.estimator import Estimator
from sagemaker.model_monitor.model_monitoring import ModelMonitor

my_default_monitor_name = get_last_processing_job()


{'CreationTime': datetime.datetime(2021, 8, 5, 6, 49, 46, 346000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 8, 5, 6, 56, 48, 211000, tzinfo=tzlocal()),
 'ProcessingEndTime': datetime.datetime(2021, 8, 5, 6, 56, 45, 338000, tzinfo=tzlocal()),
 'ProcessingJobArn': 'arn:aws:sagemaker:us-east-2:738335684114:processing-job/music-recommendation-monitor-baseline-2021-08-05-064946',
 'ProcessingJobName': 'music-recommendation-monitor-baseline-2021-08-05-064946',
 'ProcessingJobStatus': 'Completed'}


In [11]:
my_default_monitor_reload = ProcessingJob.from_processing_name(sm_session, my_default_monitor_name)

response = client.describe_processing_job(
    ProcessingJobName=my_default_monitor_name
)
pprint.pprint(response)

{'AppSpecification': {'ImageUri': '777275614652.dkr.ecr.us-east-2.amazonaws.com/sagemaker-model-monitor-analyzer'},
 'CreationTime': datetime.datetime(2021, 8, 5, 6, 49, 46, 346000, tzinfo=tzlocal()),
 'Environment': {'dataset_format': '{"csv": {"header": false, '
                                   '"output_columns_position": "START"}}',
                 'dataset_source': '/opt/ml/processing/input/baseline_dataset_input',
                 'output_path': '/opt/ml/processing/output',
                 'publish_cloudwatch_metrics': 'Disabled'},
 'ExitMessage': 'Completed: Job completed successfully with no violations.',
 'LastModifiedTime': datetime.datetime(2021, 8, 5, 6, 56, 48, 211000, tzinfo=tzlocal()),
 'ProcessingEndTime': datetime.datetime(2021, 8, 5, 6, 56, 45, 338000, tzinfo=tzlocal()),
 'ProcessingInputs': [{'AppManaged': False,
                       'InputName': 'baseline_dataset_input',
                       'S3Input': {'LocalPath': '/opt/ml/processing/input/baseline_dataset_

### Explore the generated constraints and statistics

In [12]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

  after removing the cwd from sys.path.


Unnamed: 0,name,inferred_type,numerical_statistics.common.num_present,numerical_statistics.common.num_missing,numerical_statistics.mean,numerical_statistics.sum,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max,numerical_statistics.distribution.kll.buckets,numerical_statistics.distribution.kll.sketch.parameters.c,numerical_statistics.distribution.kll.sketch.parameters.k,numerical_statistics.distribution.kll.sketch.data,string_statistics.common.num_present,string_statistics.common.num_missing,string_statistics.distinct_count
0,_c0,Fractional,45457.0,0.0,3.466221,157564.0,1.156234,1.0,5.0,"[{'lower_bound': 1.0, 'upper_bound': 1.4, 'cou...",0.64,2048.0,"[[3.0], [5.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1....",,,
1,_c1,Fractional,45457.0,0.0,248.533625,11297590.0,101.047964,6.0,3024.0,"[{'lower_bound': 6.0, 'upper_bound': 307.8, 'c...",0.64,2048.0,"[[252.0], [2520.0, 11.0, 22.0, 38.0, 41.0, 48....",,,
2,_c2,String,,,,,,,,,,,,45457.0,0.0,18171.0
3,_c3,String,,,,,,,,,,,,45457.0,0.0,15506.0
4,_c4,Fractional,45457.0,0.0,0.502307,22833.37,0.258836,0.0,0.989,"[{'lower_bound': 0.0, 'upper_bound': 0.0989, '...",0.64,2048.0,"[[0.11772576856016], [0.989, 0.0, 0.0, 0.0, 0....",,,
5,_c5,Fractional,45457.0,0.0,0.065632,2983.443,0.047561,0.0228,0.212,"[{'lower_bound': 0.0228, 'upper_bound': 0.0417...",0.64,2048.0,"[[0.1126937599200329], [0.212, 0.0228, 0.0228,...",,,
6,_c6,String,,,,,,,,,,,,45457.0,0.0,13839.0
7,_c7,Fractional,45457.0,0.0,0.170558,7753.073,0.115393,0.0174,0.601,"[{'lower_bound': 0.0174, 'upper_bound': 0.0757...",0.64,2048.0,"[[0.207105046487789], [0.601, 0.0174, 0.0174, ...",,,
8,_c8,Fractional,45457.0,0.0,123.433691,5610925.0,29.270197,49.464,219.004,"[{'lower_bound': 49.464, 'upper_bound': 66.418...",0.64,2048.0,"[[143.62259146164408], [208.12375095395043, 52...",,,
9,_c9,Fractional,45457.0,0.0,0.231752,10534.75,0.088843,0.005001,0.453743,"[{'lower_bound': 0.0050012445137153, 'upper_bo...",0.64,2048.0,"[[0.1880893033518223], [0.4366254056591849, 0....",,,


In [13]:
constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(10)

  """Entry point for launching an IPython kernel.


Unnamed: 0,name,inferred_type,completeness,num_constraints.is_non_negative
0,_c0,Fractional,1.0,True
1,_c1,Fractional,1.0,True
2,_c2,String,1.0,
3,_c3,String,1.0,
4,_c4,Fractional,1.0,True
5,_c5,Fractional,1.0,True
6,_c6,String,1.0,
7,_c7,Fractional,1.0,True
8,_c8,Fractional,1.0,True
9,_c9,Fractional,1.0,True


Before proceeding to enable monitoring, you could chose to edit the constraint file as required to fine tune the constraints.

<a id='05-continuous'></a>

## Step 3: Enable continous monitoring

##### [back to top](#05-nb)

----

We have collected the data above, here we proceed to analyze and monitor the data with MonitoringSchedules.

### Create a schedule

We are ready to create a model monitoring schedule for the Endpoint created earlier with the baseline resources (constraints and statistics).

In [14]:
from sagemaker.model_monitor import CronExpressionGenerator
import datetime as datetime
from time import gmtime, strftime


mon_schedule_name = 'music-rec-monitor-schedule-{}'.format(datetime.datetime.now().strftime("%Y-%m-%d-%H%M%S"))
s3_report_path = f's3://{bucket}/{prefix}/monitor/report'

try:
    my_default_monitor.create_monitoring_schedule(
        monitor_schedule_name=mon_schedule_name,
        endpoint_input=endpoint_name,
        output_s3_uri=s3_report_path,
        statistics=my_default_monitor.baseline_statistics(),
        constraints=my_default_monitor.suggested_constraints(),
        schedule_cron_expression=CronExpressionGenerator.daily(),
        enable_cloudwatch_metrics=True,
    )
    print(f"Created monitoring schedule {mon_schedule_name}")
except:
    my_default_monitor.update_monitoring_schedule(
        endpoint_input=endpoint_name,
        schedule_cron_expression=CronExpressionGenerator.daily(),
        enable_cloudwatch_metrics=True,
    )
    print(f"Updated monitoring schedule {my_default_monitor.monitoring_schedule_name}")

Created monitoring schedule music-rec-monitor-schedule-2021-08-05-065703


In [15]:
import time

desc_schedule_result = my_default_monitor.describe_schedule()
while desc_schedule_result['MonitoringScheduleStatus'] != 'Scheduled':
    print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))
    desc_schedule_result = my_default_monitor.describe_schedule()
    time.sleep(30)
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

Schedule status: Pending
Schedule status: Pending
Schedule status: Scheduled


### All set
Now that your monitoring schedule has been created. Please return to the Amazon SageMaker Studio to list the executions for this Schedule and observe the results going forward.