# Tableau Amazon SageMaker - MLOps Workshop

### Overview

In this notebook you will automate an MLOps pipeline build, train, deploy and monitor an XGBoost regression model to automate the classification of unhappy customers for telecommunication service providers. The goal is to identify customers who may cancel their service soon so that you can entice them to stay. This is known as customer churn prediction.

The dataset we use is publicly available and was mentioned in the book Discovering Knowledge in Data by Daniel T. Larose. It is attributed by the author to the University of California Irvine Repository of Machine Learning Datasets.

This notebook will take you through a series of steps to execute the AWS CodePipeline stage as depicted below:

1. [Data Prep / ETL Step](#Data-Prep)
2. [Start Build](#Start-Build)
3. [Wait for Training Job](#Wait-for-Training-Job)
4. [Test Dev Deployment](#Test-Dev-Deployment)
5. [Approve Prod Endpoint](#Approve-Prod-Deployment)
6. [Test Prod Deployment](#Test-Prod-Deployment)
7. [Model Monitoring](#Model-Monitoring)
8. [CloudWatch Monitoring](#CloudWatch-Monitoring)
9. [Retraining of the Model](#Retrain-Model) 

In [1]:
# # Import the latest sagemaker and boto3 SDKs
import sys
!{sys.executable} -m pip install --upgrade pip
#!{sys.executable} -m pip install -qU awscli boto3 "sagemaker>=2.0.0" tqdm
!{sys.executable} -m pip install -qU awscli boto3 "sagemaker>=1.71.0,<2.0.0" tqdm
!{sys.executable} -m pip show sagemaker

Requirement already up-to-date: pip in /opt/conda/lib/python3.7/site-packages (20.2.3)
Name: sagemaker
Version: 1.72.1
Summary: Open source library for training and deploying models on Amazon SageMaker.
Home-page: https://github.com/aws/sagemaker-python-sdk/
Author: Amazon Web Services
Author-email: None
License: Apache License 2.0
Location: /opt/conda/lib/python3.7/site-packages
Requires: numpy, boto3, packaging, scipy, smdebug-rulesconfig, importlib-metadata, protobuf, protobuf3-to-dict
Required-by: 


In [2]:
import uuid
import logging
import boto3
import sagemaker
import pandas as pd

from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker import s3_input
from sagemaker.s3 import S3Uploader

#### Update the S3 Bucket information below

In [3]:
session = sagemaker.Session()
bucket = "mlops-customerchurn-artifact-us-east-2-325928439752"  # Change this
print(bucket)

mlops-customerchurn-artifact-us-east-2-325928439752


In [4]:
region = boto3.Session().region_name
id = uuid.uuid4().hex

In [5]:
glue_role = 'arn:aws:iam::325928439752:role/AWS-Glue-S3-Bucket-Access'

## Prepare the Dataset



Create the AWS Glue Job¶

In [None]:
job_name = 'glue-customer-churn-etl-{}'.format(id)

In [None]:
project_name = 'customerchurn'

data_source = S3Uploader.upload(local_path='./data/customer-churn.csv',
                               desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
                               session=session)

train_prefix = 'train'
val_prefix = 'validation'

train_data = 's3://{}/{}/{}/'.format(bucket, project_name, train_prefix)
validation_data = 's3://{}/{}/{}/'.format(bucket, project_name, val_prefix)


### Create the AWS Glue Job¶

In [None]:
glue_script_location = S3Uploader.upload(local_path='./code/glue_etl.py',
                               desired_s3_uri='s3://{}/{}'.format(bucket, project_name),
                               session=session)
glue_client = boto3.client('glue')

create_response = glue_client.create_job(
    Name=job_name,
    Description='PySpark job to extract the data and split in to training and validation data sets',
    Role=glue_role, # you can pass your existing AWS Glue role here if you have used Glue before
    ExecutionProperty={
        'MaxConcurrentRuns': 2
    },
    Command={
        'Name': 'glueetl',
        'ScriptLocation': glue_script_location,
        'PythonVersion': '3'
    },
    DefaultArguments={
        '--job-language': 'python'
    },
    GlueVersion='1.0',
    WorkerType='Standard',
    NumberOfWorkers=2,
    Timeout=60
)

### Run Glue Job

In [None]:
run_response = glue_client.start_job_run(JobName=create_response['Name'], 
                                        Arguments={
                    '--BUCKET': bucket,                    
                    '--S3_SOURCE': data_source,
                    '--S3_DEST': 's3a://{}/{}/'.format(bucket, project_name),
                    '--TRAIN_KEY': train_prefix + '/',
                    '--VAL_KEY': val_prefix +'/'})


In [None]:
status = glue_client.get_job_run(JobName=create_response['Name'], RunId=run_response['JobRunId'])

In [None]:
print (status['JobRun']['JobRunState'])

## Start Build

Load variables from environment

In [6]:
import boto3
from botocore.exceptions import ClientError
import os
import time

region = boto3.Session().region_name
artifact_bucket = bucket #os.environ['ARTIFACT_BUCKET']
pipeline_name = "customerchurn" #os.environ['PIPELINE_NAME']
model_name = "customerchurn" #os.environ['MODEL_NAME']

print('region: {}'.format(region))
print('artifact bucket: {}'.format(artifact_bucket))
print('pipeline: {}'.format(pipeline_name))
print('model name: {}'.format(model_name))

region: us-east-2
artifact bucket: mlops-customerchurn-artifact-us-east-2-325928439752
pipeline: customerchurn
model name: customerchurn


In [16]:
data_df = pd.read_csv('./data/training-dataset-with-header.csv')
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(data_df, test_size=0.20, random_state=42)
val_df, test_df = train_test_split(val_df, test_size=0.05, random_state=42)

# Set the index for our test dataframe
test_df.reset_index(inplace=True, drop=True)

print('split train: {}, val: {}, test: {} '.format(train_df.shape[0], val_df.shape[0], test_df.shape[0]))

/root/Tableau/MLOps
split train: 1866, val: 443, test: 24 


In [18]:
train_df.to_csv('./data/split/train.csv', index=False, header=False)
val_df.to_csv('./data/split/validation.csv', index=False, header=False)

# Save test and baseline with headers
test_df.to_csv('./data/split/test.csv', index=False, header=True)
train_df.to_csv('./data/split/baseline.csv', index=False, header=True)

In [19]:
import sagemaker

# Get the session and default bucket
session = sagemaker.session.Session()
bucket = session.default_bucket()

# Specify data previx version
prefix = 'customerchurn'

s3_train_uri = session.upload_data('data/train.csv', bucket, prefix + '/data/training')
s3_val_uri = session.upload_data('data/validation.csv', bucket, prefix + '/data/validation')
s3_baseline_uri = session.upload_data('data/baseline.csv', bucket, prefix + '/data/baseline')

Upload data source meta data to trigger a new build

In [20]:
from io import BytesIO
import zipfile
import json

input_data = {
    'TrainingUri': s3_train_uri,
    'ValidationUri': s3_val_uri,
    'BaselineUri': s3_baseline_uri
}

hyperparameters = {
    'num_round': 50
}

data_source_key = '{}/data-source.zip'.format(pipeline_name)

zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'a') as zf:
    zf.writestr('inputData.json', json.dumps(input_data))
    zf.writestr('hyperparameters.json', json.dumps(hyperparameters))
zip_buffer.seek(0)

s3 = boto3.client('s3')
s3.put_object(Bucket=artifact_bucket, Key=data_source_key, Body=bytearray(zip_buffer.read()))

{'ResponseMetadata': {'RequestId': '31E34F11A107F173',
  'HostId': 'ldwrrFD7x2uDFrYGUl+C1SRZVxpsYhvulXrAl+u+g6VjfC9Nc/o9DsVgOlQ5T3uIxuJ2fCiLSlI=',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'ldwrrFD7x2uDFrYGUl+C1SRZVxpsYhvulXrAl+u+g6VjfC9Nc/o9DsVgOlQ5T3uIxuJ2fCiLSlI=',
   'x-amz-request-id': '31E34F11A107F173',
   'date': 'Wed, 07 Oct 2020 16:04:48 GMT',
   'x-amz-version-id': 'FeAr07VfHLCCeaONisKJeqU.zGmvVOOp',
   'etag': '"cbea8cdbae4939488fd9b04afeb425ff"',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"cbea8cdbae4939488fd9b04afeb425ff"',
 'VersionId': 'FeAr07VfHLCCeaONisKJeqU.zGmvVOOp'}

## Wait for Training Job

Follow the code pipeline to wait until the training job is complete

In [21]:
from IPython.core.display import HTML

HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codepipeline/pipelines/{1}/view?region={0}">Code Pipeline</a>'.format(region, pipeline_name))

While we are waiting for the code pipeline to run, let's take a look at the model `run.py` code.  

* We can see the XGBoost SageMaker estimator define in the `get_training_params` method.
* The `training_uri` and  `validation_uri` are loaded from the `inputData.json` file in the data directory.

In [22]:
!pygmentize code/model/run.py

[34mimport[39;49;00m [04m[36margparse[39;49;00m
[34mimport[39;49;00m [04m[36mjson[39;49;00m
[34mimport[39;49;00m [04m[36mos[39;49;00m
[34mimport[39;49;00m [04m[36msys[39;49;00m
[34mimport[39;49;00m [04m[36mtime[39;49;00m

[34mimport[39;49;00m [04m[36mboto3[39;49;00m
[34mimport[39;49;00m [04m[36msagemaker[39;49;00m
[34mfrom[39;49;00m [04m[36msagemaker[39;49;00m[04m[36m.[39;49;00m[04m[36mworkflow[39;49;00m[04m[36m.[39;49;00m[04m[36mairflow[39;49;00m [34mimport[39;49;00m training_config


[34mdef[39;49;00m [32mget_training_image[39;49;00m(region=[34mNone[39;49;00m):
    region = region [35mor[39;49;00m boto3.Session().region_name
    [34mreturn[39;49;00m sagemaker.image_uris.retrieve(
        region=region, framework=[33m"[39;49;00m[33mxgboost[39;49;00m[33m"[39;49;00m, version=[33m"[39;49;00m[33m1.0-1[39;49;00m[33m"[39;49;00m
    )


[34mdef[39;49;00m [32mget_training_params[39;49;00m(
    model_name,
    job_

Once the training and baseline job is complete we can inspect the exeriment metrics.

In [23]:
from sagemaker import analytics
model_analytics = analytics.ExperimentAnalytics(experiment_name=model_name)
analytics_df = model_analytics.dataframe()

if (analytics_df.shape[0] == 0):
    raise(Exception('Please wait.  No training or baseline jobs'))

pd.set_option('display.max_colwidth', 100) # Increase column width to show full copmontent name
cols = ['TrialComponentName', 'DisplayName', 'SageMaker.InstanceType', 
        'train:rmse - Last', 'validation:rmse - Last'] # return the last rmse for training and validation
analytics_df[analytics_df.columns & cols].head(2)

Unnamed: 0,TrialComponentName,DisplayName,SageMaker.InstanceType,train:rmse - Last,validation:rmse - Last
0,mlops-customerchurn-edf18b2e-a503-4f60-9a7c-031c4c89add3-aws-training-job,Training,ml.m4.xlarge,0.32926,0.35421
1,mlops-customerchurn-pbl-edf18b2e-a503-4f60-9a7c-031c4c89add3-aws-processing-job,Baseline,ml.m5.xlarge,,


## Test Dev Deployment

One the endpoint has been deployed and awaiting approval, we can begin some tests.

In [24]:
codepipeline = boto3.client('codepipeline')

def get_pipeline_stage(pipeline_name, stage_name):
    response = codepipeline.get_pipeline_state(name=pipeline_name)
    for stage in response['stageStates']:
        if stage['stageName'] == stage_name:
            return stage
        
# Get last execution id
deploy_dev = get_pipeline_stage(pipeline_name, 'DeployDev')
if not 'latestExecution' in deploy_dev:
    raise(Exception('Please wait.  Deploy dev not started'))
    
execution_id = deploy_dev['latestExecution']['pipelineExecutionId']
dev_endpoint_name = 'mlops-{}-dev-{}'.format(model_name, execution_id)

print('endpoint name: {}'.format(dev_endpoint_name))

endpoint name: mlops-customerchurn-dev-edf18b2e-a503-4f60-9a7c-031c4c89add3


Wait until the dev endpoint is in service (this can take up to 10 minutes)

In [25]:
sm = boto3.client('sagemaker')

while True:
    try:
        response = sm.describe_endpoint(EndpointName=dev_endpoint_name)
        print("Endpoint status: {}".format(response['EndpointStatus']))
        if response['EndpointStatus'] == 'InService':
            break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

Endpoint status: InService


In [26]:
import numpy as np
from tqdm import tqdm

try:
    # Support SageMaker v2 SDK: https://sagemaker.readthedocs.io/en/stable/v2.html
    from sagemaker.predictor import Predictor
    from sagemaker.serializers import CSVSerializer
    def get_predictor(endpoint_name):
        xgb_predictor = Predictor(endpoint_name)
        xgb_predictor.serializer = CSVSerializer()
        return xgb_predictor
except:
    # Fallback to SageMaker v1.70 SDK
    from sagemaker.predictor import RealTimePredictor, csv_serializer
    def get_predictor(endpoint_name):
        xgb_predictor = RealTimePredictor(endpoint_name)
        xgb_predictor.content_type = 'text/csv'
        xgb_predictor.serializer = csv_serializer
        return xgb_predictor



Invoke the dev endpoint with test data.

In [28]:
dev_predictor = get_predictor(dev_endpoint_name)

with open('./data/test_sample.csv', 'r') as f:
    for row in f:
        payload = row.rstrip('\n')
        predictions = dev_predictor.predict(data=payload)
        print(predictions)
        time.sleep(0.5)



b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.21538889408111572'
b'0.11626008152961731'
b'0.1974208950996399'
b'0.11626008152961731'
b'0.2893518805503845'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.17140549421310425'
b'0.11626008152961731'
b'0.2893518805503845'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.2893518805503845'
b'0.21538889408111572'
b'0.11626008152961731'
b'0.21538889408111572'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.2893518805503845'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.21538889408111572'
b'0.11626008152961731'
b'0.21538889408111572'
b'0.2893518805503845'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.21538889408111572'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.2893518805503845'
b'0.21538889408111572'
b'0.11626008152961

Load the response into a data frame, and join with predictions to calculate absolute error.

### TODO

## Approve Prod Deployment

If we are happy with this metric, we can go ahead and approve with the widget below, or manually in the CodePipeline  by clicking the "Review" button.

![Code pipeline](../docs/deploy-dev.png)

In [29]:
import ipywidgets as widgets

def on_click(obj):
    result = { 'summary': approval_text.value, 'status': obj.description }
    response = codepipeline.put_approval_result(
      pipelineName=pipeline_name,
      stageName='DeployDev',
      actionName='ApproveDeploy',
      result=result,
      token=approval_action['token']
    )
    button_box.close()
    print(result)
    
# Create the widget if we are ready for approval
deploy_dev = get_pipeline_stage(pipeline_name, 'DeployDev')
if not 'latestExecution' in deploy_dev['actionStates'][-1]:
    raise(Exception('Please wait.  Deploy dev not complete'))

approval_action = deploy_dev['actionStates'][-1]['latestExecution']
if approval_action['status'] == 'Succeeded':
    print('Dev approved: {}'.format(approval_action['summary']))
elif 'token' in approval_action:
    approval_text = widgets.Text(placeholder='Optional approval message')   
    approve_btn = widgets.Button(description="Approved", button_style='success', icon='check')
    reject_btn = widgets.Button(description="Rejected", button_style='danger', icon='close')
    approve_btn.on_click(on_click)
    reject_btn.on_click(on_click)
    button_box = widgets.HBox([approval_text, approve_btn, reject_btn])
    display(button_box)
else:
    raise(Exception('Please wait.  No dev approval'))

Dev approved: Approved


## Test Prod Deployment

The prod deployment will start shortly after approval.

In [30]:
deploy_prd = get_pipeline_stage(pipeline_name, 'DeployPrd')
if not 'latestExecution' in deploy_prd or not 'latestExecution' in deploy_prd['actionStates'][0]:
    raise(Exception('Please wait.  Deploy prd not started'))
    
execution_id = deploy_prd['latestExecution']['pipelineExecutionId']

Production deployment is managed through a CloudFormation stack which performs the following:

1. Creates SageMaker Endpoint with Data Capture and AutoScaling enabled
2. Creates Model Monitoring Schedule with CloudWatch Alarm
3. Deploys an API Gateway Lambda with AWS Code Deploy

![Code pipeline](../docs/cloud-formation.png)

List the last events and how long ago they occurred.

In [31]:
from datetime import datetime, timedelta
from dateutil.tz import tzlocal

def get_event_dataframe(events):
    stack_cols = ['LogicalResourceId', 'ResourceStatus', 'ResourceStatusReason', 'Timestamp']
    stack_event_df = pd.DataFrame(events)[stack_cols].fillna('')
    stack_event_df['TimeAgo'] = (datetime.now(tzlocal())-stack_event_df['Timestamp'])
    return stack_event_df.drop('Timestamp', axis=1)

cfn = boto3.client('cloudformation')

stack_name = stack_name='{}-deploy-prd'.format(pipeline_name)
print('stack name: {}'.format(stack_name))

# Get latest stack events
while True:
    try:
        response = cfn.describe_stack_events(StackName=stack_name)
        break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)
    
get_event_dataframe(response['StackEvents']).head()

stack name: customerchurn-deploy-prd


Unnamed: 0,LogicalResourceId,ResourceStatus,ResourceStatusReason,TimeAgo
0,customerchurn-deploy-prd,UPDATE_COMPLETE,,00:15:59.249340
1,Model,DELETE_COMPLETE,,00:15:59.485340
2,Model,DELETE_IN_PROGRESS,,00:16:00.682340
3,EndpointConfig,DELETE_COMPLETE,,00:16:01.274340
4,EndpointConfig,DELETE_IN_PROGRESS,,00:16:02.716340


We can send some traffic to the production endpoint now

In [32]:
prd_endpoint_name='mlops-{}-prd-{}'.format(model_name, execution_id)
print('prod endpoint: {}'.format(prd_endpoint_name))

#prd_endpoint_name="mlops-customerchurn-prd-6ad59d33-5fc9-4c55-a4c4-2b89d0398549"

prod endpoint: mlops-customerchurn-prd-edf18b2e-a503-4f60-9a7c-031c4c89add3


Wait until the endpoint has finishing updated before we send some traffic

In [33]:
sm = boto3.client('sagemaker')

while True:
    try:
        response = sm.describe_endpoint(EndpointName=prd_endpoint_name)
        print("Endpoint status: {}".format(response['EndpointStatus']))
        # Wait until the endpoint is in service with data capture enabled
        if response['EndpointStatus'] == 'InService' \
            and 'DataCaptureConfig' in response \
            and response['DataCaptureConfig']['EnableCapture']:
            break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

Endpoint status: InService


Send some inference to production endpoint now that data capture is enabled.  

In [35]:
prd_predictor = get_predictor(prd_endpoint_name)


with open('./data/test_sample.csv', 'r') as f:
    for row in f:
        payload = row.rstrip('\n')
        predictions = prd_predictor.predict(data=payload)
        print(predictions)
        time.sleep(0.5)

b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.21538889408111572'
b'0.11626008152961731'
b'0.1974208950996399'
b'0.11626008152961731'
b'0.2893518805503845'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.17140549421310425'
b'0.11626008152961731'
b'0.2893518805503845'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.2893518805503845'
b'0.21538889408111572'
b'0.11626008152961731'
b'0.21538889408111572'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.2893518805503845'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.21538889408111572'
b'0.11626008152961731'
b'0.21538889408111572'
b'0.2893518805503845'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.21538889408111572'
b'0.11626008152961731'
b'0.11626008152961731'
b'0.2893518805503845'
b'0.21538889408111572'
b'0.11626008152961

## Test Rest API

Get back the deployment progress and rest API endpoint

In [36]:
def get_stack_status(stack_name):
    response = cfn.describe_stacks(StackName=stack_name)
    if response['Stacks']:
        stack = response['Stacks'][0]
        outputs = None
        if 'Outputs' in stack:
            outputs = dict([(o['OutputKey'], o['OutputValue']) for o in stack['Outputs']])
        return stack['StackStatus'], outputs 

while True:
    try:
        status, outputs = get_stack_status(stack_name)
        print('stack status: {}'.format(status))
        if status.endswith('COMPLETE') or status.endswith('FAILED'):
            break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)    
                

if outputs:
    print('deployment application: {}'.format(outputs['DeploymentApplication']))
    print('rest api: {}'.format(outputs['RestApi']))                

stack status: UPDATE_COMPLETE
deployment application: customerchurn-deploy-prd-ServerlessDeploymentApplication-1W3Q31G984RFW
rest api: https://qmkg5uvca8.execute-api.us-east-2.amazonaws.com/Prod/api/


Check the deployment application to see if its created and started to shift traffic.

In [37]:
#HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codedeploy/applications/{1}?region={0}">Deployment Application</a>'.format(region, outputs['DeploymentApplication']))

Now let's ping the REST endpoint to see which SageMaker endpoint it is hitting.  Press STOP when deployment complete

In [38]:
%%time

from urllib import request

headers = {"Content-type": "text/csv"}
payload = test_df[test_df.columns[1:]].head(1).to_csv(header=False, index=False).encode('utf-8')
rest_api = outputs['RestApi']

while True:
    try:
        resp = request.urlopen(request.Request(rest_api, data=payload, headers=headers))
        print("Response code: %d: endpoint: %s" % (resp.getcode(), resp.getheader('x-sagemaker-endpoint')))
        status, outputs = get_stack_status(stack_name) 
        if status.endswith('COMPLETE'):
            print('Deployment complete\n')
            break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

Response code: 200: endpoint: mlops-customerchurn-prd-edf18b2e-a503-4f60-9a7c-031c4c89add3
Deployment complete

CPU times: user 14.8 ms, sys: 4.66 ms, total: 19.5 ms
Wall time: 889 ms


## Model Monitor

Get the latest production deployment

In [39]:
deploy_prd = get_pipeline_stage(pipeline_name, 'DeployPrd')
if not 'latestExecution' in deploy_prd:
    raise(Exception('Please wait.  Deploy prd not complete'))
    
execution_id = deploy_prd['latestExecution']['pipelineExecutionId']

### Baseline

Load baseline processing job

In [40]:
processing_job_name='mlops-{}-pbl-{}'.format(model_name, execution_id)
schedule_name='mlops-{}-pms-{}'.format(model_name, execution_id)

#schedule_name='mlops-{}-pms-{}'.format(model_name, "6ad59d33-5fc9-4c55-a4c4-2b89d0398549")
print('processing job name: {}'.format(processing_job_name))
print('schedule name: {}'.format(schedule_name))

processing job name: mlops-customerchurn-pbl-edf18b2e-a503-4f60-9a7c-031c4c89add3
schedule name: mlops-customerchurn-pms-edf18b2e-a503-4f60-9a7c-031c4c89add3


In [41]:
import sagemaker
from sagemaker.model_monitor import BaseliningJob, MonitoringExecution
from sagemaker.s3 import S3Downloader

sagemaker_session = sagemaker.Session()
baseline_job = BaseliningJob.from_processing_name(sagemaker_session, processing_job_name)
status = baseline_job.describe()['ProcessingJobStatus']
if status != 'Completed':
    raise(Exception('Please wait. Processing job not complete, status: {}'.format(status)))
    
baseline_results_uri  = baseline_job.outputs[0].destination

Explore the generated constraints and statistics

In [42]:
import pandas as pd
import json

baseline_statistics = baseline_job.baseline_statistics().body_dict
schema_df = pd.json_normalize(baseline_statistics["features"])
schema_df.head()

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.


Unnamed: 0,name,inferred_type,numerical_statistics.common.num_present,numerical_statistics.common.num_missing,numerical_statistics.mean,numerical_statistics.sum,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max,numerical_statistics.distribution.kll.buckets,numerical_statistics.distribution.kll.sketch.parameters.c,numerical_statistics.distribution.kll.sketch.parameters.k,numerical_statistics.distribution.kll.sketch.data
0,Churn,Integral,2333,0,0.139306,325.0,0.346265,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'count': 2008.0}, {'lower_bound': 0.1, 'upper_bound': ...",0.64,2048.0,"[[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 0.0, 1.0, 0.0,..."
1,Account Length,Integral,2333,0,101.276897,236279.0,39.552442,1.0,243.0,"[{'lower_bound': 1.0, 'upper_bound': 25.2, 'count': 70.0}, {'lower_bound': 25.2, 'upper_bound': ...",0.64,2048.0,"[[119.0, 100.0, 111.0, 181.0, 95.0, 104.0, 70.0, 120.0, 88.0, 111.0, 33.0, 106.0, 54.0, 87.0, 94..."
2,VMail Message,Integral,2333,0,8.214316,19164.0,13.776908,0.0,51.0,"[{'lower_bound': 0.0, 'upper_bound': 5.1, 'count': 1684.0}, {'lower_bound': 5.1, 'upper_bound': ...",0.64,2048.0,"[[19.0, 0.0, 0.0, 40.0, 36.0, 0.0, 0.0, 24.0, 0.0, 0.0, 35.0, 0.0, 0.0, 0.0, 0.0, 41.0, 0.0, 0.0..."
3,Day Mins,Fractional,2333,0,180.226489,420468.4,53.987179,0.0,350.8,"[{'lower_bound': 0.0, 'upper_bound': 35.08, 'count': 14.0}, {'lower_bound': 35.08, 'upper_bound'...",0.64,2048.0,"[[178.1, 160.3, 197.1, 105.2, 283.1, 113.6, 232.1, 212.7, 73.3, 176.9, 161.9, 128.6, 190.5, 223...."
4,Day Calls,Integral,2333,0,100.259323,233905.0,20.165008,0.0,165.0,"[{'lower_bound': 0.0, 'upper_bound': 16.5, 'count': 2.0}, {'lower_bound': 16.5, 'upper_bound': 3...",0.64,2048.0,"[[110.0, 138.0, 117.0, 61.0, 112.0, 87.0, 122.0, 73.0, 86.0, 128.0, 85.0, 83.0, 108.0, 109.0, 10..."


In [43]:
baseline_constraints = baseline_job.suggested_constraints().body_dict
constraints_df = pd.json_normalize(baseline_constraints["features"])
constraints_df.head()

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.


Unnamed: 0,name,inferred_type,completeness,num_constraints.is_non_negative
0,Churn,Integral,1.0,True
1,Account Length,Integral,1.0,True
2,VMail Message,Integral,1.0,True
3,Day Mins,Fractional,1.0,True
4,Day Calls,Integral,1.0,True


### View Data Capture

Get the list of data capture files form the endpoint

In [44]:
bucket = sagemaker_session.default_bucket()
data_capture_logs_uri = 's3://{}/{}/datacapture/{}'.format(bucket, model_name, prd_endpoint_name)

capture_files = S3Downloader.list(data_capture_logs_uri)
print('Found {} files'.format(len(capture_files)))

if capture_files:
    # Get the first line of the most recent file    
    event = json.loads(S3Downloader.read_file(capture_files[-1]).split('\n')[0])
    print('\nLast file:\n{}'.format(json.dumps(event, indent=2)))

Found 2 files

Last file:
{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "22,0,110.3,107,166.5,93,202.3,96,9.5,5,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,1,0,0,1,0,1,0",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.11626008152961731",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "2b785891-7b19-47bb-8eec-a12c45d4195c",
    "inferenceTime": "2020-10-07T16:07:25Z"
  },
  "eventVersion": "0"
}


### View Monitoring Schedule

The functions for plotting and rendering distribution statistics or constraint violations are implemented in a `utils` file so let's grab that.

In [45]:
!wget -O utils.py --quiet https://raw.githubusercontent.com/awslabs/amazon-sagemaker-examples/master/sagemaker_model_monitor/visualization/utils.py
import utils as mu

Check the schedule status, and when the next hourly run is.

In [46]:
sm = boto3.client('sagemaker')

response = sm.describe_monitoring_schedule(MonitoringScheduleName=schedule_name)
print('Schedule Status: {}'.format(response['MonitoringScheduleStatus']))

now = datetime.now(tzlocal())
next_hour = (now+timedelta(hours=1)).replace(minute=0)
scheduled_diff = (next_hour-now).seconds//60
print('Next schedule in {} minutes'.format(scheduled_diff))

Schedule Status: Scheduled
Next schedule in 51 minutes


Get the latest completed monitoring schedule (which may have violations).

In [47]:
processing_job_arn = None

while processing_job_arn == None:
    try:
        response = sm.list_monitoring_executions(MonitoringScheduleName=schedule_name)
    except ClientError as e:
        print(e.response["Error"]["Message"])
    for mon in response['MonitoringExecutionSummaries']:
        status = mon['MonitoringExecutionStatus']
        now = datetime.now(tzlocal())
        created_diff = (now-mon['CreationTime']).seconds//60
        print('Schedule status: {}, Created: {} minutes ago'.format(status, created_diff))
        if status in ['Completed', 'CompletedWithViolations']:
            processing_job_arn = mon['ProcessingJobArn']
            break
        if status == 'InProgress':
            break
    else:
        raise(Exception('Please wait.  No Schedules created'))
    time.sleep(10)

Schedule status: Failed, Created: 5 minutes ago


Exception: Please wait.  No Schedules created

Load the monitoring execution

In [48]:
execution = MonitoringExecution.from_processing_arn(sagemaker_session=sagemaker.Session(), 
                                                    processing_job_arn=processing_job_arn)
exec_inputs = {inp['InputName']: inp for inp in execution.describe()['ProcessingInputs']}
exec_results_uri = execution.output.destination

print('Monitoring Execution results: {}'.format(exec_results_uri))

AttributeError: 'NoneType' object has no attribute 'split'

List the constraints, statistics and violations if they exist. 

In [49]:
!aws s3 ls $exec_results_uri/

2019-07-18 19:10:01 aws-athena-query-results-325928439752-us-west-2
2020-06-25 22:39:13 aws-codestar-us-west-2-325928439752
2020-06-25 22:39:13 aws-codestar-us-west-2-325928439752-mlops-app
2020-06-25 22:39:13 aws-codestar-us-west-2-325928439752-mlops-pipe
2019-09-06 17:12:37 aws-deepracer-271c11d5-d2e5-4e3d-9570-2a0cd5688131
2019-09-04 20:49:56 aws-glue-325928439752-us-west-2
2019-07-18 18:18:37 aws-glue-scripts-325928439752-us-west-2
2019-07-18 18:18:38 aws-glue-temporary-325928439752-us-west-2
2020-05-18 17:29:42 aws-logs-325928439752-us-west-2
2020-08-11 16:02:52 bulkqa-rbulkqas3bucket-7ad8tb0w7dvj
2019-11-03 21:56:31 cdktoolkit-stagingbucket-1b4p98ozrvlxj
2020-09-24 04:00:43 cf-templates-nzbdlmvwhhpo-us-east-2
2020-03-30 17:37:15 cf-templates-nzbdlmvwhhpo-us-west-1
2019-06-24 03:38:14 cf-templates-nzbdlmvwhhpo-us-west-2
2020-07-01 09:26:16 cloudtrail-awslogs-325928439752-e0kcnfzn-isengard-do-not-delete
2020-09-23 16:17:15 do-not-delete-gatedgarden-audit-325928439752
2019-06-04 19:

In [50]:
# Get the baseline and monitoring statistics & violations
baseline_statistics = baseline_job.baseline_statistics().body_dict
execution_statistics = execution.statistics().body_dict
violations = execution.constraint_violations().body_dict['violations']

Parameter 'session' will be renamed to 'sagemaker_session' in SageMaker Python SDK v2.


NameError: name 'execution' is not defined

In [51]:
mu.show_violation_df(baseline_statistics=baseline_statistics, 
                     latest_statistics=execution_statistics, 
                     violations=violations)

NameError: name 'execution_statistics' is not defined

## CloudWatch Monitoring

AWS [CloudWatch Synthetics](https://aws.amazon.com/blogs/aws/new-use-cloudwatch-synthetics-to-monitor-sites-api-endpoints-web-workflows-and-more/) provides allow you to setup a canary to test that your API is returning an expected value on a regular interval.  This is a great way to validate that the blue/green deployment is not causing any downtime for our end-users.

### Create Canary

Let's setup a "canary" to continously test the production API, and a dashboard to visualize the results.

In [53]:
from urllib.parse import urlparse
from string import Template
from io import BytesIO
import zipfile

# Format the canary_js with rest_api and payload
rest_url = urlparse(rest_api)

with open('./notebook/canary.js') as f:
    canary_js = Template(f.read()).substitute(hostname=rest_url.netloc, path=rest_url.path, 
                                              data=payload.decode('utf-8').strip())
# Write the zip file
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zf:
    zip_path = 'nodejs/node_modules/apiCanaryBlueprint.js' # Set a valid path
    zip_info = zipfile.ZipInfo(zip_path)
    zip_info.external_attr = 0o0755 << 16 # Ensure the file is readable
    zf.writestr(zip_info, canary_js)
zip_buffer.seek(0)

# Create the canary
synth = boto3.client('synthetics')

role = sagemaker.get_execution_role()
s3_canary_uri = 's3://{}/{}'.format(artifact_bucket, model_name)
canary_name = 'mlops-{}'.format(model_name)

response = synth.create_canary(
    Name=canary_name,
    Code={
        'ZipFile': bytearray(zip_buffer.read()),
        'Handler': 'apiCanaryBlueprint.handler'
    },
    ArtifactS3Location=s3_canary_uri,
    ExecutionRoleArn=role,
    Schedule={ 
        'Expression': 'rate(10 minutes)', 
        'DurationInSeconds': 0 },
    RunConfig={
        'TimeoutInSeconds': 60,
        'MemoryInMB': 960
    },
    SuccessRetentionPeriodInDays=31,
    FailureRetentionPeriodInDays=31,
    RuntimeVersion='syn-1.0',
)

print('Creating canary: {}'.format(canary_name))

Creating canary: mlops-customerchurn


Create the a CloudWatch alarm when success percent drops below 90% for that canary

In [54]:
cloudwatch = boto3.client('cloudwatch')

canary_alarm_name = '{}-synth-lt-threshold'.format(canary_name)

response = cloudwatch.put_metric_alarm(
    AlarmName=canary_alarm_name,
    ComparisonOperator='LessThanThreshold',
    EvaluationPeriods=1,
    DatapointsToAlarm=1,
    Period=600, # 10 minute interval
    Statistic='Average',
    Threshold=90.0,
    ActionsEnabled=False,
    AlarmDescription='SuccessPercent LessThanThreshold 90%',
    Namespace='CloudWatchSynthetics',
    MetricName='SuccessPercent',
    Dimensions=[
        {
          'Name': 'CanaryName',
          'Value': canary_name
        },
    ],
    Unit='Seconds'
)

print('Creating alarm: {}'.format(canary_alarm_name))

Creating alarm: mlops-customerchurn-synth-lt-threshold


Let's wait for the canary to be read, then start it and wait until running.  The

In [55]:
while True:
    try:
        response = synth.get_canary(Name=canary_name)
        status = response['Canary']['Status']['State']    
        print('Canary status: {}'.format(status))
        if status == 'ERROR':
            raise(Exception(response['Canary']['Status']['StateReason']))    
        elif status == 'READY':
            synth.start_canary(Name=canary_name)
        elif status == 'RUNNING':
            break        
    except ClientError as e:
        if e.response["Error"]["Code"] == "ResourceNotFoundException":
            print('No canary found.')
            break
        print(e.response["Error"]["Message"])
    time.sleep(10)

# Output a html link to the cloudwatch console
HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#synthetics:canary/detail/{1}">CloudWatch Canary</a>'.format(region, canary_name))

Canary status: CREATING
Canary status: READY
Canary status: RUNNING


### Create Dashboard

Finally let's create a AWS CloudWatch Dashboard to visualize the key performane metrics and alarms.

In [56]:
sts = boto3.client('sts')
account_id = sts.get_caller_identity().get('Account')
dashboard_name = 'mlops-{}'.format(model_name)

with open('./notebook/dashboard.json') as f:
    dashboard_body = Template(f.read()).substitute(region=region, account_id=account_id, model_name=model_name)
    response = cloudwatch.put_dashboard(
        DashboardName=dashboard_name,
        DashboardBody=dashboard_body
    )

# Output a html link to the cloudwatch dashboard
HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#dashboards:name={1}">CloudWatch Dashboard</a>'.format(region, canary_name))

### Trigger Retraining
Our CodePipeline is configured with a [CloudWatch Events](https://docs.aws.amazon.com/codepipeline/latest/userguide/create-cloudtrail-S3-source.html) to start our pipeline for retraining when the drift detection metric alrams.

We can simulate drift by putting metric `0.5` which is above the threshold of `0.2`.  This will trigger the alarm, and start the code pipeline retraining.",
Click through to the Alarm and CodePipeline with the links below.

In [57]:
from datetime import datetime

# Put a new metric to trigger an alaram
response = cloudwatch.put_metric_data(
    Namespace='aws/sagemaker/Endpoints/data-metrics',
    MetricData=[
        {
            'MetricName': 'feature_baseline_drift_CustServ Calls',
            'Dimensions': [
                {
                    'Name': 'MonitoringSchedule',
                    'Value': schedule_name
                },
                {
                    'Name': 'Endpoint',
                    'Value': prd_endpoint_name
                },
            ],
            'Timestamp': datetime.now(),
            'Value': 0.5, # This is over the configured threshold of 0.2
            'Unit': 'None'
        },
    ]
)

# Output a html link to the cloudwatch dashboard
alarm_name = 'mlops-nyctaxi-metric-gt-threshold'
HTML('''<a target="_blank" href="https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#alarmsV2:alarm/{1}">CloudWatch Alarm</a> starts 
     <a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codepipeline/pipelines/{2}/view?region={0}">Code Pipeline</a>'''.format(region, alarm_name, pipeline_name))

## Clean Up

First delete the stacks used as part of the pipeline for deployment, training job and suggest baseline.  For a model name of **customerchurn** that would be.

* *customerchurn*-devploy-prd
* *customerchurn*-devploy-dev
* *customerchurn*-training-job
* *customerchurn*-suggest-baseline

The follow code will stop and delete the canary you created

In [None]:
while True:
    try:
        response = synth.get_canary(Name=canary_name)
        status = response['Canary']['Status']['State']    
        print('Canary status: {}'.format(status))
        if status == 'ERROR':
            raise(Exception(response['Canary']['Status']['StateReason']))    
        elif status == 'STOPPED':
            synth.delete_canary(Name=canary_name)
        elif status == 'RUNNING':
            synth.stop_canary(Name=canary_name)
    except ClientError as e:
        if e.response["Error"]["Code"] == "ResourceNotFoundException":
            print('Canary succesfully deleted.')
            break
        print(e.response["Error"]["Message"])
    time.sleep(10)

The following code will delete the dashboard.

In [None]:
cloudwatch.delete_alarms(AlarmNames=[canary_alarm_name])
print('Alarm deleted')

cloudwatch.delete_dashboards(DashboardNames=[dashboard_name])
print('Dashboard deleted')

Finally delete the stack you created for the AWS CodePipeline and Notebook
