# ML Ops and monitoring in production

<p><span style="font-weight: 400;">Time has come to start using the model in production!</span></p>
<p><span style="font-weight: 400;">The data science team had trained and deployed the model to production and now it&rsquo;s time to start using it. First, they make sure that the model can leverage <strong><span style="color: #ff6600;">AWS&rsquo;s autoscaling capabilities</span></strong> in order to hold large bursts of requests. </span></p>

<p><span style="font-weight: 400;">We can change the endpoint configuration using the UI. Here you can see that the endpoint has only one instance that accepts all the traffic and doesn&rsquo;t support any auto scaling. We will click on the &lsquo;<strong><em><span style="color: #3366ff;">configure auto scaling</span></em></strong>&rsquo; button to set up the auto scaling policy.</span></p>

<img src="imgs/auto_scaling_before.png" alt="Dasboard">

<p>We will set up the <span style="color: #3366ff;"><strong>criteria</strong></span> for auto scaling and its <strong><span style="color: #3366ff;">boundaries</span></strong></p>

<img src="imgs/auto_scaling_configuration.png" alt="Dasboard" width=600px>

<p><span style="font-weight: 400 font-size:30px;">Finally we can see that the <strong>endpoint</strong> will <strong>autoscale</strong>!&nbsp;</span></p>

<img src="imgs/auto_scaling_after.png" alt="Dasboard">

# Monitoring model quality 

<p><span style="font-weight: 400;">The deployed model was trained on the entire history of transactions that the company had, but&hellip; when it comes to fraud it&rsquo;s always a cat and mouse game. The fraudsters will probably understand that now their transactions are being blocked and will try to find other ways to go around the system. <span style="color: #000080;"><strong>How will we be able to monitor these events</strong></span>?</span></p>
<p><span style="font-weight: 400;">This concept is called <span style="color: #3366ff;"><em><strong>Drift</strong></em></span>. The distribution of features or the underlying target function <em><span style="color: #3366ff;">deviates from the original distribution</span></em> upon which the model was trained.&nbsp;</span></p>
<p><span style="font-weight: 400;"><span style="color: #ff6600;"><strong>AWS Sagemaker</strong></span> offers a whole set of tools to know when such drift occurs, and allows monitoring related events using standard AWS tools like <strong><span style="color: #ff6600;">CloudWatch</span></strong>!</span></p>

In [114]:
from sagemaker.model_monitor import DefaultModelMonitor, ModelQualityMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from sagemaker import get_execution_role, session, Session
from sqlalchemy import create_engine
from sagemaker.s3 import S3Downloader, S3Uploader
import json
import pandas as pd
import boto3
import os
import yaml

In [2]:
role = get_execution_role()
session = Session()

## Validating data quality

In [3]:
conf = yaml.load(open('./vars.yaml', 'r'))

rs_username = conf['REDSHIFT_USER']
rs_password = conf['REDSHIFT_PASSWORD']
rs_db = conf['REDSHIFT_DB']
rs_host = conf['REDSHIFT_HOST']
bucket = conf['PROD_BUCKET']
prefix = 'model_monitoring'

In [4]:
engine = create_engine(f'redshift+psycopg2://{rs_username}:{rs_password}@{rs_host}:5439/{rs_db}')

In [5]:
sql_get_dataset = '''
SELECT
  isfraud
, amount
, diff_dest_equal_amount
, diff_origin_equal_amount
, is_cash_in
, is_cash_out
, is_debit
, is_payment
, is_transfer
, namedest_c
, nameorig_c
, newbalancedest
, newbalanceorig
, oldbalancedest
, oldbalanceorg
FROM dataset
WHERE step <= 399
'''

In [6]:
df = pd.read_sql(sql_get_dataset, con=engine)

In [7]:
baseline_prefix = prefix + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_data_uri = 's3://{}/{}'.format(bucket,baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(bucket, baseline_results_prefix)

In [8]:
s3_train_data_path = os.path.join(baseline_data_uri, 'train_with_header.csv')

In [9]:
df.to_csv(s3_train_data_path, header=True, index=False)

In [None]:
my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)


my_default_monitor.suggest_baseline(
    baseline_dataset=s3_train_data_path,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    wait=True
)

In [11]:
s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Files:")
print("\n ".join(report_files))

Found Files:
model_monitoring/baselining/results/constraints.json
 model_monitoring/baselining/results/statistics.json


In [12]:
baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

Unnamed: 0,name,inferred_type,numerical_statistics.common.num_present,numerical_statistics.common.num_missing,numerical_statistics.mean,numerical_statistics.sum,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max,numerical_statistics.distribution.kll.buckets,numerical_statistics.distribution.kll.sketch.parameters.c,numerical_statistics.distribution.kll.sketch.parameters.k,numerical_statistics.distribution.kll.sketch.data
0,isfraud,Integral,5756120,0,0.000775,4461.0,0.027828,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[], [], [], [1.0], [0.0], [0.0, 0.0, 0.0, 0.0..."
1,amount,Fractional,5756120,0,4.711305,27118840.0,0.787519,0.0,7.965886,"[{'lower_bound': 0.0, 'upper_bound': 0.7965885...",0.64,2048.0,"[[], [], [], [6.93218330923897], [6.7864725538..."
2,diff_dest_equal_amount,Integral,5756120,0,0.17012,979233.0,0.375738,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[], [], [], [1.0], [1.0], [1.0, 0.0, 0.0, 0.0..."
3,diff_origin_equal_amount,Integral,5756120,0,0.147035,846353.0,0.354141,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[], [], [], [1.0], [1.0], [1.0, 0.0, 0.0, 0.0..."
4,is_cash_in,Integral,5756120,0,0.219679,1264501.0,0.414029,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[], [], [], [1.0], [1.0], [1.0, 0.0, 0.0, 0.0..."
5,is_cash_out,Integral,5756120,0,0.353781,2036404.0,0.478142,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[], [], [], [1.0], [1.0], [1.0, 0.0, 0.0, 0.0..."
6,is_debit,Integral,5756120,0,0.006416,36934.0,0.079845,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[], [], [], [1.0], [1.0], [1.0, 0.0, 0.0, 0.0..."
7,is_payment,Integral,5756120,0,0.336953,1939542.0,0.472669,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[], [], [], [1.0], [1.0], [1.0, 0.0, 0.0, 0.0..."
8,is_transfer,Integral,5756120,0,0.08317,478739.0,0.27614,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[], [], [], [1.0], [1.0], [1.0, 0.0, 0.0, 0.0..."
9,namedest_c,Integral,5756120,0,0.663047,3816578.0,0.472669,0.0,1.0,"[{'lower_bound': 0.0, 'upper_bound': 0.1, 'cou...",0.64,2048.0,"[[], [], [], [1.0], [1.0], [1.0, 0.0, 0.0, 0.0..."


In [13]:
constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(10)

Unnamed: 0,name,inferred_type,completeness,num_constraints.is_non_negative
0,isfraud,Integral,1.0,True
1,amount,Fractional,1.0,True
2,diff_dest_equal_amount,Integral,1.0,True
3,diff_origin_equal_amount,Integral,1.0,True
4,is_cash_in,Integral,1.0,True
5,is_cash_out,Integral,1.0,True
6,is_debit,Integral,1.0,True
7,is_payment,Integral,1.0,True
8,is_transfer,Integral,1.0,True
9,namedest_c,Integral,1.0,True


## Monitoring the data quality

<p><span style="font-weight: 400;">In order to verify that the distribution of the data doesn&rsquo;t change, we first have to <strong><span style="color: #3366ff;">record the original distribution</span></strong>. We can start doing this by generating statistics and constraints on the original dataset and then compare new <span style="color: #008000;">captured data against the learned stats</span>!</span></p>

In [33]:
from time import gmtime, strftime
import io
from io import StringIO
from sagemaker.model_monitor import CronExpressionGenerator
from urllib.parse import urlparse

import time

In [15]:
predictor_endpoint='fraud-detection-endpoint-2021-01-10-16-42-59'
reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(bucket,reports_prefix)

In [16]:
monitor_schedule_name = 'fraud-detection-scheduled-monitoring-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
ms = my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=monitor_schedule_name,
    endpoint_input=predictor_endpoint,
    output_s3_uri=s3_report_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,

)

In [17]:
# Invoke the endpoint

In [19]:
runtime = boto3.Session().client('sagemaker-runtime')
new_data_path = f"s3://{conf['DATA_BUCKET']}/data/new_data/new_data.csv"
df_new_data = pd.read_csv(new_data_path, header=None)
df_new_data.columns = ['isfraud', 'amount', 'diff_dest_equal_amount', 'diff_origin_equal_amount',
                       'is_cash_in', 'is_cash_out', 'is_debit', 'is_payment',
                       'is_transfer', 'namedest_c', 'nameorig_c', 'newbalancedest', 
                       'newbalanceorig', 'oldbalancedest', 'oldbalanceorg']

for _ in range(10):
    csv_file = io.StringIO()
    df_new_data.drop('isfraud', axis=1).sample(1000).to_csv(csv_file, header=False, index=False)
    payload_data = csv_file.getvalue()
    response = runtime.invoke_endpoint(EndpointName=predictor_endpoint, ContentType='text/csv', Body=payload_data)
    csv_file.close()
    print ('Submitted batch of samples!')

Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!


In [20]:
desc_schedule_result = my_default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

Schedule status: Scheduled


In [22]:
mon_executions = my_default_monitor.list_executions()
while len(mon_executions) == 0:
    print("Waiting for the 1st execution to happen...")
    time.sleep(60*5)
    mon_executions = my_default_monitor.list_executions()

In [50]:
monitor_run = mon_executions[-1]
monitor_run.wait(logs=False)
latest_job = monitor_run.describe()
report_uri=monitor_run.output.destination

!

In [51]:
s3uri = urlparse(report_uri)
report_bucket = s3uri.netloc
report_key = s3uri.path.lstrip('/')
print('Report key: {}'.format(report_key))

s3_client = boto3.Session().client('s3')
result = s3_client.list_objects(Bucket=report_bucket, Prefix=report_key)
report_files = [report_file.get("Key") for report_file in result.get('Contents')]
print("Found Report Files:")
print("\n ".join(report_files))

Report key: model_monitoring/reports/fraud-detection-endpoint-2021-01-10-16-42-59/fraud-detection-scheduled-monitoring-2021-01-10-19-50-09/2021/01/10/20
Found Report Files:
model_monitoring/reports/fraud-detection-endpoint-2021-01-10-16-42-59/fraud-detection-scheduled-monitoring-2021-01-10-19-50-09/2021/01/10/20/constraint_violations.json


In [None]:
violations = my_default_monitor.latest_monitoring_constraint_violations()
pd.set_option('display.max_colwidth', -1)
constraints_df = pd.io.json.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)

In [41]:
my_default_monitor.stop_monitoring_schedule()
my_default_monitor.delete_monitoring_schedule()


Stopping Monitoring Schedule with name: fraud-detection-scheduled-monitoring-2021-01-10-19-50-09


## Validating Model Quality 

In [56]:
from sagemaker.model_monitor import ModelQualityMonitor
from sagemaker.model_monitor import EndpointInput
from sagemaker.model_monitor.dataset_format import DatasetFormat
from datetime import datetime

In [53]:
#Create the model quality monitoring object
fraud_model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=session
)

### Generate a prediction baseline

In [108]:
new_data_path = f"s3://{conf['DATA_BUCKET']}/data/new_data/new_data.csv"
df_new_data = pd.read_csv(new_data_path, header=None)
df_new_data.columns = ['isfraud', 'amount', 'diff_dest_equal_amount', 'diff_origin_equal_amount',
                       'is_cash_in', 'is_cash_out', 'is_debit', 'is_payment',
                       'is_transfer', 'namedest_c', 'nameorig_c', 'newbalancedest', 
                       'newbalanceorig', 'oldbalancedest', 'oldbalanceorg']
responses = []
for i in range(10):
    start_index = i*1000
    end_index = (i+1) *1000
    csv_file = io.StringIO()
    df_new_data.drop('isfraud', axis=1).iloc[start_index:end_index].to_csv(csv_file, header=False, index=False)
    payload_data = csv_file.getvalue()
    response = runtime.invoke_endpoint(EndpointName=predictor_endpoint, ContentType='text/csv', Body=payload_data)
    responses+=response['Body'].read().decode().split(',')
    csv_file.close()
    print ('Submitted batch of samples!')

Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!
Submitted batch of samples!


In [80]:
responses = [float(x) for x in responses]

In [81]:
df_validation = df_new_data.iloc[0:10000].copy()

In [156]:
df_validation.head()

Unnamed: 0,prediction,probability,label
0,0,0.0001339156,0
1,0,3.902226e-06,0
2,0,3.702656e-06,0
3,0,2.901372e-05,0
4,0,4.751983e-07,0


In [87]:
df_validation['probability'] = responses
df_validation['prediction'] = (df_validation['prediction'] > 0.5).astype(int)
df_validation.rename(columns={'isfraud':'label'}, inplace=True)
df_validation = df_validation[['prediction', 'probability', 'label']]

In [91]:
df_validation.to_csv(s3_train_data_path, header=True, index=False)

In [92]:
baseline_job_name = f"fraud-model-baseline-job-{datetime.utcnow():%Y-%m-%d-%H%M}"

In [93]:
job = fraud_model_quality_monitor.suggest_baseline(
    job_name=baseline_job_name,
    baseline_dataset=s3_train_data_path,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri = baseline_results_uri,
    problem_type='BinaryClassification',
    inference_attribute= "prediction",
    probability_attribute= "probability",
    ground_truth_attribute= "label"
)
job.wait(logs=False)


Job Name:  fraud-model-baseline-job-2021-01-10-2126
Inputs:  [{'InputName': 'baseline_dataset_input', 'S3Input': {'S3Uri': 's3://gad-capstone-prod/model_monitoring/baselining/data/train_with_header.csv', 'LocalPath': '/opt/ml/processing/input/baseline_dataset_input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'S3Output': {'S3Uri': 's3://gad-capstone-prod/model_monitoring/baselining/results', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
.............................................................!

In [94]:
baseline_job = fraud_model_quality_monitor.latest_baselining_job

In [95]:
binary_metrics = baseline_job.baseline_statistics().body_dict["binary_classification_metrics"]
pd.json_normalize(binary_metrics).T

Unnamed: 0,0
confusion_matrix.0.0,9994
confusion_matrix.0.1,0
confusion_matrix.1.0,0
confusion_matrix.1.1,6
recall.value,1
recall.standard_deviation,0
precision.value,1
precision.standard_deviation,0
accuracy.value,1
accuracy.standard_deviation,0


In [96]:
pd.DataFrame(baseline_job.suggested_constraints().body_dict["binary_classification_constraints"]).T

Unnamed: 0,threshold,comparison_operator
recall,1,LessThanThreshold
precision,1,LessThanThreshold
accuracy,1,LessThanThreshold
true_positive_rate,1,LessThanThreshold
true_negative_rate,1,LessThanThreshold
false_positive_rate,0,GreaterThanThreshold
false_negative_rate,0,GreaterThanThreshold
auc,1,LessThanThreshold
f0_5,1,LessThanThreshold
f1,1,LessThanThreshold
