# MLOps Demo

Julian Bright, Machine Learning Specialist @ Amazon Web Services


### Overview

Following a series of steps to trigger demo

1. [Data Prep](#Data-Prep)
2. [Start Build](#Start-Build)
3. [Waiting for Training Job](#Wait-for-Training-Job)
4. [Test Dev Deployment](#Test-Dev-Deployment)
5. [Test Prod Endpoint](#Test-Prod-Endpoint)
6. [Monitor](#Monitor)

In [None]:
!pip install -q tqdm

## Data Prep

Download a sample of the New York City Taxi [dataset](https://registry.opendata.aws/nyc-tlc-trip-records-pds/)

In [None]:
!aws s3 cp 's3://nyc-tlc/trip data/green_tripdata_2018-02.csv' 'nyc-tlc.csv'

Load the dataset into a pandas data frame (this should take approximately 20 seconds)

In [None]:
%%time
import pandas as pd

parse_dates= ['lpep_dropoff_datetime', 'lpep_pickup_datetime']
trip_df = pd.read_csv('nyc-tlc.csv', parse_dates=parse_dates)

Feature engineering to convert dates and add derived duration in minutes

In [None]:
trip_df['duration_minutes'] = (trip_df['lpep_dropoff_datetime'] - trip_df['lpep_pickup_datetime']).dt.seconds/60

Select a sample of columns for our machine learning model

In [None]:
cols = ['total_amount', 'duration_minutes', 'passenger_count', 'trip_distance']
data_df = trip_df[cols]
print(data_df.shape)
data_df.head()

Exclude any outlines, dropping any null values

In [None]:
data_df = data_df[(data_df.total_amount > 0) & (data_df.total_amount < 200) & 
                  (data_df.duration_minutes > 0) & (data_df.duration_minutes < 120) & 
                  (data_df.trip_distance > 0) & (data_df.trip_distance < 1000) & 
                  (data_df.passenger_count > 0)].dropna()
print(data_df.shape)

### Visualize 

Sample and plot distribution of fields

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

plt.style.use('fivethirtyeight')

In [None]:
sample_df = data_df.sample(1000)
sample_df.hist(bins=100, layout=(2,2))

Plot duration vs trip distance

In [None]:
sample_df.plot.scatter(x='duration_minutes', y='trip_distance')

Plot duration vs total amount and we see a similar patttern

In [None]:
sample_df.plot.scatter(x='duration_minutes', y='total_amount')

We are now ready to split the dataset into train/validation/test sets

In [None]:
from sklearn.model_selection import train_test_split
train_df, val_df = train_test_split(data_df, test_size=0.20, random_state=42)
val_df, test_df = train_test_split(val_df, test_size=0.05, random_state=42)

# Set the index for our test dataframe
test_df.reset_index(inplace=True, drop=True)

print('split train: {}, val: {}, test: {} '.format(train_df.shape[0], val_df.shape[0], test_df.shape[0]))

Save files as CSV including baseline

In [None]:
%%time

train_cols = ['total_amount', 'duration_minutes','passenger_count','trip_distance']
train_df.to_csv('train.csv', index=False, header=False)
val_df.to_csv('validation.csv', index=False, header=False)

# Save test and baseline with headers
test_df.to_csv('test.csv', index=False, header=True)
train_df.to_csv('baseline.csv', index=False, header=True)

Upload files to s3

In [None]:
import sagemaker

# Get the session and default bucket
session = sagemaker.session.Session()
bucket = session.default_bucket()

# Specify data previx version
prefix = 'nyc-tlc/v1'

s3_train_uri = session.upload_data('train.csv', bucket, prefix + '/data/training')
s3_val_uri = session.upload_data('validation.csv', bucket, prefix + '/data/validation')
s3_baseline_uri = session.upload_data('baseline.csv', bucket, prefix + '/data/baseline')

## Start Build

Load variables from environment

In [None]:
import boto3
import os
import time

region = boto3.Session().region_name
artifact_bucket = os.environ['ARTIFACT_BUCKET']
pipeline_name = os.environ['PIPELINE_NAME']
model_name = os.environ['MODEL_NAME']

print('region: {}'.format(region))
print('artifact bucket: {}'.format(artifact_bucket))
print('pipeline: {}'.format(pipeline_name))
print('model name: {}'.format(model_name))

Upload data source meta data to trigger a new build

In [None]:
from io import BytesIO
import zipfile
import json

input_data = {
    'TrainingUri': s3_train_uri,
    'ValidationUri': s3_val_uri,
    'BaselineUri': s3_baseline_uri
}

hyperparameters = {
    'num_round': 50
}

data_source_key = '{}/data-source.zip'.format(pipeline_name)

zip_buffer = BytesIO()
with zipfile.ZipFile(zip_buffer, 'a') as zf:
    zf.writestr('inputData.json', json.dumps(input_data))
    zf.writestr('hyperparameters.json', json.dumps(hyperparameters))
zip_buffer.seek(0)

s3 = boto3.client('s3')
s3.put_object(Bucket=artifact_bucket, Key=data_source_key, Body=bytearray(zip_buffer.read()))

## Wait for Training Job

Follow the code pipeline to wait until the training job is complete

In [None]:
from IPython.core.display import HTML

HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codepipeline/pipelines/{1}/view?region={0}">Code Pipeline</a>'.format(region, pipeline_name))

## Test Dev Deployment

One the endpoint has been deployed and awaiting approval, we can begin some tests

In [None]:
codepipeline = boto3.client('codepipeline')

def get_pipeline_stage(pipeline_name, stage_name):
    response = codepipeline.get_pipeline_state(name=pipeline_name)
    for stage in response['stageStates']:
        if stage['stageName'] == stage_name:
            return stage
        
# Get last execution id
deploy_dev = get_pipeline_stage(pipeline_name, 'DeployDev')
if not 'latestExecution' in deploy_dev:
    raise(Exception('Please wait.  Deploy dev not complete'))
    
execution_id = deploy_dev['latestExecution']['pipelineExecutionId']
dev_endpoint_name = 'mlops-{}-dev-{}'.format(model_name, execution_id)

print('endpoint name: {}'.format(dev_endpoint_name))

Wait until the dev endpoint is in service (this can take up to 10 minutes)

In [None]:
sm = boto3.client('sagemaker')

while True:
    try:
        response = sm.describe_endpoint(EndpointName=dev_endpoint_name)
        print("Endpoint status: {}".format(response['EndpointStatus']))
        if response['EndpointStatus'] == 'InService':
            break
    except:
        pass 
    time.sleep(10)

In [None]:
from sagemaker.predictor import RealTimePredictor, csv_serializer
import numpy as np
from tqdm import tqdm

def get_predictor(endpoint_name):
    xgb_predictor = RealTimePredictor(endpoint_name)
    xgb_predictor.content_type = 'text/csv'
    xgb_predictor.serializer = csv_serializer
    return xgb_predictor

def predict(predictor, data, rows=500):
    split_array = np.array_split(data, round(data.shape[0] / float(rows)))
    predictions = ''
    for array in tqdm(split_array):
        predictions = ','.join([predictions, predictor.predict(array).decode('utf-8')])
    return np.fromstring(predictions[1:], sep=',')

Invoke the dev endpoint with test data

In [None]:
dev_predictor = get_predictor(dev_endpoint_name)
predictions = predict(dev_predictor, test_df[test_df.columns[1:]].values)

Load the response into a dataframe, and join with predictions to calculate absolute error

In [None]:
pred_df = pd.DataFrame({'total_amount_predictions': predictions })
pred_df = test_df.join(pred_df) # Join on all
pred_df['error'] = abs(pred_df['total_amount']-pred_df['total_amount_predictions'])

We can see some of the largest errors are high amounts for low distance

In [None]:
pred_df.sort_values('error', ascending=False).head()

Plot the total amount vs predicterd for the test error inspecting some outliers

In [None]:
ax = pred_df.tail(1000).plot.scatter(x='total_amount_predictions', y='total_amount', 
                                     c='error', title='actual amount vs pred')

Calculate the root means square error for the predicted total

In [None]:
from math import sqrt
from sklearn.metrics import mean_squared_error

def rmse(pred_df):
    return sqrt(mean_squared_error(pred_df['total_amount'], pred_df['total_amount_predictions']))

print('RMSE: {}'.format(rmse(pred_df)))

If we are happy with this metric, we can go ahead and approve this

In [None]:
import ipywidgets as widgets

def on_click(obj):
    result = { 'summary': approval_text.value, 'status': obj.description }
    response = codepipeline.put_approval_result(
      pipelineName=pipeline_name,
      stageName='DeployDev',
      actionName='ApproveDeploy',
      result=result,
      token=approval_action['token']
    )
    button_box.close()
    print(result)

# Create the widget if we are ready for approval
approval_action = get_pipeline_stage(pipeline_name, 'DeployDev')['actionStates'][-1]['latestExecution']
if 'token' in approval_action:
    approval_text = widgets.Text(placeholder='Optional approval message')   
    approve_btn = widgets.Button(description="Approved", button_style='success', icon='check')
    reject_btn = widgets.Button(description="Rejected", button_style='danger', icon='close')
    approve_btn.on_click(on_click)
    reject_btn.on_click(on_click)
    button_box = widgets.HBox([approval_text, approve_btn, reject_btn])
    display(button_box)
else:
    raise(Exception('Please wait.  No dev approval'))

## List Prod Events

List the current state of the production deployment

In [None]:
cfn = boto3.client('cloudformation')

stack_name = stack_name='{}-deploy-prd'.format(pipeline_name)
print('stack name: {}'.format(stack_name))

List the last events and how long ago they occured

In [None]:
from datetime import datetime
from dateutil.tz import tzlocal

def get_event_dataframe(events):
    stack_cols = ['LogicalResourceId', 'ResourceStatus', 'ResourceStatusReason', 'Timestamp']
    stack_event_df = pd.DataFrame(events)[stack_cols].fillna('')
    stack_event_df['TimeAgo'] = (datetime.now(tzlocal())-stack_event_df['Timestamp'])
    return stack_event_df.drop('Timestamp', axis=1)

# Get latest stack events
response = cfn.describe_stack_events(StackName=stack_name)
get_event_dataframe(response['StackEvents']).head()

## Test Prod Endpoint

We can send some traffic to the production endpoint now

In [None]:
prd_endpoint_name='mlops-{}-prd-{}'.format(model_name, execution_id)
print('prod endpoint: {}'.format(prd_endpoint_name))

Wait until the endpoint has finishing updated before we send some traffic

In [None]:
sm = boto3.client('sagemaker')

while True:
    try:
        response = sm.describe_endpoint(EndpointName=prd_endpoint_name)
        print("Endpoint status: {}".format(response['EndpointStatus']))
        # Wait until the endpoint is in service with data capture enabled
        if response['EndpointStatus'] == 'InService' \
            and 'DataCaptureConfig' in response \
            and response['DataCaptureConfig']['EnableCapture']:
            break
    except:
        pass
    time.sleep(10)

Send some inference to production endpoint now that data capture is enabled.  Use single records to that monitoring schedule can map to baseline.

In [None]:
prd_predictor = get_predictor(prd_endpoint_name)
sample_values = test_df[test_df.columns[1:]].sample(100).values
predictions = predict(prd_predictor, sample_values, rows=1)

## Test Rest API

Get back the deployment progress and rest api endpoint

In [None]:
def get_stack_status(stack_name):
    response = cfn.describe_stacks(StackName=stack_name)
    if response['Stacks']:
        stack = response['Stacks'][0]
        return stack['StackStatus'], dict([(o['OutputKey'], o['OutputValue']) for o in stack['Outputs']])

status, outputs = get_stack_status(stack_name)
            
print('stack status: {}'.format(status))
print('deployment application: {}'.format(outputs['DeploymentApplication']))
print('rest api: {}'.format(outputs['RestApi']))                

Check the deployment application to see if its created and started to shift traffic.

In [None]:
HTML('<a target="_blank" href="https://{0}.console.aws.amazon.com/codesuite/codedeploy/applications/{1}?region={0}">Deployment Application</a>'.format(region, outputs['DeploymentApplication']))

Now let's ping the REST endpoint to see which sagemaker endpoint it is hitting.  Press STOP when deployment complete

In [None]:
%%time

from urllib import request

headers = {"Content-type": "text/csv"}
payload = test_df[test_df.columns[1:]].head(1).to_csv(header=False, index=False).encode('utf-8')

while True:
    try:
        resp = request.urlopen(request.Request(outputs['RestApi'], data=payload, headers=headers))
        print("Response code: %d: endpoint: %s" % (resp.getcode(), resp.getheader('x-sagemaker-endpoint')))
        status, outputs = get_stack_status(stack_name) 
        if status.endswith('COMPLETE'):
            print('Deployment complete\n')
            break
    except Exception as e:
        pass
    time.sleep(10)

## Monitor

Get the latest production deployment

In [None]:
# Get last execution id
deploy_prd = get_pipeline_stage(pipeline_name, 'DeployPrd')
if not 'latestExecution' in deploy_prd:
    raise(Exception('Please wait.  Prod prd not complete'))
    
execution_id = deploy_prd['latestExecution']['pipelineExecutionId']

### Baseline

Load baseline processing job

In [None]:
processing_job_name='mlops-{}-pbl-{}'.format(model_name, execution_id)
schedule_name='mlops-{}-pms-{}'.format(model_name, execution_id)

print('processing job name: {}'.format(processing_job_name))
print('schedule name: {}'.format(schedule_name))

In [None]:
import sagemaker
from sagemaker.model_monitor import BaseliningJob, DefaultModelMonitor, MonitoringExecution
from sagemaker.s3 import S3Downloader

sagemaker_session = sagemaker.Session()
baseline_job = BaseliningJob.from_processing_name(sagemaker_session, processing_job_name)
status = baseline_job.describe()['ProcessingJobStatus']
if status != 'Completed':
    raise(Exception('Please wait. Processing job not complete, status: {}'.format(status)))
    
baseline_results_uri  = baseline_job.outputs[0].destination

Explore the generated constraints and statistics

In [None]:
import pandas as pd
import json

baseline_statistics = baseline_job.baseline_statistics().body_dict
schema_df = pd.io.json.json_normalize(baseline_statistics["features"])
schema_df.head()

In [None]:
baseline_constraints = baseline_job.suggested_constraints().body_dict
constraints_df = pd.io.json.json_normalize(baseline_constraints["features"])
constraints_df.head()

### View Data Capture

Get the list of data capture files form the endpoint

In [None]:
bucket = sagemaker_session.default_bucket()
data_capture_logs_uri = 's3://{}/{}/datacapture/{}'.format(bucket, model_name, prd_endpoint_name)

capture_files = S3Downloader.list(data_capture_logs_uri)
print('Found {} files'.format(len(capture_files)))

if capture_files:
    # Get the first line of the most recent file    
    event = json.loads(S3Downloader.read_file(capture_files[-1]).split('\n')[0])
    print('\nLast file:\n{}'.format(json.dumps(event, indent=2)))

### View Monitoring Schedule

The functions for plotting and rendering distribution statistics or constraint violations are implemented in a `utils` file so let's grab that.

In [None]:
!wget -O utils.py --quiet https://raw.githubusercontent.com/awslabs/amazon-sagemaker-examples/master/sagemaker_model_monitor/visualization/utils.py
import utils as mu

Load the last succesful monitoring schedule

In [None]:
# Validate that we are looking for completed/stopped schedules
sm = boto3.client('sagemaker')
response = sm.list_monitoring_executions(MonitoringScheduleName=schedule_name)

status = None
expected_status = ['Completed', 'CompletedWithViolations']
for mon in response['MonitoringExecutionSummaries']:
    processing_job_arn = mon['ProcessingJobArn']
    status = mon['MonitoringExecutionStatus']
    if status in expected_status:
        break

if not status in expected_status:
    raise(Exception('Please wait.  No completed schedules'))
    
print('Schedule status: {}'.format(status))

Load the monitoring execution

In [None]:
execution = MonitoringExecution.from_processing_arn(sagemaker_session=sagemaker.Session(), 
                                                    processing_job_arn=processing_job_arn)
exec_inputs = {inp['InputName']: inp for inp in execution.describe()['ProcessingInputs']}
exec_results_uri = execution.output.destination

print('Monitoring Execution results: {}'.format(exec_results_uri))

List the constraints, statistics and violations if they exist. 

In [None]:
!aws s3 ls $exec_results_uri/

In [None]:
# Get the baseline and monitoring statistics & violations
baseline_statistics = baseline_job.baseline_statistics().body_dict
execution_statistics = execution.statistics().body_dict
violations = execution.constraint_violations().body_dict['violations']

In [None]:
mu.show_violation_df(baseline_statistics=baseline_statistics, 
                     latest_statistics=execution_statistics, 
                     violations=violations)