# Safe MLOps Deployment Pipeline

This notebook steps through an MLOps pipeline to build, train, deploy and monitor model.

In [2]:
# Import the latest sagemaker and boto3 SDKs.
import sys

!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install -qU awscli boto3 "sagemaker>=2.1.0<3" tqdm
!{sys.executable} -m pip install -qU "stepfunctions==2.0.0"
!{sys.executable} -m pip show sagemaker stepfunctions

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Collecting pip
  Downloading pip-22.0.4-py3-none-any.whl (2.1 MB)
     |████████████████████████████████| 2.1 MB 18.2 MB/s            
[?25hInstalling collected packages: pip
  Attempting uninstall: pip
    Found existing installation: pip 21.3.1
    Uninstalling pip-21.3.1:
      Successfully uninstalled pip-21.3.1
Successfully installed pip-22.0.4
  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
pytest-astropy 0.8.0 requires pytest-cov>=2.0, which is not installed.
pytest-astropy 0.8.0 requires pytest-filter-subpackage>=0.1, which is not installed.
aiobotocore 2.0.1 requires botocore<1.22.9,>=1.22.8, but you have botocore 1.24.32 which is incompatible.[0m[31m
  fr

Restart your SageMaker kernel then continue with this notebook.

In [29]:
# Replace `None` with the project name when creating SageMaker Project
# You can find it from the left panel in Studio

PROJECT_NAME = "sd-t5-02"
PREFIX = "smsd"

assert PROJECT_NAME is not None and isinstance(
    PROJECT_NAME, str
), "Please specify the project name as string"

In [3]:
import boto3
from IPython.core.display import HTML, display


def get_provisioned_product_name(project_name):
    region = boto3.Session().region_name
    sc = boto3.client("servicecatalog")
    products = sc.search_provisioned_products(
        Filters={
            "SearchQuery": [
                project_name,
            ]
        }
    )
    pp = products["ProvisionedProducts"]
    if len(pp) != 1:
        print("Invalid provisioned product name. Open the link below and search manually")
        display(
            HTML(
                f'<a target="_blank" href="https://{region}.console.aws.amazon.com/servicecatalog/home?region={region}#provisioned-products">Service Catalog</a>'
            )
        )
        raise ValueError("Invalid provisioned product")

    return pp[0]["Name"]


PROVISIONED_PRODUCT_NAME = get_provisioned_product_name(PROJECT_NAME)
print(
    f"The associated Service Catalog Provisioned Product Name to this SagaMaker project: {PROVISIONED_PRODUCT_NAME}"
)

The associated Service Catalog Provisioned Product Name to this SagaMaker project: sd-t5-02-p-8up2y4ihhnzd


## Data Prep

In [77]:
# data_location_uri = "s3://t5-engine-bucket/training_data/dummy"
data_location_uri = "s3://t5-engine-bucket/training_data/full"

## Build

If you navigate to the CodePipeline instance, you will notice that the Source stage is initially in a `Failed` state. This happens because the dataset, which is one of the sources that can trigger the pipeline, has not yet been uploaded to the S3 location expected by the pipeline.

### Trigger Build

Start a model build and deployment pipeline by packaging up the datasets you prepared in the previous section and uploading these to the S3 source location which triggers the CodePipeline instance created.

In [78]:
import boto3
from botocore.exceptions import ClientError
import os
import time
from sagemaker import get_execution_role


def get_config(provisioned_product_name):
    sc = boto3.client("servicecatalog")
    outputs = sc.get_provisioned_product_outputs(ProvisionedProductName=provisioned_product_name)[
        "Outputs"
    ]
    config = {}
    for out in outputs:
        config[out["OutputKey"]] = out["OutputValue"]
    return config


config = get_config(PROVISIONED_PRODUCT_NAME)
region = config["Region"]
artifact_bucket = config["ArtifactBucket"]
pipeline_name = config["PipelineName"]
model_name = config["ModelName"]
workflow_pipeline_arn = config["WorkflowPipelineARN"]
role = get_execution_role()
account_id = role.split(":")[4]

print("region: {}".format(region))
print("artifact bucket: {}".format(artifact_bucket))
print("pipeline: {}".format(pipeline_name))
print("model name: {}".format(model_name))
print("workflow: {}".format(workflow_pipeline_arn))
print("role: {}".format(role))
print("account id: {}".format(account_id))

region: ap-southeast-1
artifact bucket: smsd-sd-t5-p-8up2y4ihhnzd
pipeline: smsd-sd-t5-p-8up2y4ihhnzd
model name: sd-t5
workflow: arn:aws:states:ap-southeast-1:852039983533:stateMachine:smsd-sd-t5
role: arn:aws:iam::852039983533:role/service-role/AmazonSageMaker-ExecutionRole-20211022T094935
account id: 852039983533


From the AWS CodePipeline [documentation](https://docs.aws.amazon.com/codepipeline/latest/userguide/tutorials-simple-s3.html):

> When Amazon S3 is the source provider for your pipeline, you may zip your source file or files into a single .zip and upload the .zip to your source bucket. You may also upload a single unzipped file; however, downstream actions that expect a .zip file will fail.

In [79]:
from io import BytesIO
import zipfile
import json

input_data = {
    "TrainingUri": f"{data_location_uri}/train",
    "TestUri": f"{data_location_uri}/test",
    "BaselineUri": f"{data_location_uri}/train/train.csv",
}
hyperparameters = {"random_seed": 17}

ecr_image_name = "sagemaker-t5"
image_detail = {
    "ImageUri": f"{account_id}.dkr.ecr.{region}.amazonaws.com/{ecr_image_name}:latest",
}

zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, "a") as zf:
    zf.writestr("inputData.json", json.dumps(input_data))
    zf.writestr("hyperparameters.json", json.dumps(hyperparameters))
    zf.writestr("imageDetail.json", json.dumps(image_detail))
zip_buffer.seek(0)

data_source_key = "{}/data-source.zip".format(pipeline_name)

Now upload the zip package to your artifact S3 bucket - this action will trigger the pipeline to train and deploy a model.

In [80]:
s3 = boto3.client("s3")
s3.put_object(Bucket=artifact_bucket, Key=data_source_key, Body=bytearray(zip_buffer.read()))

{'ResponseMetadata': {'RequestId': 'V5HSDY1H945WZBW6',
  'HostId': 'Uoy0K3OiwXf74qEP3vnGt4LTkB14RB1ysR7myDFQKuskyr+G4PEwfa7YdCekdidHD7Dzl9WjtCc=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'Uoy0K3OiwXf74qEP3vnGt4LTkB14RB1ysR7myDFQKuskyr+G4PEwfa7YdCekdidHD7Dzl9WjtCc=',
   'x-amz-request-id': 'V5HSDY1H945WZBW6',
   'date': 'Mon, 04 Apr 2022 19:00:10 GMT',
   'x-amz-version-id': 'A5OaJ_mga1xEi1PUWeKTjI0ikbKncZsc',
   'etag': '"229a3d1b889d5c73187d9c58328be8db"',
   'server': 'AmazonS3',
   'content-length': '0'},
  'RetryAttempts': 0},
 'ETag': '"229a3d1b889d5c73187d9c58328be8db"',
 'VersionId': 'A5OaJ_mga1xEi1PUWeKTjI0ikbKncZsc'}

Open CodePipeline and check result

## Train Model

### Inspect Training Job

Wait until the pipeline has started running the Train step before continuing with the next cells

In [24]:
from stepfunctions.workflow import Workflow

while True:
    try:
        workflow = Workflow.attach(workflow_pipeline_arn)
        break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

workflow

### Training Analytics

Once the training and baseline jobs are complete (around 5 minutes), we can inspect the experiment metrics. The code below will display all experiments in a table.

In [30]:
from sagemaker import analytics
import pandas as pd

experiment_name = "{}-{}".format(PREFIX, model_name)
model_analytics = analytics.ExperimentAnalytics(experiment_name=experiment_name)
analytics_df = model_analytics.dataframe()

if analytics_df.shape[0] == 0:
    raise (Exception("Please wait.  No training or baseline jobs"))

pd.set_option("display.max_colwidth", 100)  # Increase column width to show full copmontent name
cols = [
    "TrialComponentName",
    "DisplayName",
    "SageMaker.InstanceType",
    "train:rmse - Last",
    "validation:rmse - Last",
]  # return the last rmse for training and validation
analytics_df[analytics_df.columns & cols].head(2)

Unnamed: 0,TrialComponentName,DisplayName,SageMaker.InstanceType
0,smsd-sd-t5-evl-997750c6-2400-4429-a472-47edb6672e53-aws-processing-job,Evaluation,ml.m5.2xlarge
1,smsd-sd-t5-pbl-997750c6-2400-4429-a472-47edb6672e53-aws-processing-job,Baseline,ml.m5.2xlarge


## Deploy Dev

### Test Dev Deployment

When the pipeline has finished training a model, it automatically moves to the next step, where the model is deployed as a SageMaker Endpoint. This endpoint is part of your dev deployment, therefore, in this section, we run some tests on the endpoint to decide if you want to deploy this model into production.

In [34]:
codepipeline = boto3.client("codepipeline")

def get_pipeline_stage(pipeline_name, stage_name):
    response = codepipeline.get_pipeline_state(name=pipeline_name)
    for stage in response["stageStates"]:
        if stage["stageName"] == stage_name:
            return stage

deploy_dev = get_pipeline_stage(pipeline_name, "DeployDev")
if not "latestExecution" in deploy_dev:
    raise (Exception("Please wait.  Deploy dev not started"))

execution_id = deploy_dev["latestExecution"]["pipelineExecutionId"]
dev_endpoint_name = "{}-{}-dev-{}".format(PREFIX, model_name, execution_id)

print("endpoint name: {}".format(dev_endpoint_name))

endpoint name: smsd-sd-t5-dev-997750c6-2400-4429-a472-47edb6672e53


It can take up to 10 minutes for SageMaker to create an endpoint.

Run the code below to check the status of your endpoint. Wait until the status of the endpoint is 'InService'.

In [35]:
sm = boto3.client("sagemaker")

while True:
    try:
        response = sm.describe_endpoint(EndpointName=dev_endpoint_name)
        print("Endpoint status: {}".format(response["EndpointStatus"]))
        if response["EndpointStatus"] == "InService":
            break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

Endpoint status: InService


Endpoint is ready, run some tests

In [36]:
import numpy as np
from tqdm import tqdm

from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer


def get_predictor(endpoint_name):
    predictor = Predictor(endpoint_name)
    predictor.serializer = CSVSerializer()
    return predictor


def predict(predictor, data, rows=500):
    split_array = np.array_split(data, round(data.shape[0] / float(rows)))
    predictions = ""
    for array in tqdm(split_array):
        predictions = ",".join([predictions, predictor.predict(array).decode("utf-8")])
    return np.fromstring(predictions[1:], sep=",")

In [48]:
import pandas as pd
s3 = boto3.client('s3')
obj = s3.get_object(Bucket="t5-engine-bucket", Key=f'training_data/dummy/test/test.csv')
test_df = pd.read_csv(obj['Body']) # 'Body' is a key word
test_df.head()

Unnamed: 0,target,0,1,2,3,4,5,6,7,8,...,141,142,143,144,145,146,147,148,149,150
0,0,-0.184958,-0.084,-0.354,0.355,-0.304,-0.058,0.88,11.4674,-0.049,...,1.058,471.1,-0.063,0.059045,11.109,-1.194949,-0.19373,-0.12037,0.064384,17.097
1,0,0.662799,-0.243,-0.287,-0.741,2.56,-0.078,-0.263,9.45962,3.606,...,1.732,707.7,-0.035,0.050669,10.006,-1.01515,-0.1796,-0.10987,0.047429,17.3695
2,0,0.156325,-0.132,-0.235,0.752,-0.324,-0.438,-0.351,10.0104,-0.044,...,0.715,675.4,-0.012,0.041223,-21.501,1.274784,-0.1828,-0.11017,0.04313,17.1175
3,0,-0.661623,-0.178,-0.149,-0.923,-0.266,-0.184,2.681,9.45939,-0.059,...,0.406,580.7,-0.041,0.045162,-1.673,-1.193886,-0.1817,-0.12229,0.049549,17.5938
4,0,1.89651,-0.085,-0.359,0.764,-0.311,-0.078,-0.299,10.539,-0.051,...,-1.051,668.7,-0.033,0.055048,10.125,-1.205284,-0.19094,-0.11897,0.052379,17.5324


In [51]:
dev_predictor = get_predictor(dev_endpoint_name)
predictions = predict(dev_predictor, test_df[test_df.columns[1:]].values, rows=1)
predictions

100%|██████████| 300/300 [02:32<00:00,  1.97it/s]


array([0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0.,
       0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 4., 1., 0.,
       0., 0., 0., 0., 0., 0., 0., 0., 4., 3., 3., 3., 3., 1., 2., 1., 2.,
       1., 1., 1., 1., 1., 1., 2., 2., 3., 3., 3., 3., 1., 1., 2., 2., 2.,
       1., 2., 2., 1., 4., 4., 1., 4., 2., 4., 2., 4., 2., 3., 3., 3., 1.,
       2., 2., 1., 1., 2., 2., 1., 1., 2., 2., 3., 0., 2., 1., 2., 1., 0.,
       2., 4., 1., 1., 2., 1., 4., 2., 4., 3., 4., 1., 4., 2., 1., 2., 4.,
       2., 4., 1., 2., 2., 2., 1., 3., 3., 4., 1., 2., 1., 2., 2., 4., 1.,
       1., 1., 1., 1., 1., 1., 4., 1., 1., 4., 2., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 2., 1., 1., 1.,
       1., 1., 2., 1., 1., 3., 3., 3., 3., 4., 2., 4., 1., 4., 2., 2., 3.,
       3., 3., 3., 2., 4., 4., 3., 3., 3., 3., 1., 1., 1., 1., 1., 2., 2.,
       2., 1., 2., 1., 4.

Load the predictions into a data frame, and join it with test data. Then, calculate absolute error. Display the results in a table, sorted by the highest absolute error values.

In [52]:
pred_df = pd.DataFrame({"predictions": predictions})
pred_df = test_df.join(pred_df)  # Join on all
pred_df["error"] = abs(pred_df["target"] - pred_df["predictions"])

pred_df.sort_values("error", ascending=False).head()

Unnamed: 0,target,0,1,2,3,4,5,6,7,8,...,143,144,145,146,147,148,149,150,predictions,error
48,0,-1.01375,-0.285,-0.298,-0.776,-0.263,-0.118,-0.314,8.86586,-0.059,...,-0.023,0.04026,8.657,-0.511029,-0.17633,-0.11704,0.040832,17.5005,4.0,4.0
113,3,0.636412,-0.102,-0.391,0.054,-0.299,-0.163,-0.276,10.1746,-0.045,...,-0.007,0.049283,-19.927,-1.260539,-0.18735,-0.12018,0.043947,17.2123,0.0,3.0
244,4,0.463505,-0.088,-0.257,-0.738,-0.246,-0.163,-0.341,9.97385,-0.045,...,-0.01,0.053151,-6.895,-1.003716,-0.18502,-0.12348,0.046661,17.7874,1.0,3.0
137,1,13.885123,-0.157,-0.267,1.33,-0.263,-0.277,1.383,11.1761,-0.051,...,-0.024,0.051215,-34.035,0.950195,-0.2245,-0.11349,0.047213,17.2637,4.0,3.0
92,1,44.477735,-0.113,-0.222,0.518,-0.354,-0.107,-0.333,9.31445,-0.066,...,-0.027,0.04993,-44.264,-1.198845,-0.17796,-0.11234,0.048404,17.5352,4.0,3.0


## Deploy Prod

### Approve Deployment to Production

Go to Codepipeline, Review and Approve.

### Test Production Deployment

In [54]:
deploy_prd = get_pipeline_stage(pipeline_name, "DeployPrd")
if not "latestExecution" in deploy_prd or not "latestExecution" in deploy_prd["actionStates"][0]:
    raise (Exception("Please wait.  Deploy prd not started"))

execution_id = deploy_prd["latestExecution"]["pipelineExecutionId"]

In [55]:
# Check deployment resources
from datetime import datetime, timedelta
from dateutil.tz import tzlocal


def get_event_dataframe(events):
    stack_cols = [
        "LogicalResourceId",
        "ResourceStatus",
        "ResourceStatusReason",
        "Timestamp",
    ]
    stack_event_df = pd.DataFrame(events)[stack_cols].fillna("")
    stack_event_df["TimeAgo"] = datetime.now(tzlocal()) - stack_event_df["Timestamp"]
    return stack_event_df.drop("Timestamp", axis=1)


cfn = boto3.client("cloudformation")

stack_name = stack_name = "{}-deploy-prd".format(pipeline_name)
print("stack name: {}".format(stack_name))

# Get latest stack events
while True:
    try:
        response = cfn.describe_stack_events(StackName=stack_name)
        break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

get_event_dataframe(response["StackEvents"]).head()

stack name: smsd-sd-t5-p-8up2y4ihhnzd-deploy-prd


Unnamed: 0,LogicalResourceId,ResourceStatus,ResourceStatusReason,TimeAgo
0,smsd-sd-t5-p-8up2y4ihhnzd-deploy-prd,CREATE_COMPLETE,,00:21:49.891453
1,ApiFunctionInvokePermissionProd,CREATE_COMPLETE,,00:21:51.418453
2,ServerlessRestApiProdStage,CREATE_COMPLETE,,00:21:54.162453
3,ServerlessRestApiProdStage,CREATE_IN_PROGRESS,Resource creation Initiated,00:21:54.570453
4,ServerlessRestApiProdStage,CREATE_IN_PROGRESS,,00:21:57.075453


The resource of most interest to us is the endpoint. This takes on average 10 minutes to deploy.

Fetch the name of the endpoint, then run a loop to wait for the endpoint to be fully deployed. We need the status to be 'InService'.

In [56]:
prd_endpoint_name = "{}-{}-prd-{}".format(PREFIX, model_name, execution_id)
print("prod endpoint: {}".format(prd_endpoint_name))

prod endpoint: smsd-sd-t5-prd-997750c6-2400-4429-a472-47edb6672e53


In [57]:
sm = boto3.client("sagemaker")

while True:
    try:
        response = sm.describe_endpoint(EndpointName=prd_endpoint_name)
        print("Endpoint status: {}".format(response["EndpointStatus"]))
        # Wait until the endpoint is in service with data capture enabled
        if (
            response["EndpointStatus"] == "InService"
            and "DataCaptureConfig" in response
            and response["DataCaptureConfig"]["EnableCapture"]
        ):
            break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

Endpoint status: InService


Check if you can send data to the endpoint and receive predictions in return.

In [58]:
prd_predictor = get_predictor(prd_endpoint_name)
sample_values = test_df[test_df.columns[1:]].sample(10).values
predictions = predict(prd_predictor, sample_values, rows=1)
predictions

100%|██████████| 10/10 [00:05<00:00,  1.79it/s]


array([2., 1., 1., 2., 3., 0., 0., 2., 1., 3.])

### Test REST API (API Gateway)

In [59]:
# Open Lambda deployement app
HTML(
    '<a target="_blank" href="https://{0}.console.aws.amazon.com/lambda/home?region={0}#/applications/{1}-deploy-prd?tab=deploy">Lambda Deployment</a>'.format(
        region, model_name
    )
)

Run the code below to confirm that the endpoint is in service.  It will complete once the REST API is available.

In [60]:
def get_stack_status(stack_name):
    response = cfn.describe_stacks(StackName=stack_name)
    if response["Stacks"]:
        stack = response["Stacks"][0]
        outputs = None
        if "Outputs" in stack:
            outputs = dict([(o["OutputKey"], o["OutputValue"]) for o in stack["Outputs"]])
        return stack["StackStatus"], outputs


outputs = None
while True:
    try:
        status, outputs = get_stack_status(stack_name)
        response = sm.describe_endpoint(EndpointName=prd_endpoint_name)
        print("Endpoint status: {}".format(response["EndpointStatus"]))
        if outputs:
            break
        elif status.endswith("FAILED"):
            raise (Exception("Stack status: {}".format(status)))
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

if outputs:
    print("deployment application: {}".format(outputs["DeploymentApplication"]))
    print("rest api: {}".format(outputs["RestApi"]))

Endpoint status: InService
deployment application: smsd-sd-t5-p-8up2y4ihhnzd-deploy-prd-ServerlessDeploymentApplication-VSBUUAL5TYQT
rest api: https://e4c3gvi60d.execute-api.ap-southeast-1.amazonaws.com/Prod/api/


If we are performing an update on your production deployment, we will then be able to expand the Lambda Deployment tab to reveal the resources. Click on the **ApiFunctionAliaslive** link to see the Lambda Deployment in progress. 

In [61]:
HTML(
    '<a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codedeploy/applications/{1}?region={0}">CodeDeploy application</a>'.format(
        region, outputs["DeploymentApplication"]
    )
)

CodeDeploy will perform a canary deployment and send 10% of the traffic to the new endpoint over a 5-minute period.

We can invoke the REST API and inspect the headers being returned to see which endpoint we are hitting.  You will occasionally see the cell below show a different endpoint that settles to the new version once the stack is complete.  

In [62]:
%%time

from urllib import request

headers = {"Content-type": "text/csv"}
payload = test_df[test_df.columns[1:]].head(1).to_csv(header=False, index=False).encode("utf-8")
rest_api = outputs["RestApi"]

while True:
    try:
        resp = request.urlopen(request.Request(rest_api, data=payload, headers=headers))
        print(
            "Response code: %d: endpoint: %s"
            % (resp.getcode(), resp.getheader("x-sagemaker-endpoint"))
        )
        status, outputs = get_stack_status(stack_name)
        if status.endswith("COMPLETE"):
            print("Deployment complete\n")
            break
        elif status.endswith("FAILED"):
            raise (Exception("Stack status: {}".format(status)))
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

Response code: 200: endpoint: smsd-sd-t5-prd-997750c6-2400-4429-a472-47edb6672e53
Deployment complete

CPU times: user 23 ms, sys: 6.15 ms, total: 29.2 ms
Wall time: 1.27 s


## Monitor

### Inspect Model Monitor

When we prepared the datasets for model training, we saved a baseline dataset (a copy of the train dataset). Then, when we approved the model for deployment into production, the pipeline set up an SageMaker Endpoint with data capture enabled and a model monitoring schedule.

Fetch the latest production deployment execution ID.

In [63]:
deploy_prd = get_pipeline_stage(pipeline_name, "DeployPrd")
if not "latestExecution" in deploy_prd:
    raise (Exception("Please wait.  Deploy prod not complete"))

execution_id = deploy_prd["latestExecution"]["pipelineExecutionId"]

Under the hood, SageMaker model monitor runs in SageMaker processing jobs. Use the execution ID to fetch the names of the processing job and the schedule.

In [64]:
processing_job_name = "{}-{}-pbl-{}".format(PREFIX, model_name, execution_id)
schedule_name = "{}-{}-pms".format(PREFIX, model_name)

print("processing job name: {}".format(processing_job_name))
print("schedule name: {}".format(schedule_name))

processing job name: smsd-sd-t5-pbl-997750c6-2400-4429-a472-47edb6672e53
schedule name: smsd-sd-t5-pms


### Explore Baseline

Now fetch the baseline results from the processing job. This cell will throw an exception if the processing job is not complete - if that happens, just wait several minutes and try again. <a id="view-baseline-results"></a>

In [65]:
import sagemaker
from sagemaker.model_monitor import BaseliningJob, MonitoringExecution
from sagemaker.s3 import S3Downloader

sagemaker_session = sagemaker.Session()
baseline_job = BaseliningJob.from_processing_name(sagemaker_session, processing_job_name)
status = baseline_job.describe()["ProcessingJobStatus"]
if status != "Completed":
    raise (Exception("Please wait. Processing job not complete, status: {}".format(status)))

baseline_results_uri = baseline_job.outputs[0].destination

SageMaker model monitor generates two types of files. Take a look at the statistics file first. It calculates various statistics for each feature of the dataset, including the mean, standard deviation, minimum value, maximum value, and more. 

In [66]:
import pandas as pd
import json

baseline_statistics = baseline_job.baseline_statistics().body_dict
schema_df = pd.json_normalize(baseline_statistics["features"])
schema_df[
    [
        "name",
        "numerical_statistics.mean",
        "numerical_statistics.std_dev",
        "numerical_statistics.min",
        "numerical_statistics.max",
    ]
].head()

Unnamed: 0,name,numerical_statistics.mean,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max
0,0,2.416416,6.766966,-5.48092,47.147304
1,1,0.148897,1.147891,-0.357,8.927
2,2,0.12842,0.80498,-0.647,3.515
3,3,-0.02839,0.997213,-2.641,2.577
4,4,0.385867,1.080775,-0.379,6.666


Now look at the suggested [constraints files](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html). These are constraints which SageMaker model monitor recommends. If the live data which is sent to the production SageMaker Endpoint violates these constraints, this indicates data drift, and model monitor can raise an alert to trigger retraining. Of course, we can set different constraints based on the statistics which we viewed previously.

In [67]:
baseline_constraints = baseline_job.suggested_constraints().body_dict
constraints_df = pd.json_normalize(baseline_constraints["features"])
constraints_df.head()

Unnamed: 0,name,inferred_type,completeness,num_constraints.is_non_negative
0,0,Fractional,1.0,False
1,1,Fractional,1.0,False
2,2,Fractional,1.0,False
3,3,Fractional,1.0,False
4,4,Fractional,1.0,False


### View data capture

When the "Deploy Production" stage of the MLOps pipeline deploys a SageMaker endpoint, it also enables data capture. This means the incoming requests to the endpoint, as well as the results from the ML model, are stored in an S3 location. Model monitor can analyze this data and compare it to the baseline to ensure that no constraints are violated. 

Check how many files have been created by the data capture, and view the latest file in detail. Note, data capture relies on data being sent to the production endpoint. If you don't see any files yet, wait several minutes and try again.

In [82]:
bucket = sagemaker_session.default_bucket()
data_capture_logs_uri = "s3://{}/{}-{}/datacapture/{}".format(
    bucket, PREFIX, model_name, prd_endpoint_name
)

capture_files = S3Downloader.list(data_capture_logs_uri)
print("Found {} files".format(len(capture_files)))

if capture_files:
    # Get the first line of the most recent file
    event = json.loads(S3Downloader.read_file(capture_files[-1]).split("\n")[0])
    print("\nLast file:\n{}".format(json.dumps(event, indent=2)))

Found 3 files

Last file:
{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "-0.1849580089999999,-0.084,-0.354,0.355,-0.304,-0.0579999999999999,0.88,11.4674,-0.049,442.69300000000004,0.406,1.74968,1,0.0030513000000000003,-0.2769999999999999,17139,-0.043,-0.122,0.639639074,0.9983290000000001,121.69361509999999,72.3051,-0.423,868,0.002945,-0.00229,0.80223,-0.09300000000000001,-0.081,-0.0069999999999999,-0.153,0.00327827,0.6642100000000001,-0.0069999999999999,-0.032,3393.48,0.9,-0.048,2.14,0.935,-1.052474737,0.3929999999999999,93.6723,-0.29,-0.303,-0.02,-0.11900000000000001,-16.1959,113.914,1.4953299999999998,976.318,-1.4340000000000002,-0.021,-0.092,0.180292,13.63,476.236,-0.177,6106.6,-0.027000000000000003,110.77799999999999,0.995359,-0.0055299999999999,-0.852,0,0.005149,67275.6,-0.0055899999999999,-0.747,3.4836099999999997,0.00634091,0.180101,-0.271,-183.864,-0.01176,0.445600046,-0.0043799999999999,0.00993,-1.85922

### View monitoring schedule

In [83]:
!wget -O utils.py --quiet https://raw.githubusercontent.com/awslabs/amazon-sagemaker-examples/master/sagemaker_model_monitor/visualization/utils.py
import utils as mu

The [minimum scheduled run time](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-scheduling.html) for model monitor is one hour, which means we will need to wait at least an hour to see any results.

Check the schedule status and list the next run.

In [84]:
sm = boto3.client("sagemaker")

response = sm.describe_monitoring_schedule(MonitoringScheduleName=schedule_name)
print("Schedule Status: {}".format(response["MonitoringScheduleStatus"]))

now = datetime.now(tzlocal())
next_hour = (now + timedelta(hours=1)).replace(minute=0)
scheduled_diff = (next_hour - now).seconds // 60
print("Next schedule in {} minutes".format(scheduled_diff))

Schedule Status: Scheduled
Next schedule in 49 minutes


A couple of minutes after the model monitoring schedule has run, fetch the latest schedule status. A completed schedule run may have found violations.

In [85]:
processing_job_arn = None

while processing_job_arn is None:
    try:
        response = sm.list_monitoring_executions(MonitoringScheduleName=schedule_name)
    except ClientError as e:
        print(e.response["Error"]["Message"])
    for mon in response["MonitoringExecutionSummaries"]:
        status = mon["MonitoringExecutionStatus"]
        now = datetime.now(tzlocal())
        created_diff = (now - mon["CreationTime"]).seconds // 60
        print("Schedule status: {}, Created: {} minutes ago".format(status, created_diff))
        if status in ["Completed", "CompletedWithViolations"]:
            processing_job_arn = mon["ProcessingJobArn"]
            break
        if status == "InProgress":
            break
    else:
        raise (Exception("Please wait.  No Schedules executing"))
    time.sleep(10)

Schedule status: CompletedWithViolations, Created: 10 minutes ago


### View monitoring results

In [86]:
if processing_job_arn:
    execution = MonitoringExecution.from_processing_arn(
        sagemaker_session=sagemaker.Session(), processing_job_arn=processing_job_arn
    )
    exec_inputs = {inp["InputName"]: inp for inp in execution.describe()["ProcessingInputs"]}
    exec_results_uri = execution.output.destination

    print("Monitoring Execution results: {}".format(exec_results_uri))

Monitoring Execution results: s3://sagemaker-ap-southeast-1-852039983533/smsd-sd-t5/monitoring/reports/smsd-sd-t5-prd-997750c6-2400-4429-a472-47edb6672e53/smsd-sd-t5-pms/2022/04/04/19


Take a look at the files which have been saved in the S3 output location. If violations were found, we should see a constraint violations file in addition to the statistics and constraints file which we viewed before.

In [87]:
!aws s3 ls $exec_results_uri/

2022-04-04 19:06:55      21567 constraint_violations.json
2022-04-04 19:06:55      23463 constraints.json
2022-04-04 19:06:55     300660 statistics.json


Now, fetch the monitoring statistics and violations. Then use the utils code to visualize the results in a table. It will highlight any baseline drift found by the model monitor. Drift can happen for categorical features (for inferred string styles) or for numerical features (e.g. total fare amount).

In [88]:
# Get the baseline and monitoring statistics & violations
baseline_statistics = baseline_job.baseline_statistics().body_dict
execution_statistics = execution.statistics().body_dict
violations = execution.constraint_violations().body_dict["violations"]

In [89]:
mu.show_violation_df(
    baseline_statistics=baseline_statistics,
    latest_statistics=execution_statistics,
    violations=violations,
)

Unnamed: 0,data_type,completeness,baseline_drift,categorical_values
0,Integral,100.00%,,
1,Fractional,100.00%,,
10,Fractional,100.00%,44.38%,
100,Fractional,100.00%,44.74%,
101,Fractional,100.00%,44.69%,
102,Fractional,100.00%,33.82%,
103,Fractional,100.00%,42.09%,
104,Fractional,100.00%,44.65%,
105,Fractional,100.00%,44.68%,
106,Fractional,100.00%,44.74%,


### Trigger Retraining

The CodePipeline instance is configured with [CloudWatch Events](https://docs.aws.amazon.com/codepipeline/latest/userguide/create-cloudtrail-S3-source.html) to start the pipeline for retraining when the drift detection triggers specific metric alarms.

We can simulate drift by putting a metric value above the threshold of `0.2` directly into CloudWatch.  This will trigger the alarm, and start the code pipeline.

Tip: This alarm is configured only for the latest production endpoint, so re-training will only occur if we are putting metrics against the latest endpoint.

Trigger the metric alarm. Note that it can take a couple of minutes for everything to trigger.

In [92]:
from datetime import datetime
import random

cloudwatch = boto3.client("cloudwatch")

# Define the metric name and threshold
metric_name = "feature_baseline_drift_total_amount"
metric_threshold = 0.2

# Put a new metric to trigger an alaram
def put_drift_metric(value):
    print("Putting metric: {}".format(value))
    response = cloudwatch.put_metric_data(
        Namespace="aws/sagemaker/Endpoints/data-metrics",
        MetricData=[
            {
                "MetricName": metric_name,
                "Dimensions": [
                    {"Name": "MonitoringSchedule", "Value": schedule_name},
                    {"Name": "Endpoint", "Value": prd_endpoint_name},
                ],
                "Timestamp": datetime.now(),
                "Value": value,
                "Unit": "None",
            },
        ],
    )


def get_drift_stats():
    response = cloudwatch.get_metric_statistics(
        Namespace="aws/sagemaker/Endpoints/data-metrics",
        MetricName=metric_name,
        Dimensions=[
            {"Name": "MonitoringSchedule", "Value": schedule_name},
            {"Name": "Endpoint", "Value": prd_endpoint_name},
        ],
        StartTime=datetime.now() - timedelta(minutes=2),
        EndTime=datetime.now(),
        Period=1,
        Statistics=["Average"],
        Unit="None",
    )
    if "Datapoints" in response and len(response["Datapoints"]) > 0:
        return response["Datapoints"][0]["Average"]
    return 0


# print("Simluate drift on endpoint: {}".format(prd_endpoint_name))

# while True:
#     put_drift_metric(round(random.uniform(metric_threshold, 1.0), 4))
#     drift_stats = get_drift_stats()
#     print("Average drift amount: {}".format(get_drift_stats()))
#     if drift_stats > metric_threshold:
#         break
#     time.sleep(1)

Click through to the Alarm and CodePipeline Execution history with the links below.

In [90]:
# Output a html link to the cloudwatch dashboard
metric_alarm_name = "mlops-{}-metric-gt-threshold".format(model_name)
HTML(
    """<a target="_blank" href="https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#alarmsV2:alarm/{1}">CloudWatch Alarm</a> triggers
     <a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codepipeline/pipelines/{2}/executions?region={0}">Code Pipeline Execution</a>""".format(
        region, metric_alarm_name, pipeline_name
    )
)

### Create a CloudWatch dashboard

To visualize the key performance metrics and alarms which we have created.

This dashboard shows 9 charts in three rows, where the first row displays Lambda metrics, the second row displays SageMaker metrics, and the third row (shown in the screenshot below) displays the alarms set up for the pipeline.

In [93]:
from string import Template

cloudwatch = boto3.client("cloudwatch")
sts = boto3.client("sts")
account_id = sts.get_caller_identity().get("Account")
dashboard_name = "{}-{}-{}".format(PREFIX, model_name, config["SageMakerProjectId"])

with open("dashboard.json") as f:
    dashboard_body = Template(f.read()).substitute(
        region=region, account_id=account_id, model_name=model_name
    )
    response = cloudwatch.put_dashboard(DashboardName=dashboard_name, DashboardBody=dashboard_body)

# Output a html link to the cloudwatch dashboard
HTML(
    '<a target="_blank" href="https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#dashboards:name={1}">CloudWatch Dashboard</a>'.format(
        region, dashboard_name
    )
)

## Fully cleanup

In [98]:
cfn = boto3.client("cloudformation")

# Delete the prod and then dev stack
for stack_name in [
    f"{pipeline_name}-deploy-prd",
    f"{pipeline_name}-deploy-dev",
    f"{pipeline_name}-workflow",
    f"{PREFIX}-{model_name}-{config['SageMakerProjectId']}-sagemaker-custom-resource",
]:
    print("Deleting stack: {}".format(stack_name))
    cfn.delete_stack(StackName=stack_name)
    cfn.get_waiter("stack_delete_complete").wait(StackName=stack_name)

Deleting stack: smsd-sd-t5-p-8up2y4ihhnzd-deploy-prd
Deleting stack: smsd-sd-t5-p-8up2y4ihhnzd-deploy-dev
Deleting stack: smsd-sd-t5-p-8up2y4ihhnzd-workflow
Deleting stack: smsd-sd-t5-p-8up2y4ihhnzd-sagemaker-custom-resource


The following code will delete the dashboard.

In [99]:
cloudwatch.delete_dashboards(DashboardNames=[dashboard_name])
print("Dashboard deleted")

Dashboard deleted


The following code will clean up all objects in the artifact bucket and delete the SageMaker project.

In [100]:
s3_resource = boto3.resource('s3')
s3_artifact_bucket = s3_resource.Bucket(artifact_bucket)
s3_artifact_bucket.object_versions.delete()
print("Artifact bucket objects deleted")

sm.delete_project(
    ProjectName=PROJECT_NAME
)
print("SageMaker Project deleted")

Artifact bucket objects deleted
SageMaker Project deleted


Finally, close this notebook.