In [None]:
s3_bucket = "<INSERT S3 BUCKET HERE>"
prefix = "chapter08"

In [None]:
%store -r ll_package_arn

In [None]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker import ModelPackage
from sagemaker.predictor import Predictor


session = sagemaker.Session()
role = get_execution_role()

model = ModelPackage(
    role=role,
    model_package_arn=ll_package_arn,
    sagemaker_session=session
)

model.predictor_cls = Predictor

In [None]:
from sagemaker.model_monitor import DataCaptureConfig

base = f"s3://{s3_bucket}/{prefix}"
capture_upload_path = f"{base}/data-capture"

data_capture_config = DataCaptureConfig(
    enable_capture = True,
    sampling_percentage=100,
    destination_s3_uri=capture_upload_path,
    kms_key_id=None,
    capture_options=["REQUEST", "RESPONSE"],
    csv_content_types=["text/csv"],
    json_content_types=["application/json"]
)

In [None]:
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import CSVDeserializer

predictor = model.deploy(
    instance_type='ml.m5.xlarge', 
    initial_instance_count=1,
    serializer=CSVSerializer(),
    deserializer=CSVDeserializer(),
    data_capture_config=data_capture_config
)

In [None]:
payload = "1.5,2"

predictor.predict(data=payload)

In [None]:
%%bash

mkdir -p tmp
wget -O tmp/baseline.csv https://bit.ly/3td5vjx

In [None]:
base = f's3://{s3_bucket}/{prefix}'
baseline_source_uri = f'{base}/baseline.csv'
baseline_output_uri = f"{base}/baseline-output"

In [None]:
!aws s3 cp tmp/baseline.csv {baseline_source_uri}

In [None]:
from sagemaker.model_monitor import DefaultModelMonitor

default_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.large',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)

In [None]:
%%time

from sagemaker.model_monitor import dataset_format

dataset_format = dataset_format.DatasetFormat.csv(header=True)

baseline_dict = {
    'baseline_dataset': baseline_source_uri,
    'dataset_format': dataset_format,
    'output_s3_uri': baseline_output_uri,
    'wait': True
}

default_monitor.suggest_baseline(
    **baseline_dict
)

In [None]:
baseline_job = default_monitor.latest_baselining_job
stats = baseline_job.baseline_statistics()
schema_dict = stats.body_dict["features"]

import pandas as pd
schema_df = pd.json_normalize(schema_dict)
schema_df.head()

In [None]:
constraints = baseline_job.suggested_constraints()
constraints_dict = constraints.body_dict["features"]
constraints_df = pd.json_normalize(constraints_dict)
constraints_df.head()

In [None]:
constraints.body_dict['features'][1]['inferred_type'] = 'Integral'
constraints.body_dict['features'][2]['inferred_type'] = 'Integral'
constraints.save()

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

import random
from string import ascii_uppercase

def generate_schedule_name():
    chars = random.choices(ascii_uppercase, k=5)
    output = 'schedule-' + ''.join(chars)
    return output

In [None]:
schedule_name = generate_schedule_name()
schedule_name

In [None]:
s3_report_path = f'{base}/report'
baseline_statistics = default_monitor.baseline_statistics()
constraints = default_monitor.suggested_constraints()

In [None]:
cron_expression = CronExpressionGenerator.hourly()
cron_expression

In [None]:
try:
    default_monitor.delete_monitoring_schedule()
except:
    pass

In [None]:
default_monitor.create_monitoring_schedule(
    monitor_schedule_name=schedule_name,
    endpoint_input=predictor.endpoint,
    output_s3_uri=s3_report_path,
    statistics=baseline_statistics,
    constraints=constraints,
    schedule_cron_expression=cron_expression,
    enable_cloudwatch_metrics=True,
)

In [None]:
default_monitor.describe_schedule()

In [None]:
from time import sleep
sleep(300)

In [None]:
def generate_payload(x,y):
    return f"{x},{y}"

In [None]:
def perform_good_input():
    print("> PERFORM REQUEST WITH GOOD INPUT")
    payload = generate_payload(1, 2)
    predictor.predict(data=payload)

In [None]:
def perform_bad_input():
    print("> PERFORM REQUEST WITH BAD INPUT")
    payload = generate_payload(1, 2.5)
    predictor.predict(data=payload)

In [None]:
perform_good_input()
perform_bad_input()

In [None]:
dm = default_monitor
monitoring_violations = dm.latest_monitoring_constraint_violations()
monitoring_statistics = dm.latest_monitoring_statistics()

In [None]:
%%time

from time import sleep

violations = monitoring_violations

while not violations:
    print("No executions yet. Sleeping for 5 minutes...")
    sleep(60 * 5)
    
    perform_good_input()
    perform_bad_input()
    
    try:
        v = dm.latest_monitoring_constraint_violations()
        violations = v
    except:
        pass
    
print("Executions found!")

In [None]:
violations = dm.latest_monitoring_constraint_violations()
violations.__dict__

In [None]:
monitoring_statistics = dm.latest_monitoring_statistics()
monitoring_statistics.__dict__

In [None]:
default_monitor.delete_monitoring_schedule()

In [None]:
predictor.delete_endpoint()