In [1]:
import sagemaker
import numpy as np
import boto3
import os
import pandas as pd
from sklearn import datasets
from sagemaker import get_execution_role
from sagemaker.serializers import CSVSerializer

role = get_execution_role()
iris = datasets.load_iris()
X = iris.data
y = iris.target

dataset = np.insert(X, 0, y,axis=1)
pd.DataFrame(data=dataset, columns=['iris_id'] + iris.feature_names).to_csv('full_dataset.csv', index=None)

sagemaker_session = sagemaker.Session()
bucket = sagemaker_session.default_bucket()

prefix='mlops/iris'
endpoint_name = open('endpoint_name.txt', 'r').read().strip() if os.path.isfile('endpoint_name.txt') else None
endpoint_name2 = open('endpoint_name2.txt', 'r').read().strip() if os.path.isfile('endpoint_name2.txt') else None

try:
    xgb_predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
    xgb_predictor.serializer = CSVSerializer()
except Exception as e:
    raise Exception("You must run Part 1 before this. There, you will train/deploy a Model and use it here")

In [2]:
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

endpoint_monitor = DefaultModelMonitor(
    role=role,
    instance_count=1,
    instance_type='ml.m5.xlarge',
    volume_size_in_gb=20,
    max_runtime_in_seconds=3600,
)
endpoint_monitor.suggest_baseline(
    baseline_dataset='full_dataset.csv',
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri='s3://{}/{}/monitoring/baseline'.format(bucket, prefix),
    wait=True,
    logs=False
)


Job Name:  baseline-suggestion-job-2022-09-01-13-01-59-127
Inputs:  [{'InputName': 'baseline_dataset_input', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-ap-south-1-523666378432/model-monitor/baselining/baseline-suggestion-job-2022-09-01-13-01-59-127/input/baseline_dataset_input', 'LocalPath': '/opt/ml/processing/input/baseline_dataset_input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'monitoring_output', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-ap-south-1-523666378432/mlops/iris/monitoring/baseline', 'LocalPath': '/opt/ml/processing/output', 'S3UploadMode': 'EndOfJob'}}]
.................................................................!

<sagemaker.processing.ProcessingJob at 0x7fcc72601700>

In [3]:
baseline_job = endpoint_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
constraints_df = pd.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
report_df = schema_df.merge(constraints_df)
report_df.drop([
    'numerical_statistics.distribution.kll.buckets',
    'numerical_statistics.distribution.kll.sketch.data',
    'numerical_statistics.distribution.kll.sketch.parameters.c'
], axis=1).head(10)

Unnamed: 0,name,inferred_type,numerical_statistics.common.num_present,numerical_statistics.common.num_missing,numerical_statistics.mean,numerical_statistics.sum,numerical_statistics.std_dev,numerical_statistics.min,numerical_statistics.max,numerical_statistics.distribution.kll.sketch.parameters.k,completeness,num_constraints.is_non_negative
0,iris_id,Fractional,150,0,1.0,150.0,0.816497,0.0,2.0,2048.0,1.0,True
1,sepal length (cm),Fractional,150,0,5.843333,876.5,0.825301,4.3,7.9,2048.0,1.0,True
2,sepal width (cm),Fractional,150,0,3.057333,458.6,0.434411,2.0,4.4,2048.0,1.0,True
3,petal length (cm),Fractional,150,0,3.758,563.7,1.759404,1.0,6.9,2048.0,1.0,True
4,petal width (cm),Fractional,150,0,1.199333,179.9,0.759693,0.1,2.5,2048.0,1.0,True


In [4]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

endpoint_monitor.create_monitoring_schedule(
    endpoint_input=endpoint_name,
    output_s3_uri='s3://{}/{}/monitoring/reports'.format(bucket, prefix),
    statistics=endpoint_monitor.baseline_statistics(),
    constraints=endpoint_monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

In [5]:
# This is how you can list all the monitoring schedules you created in your account
!aws sagemaker list-monitoring-schedules

{
    "MonitoringScheduleSummaries": [
        {
            "MonitoringScheduleName": "monitoring-schedule-2022-09-01-13-07-26-337",
            "MonitoringScheduleArn": "arn:aws:sagemaker:ap-south-1:523666378432:monitoring-schedule/monitoring-schedule-2022-09-01-13-07-26-337",
            "CreationTime": 1662037646.523,
            "LastModifiedTime": 1662037646.543,
            "MonitoringScheduleStatus": "Pending",
            "EndpointName": "sagemaker-xgboost-2022-08-29-18-26-49-482",
            "MonitoringJobDefinitionName": "data-quality-job-definition-2022-09-01-13-07-26-337",
            "MonitoringType": "DataQuality"
        }
    ]
}


In [6]:
import random
import time 
from threading import Thread

traffic_generator_running=True
def invoke_endpoint_forever():
    print('Invoking endpoint forever!')
    while traffic_generator_running:
        ## This will create an invalid set of features
        ## The idea is to violate two monitoring constraings: not_null and data_drift
        null_idx = random.randint(0,3)
        sample = [random.randint(500,2000) / 100.0 for i in range(4)]
        sample[null_idx] = None
        xgb_predictor.predict(sample)
        time.sleep(0.5)
    print('Endpoint invoker has stopped')
Thread(target = invoke_endpoint_forever).start()

Invoking endpoint forever!


In [11]:
import time
import datetime
import boto3

def process_monitoring_logs(endpoint_monitor):
    sm = boto3.client('sagemaker')
    now = datetime.datetime.today()
    suffix = now.strftime("%Y/%m/%d/%H")
    start_time = datetime.datetime(now.year, now.month, now.day, now.hour)
    end_time = start_time + datetime.timedelta(hours=1)

    # get the monitoring metadata
    base_desc = endpoint_monitor.describe_latest_baselining_job()
    sche_desc = endpoint_monitor.describe_schedule()
    baseline_path = base_desc['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
    endpoint_name = sche_desc['EndpointName']

    variant_name = sm.describe_endpoint(EndpointName=endpoint_name)['ProductionVariants'][0]['VariantName']
    logs_path = "%s/%s/%s" % (endpoint_name,variant_name,suffix)
    
    s3_output = {
        "S3Uri": 's3://{}/{}/monitoring/{}'.format(bucket, prefix, logs_path),
        "LocalPath": "/opt/ml/processing/output",
        "S3UploadMode": "Continuous"
    }
    # values for the processing job input
    values = [
        [ 'input_1', 's3://{}/{}/monitoring/{}'.format(bucket, prefix, logs_path),
            '/opt/ml/processing/input/endpoint/{}'.format(logs_path) ], 
        [ 'baseline', '%s/statistics.json' % baseline_path,
            '/opt/ml/processing/baseline/stats'],
        [ 'constraints', '%s/constraints.json' % baseline_path,
            '/opt/ml/processing/baseline/constraints']
    ]
    job_params = {
        'ProcessingJobName': 'model-monitoring-%s' % time.strftime("%Y%m%d%H%M%S"),
        'ProcessingInputs': [{
            'InputName': o[0],
            'S3Input': { 
                'S3Uri': o[1], 'LocalPath': o[2], 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 
                'S3CompressionType': 'None', 'S3DataDistributionType': 'FullyReplicated'
            }} for o in values],
        'ProcessingOutputConfig': { 'Outputs': [ {'OutputName': 'result','S3Output': s3_output } ] },
        'ProcessingResources': base_desc['ProcessingResources'],
        'AppSpecification': base_desc['AppSpecification'],
        'RoleArn': base_desc['RoleArn'],
        'Environment': {
            'baseline_constraints': '/opt/ml/processing/baseline/constraints/constraints.json',
            'baseline_statistics': '/opt/ml/processing/baseline/stats/statistics.json',
            'dataset_format': '{"sagemakerCaptureJson":{"captureIndexNames":["endpointInput","endpointOutput"]}}',
            'dataset_source': '/opt/ml/processing/input/endpoint',      
            'output_path': '/opt/ml/processing/output',
            'publish_cloudwatch_metrics': 'Enabled',
            'sagemaker_monitoring_schedule_name': sche_desc['MonitoringScheduleName'],
            'sagemaker_endpoint_name': endpoint_name,
            'start_time': start_time.strftime("%Y-%m-%dT%H:%M:%SZ"),
            'end_time': end_time.strftime("%Y-%m-%dT%H:%M:%SZ")
        }
    }
    sm.create_processing_job(**job_params)
    waiter = sm.get_waiter('processing_job_completed_or_stopped')
    waiter.wait( ProcessingJobName=job_params['ProcessingJobName'], WaiterConfig={'Delay': 30,'MaxAttempts': 20} )
    return job_params['ProcessingJobName'], s3_output['S3Uri']

In [12]:
import pandas as pd
## The processing job takes something like 5mins to run
job_name, s3_output = process_monitoring_logs(endpoint_monitor)
tokens = s3_output.split('/', 3)
df = pd.read_json(sagemaker_session.read_s3_file(tokens[2], '%s/constraint_violations.json' % tokens[3]))
df = pd.json_normalize(df.violations)
df.head()

Unnamed: 0,feature_name,constraint_check_type,description
0,sepal width (cm),completeness_check,Data completeness requirement is not met. Expe...
1,sepal length (cm),completeness_check,Data completeness requirement is not met. Expe...
2,petal length (cm),completeness_check,Data completeness requirement is not met. Expe...
3,petal width (cm),completeness_check,Data completeness requirement is not met. Expe...
4,sepal length (cm),baseline_drift_check,Baseline drift distance: 0.4724000476322821 ex...


In [13]:
traffic_generator_running=False
time.sleep(3)
endpoint_monitor.delete_monitoring_schedule()
time.sleep(10) # wait for 10 seconds before trying to delete the endpoint

Endpoint invoker has stopped

Deleting Monitoring Schedule with name: monitoring-schedule-2022-09-01-13-07-26-337


In [14]:
try:
    xgb_predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name, sagemaker_session=sagemaker_session)
    xgb_predictor.delete_endpoint()
except Exception as e:
    print(e)
try:
    xgb_predictor2 = sagemaker.predictor.Predictor(endpoint_name=endpoint_name2, sagemaker_session=sagemaker_session)
    xgb_predictor2.delete_endpoint()
except Exception as e:
    print(e)

Parameter validation failed:
Invalid type for parameter EndpointName, value: None, type: <class 'NoneType'>, valid types: <class 'str'>
