# SageMaker Model Monitoring BYOC with Batch Records for Near Real-time Inference

This AWS code sample will demonstrate how to use Amazon SageMaker Model Monitoring when sending a “batch” of multiple inference records to an Amazon SageMaker Endpoint. For this Model Monitoring example, we will be focusing on Model Quality Monitoring (MQM). 


This notebook will execute the following steps:

### Steps

1. An AI/ML Developer will train an Amazon SageMaker Model and create Model Quality Baseline artifacts (e.g. F1-Score, Accuracy of model) and save to S3
2. An AI/ML Developer will create an Amazon SageMaker Endpoint with SageMaker Model and enable data capture
3. User/Application will send 1 payload with multiple inference records to an Amazon Endpoint. The user will receive a single payload response showing the output per inference record.  Since there are multiple inference records per request, we would need to parse the metric/statistic for every payload (containing multiple inference records ) for model quality.
4. User/Application will create ground truth data given predictions and store to Amazon S3
5. Create BYOC and push to ECR
6. A SageMaker Model Monitor Job using a BYOC will be created to:
    1. Process the “batch” records to single records from Capture Request/Responses
    2. Merge step (A) with ground truth data submitted by user
    3. Compare evaluation of (A) and (B) with baseline data artifacts from step (1)
    4. Update the calculated metrics in Amazon Cloudwatch and output constraint violations report (if violations are present)
7. Use SageMaker Studio Classic Endpoint console to observe Model Monitoring reports
    

These steps are represented in the architecture diagram below:


![architecture image](images/architecture.png)

__NOTE:__ You will need permissions to the following AWS SageMaker, S3, and other services

* Amazon SageMaker Processing Jobs
* Amazon SageMaker Training Jobs
* Amazon SageMaker Models
* Amazon SageMaker Endpoint
* Amazon SageMaker Model Monitor
* Amazon CodeBuild
* Amazon ECR
* Amazon S3 (default SageMaker Bucket)

We recommend using the SageMaker Full Access IAM role for best user experience. However, if this isn't possible please either the required permissions for the services above or modify the notebook as needed given your AWS configurations. 

### Prerequisite recommendations
* Amazon SageMaker Studio Classic - We recommend executing this notebook in a SageMaker Studio classic environment. To learn more about setting up Amazon SageMaker Studio please read [here](https://docs.aws.amazon.com/sagemaker/latest/dg/onboard-quick-start.html)
* Use an IAM role with the permissions mentioned earlier when setting up your Amazon SageMaker Studio environment. Otherwise, use this role when executing this notebook. 
* In your Amazon SageMaker Studio Classic environment, open a system terminal and use git operations to clone this notebook's [github repository](https://github.com/aws-samples/customized-model-monitoring-for-near-real-time-batch-inference-with-amazon-sagemaker/tree/main)
* When launching this notebook in Amazon SageMaker Studio Classic, please use an instance size of `ml.t3.medium` with a kernel of `Data Science 3.0`


## 1. Import Libraries and Set Environment Variables

Here we import libraries from the Amazon SageMaker Python SDK, where we will use a default Amazon SageMaker Bucket. We will use the public dataset [forest coverage type](https://archive.ics.uci.edu/dataset/31/covertype) which is accessible through the SKlearn API. 

In [None]:
import json
import time
import datetime as dt
from sklearn.datasets import fetch_covtype
from sklearn.model_selection import train_test_split
from sagemaker.analytics import TrainingJobAnalytics
import pandas as pd

In [None]:
import uuid
import sagemaker

sess = sagemaker.session.Session()
account_id = sess.account_id()
role = sagemaker.get_execution_role()
region = sess.boto_region_name
bucket = sagemaker.session.Session().default_bucket() # change bucket name here if needed
prefix_name = "sagemaker-model-monitor-byoc-batch-records-endpoint"
train_data_path = f's3://{bucket}/{prefix_name}/training-data/test/test.csv'
validation_data_path = f's3://{bucket}/{prefix_name}/training-data/validation/validation.csv'
test_data_path = f's3://{bucket}/{prefix_name}/training-data/test/test.csv'

## 2. Create Training Data

Here we will execute an Amazon SageMaker Training Job using the XGBoost Framework

In [None]:
data = fetch_covtype(as_frame=True)

In [None]:
# use base 0 for ordinal target
data.target = data.target.apply(lambda x: x-1)

Split data for training, validation, and testing

In [None]:
X_train, X_test, y_train, y_test = train_test_split(data.data, data.target, test_size=0.10, random_state=1)

X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.20, random_state=1)

Save Data To S3 Channels

In [None]:
## Save Training Dataset
pd.concat([y_train, X_train], axis=1).to_csv(train_data_path, index=False)

## Save Validation path
pd.concat([y_val, X_val], axis=1).to_csv(validation_data_path, index=False)

## Save Test Dataset
pd.concat([y_test, X_test], axis=1).to_csv(test_data_path, index=False)

## Train Model

In [None]:
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.estimator import Estimator

hyperparameters = {
    "max_depth": 5,
    "eta": 0.36,
    "gamma": 2.88,
    "min_child_weight": 9.89,
    "subsample": 0.77,
    "objective": "multi:softprob",
    "num_class": 7,
    "num_round": 50
}

xgb_estimator = XGBoost(
    entry_point="./src/train.py",
    hyperparameters=hyperparameters,
    role=role,
    instance_count=1,
    instance_type="ml.m5.2xlarge",
    framework_version="1.5-1",
    output_path=f's3://{bucket}/{prefix_name}/models'
)

In [None]:
xgb_estimator.fit(
    {
        "train": train_data_path,
        "validation": validation_data_path
    },
    wait=True,
    logs=True
)

## 3. Create Statistics and Constraints files

Here we wil create metadata JSON files based on model evaluation metrics, that will be used in the SageMaker Model Quality Monitoring Job. 

__Review *validation:accuracy* metrics from recent Training Job__

In [None]:
analytics = TrainingJobAnalytics(
    training_job_name=xgb_estimator.jobs[-1].name,
    metric_names=[
        "validation:accuracy",
        "train:mlogloss",
        "validation:mlogloss"
    ]

)

In [None]:
metrics_dataframe = analytics.dataframe().drop_duplicates(["metric_name"],keep="last")
print(f"Accuracy Metric from Training Job:\n\n{metrics_dataframe}")
accuracy_value = metrics_dataframe[metrics_dataframe["metric_name"]=="validation:accuracy"]["value"].iloc[0]

### Create Statistics File

In [None]:
statistics_dict = {
    "metrics":{
        row["metric_name"]:row["value"] for index, row in metrics_dataframe.iterrows()
    }
}
    

# Serializing json
json_object = json.dumps(statistics_dict, indent=4)
 
# Writing to sample.json
with open("statistics.json", "w") as outfile:
    outfile.write(json_object)

Upload to S3

In [None]:
! aws s3 cp statistics.json s3://{bucket}/{prefix_name}/model-monitor/mqm/baseline-data/

### Create Constraints File

In [None]:
constraints_dict = {
    "accuracy":{
        "threshold": accuracy_value
    }
}
    

# Serializing json
json_object = json.dumps(constraints_dict, indent=4)
 
# Writing to sample.json
with open("constraints.json", "w") as outfile:
    outfile.write(json_object)

Upload to S3

In [None]:
! aws s3 cp constraints.json s3://{bucket}/{prefix_name}/model-monitor/mqm/baseline-data/

Remove local Files

In [None]:
! rm constraints.json statistics.json

## Create Endpoint

In [None]:
from sagemaker.model_monitor import DataCaptureConfig

In [None]:
predictor = xgb_estimator.deploy(
    instance_type="ml.m5.large",
    initial_instance_count=1,
    wait=True,
    data_capture_config=DataCaptureConfig(
        enable_capture=True,
        sampling_percentage=100,
        destination_s3_uri=f"s3://{bucket}/{prefix_name}/model-monitor/data-capture"
    ),
    source_dir="./src",
    entry_point="inference.py"
)

## 4. Invoke SageMaker Endpoint with Muliple Payloads and Collect Ground Truth

In this section, we create ground truth values for each inference invocation. 

__Note__: Due to the batch inference payload customization, we create specific indexes such as `InferenceId` and `payload_index`, which will be used later when joining with the SageMaker Endpoint data capture within the SageMaker Model Monitoring Job. 

__Create Ground Truth for 2 inference records__

Since we are issuing a payload with multiple inference records, we group them by an `InferenceId`, which we will reference when invoking the Amazon SageMaker Endpoint.

For simplicity, we use the test dataset which provides records for inference along with labels for ground truth examples. 

In [None]:
import boto3

In [None]:
inference_id = 0
ground_truth_df_ls = []

In [None]:
test_df = pd.concat([y_test, X_test], axis=1)
number_of_records = 2
test_records = json.dumps(test_df.iloc[0:number_of_records, 1:].to_dict(orient="records"))
ground_truth_df = pd.DataFrame(
    {
        "InferenceId": [str(inference_id)] * number_of_records,
        "payload_index": list(range(number_of_records)),
        "groundTruthLabel": test_df.iloc[0:number_of_records, 0]
    }
)
ground_truth_df_ls.append(ground_truth_df)

In [None]:
ground_truth_df

__Invoking the Amazon SageMaker Endpoint__

Here we pass our payload (which includes 2 inference records) to the Amazon SageMaker Endpoint. Please note the use of `InferenceId` which relates to the Amazon SageMaker Ground Truth record(s) we created earlier. 

In [None]:
sm_runtime = boto3.client("sagemaker-runtime")

In [None]:
response = sm_runtime.invoke_endpoint(
    EndpointName=predictor.endpoint_name,
    ContentType="application/json",
    Accept="application/json",
    Body=test_records,
    InferenceId="0"
)

response_data = json.loads(response["Body"].read().decode())
print("Multi-record response:\n")
for record in response_data:
    print(record)

__Execute 100 invocations with multiple Payloads__

Here we will invoke the SageMaker Endpoint with payloads containing multiple (i.e. batch) of inference records. Along with this, we create Ground Truth labels for each inference record. 

In [None]:
number_of_records = 5
df_index = 0
i = 0
while i < 100:
    inference_id = i + 1
    test_records = json.dumps(test_df.iloc[df_index:df_index+number_of_records, 1:].to_dict(orient="records"))
    ground_truth_df = pd.DataFrame(
        {
            "InferenceId": [str(inference_id)] * number_of_records,
            "payload_index": list(range(number_of_records)),
            "groundTruthLabel": test_df.iloc[df_index:df_index+number_of_records, 0]
        }
    )
    ground_truth_df_ls.append(ground_truth_df)
    response = sm_runtime.invoke_endpoint(
        EndpointName=predictor.endpoint_name,
        ContentType="application/json",
        Accept="application/json",
        Body=test_records,
        InferenceId=str(inference_id)
    )
    df_index += number_of_records
    i += 1
    

__Store Ground Truth in S3__

In [None]:
gt_dataframe = pd.concat(ground_truth_df_ls, axis=0, ignore_index=True).reset_index(drop=True)

In [None]:
# save ground truth data by partitioned prefix by timestamp
timestamp = dt.datetime.now(dt.timezone.utc)
timestamp_prefix_parts = [timestamp.year, timestamp.month, timestamp.day, timestamp.hour]
data_partition_prefix = "/".join([str(ts) for ts in timestamp_prefix_parts])

gt_dataframe.to_json(
    f"s3://{bucket}/{prefix_name}/model-monitor/mqm/ground_truth/{predictor.endpoint_name}/{data_partition_prefix}/ground_truth.json",
    orient="records"
)

## 5. Build BYOC Docker Image

Here we use [Amazon SageMaker Studio Image Build CLI](https://aws.amazon.com/blogs/machine-learning/using-the-amazon-sagemaker-studio-image-build-cli-to-build-container-images-from-your-studio-notebooks/) to create a custom docker image that is created from the base [Amazon SageMaker SKLearn image](https://github.com/aws/sagemaker-scikit-learn-container). 

We will execute a shell script which will install the SageMaker Studio Image Build CLI, build the image, and push to ECR with the repo name:tag of sm-mm-mqm-byoc:1.0. We run this script from the root directory of this repository. To read script, please see `./scripts/build_push_ecr_image.sh` for more details. 

In [None]:
from sagemaker.image_uris import retrieve

__Create Docker file based on image uri__

In [None]:
%%writefile ./docker/Dockerfile
# we use the SageMaker pre-built SKLearn image as the base image
# for this example, we use the us-east-1 region. If you have a different region
# please update the image uri below accordingly

In [None]:
image_uri = retrieve(framework="sklearn", region=region,py_version="py3", version="1.2-1")
image_uri = f"FROM {image_uri}"
%store image_uri >>./docker/Dockerfile

In [None]:
%%writefile  -a ./docker/Dockerfile

RUN python3 -m pip install awswrangler

ENV PYTHONUNBUFFERED=TRUE

ADD ./src/model_quality_monitoring.py /

ENTRYPOINT ["python3", "/model_quality_monitoring.py"]

__Run shell script to build and push Docker image to ECR__

In [None]:
! bash ./scripts/build_push_ecr_image.sh ./

## 6. Create Amazon SageMaker Model Monitoring Schedule

Here we use the Python SageMaker SDK to setup a Amazon SageMaker Model Monitoring Job

We will schedule an hourly SageMaker Model Monitoring job using our custom ECR image (i.e. BYOC) for our MQM use case. 

__Notes__: 
 * You can supply environment variables and entrypoint script arguments to the `ModelMonitor` class and method `create_monitoring_schedule` for further cusotmization. In this example: 
   * we provide the path `ground_truth_s3_uri_path` as an env variable which we use within the Amazon SageMaker Model Monitoring Job code. 
   * we provide the `--create-violation-tests` argument which we use within the Amazon SageMaker Model Monitoring Job code to create a violation. This will issue a Cloud Watch Alert for this demonstration. 
 * After creating monitoring job, the `MonitoringType` will have a value of `DataQuality`. This is a default value for this attribute when using a BYOC with SageMaker Model Monitoring, and shound not be confused with Data Quality Monitoring. 
 * We set the SageMaker Model Monitoring schedule to be executed on an hourly basis. For testing purposes, please ensure that both data capture and ground truth are available in S3. You can always re-execute the previous cells in this notebook, to create more dummy data. 

In [None]:
from sagemaker.model_monitor.model_monitoring import ModelMonitor, MonitoringOutput
from sagemaker.model_monitor.cron_expression_generator import CronExpressionGenerator

In [None]:
sm_mm_mqm = ModelMonitor(
    role=role, 
    image_uri=f"{account_id}.dkr.ecr.us-east-1.amazonaws.com/sm-mm-mqm-byoc:1.0", 
    instance_count=1, 
    instance_type='ml.m5.xlarge', 
    base_job_name="sm-mm-mqm-byoc",
    sagemaker_session=sess,
    env={
        "ground_truth_s3_uri_path": f"s3://{bucket}/{prefix_name}/model-monitor/mqm/ground_truth/{predictor.endpoint_name}"
    }
)

In [None]:
sm_mm_mqm.create_monitoring_schedule(
    endpoint_input=predictor.endpoint_name,
    output=MonitoringOutput(
        source="/opt/ml/processing/output",
        destination=f"s3://{bucket}/{prefix_name}/model-monitor/mqm/reports"
    ),
    statistics=f"s3://{bucket}/{prefix_name}/model-monitor/mqm/baseline-data/statistics.json",
    constraints=f"s3://{bucket}/{prefix_name}/model-monitor/mqm/baseline-data/constraints.json",
    monitor_schedule_name="sm-mm-byoc-batch-inf-schedule",
    schedule_cron_expression=CronExpressionGenerator().hourly(),
    arguments=[
        "--create-violation-tests"
    ]
)

__Verify model monitoring schedule is created__

In [None]:
! aws sagemaker describe-monitoring-schedule --monitoring-schedule-name {sm_mm_mqm.monitoring_schedule_name}

## 7. Observe SageMaker Model Monitor Output

Now that the Amazon SageMaker Model Monitor resource is created, the Amazon SageMaker Endpoint was invoked in step 4, we can now observe our SageMaker Model Monitoring results. 

Since we set the CRON schedule for the SageMaker Model Monitoring Job to be set on an hourly schedule, we’ll view the results at the end of the hour. In SageMaker Studio Classic, by navigating to the SageMaker Endpoint console, you can select the “Monitoring job history” panel to view status reports of the SageMaker Monitoring Job as shown in the screen shot below


![dashboard image](images/model-monitoring-dashboard.png)

Given an issue is found, you select the monitoring job name to review the report. 
Here the custom model monitoring metric created in the BYOC flagged an accuracy score violation of -1 (this was done purposely for demonstration with the argument `--create-violation-tests` in step 6). 

![report image](images/monitoring-job-report.png)

This gives you the ability to monitor model quality violations for your custom SageMaker Model Quality Monitoring Job all within the Amazon SageMaker Studio console. If you desire to trigger Amazon CloudWatch Alarms given published CloudWatch Metrics, you must create these CloudWatch Metrics with your BYOC job.

For automated alerts for Model Monitoring, creating an Amazon SNS topic is recommended, which email user groups will subscribe to for awareness given a CloudWatch Metric Alarm

## 8. Clean Up

Here we remove data capture, delete model monitoring schedule, and delete the Amazon SageMaker Endpoint for cost minimization. 

Delete Captured Data

In [None]:
! aws s3 rm s3://{bucket}/{prefix_name}/model-monitor/data-capture/{predictor.endpoint_name} --recursive
! aws s3 rm s3://{bucket}/{prefix_name}/model-monitor/mqm/ground_truth/{predictor.endpoint_name} --recursive

Delete Monitoring Schedule

In [None]:
sm_mm_mqm.delete_monitoring_schedule()
# wait for Model Monitoring Schedule to be deleted before deleting endpoint
time.sleep(10)

Delete Endpoint

In [None]:
predictor.delete_model()
predictor.delete_endpoint()