# Now let's monitor the training/deploying process

In [None]:
!pip install tqdm

In [None]:
import boto3
import json
import ipywidgets as widgets
import time

from IPython.display import display

## Helper functions

In [None]:
def get_actions():
    actions = []
    executionId = None
    resp = codepipeline.get_pipeline_state( name=pipeline_name )
    for stage in resp['stageStates']:
        stageName = stage['stageName']
        stageStatus = None
        if stage.get('latestExecution') is not None:
            stageStatus = stage['latestExecution']['status']
            if executionId is None:
                executionId = stage['latestExecution']['pipelineExecutionId']
            elif stage['latestExecution']['pipelineExecutionId'] != executionId:
                stageStatus = 'Old'
        for action in stage['actionStates']:
            actionName = action['actionName']
            actionStatus = 'Old'
            if action.get('latestExecution') is not None and stageStatus != 'Old':
                actionStatus = action['latestExecution']['status']
            actions.append( {'stageName': stageName, 
                             'stageStatus': stageStatus, 
                             'actionName': actionName, 
                             'actionStatus': actionStatus})
    return actions

In [None]:
def get_approval_token():
    resp = codepipeline.get_pipeline_state( name=pipeline_name )
    token = None
    # Get the approve train status token
    for stageState in resp['stageStates']:
        if stageState['stageName'] == 'DeployDev':
            for actionState in stageState['actionStates']:
                if actionState['actionName'] == 'ApproveDeploy':
                    if actionState.get('latestExecution') is None:
                        return None
                    latestExecution = actionState['latestExecution']
                    if latestExecution['status'] == 'InProgress':
                        token = latestExecution['token']
    return token

In [None]:
def approval(token, result):
    if token is None:
        return
    
    codepipeline.put_approval_result(
      pipelineName=pipeline_name,
      stageName='DeployDev',
      actionName='ApproveDeploy',
      result=result,
      token=token
    )

In [None]:
def approve(b):
    result={
        'summary': 'This is a great model! Put into production.',
        'status': 'Approved'
    }
    approval(get_approval_token(), result) 
    button_box.close()
    start_monitoring()

In [None]:
def reject(b):
    result={
        'summary': 'This is a rubbish model. Discard it',
        'status': 'Rejected'
    }
    approval(get_approval_token(), result)
    button_box.close()
    start_monitoring()

In [None]:
def start_monitoring():
    global button_box
    
    running = True
    while running:
        steps_ok = 0
        for k,action in enumerate(get_actions()):
            if action['actionStatus'] == 'Failed':
                bar.bar_style='danger'
                label.value='Ops! Something went wrong Stage[{}] Action[{}]'.format(
                    action['stageName'], action['actionName'])
                running = False
                return

            elif action['actionStatus'] == 'InProgress':
                if get_approval_token() is not None:
                    display(button_box)
                    running = False
                break
            elif action['actionStatus'] == 'Old':
                break
            elif action['actionStatus'] == 'Succeeded':
                steps_ok += 1
        
        label.value = "Actions {}/{} - Current: Stage[{}] Action[{}]".format( 
                k+1,max_actions, action['stageName'], action['actionName'] )
        bar.value = steps_ok

        if steps_ok == max_actions:
            running = False
        else:    
            time.sleep(2)

## Job monitoring

In [None]:
import os

codepipeline = boto3.client('codepipeline')
pipeline_name = os.environ['PIPELINE_NAME']
model_name = os.environ['MODEL_NAME']

print('pipeline: {}'.format(pipeline_name))
print('model name: {}'.format(model_name))

In [None]:
approve_btn = widgets.Button(description="Approve", button_style='success', icon='check')
reject_btn = widgets.Button(description="Reject", button_style='danger', icon='close')
approve_btn.on_click(approve)
reject_btn.on_click(reject)
button_box = widgets.HBox([approve_btn, reject_btn])
                
max_actions = len(get_actions())
label = widgets.Label(value="Loading...")
bar = widgets.IntProgress( value=0, min=0, max=max_actions, step=1, bar_style='info' )
info_box = widgets.VBox([label, bar])

display(info_box)
start_monitoring()

## Now, if everything went fine, we can test our models

In [None]:
# Get the current execution id, and production endpoints
response = codepipeline.get_pipeline_state( name=pipeline_name )
executionId = response['stageStates'][-1]['latestExecution']['pipelineExecutionId']

endpoint_name='mlops-{}-prd-{}'.format(model_name, executionId)
processing_job_name='mlops-{}-pbl-{}'.format(model_name, executionId)
schedule_name='mlops-{}-pms-{}'.format(model_name, executionId)

print('execution id: {}'.format(executionId))

In [None]:
#executionId = '46d4a3c0-517c-4e77-a2b7-3f84cb6e4738'
endpoint_name='mlops-{}-prd-{}'.format(model_name, executionId)
processing_job_name='mlops-{}-pbl-{}'.format(model_name, executionId)
schedule_name='mlops-{}-pms-{}'.format(model_name, executionId)

Call the endpoint with some expected, and unexpected data

In [None]:
sm_runtime = boto3.client('sagemaker-runtime')

def test_endpoint(endpoint_name, payload, content_type='text/csv', custom_attributes=''):
    resp = sm_runtime.invoke_endpoint(
        EndpointName=endpoint_name,
        Body=payload,
        ContentType=content_type,
        CustomAttributes=custom_attributes
    )
    return resp['Body'].read()

In [None]:
# Validate that we can send some traffic to the end point
test_endpoint(endpoint_name, 'text\nthis is a test'.encode('utf-8'))

In [None]:
# Load sample data
monitor_sample = [
# Define some typical data
    'cool asian food pty melbourne au', # eathing out
    'woolworths 3188 brunswick au', # groceries
    'airbnb * blaa surry hills au', # travel
    'lido cinemas hawthorn au', # entertainment
    'northcote indoor spo thornbury au', # health
# Define some data which is out of bounds of usual sample
    'one',
    '1',
    'fdkslfjkdlsjfkdsfkldjsklfmdskfjkdlsjfkldsjfkldsjfkldsjklfjsdkjfklds',
    'this is a very long sentance that should skew the character and word count',
]

# Send off a series of invidual requests for each sample
from tqdm import tqdm
for i in tqdm(range(1000)):
    for sample in monitor_sample:
        payload = 'text\n{}'.format(sample).encode('utf-8')
        test_endpoint(endpoint_name, payload).decode('utf-8')

## Load baseline

Load baseline processing job

In [None]:
import boto3
import sagemaker
from sagemaker.model_monitor import BaseliningJob
from sagemaker.model_monitor import MonitoringExecution
from sagemaker.s3 import S3Downloader
import pandas as pd

s3 = boto3.client('s3')
sm = boto3.client('sagemaker')

sagemaker_session = sagemaker.Session()

In [None]:
baseline_job = BaseliningJob.from_processing_name(sagemaker_session, processing_job_name)
status = baseline_job.describe()['ProcessingJobStatus']
if status != 'Stopped':
    raise(Exception('Processing job not complete, status: {}'.format(status)))
    
baseline_results_uri  = baseline_job.outputs[0].destination
print('baseline results uri: {}'.format(baseline_results_uri))

### Explore the generated constraints and statistics

In [None]:
baseline_statistics = baseline_job.baseline_statistics()
schema_df = pd.io.json.json_normalize(baseline_statistics.body_dict["features"])
schema_df

In [None]:
constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df

### View Data Capture

In [None]:
bucket = sagemaker_session.default_bucket()
data_capture_prefix = '{}/datacapture/{}/AllTraffic/'.format(model_name, endpoint_name)
print('data capture prefix: {}'.format(data_capture_prefix))

In [None]:
# Get capture files for this new endpoint
result = s3.list_objects(Bucket=bucket, Prefix=data_capture_prefix)
if not 'Contents' in result:
    raise(Exception('No results vailable yet for location: {}'.format(results_prefix)))
else:
    capture_files = ['s3://{0}/{1}'.format(bucket, capture_file.get("Key")) 
                     for capture_file in result.get('Contents')][::-1]
    print("Captured Files: {}, top 3:".format(len(capture_files)))
    print("\n ".join(capture_files[:3]))

In [None]:
!mkdir -p output/datacapture
!aws s3 cp {capture_files[1]} output/datacapture/captured_data_example.jsonl

In [None]:
import json

with open('output/datacapture/captured_data_example.jsonl', 'r') as f:
    lines = f.read().split('\n')
    event = json.loads(lines[0])
    print(event)

### View Monitoring Schedule

The functions for plotting and rendering distribution statistics or constraint violations are implemented in a `utils` file so let's grab that.

In [None]:
!wget https://raw.githubusercontent.com/awslabs/amazon-sagemaker-examples/master/sagemaker_model_monitor/visualization/utils.py
import utils as mu

Load the last succesful monitoring schedule

In [None]:
response = sm.list_monitoring_executions(MonitoringScheduleName=schedule_name)
schedules = [m for m in response['MonitoringExecutionSummaries'] if m['MonitoringExecutionStatus'] == 'Stopped']
if len(schedules) == 0:
    raise(Exception('No completed schedules'))
    
schedule = schedules[0]
processing_job_arn = schedule['ProcessingJobArn']
print('Schedule status: {}\n{}'.format(schedule['MonitoringExecutionStatus'], processing_job_arn))

List results from the underlying processing job

In [None]:
exec_results = schedule_processing_job.outputs[0].destination
print(exec_results)

!aws s3 ls $exec_results

Inspect the current status of processing job

In [None]:
from sagemaker.processing import ProcessingJob
schedule_processing_job = ProcessingJob.from_processing_arn(sagemaker_session, processing_job_arn)
schedule_processing_job.describe()

### Run Schedule immediately

In [None]:
import os, sys
from urllib.parse import urlparse
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput

def get_model_monitor_container_uri(region):
    container_uri_format = '{0}.dkr.ecr.{1}.amazonaws.com/sagemaker-model-monitor-analyzer'
    
    regions_to_accounts = {
        'eu-north-1': '895015795356',
        'me-south-1': '607024016150',
        'ap-south-1': '126357580389',
        'us-east-2': '680080141114',
        'us-east-2': '777275614652',
        'eu-west-1': '468650794304',
        'eu-central-1': '048819808253',
        'sa-east-1': '539772159869',
        'ap-east-1': '001633400207',
        'us-east-1': '156813124566',
        'ap-northeast-2': '709848358524',
        'eu-west-2': '749857270468',
        'ap-northeast-1': '574779866223',
        'us-west-2': '159807026194',
        'us-west-1': '890145073186',
        'ap-southeast-1': '245545462676',
        'ap-southeast-2': '563025443158',
        'ca-central-1': '536280801234'
    }
    
    container_uri = container_uri_format.format(regions_to_accounts[region], region)
    return container_uri

def get_file_name(url):
    a = urlparse(url)
    return os.path.basename(a.path)

def run_model_monitor_job_processor(region, instance_type, role, data_capture_path, statistics_path, constraints_path, reports_path,
                                    instance_count=1, preprocessor_path=None, postprocessor_path=None, publish_cloudwatch_metrics='Disabled'):
    
    data_capture_sub_path = data_capture_path[data_capture_path.rfind('datacapture/') :]
    data_capture_sub_path = data_capture_sub_path[data_capture_sub_path.find('/') + 1 :]
    processing_output_paths = reports_path + '/' + data_capture_sub_path
    
    input_1 = ProcessingInput(input_name='input_1',
                          source=data_capture_path,
                          destination='/opt/ml/processing/input/endpoint/' + data_capture_sub_path,
                          s3_data_type='S3Prefix',
                          s3_input_mode='File')

    baseline = ProcessingInput(input_name='baseline',
                               source=statistics_path,
                               destination='/opt/ml/processing/baseline/stats',
                               s3_data_type='S3Prefix',
                               s3_input_mode='File')

    constraints = ProcessingInput(input_name='constraints',
                                  source=constraints_path,
                                  destination='/opt/ml/processing/baseline/constraints',
                                  s3_data_type='S3Prefix',
                                  s3_input_mode='File')

    outputs = ProcessingOutput(output_name='result',
                               source='/opt/ml/processing/output',
                               destination=processing_output_paths,
                               s3_upload_mode='Continuous')

    env = {'baseline_constraints': '/opt/ml/processing/baseline/constraints/' + get_file_name(constraints_path),
           'baseline_statistics': '/opt/ml/processing/baseline/stats/' + get_file_name(statistics_path),
           'dataset_format': '{"sagemakerCaptureJson":{"captureIndexNames":["endpointInput","endpointOutput"]}}',
           'dataset_source': '/opt/ml/processing/input/endpoint',
           'output_path': '/opt/ml/processing/output',
           'publish_cloudwatch_metrics': publish_cloudwatch_metrics }
    
    inputs=[input_1, baseline, constraints]
    
    if postprocessor_path:
        env['post_analytics_processor_script'] = '/opt/ml/processing/code/postprocessing/' + get_file_name(postprocessor_path)
        
        post_processor_script = ProcessingInput(input_name='post_processor_script',
                                                source=postprocessor_path,
                                                destination='/opt/ml/processing/code/postprocessing',
                                                s3_data_type='S3Prefix',
                                                s3_input_mode='File')
        inputs.append(post_processor_script)

    if preprocessor_path:
        env['record_preprocessor_script'] = '/opt/ml/processing/code/preprocessing/' + get_file_name(preprocessor_path)
         
        pre_processor_script = ProcessingInput(input_name='pre_processor_script',
                                               source=preprocessor_path,
                                               destination='/opt/ml/processing/code/preprocessing',
                                               s3_data_type='S3Prefix',
                                               s3_input_mode='File')
        
        inputs.append(pre_processor_script) 
    
    processor = Processor(image_uri = get_model_monitor_container_uri(region),
                          instance_count = instance_count,
                          instance_type = instance_type,
                          role=role,
                          env = env)
    
    processor.run(inputs=inputs, outputs=[outputs])
    return processor

In [None]:
# Pick the last statistics/contstraints from capture files
s3_data_capture_path = capture_files[len(capture_files) - 1][: capture_files[len(capture_files) - 1].rfind('/')]
s3_statistics_path = baseline_results_uri + '/statistics.json'
s3_constraints_path = baseline_results_uri + '/constraints.json'

print(s3_data_capture_path)
print(s3_statistics_path)
print(s3_constraints_path)

s3_reports_path = 's3://{0}/{1}/monitoring/reports'.format(bucket, model_name)
print(s3_reports_path)

In [None]:
from sagemaker import get_execution_role

role = get_execution_role()
region = boto3.Session().region_name

processor = run_model_monitor_job_processor(region, 'ml.m5.xlarge', role, 
                                s3_data_capture_path, s3_statistics_path, s3_constraints_path, s3_reports_path)

In [None]:
# Get the processing job arn from logs, or output
processing_job_arn = processor.describe()['ProcessingJobArn']
processing_job_arn

## Monitoring results

The code below shows the violations and constraichecks across all features in a simple table.

In [None]:
execution = MonitoringExecution.from_processing_arn(sagemaker_session=sagemaker.Session(), processing_job_arn=processing_job_arn)
exec_inputs = {inp['InputName']: inp for inp in execution.describe()['ProcessingInputs']}
exec_results = execution.output.destination

In [None]:
!aws s3 ls $exec_results/

In [None]:
# Load the baseline results
baseline_statistics_filepath = exec_inputs['baseline']['S3Input']['S3Uri'] if 'baseline' in exec_inputs else None
execution_statistics_filepath = os.path.join(exec_results, 'statistics.json')
violations_filepath = os.path.join(exec_results, 'constraint_violations.json')

baseline_statistics = json.loads(S3Downloader.read_file(baseline_statistics_filepath)) if baseline_statistics_filepath is not None else None
execution_statistics = json.loads(S3Downloader.read_file(execution_statistics_filepath))
violations = json.loads(S3Downloader.read_file(violations_filepath))['violations']

In [None]:
mu.show_violation_df(baseline_statistics=baseline_statistics, latest_statistics=execution_statistics, violations=violations)

## Distributions

This section visualizes the distribution and renders the distribution statistics for all features

In [None]:
features = mu.get_features(execution_statistics)
feature_baselines = mu.get_features(baseline_statistics)

In [None]:
mu.show_distributions(features)

### Execution Stats vs Baseline

In [None]:
mu.show_distributions(features, feature_baselines)