# Testing how to simulate model quality inferences

## Section 1 - Setup

><div class="alert alert-block alert-info"><b>NOTE: </b>Recommend using an <em>ml.m5.large</em> instance type and, <em>Python 3 (Data Science)</em> kernel to train the <b>CTGAN</b> model.</div

In [1]:
# Install CTGAN
#!pip install CTGAN

In [2]:
from datetime import datetime, timedelta, timezone
import json
import os
import re
import boto3
import io
import warnings

import pandas as pd

from time import sleep
from threading import Thread

from sagemaker import get_execution_role, session, Session, image_uris
from sagemaker.s3 import S3Downloader, S3Uploader
from sagemaker.processing import ProcessingJob
from sagemaker.serializers import CSVSerializer

from sagemaker.model import Model
from sagemaker.model_monitor import DataCaptureConfig

session = Session()

In [3]:
#Get Execution role
role = get_execution_role()
print("RoleArn:", role)

region = session.boto_region_name
print("Region:", region)

RoleArn: arn:aws:iam::500842391574:role/SageMaker
Region: us-east-2


In [4]:
# Setup S3 bucket parmaters for the production logs bucket
bucket = 'proddeploymentstage-prodappl-logss3bucket004b0f70-3nv3l2whchah'
print("Inference Testing Bucket:", bucket)

##S3 prefixes
data_capture_prefix = 'endpoint-data-capture'
s3_capture_upload_path = f's3://{bucket}/{data_capture_prefix}'
ground_truth_upload_path = f's3://{bucket}/ground-truth-data/{datetime.now():%Y-%m-%d-%H-%M-%S}'

##Get the model monitor image
monitor_image_uri = image_uris.retrieve(framework="model-monitor", region=region)

print("Image URI:", monitor_image_uri)
print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {ground_truth_upload_path}")

Inference Testing Bucket: proddeploymentstage-prodappl-logss3bucket004b0f70-3nv3l2whchah
Image URI: 777275614652.dkr.ecr.us-east-2.amazonaws.com/sagemaker-model-monitor-analyzer
Capture path: s3://proddeploymentstage-prodappl-logss3bucket004b0f70-3nv3l2whchah/endpoint-data-capture
Ground truth path: s3://proddeploymentstage-prodappl-logss3bucket004b0f70-3nv3l2whchah/ground-truth-data/2021-04-30-23-16-51


In [5]:
from sagemaker.predictor import Predictor

# Instantiate a `Predictor` for the currently running production endpoint
endpoint_name = 'abalone-prod-endpoint'
predictor = Predictor(endpoint_name=endpoint_name, sagemaker_session=session, serializer=CSVSerializer())

---

## Section 2 - Recreate Baseline Data

We will re-create the Model Quality baseline job (even though it was already created by the CDK Pipeline) to see the output of the SageMaker SDK when calling `create_monitoring_schedule()`, as well as, to leverage the resultant constraints when creating the monitoring schedule.

In [6]:
# Set up the locations for capturing the baseline results
# This should already be in place from the CDK Pipeline,
# with the `baseline.csv` already there
baseline_prefix = 'baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_dataset_uri = f's3://{bucket}/{baseline_data_prefix}'
baseline_results_uri = f's3://{bucket}/{baseline_results_prefix}'
print(f'Baseline data uri: {baseline_dataset_uri}')
print(f'Baseline results uri: {baseline_results_uri}')

Baseline data uri: s3://proddeploymentstage-prodappl-logss3bucket004b0f70-3nv3l2whchah/baselining/data
Baseline results uri: s3://proddeploymentstage-prodappl-logss3bucket004b0f70-3nv3l2whchah/baselining/results


In [7]:
# Generate a new baseline job
from sagemaker.model_monitor import ModelQualityMonitor
from sagemaker.model_monitor import EndpointInput
from sagemaker.model_monitor.dataset_format import DatasetFormat

# Create the model quality monitoring object
model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=session
)

In [8]:
# Name of the model quality baseline job
baseline_job_name = f'abalone-baseline-{datetime.utcnow():%Y-%m-%d-%H%M}'

In [9]:
# Execute the baseline suggestion job. 
# Specify problem type, in this case Regression, and provide other required attributes.
job = model_quality_monitor.suggest_baseline(
    job_name=baseline_job_name,
    baseline_dataset=baseline_dataset_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri = baseline_results_uri,
    problem_type='Regression',
    inference_attribute= "prediction",
    ground_truth_attribute= "label"
)
job.wait(logs=False)


Job Name:  abalone-baseline-2021-04-30-2316
Inputs:  [{'InputName': 'baseline_dataset_input', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://proddeploymentstage-prodappl-logss3bucket004b0f70-3nv3l2whchah/baselining/data', 'LocalPath': '/opt/ml/processing/input/baseline_dataset_input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://proddeploymentstage-prodappl-logss3bucket004b0f70-3nv3l2whchah/baselining/results', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
.........................................................!

### Explore the Generated Metrics

In [10]:
baseline_job = model_quality_monitor.latest_baselining_job
binary_metrics = baseline_job.baseline_statistics().body_dict["regression_metrics"]
pd.json_normalize(binary_metrics).T

Unnamed: 0,0
mae.value,1.395513
mae.standard_deviation,0.051972
mse.value,3.623911
mse.standard_deviation,0.278567
rmse.value,1.903657
rmse.standard_deviation,0.0737
r2.value,0.657472
r2.standard_deviation,0.013495


### Explore the Generated Constraints

In [11]:
pd.DataFrame(baseline_job.suggested_constraints().body_dict["regression_constraints"]).T

Unnamed: 0,threshold,comparison_operator
mae,1.39551,GreaterThanThreshold
mse,3.62391,GreaterThanThreshold
rmse,1.90366,GreaterThanThreshold
r2,0.657472,LessThanThreshold


---

## Section 3 - Create Inferene Data to Test the Model Quality Monitor

Model Quality Monitoring needs two additional inputs - predictions made by the deployed model endpoint and the ground truth data to be provided by the model consuming application. Since you already enabled data capture on the endpoint, prediction data is captured in S3. The ground truth data depends on the what the model is predicting and what the business use case is.

### Gerating Synthetic Abalone data

In order to generate prediction data we will need to create fake "new" data. To accomplish this, we will use the CTGAN package and train it on the "raw" abaloen dataset. We will create $300$ samples of fake data.

In [12]:
import warnings

warnings.filterwarnings('ignore')
s3 = boto3.client('s3')

# 'raw' data column names
names = [
    'sex',
    'length',
    'diameter',
    'height',
    'whole_weight',
    'shucked_weight',
    'viscera_weight',
    'shell_weight',
    'rings'
]

# Location of the 'raw' data
bucket = 'data-us-east-2-500842391574'
key = 'input/raw/abalone.csv'
obj = s3.get_object(Bucket=bucket, Key=key)
raw_data = pd.read_csv(io.BytesIO(obj['Body'].read()), encoding='utf8', names=names)
raw_data.head()

Unnamed: 0,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
0,M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
1,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
2,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
3,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
4,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


In [13]:
from ctgan import CTGANSynthesizer

# Fit the CTGAN model, declaring the 'sex'column as discrete variables
ctgan = CTGANSynthesizer()
ctgan.fit(raw_data, ['sex'])

In [14]:
# Generate 300 samples from the CTGAN model
samples = ctgan.sample(300)

In [15]:
# Compare the raw data
raw_data.describe()

Unnamed: 0,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
count,4177.0,4177.0,4177.0,4177.0,4177.0,4177.0,4177.0,4177.0
mean,0.523992,0.407881,0.139516,0.828742,0.359367,0.180594,0.238831,9.933684
std,0.120093,0.09924,0.041827,0.490389,0.221963,0.109614,0.139203,3.224169
min,0.075,0.055,0.0,0.002,0.001,0.0005,0.0015,1.0
25%,0.45,0.35,0.115,0.4415,0.186,0.0935,0.13,8.0
50%,0.545,0.425,0.14,0.7995,0.336,0.171,0.234,9.0
75%,0.615,0.48,0.165,1.153,0.502,0.253,0.329,11.0
max,0.815,0.65,1.13,2.8255,1.488,0.76,1.005,29.0


In [16]:
# Compare the sample data
samples.describe()

Unnamed: 0,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
count,300.0,300.0,300.0,300.0,300.0,300.0,300.0,300.0
mean,0.556872,0.394018,0.122367,0.747729,0.328284,0.152328,0.215608,8.82
std,0.11746,0.129434,0.044977,0.516733,0.238036,0.097742,0.137489,2.91248
min,0.211295,0.023799,0.006783,-0.100823,-0.056733,-0.023577,-0.035325,2.0
25%,0.487052,0.30522,0.089199,0.337754,0.122987,0.075526,0.097561,8.0
50%,0.573432,0.426495,0.125345,0.713368,0.301237,0.147471,0.214377,9.0
75%,0.647282,0.491942,0.154256,1.118157,0.50148,0.22456,0.314897,10.0
max,0.804465,0.604992,0.225714,2.195917,1.108183,0.451372,0.711524,22.0


In [17]:
# Save the samples as fake abalone data
samples.to_csv('fake-abalone.csv', index=False)

### Pre-Process the Synthetic Abalone Data

Since the synthetic data is based on the "raw" Abalone dataset and, in order to execute inferences against the production endpoint, we will need to transform the data into to the format our model has been trained upon. To do this, we will follow the same proceedure for preprocessing the origional Abalone data.

In [18]:
import os
import requests
import tempfile
import numpy as np
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

feature_columns_dtype = {
    'sex': str,
    'length': np.float64,
    'diameter': np.float64,
    'height': np.float64,
    'whole_weight': np.float64,
    'shucked_weight': np.float64,
    'viscera_weight': np.float64,
    'shell_weight': np.float64
}
label_column_dtype = {'rings': np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


# Preprocess the data
df = pd.read_csv(
    'fake-abalone.csv',
    dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
)
df

Unnamed: 0,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
0,F,0.473346,0.373729,0.052027,0.022790,0.074170,0.153352,0.008113,9.0
1,F,0.659510,0.523410,0.185893,0.592290,0.633474,0.163537,0.424848,11.0
2,M,0.379315,0.328262,0.103683,0.185517,0.217546,0.137134,0.070888,10.0
3,M,0.605946,0.490833,0.206117,0.737121,0.682045,0.265062,0.188534,8.0
4,F,0.562325,0.531530,0.139997,1.216950,0.377734,0.180970,0.250767,12.0
...,...,...,...,...,...,...,...,...,...
295,I,0.510545,0.317034,0.066453,0.219788,0.109977,0.048319,0.098592,8.0
296,F,0.682598,0.570601,0.143785,1.481404,0.449521,0.245297,0.237634,12.0
297,M,0.683741,0.477329,0.159061,1.001532,0.613875,0.250260,0.260278,8.0
298,M,0.685952,0.524627,0.188635,0.374681,0.340483,0.234227,0.115018,9.0


In [19]:
# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
    'sex',
    'length',
    'diameter',
    'height',
    'whole_weight',
    'shucked_weight',
    'viscera_weight',
    'shell_weight',
]
label_column = 'rings'


# Set up the processing pipeline for Numerical feature scaling
numeric_features = list(feature_columns_names)
numeric_features.remove('sex')
numeric_transformer = Pipeline(
    steps=[
        ('imputer', SimpleImputer(strategy='median')),
        ('scaler', StandardScaler())
    ]
)

# Set up the processing pipeline for categorical encoding
categorical_features = ['sex']
categorical_transformer = Pipeline(
    steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))
    ]
)

# Create the transformer
preprocess = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)
    ]
)

# Preprocess the data
y = df.pop('rings')
X_pre = preprocess.fit_transform(df)
y_pre = y.to_numpy().reshape(len(y), 1)

# Create the inference dataset
pd.DataFrame(X_pre).to_csv('inference-data.csv', header=False, index=False)

# Create the ground trutch dataset
pd.DataFrame(y_pre).to_csv('ground-truth.csv', header=False, index=False)

We should now have the following syntehtic data files:
- `fake-abalone.csv`: Synthic "raw" abalone data.
- `inference-data.csv`: Preprocessed data to generate predictions from the Production Endpoint.
- `ground-truth.csv`: The ground truth labels from the synthetic data wich to compare the quality of the model's predictions.

---

## Section 4 - Setup continuous model monitoring to identify model quality drift 

### Generate Inferences

In [24]:
#  with open('inference-data.csv', 'r') as f:
#         i = 0
#         for row in f:
#             payload = row.rstrip('\n')
#             response = session.sagemaker_runtime_client.invoke_endpoint(
#                 EndpointName=endpoint_name,
#                 ContentType='text/csv', 
#                 Body=payload,
#                 InferenceId=str(i), # unique ID per row
#             )["Body"].read()
#             i += 1
#             sleep(1)

def invoke_endpoint(ep_name, file_name):    
    with open(file_name, 'r') as f:
        i = 0
        for row in f:
            payload = row.rstrip('\n')
            response = session.sagemaker_runtime_client.invoke_endpoint(
                EndpointName=endpoint_name,
                ContentType='text/csv', 
                Body=payload,
                InferenceId=str(i), # unique ID per row
            )["Body"].read()
            i += 1
            sleep(1)
            
def invoke_endpoint_forever():
    while True:
        invoke_endpoint(endpoint_name, 'inference-data.csv')
        
thread = Thread(target = invoke_endpoint_forever)
thread.start()

View captured data stored in Amazon S3. You should expect to see different files from different time periods organized based on the hour in which the invocation occurred. The format of the Amazon S3 path is:

`s3://{destination-bucket-prefix}/{endpoint-name}/{variant-name}/yyyy/mm/dd/hh/filename.jsonl`

In [25]:
print("Waiting for captures to show up", end="")
for _ in range(300): #5 Minutes
    capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
    if capture_files:
        capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")
        capture_record = json.loads(capture_file[0])
        if "inferenceId" in capture_record["eventMetadata"]:
            break
    print(".", end="", flush=True)
    sleep(1)
print()
print("Found Capture Files:")
print("\n ".join(capture_files[-3:]))

Waiting for captures to show up.................................................
Found Capture Files:
s3://proddeploymentstage-prodappl-logss3bucket004b0f70-3nv3l2whchah/endpoint-data-capture/abalone-prod-endpoint/AllTraffic/2021/04/30/23/32-56-798-11287c29-2f52-4e7f-828c-acf15fc52012.jsonl


View the contents of a single capture file. Here you should see all the data captured in an Amazon SageMaker specific JSON-line formatted file. Take a quick peek at the first few lines in the captured file.

In [26]:
print("\n".join(capture_file[-3:-1]))

{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"-1.8310921854730318,-1.0181542824934053,-1.5104745959316217,-1.1646623906731637,-1.09564733070144,-0.9706885887724228,-1.3693667692469487,0.0,0.0,1.0","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"5.331427","encoding":"CSV"}},"eventMetadata":{"eventId":"2c98bf5f-4f34-4e76-a269-49eff8b9cd2d","inferenceId":"54","inferenceTime":"2021-04-30T23:33:54Z"},"eventVersion":"0"}
{"captureData":{"endpointInput":{"observedContentType":"text/csv","mode":"INPUT","data":"0.9836200421959147,0.04915048102475453,1.3495741719052026,1.8626551809122025,0.5588826437047746,0.1658164200742212,0.6227776091927976,1.0,0.0,0.0","encoding":"CSV"},"endpointOutput":{"observedContentType":"text/csv; charset=utf-8","mode":"OUTPUT","data":"35.175026","encoding":"CSV"}},"eventMetadata":{"eventId":"1aa0fc43-b037-4963-b87b-eb61f6297cd3","inferenceId":"56","inferenceTime":"20

View the contents of a single line is present below in a formatted JSON file so that you can observe a little better.

In [27]:
print(json.dumps(capture_record, indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "-0.7122862910498597,-0.1570158056522693,-1.5665175360463932,-1.4052713473637781,-1.069329506518401,0.010495437770609581,-1.5116989283952513,1.0,0.0,0.0",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "11.798007",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "737092c2-5938-4868-88e4-00bb12fbb06e",
    "inferenceId": "0",
    "inferenceTime": "2021-04-30T23:32:56Z"
  },
  "eventVersion": "0"
}
