# Banking Fraud with MLOps

# MLOps Demo

Julian Bright, Machine Learning Specialist @ Amazon Web Services


### Overview

This is the final piece of the Banking Fraud puzzle.

The below will create a pipeline to deploy the endpoints, keep them up to date and supply metrics which are monitored in Cloudwatch.

This is based upon the excellent example

https://github.com/aws-samples/amazon-sagemaker-safe-deployment-pipeline

This notebook will add on additional steps in order to use the information we have in th ebest way forward and also applying automation, seecurity and monitoring  to our models and endpoints.

We can of course still use a manual method to update our endpoints but that would be silly! :)

The steps we will be following are shown below:

![Code pipeline](img/code_pipeline.png)

Following a series of steps to trigger demo

1. [Setup](#Setup)
2. [Data preparation](#Data-Preparation)
   1. [Splitting and standardising](#Splitting-and-standardising)
3. [Start the build](#Start-Build)
4. [Wait for training job](#Wait-for-Training-Job)
5. [Test dev deployment](#Test-Dev-Deployment)
6. [Approve prod endpoint](#Approve-Prod-Deployment)
7. [Test prod deployment](#Test-Prod-Deployment)
   1. [Test Rest API](#Test-rest-api)
8.  [CloudWatch monitoring](#CloudWatch-Monitoring)

## Setup

Rather than adding to the previous notebook, we will start again.

First, we import AWS specific modules and specify S3 data location

In [None]:
# import sys
# !{sys.executable} -m pip install --upgrade pip
# !{sys.executable} -m pip install --upgrade seaborn
# !{sys.executable} -m pip install --upgrade imbalanced-learn
import boto3
import os
from random import seed, sample
import sagemaker
import sagemaker.amazon.amazon_estimator
from sagemaker import get_execution_role
from sagemaker.serializers import CSVSerializer

Import python ML modules

In [None]:
import matplotlib.pyplot as plt
import matplotlib.lines as mlines
%matplotlib inline

import numpy as np
import pandas as pd
import seaborn as sns
sns.set_theme()
sns.set_context("paper")

from sklearn.decomposition import PCA
from sklearn.model_selection import cross_val_predict, cross_val_score, train_test_split
from sklearn import preprocessing
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, accuracy_score, roc_curve, auc, precision_score, recall_score

from sklearn.ensemble import RandomForestClassifier
# from xgboost.sklearn import XGBClassifier

In [None]:
bucket = 'sagemaker-pmelvin'
prefix = 'compile_xgb_v3'
hp_prefix = 'hp_tuning_v3'

csv_data = 'input-data/bb_banking_fraud.csv'
csv_removed_types = 'input-data/removed-specific-types/01a89090-e9b6-4b07-8a49-b0244d6dc035.csv'
full_data_location = 's3://{}/{}'.format(bucket, csv_data)
clean_data_location = 's3://{}/{}'.format(bucket, csv_removed_types)

#local_file = 'bb_banking_fraud.csv'

role = get_execution_role()
sess = sagemaker.Session()
smclient = boto3.Session().client('sagemaker')

## Data preparation

Now we reload the data

In [None]:
!aws s3 cp 's3://sagemaker-pmelvin/input-data/bb_banking_fraud.csv' 'bb_banking_fraud.csv'

In [None]:
%%time
# df_full = pd.read_csv(full_data_location)
df_full = pd.read_csv('bb_banking_fraud.csv')

In [None]:
# %%time
# df_clean = pd.read_csv(clean_data_location)

Change the feature nates and move the **isFraud** column to the front

In [None]:
df_u = df_full.rename(columns={'oldbalanceOrg':'oldBalanceOrig', 'newbalanceOrig':'newBalanceOrig', 'oldbalanceDest':'oldBalanceDest', 'newbalanceDest':'newBalanceDest', 'isfraud':'isFraud'})

In [None]:
df_u_copy = df_u.copy()

df_u['hourOfDay'] = np.nan
df_u.hourOfDay = df_u_copy.step % 24

In [None]:
first_col = df_u.pop('isFraud')
df_u.insert(0, 'isFraud', first_col)

In [None]:
df_u.head()

In [None]:
sample = df_u.sample(n=10000, random_state=42)

Now that we have the updated dataset and the sample we must use the **RandomUnderSampler** to add more datapoints within the datset (just as before)

In [None]:
def get_features(df):
    
    selected_cols = [
        'type', 'amount', 'oldBalanceOrig', 'newBalanceOrig',
        'oldBalanceDest', 'newBalanceDest', 'isFraud', 'hourOfDay'
    ]
    
    df = df[selected_cols].copy()
    dummies = pd.get_dummies(df.type)
    df = pd.concat([df, dummies], axis=1).drop("type", axis=1)
    
    return df

In [None]:
def reduce_data(pca_df):
    pca_df = pca_df.copy()
    target = pca_df.pop("isFraud")
    scaler = StandardScaler()
    pca_df = scaler.fit_transform(pca_df)
    pca = PCA(n_components=2)
    components = pca.fit_transform(pca_df)

    comp_df = pd.DataFrame(components, columns=["X", "y"])
    target = target.reset_index(drop=True)
    plot_df = pd.concat([comp_df, target], axis=1)
    
    return plot_df

In [None]:
def fraud_plot(plot_df, maj_alpha=0.5, min_alpha=1, save=None):
    fig, ax = plt.subplots(figsize=(8, 6))
    ax = sns.scatterplot(x="X", y="y", alpha=maj_alpha, data=plot_df[plot_df.isFraud == 0], label="Legitimate")
    sns.scatterplot(x="X", y="y", alpha=min_alpha, data=plot_df[plot_df.isFraud == 1], ax=ax, label="Fraud")
    plt.title("Legitimate vs Fraudulent Purchases")
    plt.tight_layout()
    if save != None:
        plt.savefig(save)
    plt.show()
    
    pass

In [None]:
# processed_data = get_features(sample)
# plot_df = reduce_data(processed_data)
# fraud_plot(plot_df)

In [None]:
# processed_data.shape

In [None]:
# processed_data.head(n=5)

In [None]:
from imblearn.under_sampling import RandomUnderSampler
RUS = RandomUnderSampler(sampling_strategy={0: 9589}, random_state=42)

In [None]:
def resample(df, method):
    processed_df = get_features(df)
    target = processed_df.pop('isFraud')

    processed_x, processed_y = method.fit_resample(processed_df, target)

    cols = list(processed_df.columns) + ["isFraud"]

    pdf_x = pd.DataFrame(processed_x, columns=processed_df.columns)
    pdf_y = pd.DataFrame(processed_y, columns=['isFraud'])
    resampled_df = pd.concat([pdf_x, pdf_y], axis=1)
    
    return resampled_df

Let's just check out data, looking first at the shape and then a nice plot

In [None]:
%%time
rus_resampled = resample(df_u, RUS)
print(rus_resampled.shape)
print(rus_resampled.isFraud.value_counts())

In [None]:
fraud_plot(reduce_data(rus_resampled), min_alpha=0.5)

In [None]:
# from imblearn.over_sampling import SMOTE
# SM = SMOTE(random_state=42)

In [None]:
# %%time
# sm_resampled = resample(df_u, SM)
# print(sm_resampled.shape)
# print(sm_resampled.isFraud.value_counts())

In [None]:
# sm_sample = sm_resampled.sample(n=10000, random_state=42)
# fraud_plot(reduce_data(sm_sample), min_alpha=0.5)


## Splitting and standardising

In [None]:
first_col = rus_resampled.pop('isFraud')
rus_resampled.insert(0, 'isFraud', first_col)

In [None]:
rus_resampled.shape

In [None]:
rus_resampled.head()

In [None]:
X = rus_resampled.drop('isFraud', 1)
y = rus_resampled.isFraud

We will split the dataset with a 80% for training, 15% for validation and 5% for testing data

In [None]:
train_ratio = 0.80
validation_ratio = 0.15
test_ratio = 0.05

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 1 - train_ratio, random_state=42)
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 1 - train_ratio, random_state=42, shuffle=True, stratify=y)

X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, test_size=test_ratio/(test_ratio + validation_ratio), random_state=42) 
# X_val, X_test, y_val, y_test = train_test_split(X_test, y_test, test_size=test_ratio/(test_ratio + validation_ratio), random_state=42, shuffle=True, stratify=y) 

print('split train: {}, val: {}, test: {} '.format(X_train.shape[0], X_val.shape[0], X_test.shape[0]))

We now specify the MinMaxScaler and apply it to the split datasets

In [None]:
scaler = MinMaxScaler()
# scaler = RobustScaler()
# scaler = StandardScaler()

scaler.fit(X_train)

In [None]:
#
X_train = pd.DataFrame(scaler.fit_transform(X_train),columns = X_train.columns)
X_test  = pd.DataFrame(scaler.fit_transform(X_test),columns = X_test.columns)
X_val   = pd.DataFrame(scaler.fit_transform(X_val),columns = X_val.columns)

In [None]:
#
# A copy is made of the dataframe to use later, as we need to convert them into numpy arrrays
X_train_ins = X_train.copy()
X_test_ins  = X_test.copy()
X_val_ins   = X_val.copy()

X_train_ins.insert(0, 'isFraud', y_train.values)
X_test_ins.insert(0, 'isFraud', y_test.values)
X_val_ins.insert(0, 'isFraud', y_val.values)

In [None]:
#
X_train.to_csv('train.csv', index=False, header=False)
X_val.to_csv('validation.csv', index=False, header=False)

# Save test and baseline with headers
X_test.to_csv('test.csv', index=False, header=True)
X_train.to_csv('baseline.csv', index=False, header=True)

In [None]:
#
# convert to numpy arrays for later use
X_train = X_train.to_numpy()
X_test  = X_test.to_numpy()
X_val   = X_val.to_numpy()

Upload the csv files to S3

In [None]:
# Get the session and default bucket
session = sagemaker.session.Session()
bucket = session.default_bucket()

bb_prefix = 'blackbelt/v1'

s3_train_uri = sess.upload_data('train.csv', bucket, bb_prefix + '/data/training')
print('Uploaded training data location: {}'.format(s3_train_uri))

s3_val_uri = sess.upload_data('validation.csv', bucket, bb_prefix + '/data/validation')
print('Uploaded validation data location: {}'.format(s3_val_uri))

s3_baseline_uri = sess.upload_data('baseline.csv', bucket, bb_prefix + '/data/baseline')
print('Uploaded validation data location: {}'.format(s3_baseline_uri))

s3_output_location = 's3://{}/{}/output'.format(bucket, bb_prefix)
print('Training artifacts will be uploaded to: {}'.format(s3_output_location))

## Start the build

The pipeline source has two inputs:

- A Git source repository containing the model definition and all supporting infrastructure
- A S3 data source that includes a reference to the training and validation datasets
 
First, we load variables from environment

In [None]:
from botocore.exceptions import ClientError
import time

region = boto3.Session().region_name
artifact_bucket = os.environ['ARTIFACT_BUCKET']
pipeline_name = os.environ['PIPELINE_NAME']
model_name = os.environ['MODEL_NAME']

print('region: {}'.format(region))
print('artifact bucket: {}'.format(artifact_bucket))
print('pipeline: {}'.format(pipeline_name))
print('model name: {}'.format(model_name))

We upload a data source meta data to trigger a new build

In [None]:
from io import BytesIO
import zipfile
import json

input_data = {
    'TrainingUri': s3_train_uri,
    'ValidationUri': s3_val_uri,
    'BaselineUri': s3_baseline_uri
}

hyperparameters = {
    'num_round': 20
}

data_source_key = '{}/data-source.zip'.format(pipeline_name)

zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'a') as zf:
    zf.writestr('inputData.json', json.dumps(input_data))
    zf.writestr('hyperparameters.json', json.dumps(hyperparameters))
zip_buffer.seek(0)

s3 = boto3.client('s3')
s3.put_object(Bucket=artifact_bucket, Key=data_source_key, Body=bytearray(zip_buffer.read()))

## Wait for training Job

Follow the code pipeline to wait until the training job is complete

In [None]:
from IPython.core.display import HTML

HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codepipeline/pipelines/{1}/view?region={0}">Code Pipeline</a>'.format(region, pipeline_name))

Once the training and baseline job is complete we can inspect the exeriment metrics.

In [None]:
from sagemaker import analytics
model_analytics = analytics.ExperimentAnalytics(experiment_name=model_name)
analytics_df = model_analytics.dataframe()

if (analytics_df.shape[0] == 0):
    raise(Exception('Please wait.  No training or baseline jobs'))

pd.set_option('display.max_colwidth', 100) # Increase column width to show full copmontent name
cols = ['TrialComponentName', 'DisplayName', 'SageMaker.InstanceType', 
        'train:rmse - Last', 'validation:rmse - Last'] # return the last rmse for training and validation
analytics_df[analytics_df.columns & cols].head(2)

## Test tev deployment

Once the endpoint has been deployed and awaiting approval, we can begin some tests.

In [None]:
codepipeline = boto3.client('codepipeline')

def get_pipeline_stage(pipeline_name, stage_name):
    response = codepipeline.get_pipeline_state(name=pipeline_name)
    for stage in response['stageStates']:
        if stage['stageName'] == stage_name:
            return stage
        
# Get last execution id
deploy_dev = get_pipeline_stage(pipeline_name, 'DeployDev')
if not 'latestExecution' in deploy_dev:
    raise(Exception('Please wait.  Deploy dev not started'))
    
execution_id = deploy_dev['latestExecution']['pipelineExecutionId']
dev_endpoint_name = 'mlops-{}-dev-{}'.format(model_name, execution_id)

print('endpoint name: {}'.format(dev_endpoint_name))

In [None]:
from tqdm import tqdm

try:
    # Support SageMaker v2 SDK: https://sagemaker.readthedocs.io/en/stable/v2.html
    from sagemaker.predictor import Predictor
    from sagemaker.serializers import CSVSerializer
    def get_predictor(endpoint_name):
        xgb_predictor = Predictor(endpoint_name)
        xgb_predictor.serializer = CSVSerializer()
        return xgb_predictor
except:
    # Fallback to SageMaker v1.70 SDK
    from sagemaker.predictor import RealTimePredictor, csv_serializer
    def get_predictor(endpoint_name):
        xgb_predictor = RealTimePredictor(endpoint_name)
        xgb_predictor.content_type = 'text/csv'
        xgb_predictor.serializer = csv_serializer
        return xgb_predictor

def predict(predictor, data, rows=500):
    split_array = np.array_split(data, round(data.shape[0] / float(rows)))
    predictions = ''
    for array in tqdm(split_array):
        predictions = ','.join([predictions, predictor.predict(array).decode('utf-8')])
    return np.fromstring(predictions[1:], sep=',')

We use the predictor with our datasets to have a further look at

In [None]:
dev_predictor = get_predictor(dev_endpoint_name)
predictions = predict(dev_predictor, X_test[:, 1:])

In [None]:
binary_predictions = np.where(predictions > 0.5, 1, 0)

In [None]:
y_train_preds = predict(dev_predictor, X_train[:, 1:])

In [None]:
y_valid_preds = predict(dev_predictor, X_val[:, 1:])

In [None]:
def calc_specificity(y_actual, y_pred, thresh):
    # calculates specificity
    return sum((y_pred < thresh) & (y_actual == 0)) /sum(y_actual ==0)

def print_report(y_actual, y_pred, thresh):
    
    auc = roc_auc_score(y_actual, y_pred)
    accuracy = accuracy_score(y_actual, (y_pred > thresh))
    recall = recall_score(y_actual, (y_pred > thresh))
    precision = precision_score(y_actual, (y_pred > thresh))
    specificity = calc_specificity(y_actual, y_pred, thresh)

    print('AUC:%.3f'%auc)
    print('accuracy:%.3f'%accuracy)
    print('recall:%.3f'%recall)
    print('precision:%.3f'%precision)
    print('specificity:%.3f'%specificity)
    print(' ')
    return auc, accuracy, recall, precision, specificity

In [None]:
thresh = 0.1

# print('XGBoost Classifier')
print('Training:')
xgb_train_auc, xgb_train_accuracy, xgb_train_recall, xgb_train_precision, xgb_train_specificity = print_report(y_train, y_train_preds, thresh)

print('Validation:')
xgb_valid_auc, xgb_valid_accuracy, xgb_valid_recall, xgb_valid_precision, xgb_valid_specificity = print_report(y_val, y_valid_preds, thresh)

## Approve prod deployment

If we are happy with this metric, we can go ahead and approve with the widget below, or manually in the CodePipeline  by clicking the "Review" button.

![Deploy Dev](img/deploy_dev.png)

In [None]:
import ipywidgets as widgets

def on_click(obj):
    result = { 'summary': approval_text.value, 'status': obj.description }
    response = codepipeline.put_approval_result(
      pipelineName=pipeline_name,
      stageName='DeployDev',
      actionName='ApproveDeploy',
      result=result,
      token=approval_action['token']
    )
    button_box.close()
    print(result)
    
# Create the widget if we are ready for approval
deploy_dev = get_pipeline_stage(pipeline_name, 'DeployDev')
if not 'latestExecution' in deploy_dev['actionStates'][-1]:
    raise(Exception('Please wait.  Deploy dev not complete'))

approval_action = deploy_dev['actionStates'][-1]['latestExecution']
if approval_action['status'] == 'Succeeded':
    print('Dev approved: {}'.format(approval_action['summary']))
elif 'token' in approval_action:
    approval_text = widgets.Text(placeholder='Optional approval message')   
    approve_btn = widgets.Button(description="Approved", button_style='success', icon='check')
    reject_btn = widgets.Button(description="Rejected", button_style='danger', icon='close')
    approve_btn.on_click(on_click)
    reject_btn.on_click(on_click)
    button_box = widgets.HBox([approval_text, approve_btn, reject_btn])
    display(button_box)
else:
    raise(Exception('Please wait.  No dev approval'))

## Test prod deployment

The prod deployment will start shortly after approval.

Production deployment is managed through a CloudFormation stack which performs the following:

1. Creates SageMaker Endpoint with Data Capture and AutoScaling enabled
2. Creates Model Monitoring Schedule with CloudWatch Alarm
3. Deploys an API Gateway Lambda with AWS Code Deploy

![Cloud Formation](img/cloud_formation.png)

List the last events and how long ago they occurred.

In [None]:
from datetime import datetime, timedelta
from dateutil.tz import tzlocal

def get_event_dataframe(events):
    stack_cols = ['LogicalResourceId', 'ResourceStatus', 'ResourceStatusReason', 'Timestamp']
    stack_event_df = pd.DataFrame(events)[stack_cols].fillna('')
    stack_event_df['TimeAgo'] = (datetime.now(tzlocal())-stack_event_df['Timestamp'])
    return stack_event_df.drop('Timestamp', axis=1)

cfn = boto3.client('cloudformation')

stack_name = stack_name='{}-deploy-prd'.format(pipeline_name)
print('stack name: {}'.format(stack_name))

# Get latest stack events
while True:
    try:
        response = cfn.describe_stack_events(StackName=stack_name)
        break
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)
    
get_event_dataframe(response['StackEvents']).head()

We can send some traffic to the production endpoint now

In [None]:
prd_endpoint_name='mlops-{}-prd-{}'.format(model_name, execution_id)
print('prod endpoint: {}'.format(prd_endpoint_name))

We wait until the endpoint has finishing updated before we send some traffic

In [None]:
prd_predictor = get_predictor(prd_endpoint_name)
predictions = predict(prd_predictor, X_test[:, 1:])

## Test Rest API

Get back the deployment progress and rest API endpoint

In [None]:
def get_stack_status(stack_name):
    response = cfn.describe_stacks(StackName=stack_name)
    if response['Stacks']:
        stack = response['Stacks'][0]
        outputs = None
        if 'Outputs' in stack:
            outputs = dict([(o['OutputKey'], o['OutputValue']) for o in stack['Outputs']])
        return stack['StackStatus'], outputs 

outputs = None
while True:
    try:
        status, outputs = get_stack_status(stack_name)
        response = smclient.describe_endpoint(EndpointName=prd_endpoint_name)
        print("Endpoint status: {}".format(response['EndpointStatus']))
        if outputs:
            break
        elif status.endswith('FAILED'):
            raise(Exception('Stack status: {}'.format(status)))
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

if outputs:
    print('deployment application: {}'.format(outputs['DeploymentApplication']))
    print('rest api: {}'.format(outputs['RestApi']))

Check the deployment application to see if its created and started to shift traffic.

In [None]:
from IPython.core.display import HTML

In [None]:
HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codedeploy/applications/{1}?region={0}">Deployment Application</a>'.format(region, outputs['DeploymentApplication']))

Now let's ping the REST endpoint to see which SageMaker endpoint it is hitting.

Press STOP when deployment complete

In [None]:
%%time

from urllib import request

headers = {"Content-type": "text/csv"}
payload = X_test_ins[X_test_ins.columns[1:]].head(1).to_csv(header=False, index=False).encode('utf-8')
rest_api = outputs['RestApi']

while True:
    try:
        resp = request.urlopen(request.Request(rest_api, data=payload, headers=headers))
        print("Response code: %d: endpoint: %s" % (resp.getcode(), resp.getheader('x-sagemaker-endpoint')))
        status, outputs = get_stack_status(stack_name) 
        if status.endswith('COMPLETE'):
            print('Deployment complete\n')
            break
        elif status.endswith('FAILED'):
            raise(Exception('Stack status: {}'.format(status)))
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

Check the deployment application to see if its created and started to shift traffic.

In [None]:
HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codedeploy/applications/{1}?region={0}">Deployment Application</a>'.format(region, outputs['DeploymentApplication']))

Now let's ping the REST endpoint to see which SageMaker endpoint it is hitting.  Press STOP when deployment complete

In [None]:
%%time

from urllib import request

headers = {"Content-type": "text/csv"}
payload = test_df[test_df.columns[1:]].head(1).to_csv(header=False, index=False).encode('utf-8')
rest_api = outputs['RestApi']

while True:
    try:
        resp = request.urlopen(request.Request(rest_api, data=payload, headers=headers))
        print("Response code: %d: endpoint: %s" % (resp.getcode(), resp.getheader('x-sagemaker-endpoint')))
        status, outputs = get_stack_status(stack_name) 
        if status.endswith('COMPLETE'):
            print('Deployment complete\n')
            break
        elif status.endswith('FAILED'):
            raise(Exception('Stack status: {}'.format(status)))
    except ClientError as e:
        print(e.response["Error"]["Message"])
    time.sleep(10)

## CloudWatch Monitoring

AWS [CloudWatch Synthetics](https://aws.amazon.com/blogs/aws/new-use-cloudwatch-synthetics-to-monitor-sites-api-endpoints-web-workflows-and-more/) provides allow you to setup a canary to test that your API is returning an expected value on a regular interval.  This is a great way to validate that the blue/green deployment is not causing any downtime for our end-users.

### Create Canary

Let's setup a "canary" to continously test the production API, and a dashboard to visualize the results.

In [None]:
from urllib.parse import urlparse
from string import Template
from io import BytesIO
import zipfile

# Format the canary_js with rest_api and payload
rest_url = urlparse(rest_api)

with open('canary.js') as f:
    canary_js = Template(f.read()).substitute(hostname=rest_url.netloc, path=rest_url.path, 
                                              data=payload.decode('utf-8').strip())
# Write the zip file
zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zf:
    zip_path = 'nodejs/node_modules/apiCanaryBlueprint.js' # Set a valid path
    zip_info = zipfile.ZipInfo(zip_path)
    zip_info.external_attr = 0o0755 << 16 # Ensure the file is readable
    zf.writestr(zip_info, canary_js)
zip_buffer.seek(0)

# Create the canary
synth = boto3.client('synthetics')

role = sagemaker.get_execution_role()
s3_canary_uri = 's3://{}/{}'.format(artifact_bucket, model_name)
canary_name = 'mlops-{}'.format(model_name)

response = synth.create_canary(
    Name=canary_name,
    Code={
        'ZipFile': bytearray(zip_buffer.read()),
        'Handler': 'apiCanaryBlueprint.handler'
    },
    ArtifactS3Location=s3_canary_uri,
    ExecutionRoleArn=role,
    Schedule={ 
        'Expression': 'rate(10 minutes)', 
        'DurationInSeconds': 0 },
    RunConfig={
        'TimeoutInSeconds': 60,
        'MemoryInMB': 960
    },
    SuccessRetentionPeriodInDays=31,
    FailureRetentionPeriodInDays=31,
    RuntimeVersion='syn-1.0',
)

print('Creating canary: {}'.format(canary_name))

Create the a CloudWatch alarm when success percent drops below 90% for that canary

In [None]:
cloudwatch = boto3.client('cloudwatch')

canary_alarm_name = '{}-synth-lt-threshold'.format(canary_name)

response = cloudwatch.put_metric_alarm(
    AlarmName=canary_alarm_name,
    ComparisonOperator='LessThanThreshold',
    EvaluationPeriods=1,
    DatapointsToAlarm=1,
    Period=600, # 10 minute interval
    Statistic='Average',
    Threshold=90.0,
    ActionsEnabled=False,
    AlarmDescription='SuccessPercent LessThanThreshold 90%',
    Namespace='CloudWatchSynthetics',
    MetricName='SuccessPercent',
    Dimensions=[
        {
          'Name': 'CanaryName',
          'Value': canary_name
        },
    ],
    Unit='Seconds'
)

print('Creating alarm: {}'.format(canary_alarm_name))

Let's wait for the canary to be read, then start it and wait until running.  The

In [None]:
while True:
    try:
        response = synth.get_canary(Name=canary_name)
        status = response['Canary']['Status']['State']    
        print('Canary status: {}'.format(status))
        if status == 'ERROR':
            raise(Exception(response['Canary']['Status']['StateReason']))    
        elif status == 'READY':
            synth.start_canary(Name=canary_name)
        elif status == 'RUNNING':
            break        
    except ClientError as e:
        if e.response["Error"]["Code"] == "ResourceNotFoundException":
            print('No canary found.')
            break
        print(e.response["Error"]["Message"])
    time.sleep(10)

# Output a html link to the cloudwatch console
HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#synthetics:canary/detail/{1}">CloudWatch Canary</a>'.format(region, canary_name))

### Create Dashboard

Finally let's create a AWS CloudWatch Dashboard to visualize the key performane metrics and alarms.

In [None]:
sts = boto3.client('sts')
account_id = sts.get_caller_identity().get('Account')
dashboard_name = 'mlops-{}'.format(model_name)

with open('dashboard.json') as f:
    dashboard_body = Template(f.read()).substitute(region=region, account_id=account_id, model_name=model_name)
    response = cloudwatch.put_dashboard(
        DashboardName=dashboard_name,
        DashboardBody=dashboard_body
    )

# Output a html link to the cloudwatch dashboard
HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#dashboards:name={1}">CloudWatch Dashboard</a>'.format(region, canary_name))

### Trigger Retraining

Our CodePipeline is configured with a [CloudWatch Events](https://docs.aws.amazon.com/codepipeline/latest/userguide/create-cloudtrail-S3-source.html) to start our pipeline for retraining when the drift detection metric alrams.

We can simulate drift by putting metric `0.5` which is above the threshold of `0.2`.  This will trigger the alarm, and start the code pipeline retraining.

Click through to the Alarm and CodePipeline with the links below.

In [None]:
from datetime import datetime

# Put a new metric to trigger an alaram
response = cloudwatch.put_metric_data(
    Namespace='aws/sagemaker/Endpoints/data-metrics',
    MetricData=[
        {
            'MetricName': 'feature_baseline_drift_total_amount',
            'Dimensions': [
                {
                    'Name': 'MonitoringSchedule',
                    'Value': schedule_name
                },
                {
                    'Name': 'Endpoint',
                    'Value': prd_endpoint_name
                },
            ],
            'Timestamp': datetime.now(),
            'Value': 0.5, # This is over the configured threshold of 0.2
            'Unit': 'None'
        },
    ]
)

# Output a html link to the cloudwatch dashboard
alarm_name = 'mlops-nyctaxi-metric-gt-threshold'
HTML('''<a target="_blank" href="https://{0}.console.aws.amazon.com/cloudwatch/home?region={0}#alarmsV2:alarm/{1}">CloudWatch Alarm</a> starts 
     <a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codepipeline/pipelines/{2}/view?region={0}">Code Pipeline</a>'''.format(region, alarm_name, pipeline_name))


## Clean Up

First delete the stacks used as part of the pipeline for deployment, training job and suggest baseline

The follow code will stop and delete the canary you created

In [None]:
while True:
    try:
        response = synth.get_canary(Name=canary_name)
        status = response['Canary']['Status']['State']    
        print('Canary status: {}'.format(status))
        if status == 'ERROR':
            raise(Exception(response['Canary']['Status']['StateReason']))    
        elif status == 'STOPPED':
            synth.delete_canary(Name=canary_name)
        elif status == 'RUNNING':
            synth.stop_canary(Name=canary_name)
    except ClientError as e:
        if e.response["Error"]["Code"] == "ResourceNotFoundException":
            print('Canary succesfully deleted.')
            break
        print(e.response["Error"]["Message"])
    time.sleep(10)

The following code will delete the dashboard.

In [None]:
cloudwatch.delete_alarms(AlarmNames=[canary_alarm_name])
print('Alarm deleted')

cloudwatch.delete_dashboards(DashboardNames=[dashboard_name])
print('Dashboard deleted')