# Forest CoverType 3): Model Monitoring and Drift Detection

We've already covered setting up a pipeline to canary deploy new model variants to an endpoint with data capture enabled...

Our next mission is to regularly analyze that captured data to detect drift and trigger remedial activities (e.g. warnings, re-training, etc).

## Libraries and config...

In [None]:
%load_ext autoreload
%autoreload 2

# Python Built-Ins:
import json
import math
import os
import time

# External Dependencies:
import boto3
from botocore import exceptions as botoexceptions
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import sagemaker
from sagemaker.pytorch.estimator import PyTorch as PyTorchEstimator
from sagemaker.pytorch.model import PyTorchModel, PyTorchPredictor
import seaborn as sn
from sklearn import metrics
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
from smexperiments.trial_component import TrialComponent
from smexperiments.tracker import Tracker
from tqdm import tqdm  # (Progress bars)

# Local Dependencies:
import util

In [None]:
%store -r experiment_name
%store -r preproc_trial_component_name
%store -r project_id
%store -r target_model

smclient = boto3.client("sagemaker")
role = sagemaker.get_execution_role()
smsess = sagemaker.session.Session()

project = util.project.init(project_id, role)
print(project)

sandbox_bucket = boto3.resource("s3").Bucket(project.sandbox.sandbox_bucket)

In [None]:
preproc_trial_component = TrialComponent.load(preproc_trial_component_name)

## Generating some traffic

Our TabNet endpoint is quite flexible in the data formats it supports: For example we demonstrated mini-batched inference on `application/x-npy` input at the end of the TabNet notebook.

...But we'd like to use the [DefaultModelMonitor](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_monitor.model_monitoring.DefaultModelMonitor), rather than taking on the challenge of custom a custom monitoring processor at this stage.

So for our production endpoint, we'll stick to **single-record requests in text/csv format**:

In [None]:
predictor = PyTorchPredictor(
    target_model,
    # By default, PyTorch predictors use application/x-npy on the wire... Which is nice except default model
    # monitor can't understand the captures! So we'll use CSV instead:
    serializer=sagemaker.serializers.CSVSerializer(),
    deserializer=sagemaker.deserializers.CSVDeserializer(),
) 

In [None]:
# Load up our test dataframe from local file:
with open("data/columns.json", "r") as f:
    train_columns = json.load(f)

df_test = pd.read_csv(
    "data/test-noheader.csv",
    names=train_columns
)
df_test.head()

The cell below loops through the entire test dataframe exactly once, one record at a time...

That can take a long time and we're just trying to generate some sample data, so feel free to interrupt it!

In [None]:
%%time

inf_batch_size = 1  # Important to keep = 1 if we want DefaultModelMonitor to work!
n_inf_batches = math.ceil(len(df_test) / inf_batch_size)

print(f"Sending test data for inference in {n_inf_batches} batches of {inf_batch_size} records...")
iterator = tqdm(
    df_test.drop("Cover_Type", axis=1).groupby(np.arange(len(df_test))//inf_batch_size),
    total=n_inf_batches
)

def predict_batch(it):
    ixbatch, group = it
    result = np.array(predictor.predict(group.to_numpy()), dtype="float")
    time.sleep(0.5)
    return result

# This utility fn will show us a tqdm progress bar *without* it getting messed up by notebook interrupts
last_result = util.progress.notebook_safe_tqdm_loop(
    iterator,
    predict_batch
)

print("Done!")
print("Last batch result:")
result

## Setting up a DefaultModelMonitor

Just like an `Estimator` creating a ModelMonitor doesn't actually *do* anything - we're just defining configuration:

In [None]:
my_default_monitor = sagemaker.model_monitor.DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    volume_size_in_gb=5,
    max_runtime_in_seconds=3600,
)

In [None]:
# (Or attach to an endpoint)
#my_default_monitor = sagemaker.model_monitor.DefaultModelMonitor.attach(
#    target_model
#)

## Baselining from the training data set

First we want to summarize a **baseline** data distribution, to track deviations observed in production.

A logical choice in this scenario is to use the use the dataset the model was actually trained on.

The `DefaultModelMonitor.suggest_baseline()` function:
- Sets up a **SageMaker Processing Job**, which
- Analyses **the data set in S3**, to
- *Suggest* **summary statistics and constraints** for each field in the data

Like any other Processing Job these outputs will appear as files in our S3 output location, but DefaultModelMonitor will give us some additional convenience methods for analysing their structure.

In [None]:
%%time

train_baseline_uri = "s3://{}/baselines/{}/{} {}/train".format(
    project.monitoring_bucket,
    target_model,
    preproc_trial_component.display_name,  # (Not unique)
    preproc_trial_component.trial_component_name,  # (Not human-interpretable)
)

train_baseline_job = my_default_monitor.suggest_baseline(
    baseline_dataset=preproc_trial_component.output_artifacts["train-csv"].value,
    dataset_format=sagemaker.model_monitor.DatasetFormat.csv(header=True),
    output_s3_uri=train_baseline_uri,
    wait=True
)

It's important to note that these statistics and constraints can be downloaded, inspected **and edited**: If you'd like to impose different alerting constraints from human domain understanding, that's fine!

In [None]:
!mkdir -p data/monitoring/baselines
baselines_uri = f"s3://{project.monitoring_bucket}/baselines"
!aws s3 sync $baselines_uri data/monitoring/baselines

In [None]:
baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

In [None]:
constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(20)

## Baselining from live data

An **alternative** strategy that might be more relevant to some online learning use-cases where the "training set" is less well-defined, is to calculate statistics and proposed constraints from live captured data.

In [None]:
%%time

live_baseline_uri = "s3://{}/baselines/{}/live/{}".format(
    project.monitoring_bucket,
    target_model,
    util.append_timestamp("baseline"),
)

live_baseline_job = my_default_monitor.suggest_baseline(
    baseline_dataset="s3://{}/capture/{}".format(
        project.monitoring_bucket,
        target_model,
        # TODO: Will you filter down to a subset of this path?
    ),
    # We don't need any fancy pre/post processing scripts for this use case:
    #record_preprocessor_script="src-monitoring/preprocessor.py",
    #post_analytics_processor_script=None,
    dataset_format=sagemaker.model_monitor.DatasetFormat.sagemaker_capture_json(),
    output_s3_uri=live_baseline_uri,
    wait=True
)

### Extension Exercise:

Can you modify one or more of the suggested constraints, and create your monitoring schedule (below) from that modified copy to trigger additional violations?

## Setting up a monitoring schedule

With a baseline established and endpoint data capture configured, we'd like SageMaker to regularly check the data coming through our model and warn us about violations.

Since we're using constraint/statistic and data capture formats that the DefaultModelMonitor understands, this is a simple SDK call:

In [None]:
my_default_monitor.create_monitoring_schedule(
    target_model,
    # We'll try to avoid accidentally setting up multiple schedules on one endpoint:
    monitor_schedule_name=target_model,
    # We don't need any fancy pre/post processing scripts for this use case:
    #record_preprocessor_script="src-monitoring/preprocessor.py",
    #post_analytics_processor_script=None,
    output_s3_uri="s3://{}/schedule-results/{}".format(
        project.monitoring_bucket,
        target_model,
    ),
    statistics=baseline_job.baseline_statistics(),
    constraints=baseline_job.suggested_constraints(),
    # Watch out: Only a subset of Cron (scheduling) expressions are supported!
    schedule_cron_expression=sagemaker.model_monitor.CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

In [None]:
my_default_monitor.describe_schedule()

## Generate some *deliberately biased* traffic

remember that biased testing subset we extracted in the first notebook? Let's put it to use!

In [None]:
df_biased = pd.read_csv(
    "data/test-bias-noheader.csv",
    names=train_columns
)
df_biased.head()

In [None]:
%%time

inf_batch_size = 1  # Important to keep = 1 if we want DefaultModelMonitor to work!
n_inf_batches = math.ceil(len(df_biased) / inf_batch_size)

print(f"Sending biased test data for inference in {n_inf_batches} batches of {inf_batch_size} records...")
iterator = tqdm(
    df_biased.drop("Cover_Type", axis=1).groupby(np.arange(len(df_biased))//inf_batch_size),
    total=n_inf_batches
)

def predict_batch(it):
    ixbatch, group = it
    result = np.array(predictor.predict(group.to_numpy()), dtype="float")
    time.sleep(0.5)
    return result

# This utility fn will show us a tqdm progress bar *without* it getting messed up by notebook interrupts
last_result = util.progress.notebook_safe_tqdm_loop(
    iterator,
    predict_batch
)

print("Done!")
print("Last batch result:")
result

## Explore the results

Hopefully our biased traffic has now been running for long enough to collect some good data and trigger a scheduled monitoring job.

We can explore what we've collected through the SageMaker Studio UI, but also through the APIs.

Let's explore what we've collected:

In [None]:
data_capture_uri = f"s3://{project.monitoring_bucket}"
!aws s3 sync $data_capture_uri data/monitoring

In [None]:
my_default_monitor.list_executions()  # Hopefully one or more by now!

In [None]:
latest_monitoring_job = my_default_monitor.list_executions()[-1]
latest_monitoring_job.describe()

### Extension: Visualize the results

Drawing on the public sample code from the official repository below, can you present the violations here in the notebook? This API-driven interaction will be useful practice for using Model Monitor in classic SageMaker Notebook Instances, outside Studio.

https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker_model_monitor/introduction/SageMaker-ModelMonitoring.ipynb

In [None]:
#my_default_monitor.delete_monitoring_schedule()