# Deploying and Monitoring

In this notebook we will deploy the network traffic classification model that we have trained in the previous steps to Amazon SageMaker hosting, which will expose a fully-managed real-time endpoint to execute inferences.

Amazon SageMaker is adding new capabilities that monitor ML models while in production and detect deviations in data quality in comparison to a baseline dataset (e.g. training data set). They enable you to capture the metadata and the input and output for invocations of the models that you deploy with Amazon SageMaker. They also enable you to analyze the data and monitor its quality. 

We will deploy the model to a real-time endpoint with data capture enabled and start collecting some inference inputs/outputs. Then, we will create a baseline and finally enable model monitoring to compare inference data with respect to the baseline and analyze the quality.

First, we set some variables, including the AWS region we are working in, the IAM execution role of the notebook instance and the Amazon S3 bucket where we will store data and outputs.

In [None]:
import os
import boto3
import sagemaker

region = boto3.Session().region_name
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
bucket_name = sagemaker_session.default_bucket()
prefix = 'aim362'

print(region)
print(role)
print(bucket_name)

## Deployment with Data Capture

We are going to deploy the latest network traffic classification model that we have trained. To deploy a model using the SM Python SDK, we need to make sure we have the Amazon S3 URI where the model artifacts are stored and the URI of the Docker container that will be used for hosting this model.

First, let's determine the Amazon S3 URI of the model artifacts by using a couple of utility functions which query Amazon SageMaker service to get the latest training job whose name starts with 'nw-traffic-classification-xgb' and then describing the training job.

In [None]:
import boto3

def get_latest_training_job_name(base_job_name):
    client = boto3.client('sagemaker')
    response = client.list_training_jobs(NameContains=base_job_name, SortBy='CreationTime', 
                                         SortOrder='Descending', StatusEquals='Completed')
    if len(response['TrainingJobSummaries']) > 0 :
        return response['TrainingJobSummaries'][0]['TrainingJobName']
    else:
        raise Exception('Training job not found.')

def get_training_job_s3_model_artifacts(job_name):
    client = boto3.client('sagemaker')
    response = client.describe_training_job(TrainingJobName=job_name)
    s3_model_artifacts = response['ModelArtifacts']['S3ModelArtifacts']
    return s3_model_artifacts

latest_training_job_name = get_latest_training_job_name('nw-traffic-classification-xgb')
print(latest_training_job_name)
model_path = get_training_job_s3_model_artifacts(latest_training_job_name)
print(model_path)

For this model, we are going to use the same XGBoost Docker container we used for training, which also offers inference capabilities. As a consequence, we can just create the XGBoostModel object of the Amazon SageMaker Python SDK and then invoke its .deploy() method to execute deployment.

We will also provide an entrypoint script to be invoked at deployment/inference time. The purpose of this code is deserializing and loading the XGB model. In addition, we are re-defining the output functions as we want to extract the class value from the default array output. For example, for class 3 the XGB container would output [3.] but we want to extract only the 3 value.

In [None]:
!pygmentize source_dir/deploy_xgboost.py

Now we are ready to create the XGBoostModel object.

In [None]:
from time import gmtime, strftime
from sagemaker.xgboost import XGBoostModel

model_name = 'nw-traffic-classification-xgb-model-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)
xgboost_model = XGBoostModel(model_data=model_path,
                             entry_point='deploy_xgboost.py',
                             source_dir='source_dir/',
                             name=model_name,
                             code_location=code_location,
                             framework_version='0.90-2',
                             role=role, 
                             sagemaker_session=sagemaker_session)

Finally we create an endpoint with data capture enabled, for monitoring the model data quality.
Data capture is enabled at enpoint configuration level for the Amazon SageMaker real-time endpoint. You can choose to capture the request payload, the response payload or both and captured data is stored in JSON format.

In [None]:
from time import gmtime, strftime
from sagemaker.model_monitor import DataCaptureConfig

s3_capture_upload_path = 's3://{}/{}/monitoring/datacapture'.format(bucket_name, prefix)
print(s3_capture_upload_path)

endpoint_name = 'nw-traffic-classification-xgb-ep-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print(endpoint_name)

pred = xgboost_model.deploy(initial_instance_count=1,
                            instance_type='ml.m5.xlarge',
                            endpoint_name=endpoint_name,
                            data_capture_config=DataCaptureConfig(
                                enable_capture=True,
                                sampling_percentage=100,
                                destination_s3_uri=s3_capture_upload_path))

After the deployment has been completed, we can leverage on the RealTimePredictor object to execute HTTPs requests against the deployed endpoint and get inference results.

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer

pred = Predictor(endpoint_name)
pred.serializer = CSVSerializer()

# Expecting class 4
test_values = "80,1056736,3,4,20,964,20,0,6.666666667,11.54700538,964,0,241.0,482.0,931.1691850999999,6.6241710320000005,176122.6667,\
431204.4454,1056315,2,394,197.0,275.77164469999997,392,2,1056733,352244.3333,609743.1115,1056315,24,0,0,0,0,72,92,\
2.8389304419999997,3.78524059,0,964,123.0,339.8873763,115523.4286,0,0,1,1,0,0,0,1,1.0,140.5714286,6.666666667,\
241.0,0.0,0.0,0.0,0.0,0.0,0.0,3,20,4,964,8192,211,1,20,0.0,0.0,0,0,0.0,0.0,0,0,20,2,2018,1,0,1,0"

result = pred.predict(test_values)
print(result)

# Expecting class 7
test_values = "80,10151,2,0,0,0,0,0,0.0,0.0,0,0,0.0,0.0,0.0,197.0249237,10151.0,0.0,10151,10151,10151,10151.0,0.0,10151,10151,0,0.0,\
0.0,0,0,0,0,0,0,40,0,197.0249237,0.0,0,0,0.0,0.0,0.0,0,0,0,0,1,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2,0,0,0,32738,\
-1,0,20,0.0,0.0,0,0,0.0,0.0,0,0,21,2,2018,2,0,1,0"

result = pred.predict(test_values)
print(result)

# Expecting class 0
test_values = "80,54322832,2,0,0,0,0,0,0.0,0.0,0,0,0.0,0.0,0.0,0.0368169318,54322832.0,0.0,54322832,54322832,54322832,54322832.0,0.0,\
54322832,54322832,0,0.0,0.0,0,0,0,0,0,0,40,0,0.0368169318,0.0,0,0,0.0,0.0,0.0,0,0,0,0,1,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,\
0.0,0.0,0.0,0.0,2,0,0,0,279,-1,0,20,0.0,0.0,0,0,0.0,0.0,0,0,23,2,2018,4,0,1,0"

result = pred.predict(test_values)
print(result)

Now let's list the data capture files stored in S3. You should expect to see different files from different time periods organized based on the hour in which the invocation occurred.

**Note that the delivery of capture data to Amazon S3 can require a couple of minutes so next cell might error. If this happens, please retry after a minute.**

In [None]:
s3_client = boto3.Session().client('s3')
current_endpoint_capture_prefix = '{}/monitoring/datacapture/{}'.format(prefix, endpoint_name)

result = s3_client.list_objects(Bucket=bucket_name, Prefix=current_endpoint_capture_prefix)
capture_files = ['s3://{0}/{1}'.format(bucket_name, capture_file.get("Key")) for capture_file in result.get('Contents')]

print("Capture Files: ")
print("\n ".join(capture_files))

We can also read the contents of one of these files and see how capture records are organized in JSON lines format.

In [None]:
!aws s3 cp {capture_files[0]} datacapture/captured_data_example.jsonl
!head datacapture/captured_data_example.jsonl

In addition, we can better understand the content of each JSON line like follows:

In [None]:
import json
with open ("datacapture/captured_data_example.jsonl", "r") as myfile:
    data=myfile.read()

print(json.dumps(json.loads(data.split('\n')[0]), indent=2))

For each inference request, we get input data, output data and some metadata like the inference time captured and saved.

## Baselining

From our validation dataset let's ask Amazon SageMaker to suggest a set of baseline constraints and generate descriptive statistics for our features. Note that we are using the validation dataset for this workshop to make sure baselining time is short, and that file extension needs to be changed since the baselining jobs require .CSV file extension as default.
In reality, you might be willing to use a larger dataset as baseline.

In [None]:
import boto3

s3 = boto3.resource('s3')

bucket_key_prefix = "aim362/data/val/"
bucket = s3.Bucket(bucket_name)

for s3_object in bucket.objects.filter(Prefix=bucket_key_prefix):
    target_key = s3_object.key.replace('data/val/', 'monitoring/baselining/data/').replace('.part', '.csv')
    print('Copying {0} to {1} ...'.format(s3_object.key, target_key))
    
    copy_source = {
        'Bucket': bucket_name,
        'Key': s3_object.key
    }
    s3.Bucket(bucket_name).copy(copy_source, target_key)

In [None]:
baseline_data_path = 's3://{0}/{1}/monitoring/baselining/data'.format(bucket_name, prefix)
baseline_results_path = 's3://{0}/{1}/monitoring/baselining/results'.format(bucket_name, prefix)

print(baseline_data_path)
print(baseline_results_path)

Please note that running the baselining job will require 8-10 minutes. In the meantime, you can take a look at the Deequ library, used to execute these analyses with the default Model Monitor container: https://github.com/awslabs/deequ

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.c5.4xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

In [None]:
my_default_monitor.suggest_baseline(
    baseline_dataset=baseline_data_path,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_path,
    wait=True
)

Let's display the statistics that were generated by the baselining job.

In [None]:
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.io.json.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(10)

Then, we can also visualize the constraints.

In [None]:
constraints_df = pd.io.json.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(10)

#### Results

The baselining job has inspected the validation dataset and generated constraints and statistics, that will be used to monitor our endpoint.

## Generating violations artificially

In order to get some result relevant to monitoring analysis, we are going to generate artificially some inferences with feature values causing specific violations, and then invoke the endpoint with this data.

This requires about 2 minutes for 1000 inferences.

In [None]:
import time
import numpy as np
dist_values = np.random.normal(1, 0.2, 1000)

# Tot Fwd Pkts -> set to float (expected integer) [second feature]
# Flow Duration -> set to empty (missing value) [third feature]
# Fwd Pkt Len Mean -> sampled from random normal distribution [nineth feature]

artificial_values = "22,,40.3,0,0,0,0,0,{0},0.0,0,0,0.0,0.0,0.0,0.0368169318,54322832.0,0.0,54322832,54322832,54322832,54322832.0,0.0,\
54322832,54322832,0,0.0,0.0,0,0,0,0,0,0,40,0,0.0368169318,0.0,0,0,0.0,0.0,0.0,0,0,0,0,1,0,0,0,0.0,0.0,0.0,0.0,0.0,0.0,\
0.0,0.0,0.0,0.0,2,0,0,0,279,-1,0,20,0.0,0.0,0,0,0.0,0.0,0,0,23,2,2018,4,0,1,0"

for i in range(1000):
    pred.predict(artificial_values.format(str(dist_values[i])))
    time.sleep(0.15)
    if i > 0 and i % 100 == 0 :
        print('Executed {0} inferences.'.format(i))

## Monitoring

Once we have built the baseline for our data, we can enable endpoint monitoring by creating a monitoring schedule.
When the schedule fires, a monitoring job will be kicked-off and will inspect the data captured at the endpoint with respect to the baseline; then it will generate some report files that can be used to analyze monitoring results.

### Create Monitoring Schedule

Let's create the monitoring schedule for the previously created endpoint. When we create the schedule, we can also specify two scripts that will preprocess the records before the analysis takes place and execute post-processing at the end.
For this example, we are not going to use a record preprocessor, and we are just specifying a post-processor that outputs some text for demo purposes.

In [None]:
!pygmentize postprocessor.py

We copy the script to Amazon S3 and specify the path where the monitoring reports will be saved to.

In [None]:
import boto3

monitoring_code_prefix = '{0}/monitoring/code'.format(prefix)
print(monitoring_code_prefix)

boto3.Session().resource('s3').Bucket(bucket_name).Object(monitoring_code_prefix + '/postprocessor.py').upload_file('postprocessor.py')
postprocessor_path = 's3://{0}/{1}/monitoring/code/postprocessor.py'.format(bucket_name, prefix)
print(postprocessor_path)

reports_path = 's3://{0}/{1}/monitoring/reports'.format(bucket_name, prefix)
print(reports_path)

Finally, we create the monitoring schedule with hourly schedule execution.

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

endpoint_name = pred.endpoint

mon_schedule_name = 'nw-traffic-classification-xgb-mon-sch-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
my_default_monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=endpoint_name,
    post_analytics_processor_script=postprocessor_path,
    output_s3_uri=reports_path,
    statistics=my_default_monitor.baseline_statistics(),
    constraints=my_default_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True
)

### Describe Monitoring Schedule

In [None]:
desc_schedule_result = my_default_monitor.describe_schedule()
desc_schedule_result

### Delete Monitoring Schedule

Once the schedule is created, it will kick of jobs at specified intervals. Note that if you are kicking this off after creating the hourly schedule, you might find the executions empty. 
You might have to wait till you cross the hour boundary (in UTC) to see executions kick off. Since we don't want to wait for the hour in this example we can delete the schedule and use the code in next steps to simulate what will happen when a schedule is triggered, by running an Amazon SageMaker Processing Job.

In [None]:
# Note: this is just for the purpose of running this example.
my_default_monitor.delete_monitoring_schedule()

### Triggering execution manually

In oder to trigger the execution manually, we first get all paths to data capture, baseline statistics, baseline constraints, etc.
Then, we use a utility fuction, defined in <a href="./monitoringjob_utils.py">monitoringjob_utils.py</a>, to run the processing job.

In [None]:
result = s3_client.list_objects(Bucket=bucket_name, Prefix=current_endpoint_capture_prefix)
capture_files = ['s3://{0}/{1}'.format(bucket_name, capture_file.get("Key")) for capture_file in result.get('Contents')]

print("Capture Files: ")
print("\n ".join(capture_files))

data_capture_path = capture_files[len(capture_files) - 1][: capture_files[len(capture_files) - 1].rfind('/')]
statistics_path = baseline_results_path + '/statistics.json'
constraints_path = baseline_results_path + '/constraints.json'

print(data_capture_path)
print(postprocessor_path)
print(statistics_path)
print(constraints_path)
print(reports_path)

In [None]:
from monitoringjob_utils import run_model_monitor_job_processor

run_model_monitor_job_processor(region, 'ml.m5.xlarge', role, data_capture_path, statistics_path, constraints_path, reports_path,
                                postprocessor_path=postprocessor_path)

### Analysis

When the monitoring job completes, monitoring reports are saved to Amazon S3. Let's list the generated reports.

In [None]:
s3_client = boto3.Session().client('s3')
monitoring_reports_prefix = '{}/monitoring/reports/{}'.format(prefix, pred.endpoint)

result = s3_client.list_objects(Bucket=bucket_name, Prefix=monitoring_reports_prefix)
try:
    monitoring_reports = ['s3://{0}/{1}'.format(bucket_name, capture_file.get("Key")) for capture_file in result.get('Contents')]
    print("Monitoring Reports Files: ")
    print("\n ".join(monitoring_reports))
except:
    print('No monitoring reports found.')

We then copy monitoring reports locally.

In [None]:
!aws s3 cp {monitoring_reports[0]} monitoring/
!aws s3 cp {monitoring_reports[1]} monitoring/
!aws s3 cp {monitoring_reports[2]} monitoring/

Let's display the violations identified by the monitoring execution.

In [None]:
import pandas as pd
pd.set_option('display.max_colwidth', -1)

file = open('monitoring/constraint_violations.json', 'r')
data = file.read()

violations_df = pd.io.json.json_normalize(json.loads(data)['violations'])
violations_df.head(10)

We can see that the violations identified correspond to the ones that we artificially generated and that there is a feature that is generating some drift from the baseline.

### Advanced Hints

You might be asking yourself what are the type of violations that are monitored and how drift from the baseline is computed.

The types of violations monitored are listed here: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-interpreting-violations.html. Most of them use configurable thresholds, that are specified in the monitoring configuration section of the baseline constraints JSON. Let's take a look at this configuration from the baseline constraints file:

In [None]:
!aws s3 cp {statistics_path} baseline/
!aws s3 cp {constraints_path} baseline/

In [None]:
import json
with open ("baseline/constraints.json", "r") as myfile:
    data=myfile.read()

print(json.dumps(json.loads(data)['monitoring_config'], indent=2))

This configuration is intepreted when the monitoring job is executed and used to compare captured data to the baseline. If you want to customize this section, you will have to update the **constraints.json** file and upload it back to Amazon S3 before launching the monitoring job.

When data distributions are compared to detect potential drift, you can choose to use either a _Simple_ or _Robust_ comparison method, where the latter has to be preferred when dealing with small datasets. Additional info: https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html.

## Delete Endpoint

Finally we can delete the endpoint to free-up resources.

In [None]:
pred.delete_endpoint()
pred.delete_model()

## References

A Realistic Cyber Defense Dataset (CSE-CIC-IDS2018) https://registry.opendata.aws/cse-cic-ids2018/