In [2]:
%%time

from datetime import datetime, timedelta, timezone
import json
import os
import re
import boto3
from time import sleep
from threading import Thread

import pandas as pd

from sagemaker import get_execution_role, session, Session, image_uris
from sagemaker.s3 import S3Downloader, S3Uploader
from sagemaker.processing import ProcessingJob
from sagemaker.serializers import CSVSerializer

from sagemaker.model import Model
from sagemaker.model_monitor import DataCaptureConfig

session = Session()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
CPU times: user 1.61 s, sys: 148 ms, total: 1.75 s
Wall time: 2.76 s


### AWS Region and IAM Role + Setup

In [3]:
role = get_execution_role()
print(f"RoleArn: {role}")

sagemaker_session = Session()
sagemaker_client = sagemaker_session.sagemaker_client
sagemaker_runtime_client = sagemaker_session.sagemaker_runtime_client

region = sagemaker_session.boto_region_name
print(f"AWS region: {region}")

# A different bucket can be used, but make sure the role for this notebook has
# the s3:PutObject permissions. This is the bucket into which the data is captured
bucket = Session().default_bucket()
print(f"Demo Bucket: {bucket}")
prefix = "sagemaker/Final_Project_Model_Monitor"
s3_key = f"s3://{bucket}/{prefix}"
print(f"S3 key: {s3_key}")

s3_capture_upload_path = f"{s3_key}/datacapture/final_project"
ground_truth_upload_path = f"{s3_key}/ground_truth_data/final_project/{datetime.now():%Y-%m-%d-%H-%M-%S}"
s3_report_path = f"{s3_key}/reports"

print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {ground_truth_upload_path}")
print(f"Report path: {s3_report_path}")

baseline_results_uri = f"{s3_key}/baselining/final_project"
print(f"Baseline results uri: {baseline_results_uri}")

endpoint_instance_count = 1
endpoint_instance_type = "ml.m5.large"


RoleArn: arn:aws:iam::004608622582:role/LabRole
AWS region: us-east-1
Demo Bucket: sagemaker-us-east-1-004608622582
S3 key: s3://sagemaker-us-east-1-004608622582/sagemaker/Final_Project_Model_Monitor
Capture path: s3://sagemaker-us-east-1-004608622582/sagemaker/Final_Project_Model_Monitor/datacapture/final_project
Ground truth path: s3://sagemaker-us-east-1-004608622582/sagemaker/Final_Project_Model_Monitor/ground_truth_data/final_project/2024-06-21-20-05-04
Report path: s3://sagemaker-us-east-1-004608622582/sagemaker/Final_Project_Model_Monitor/reports
Baseline results uri: s3://sagemaker-us-east-1-004608622582/sagemaker/Final_Project_Model_Monitor/baselining/final_project


### Model Files and Data Files

In [4]:
base_path = "/root/AAI-540-Final-Project/"
model_file = f"{base_path}/Models/xgb_regressor_model.tar.gz"
test_dataset = f"{base_path}/Data/test_data_no_head.csv"
validation_dataset = f"{base_path}/Data/validation_data_head.csv"
dataset_type = "text/csv"

with open(validation_dataset) as f:
    headers_line = f.readline().rstrip()
all_headers = headers_line.split(",")
label_header = all_headers[0]

### Deploy model to Amazon SageMaker

In [5]:
model_name = f"Final-Project-xgb-regression-model-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("Model name: ", model_name)
endpoint_name = f"Final-Project-xgb-regression-model-monitor-{datetime.utcnow():%Y-%m-%d-%H%M}"
print("Endpoint name: ", endpoint_name)

Model name:  Final-Project-xgb-regression-model-monitor-2024-06-21-2005
Endpoint name:  Final-Project-xgb-regression-model-monitor-2024-06-21-2005


In [6]:
#  Read in the modle URL from setup
%store -r model_url

### Invoke Deployed Model

In [7]:
image_uri = image_uris.retrieve("xgboost", region, "0.90-1")
print(f"XGBoost image uri: {image_uri}")
model = Model(
    role=role,
    name=model_name,
    image_uri=image_uri,
    model_data=model_url,
    sagemaker_session=sagemaker_session,
)

data_capture_config = DataCaptureConfig(
    enable_capture=True,
    sampling_percentage=100,
    destination_s3_uri=s3_capture_upload_path,
)
print(f"Deploying model {model_name} to endpoint {endpoint_name}")
model.deploy(
    initial_instance_count=endpoint_instance_count,
    instance_type=endpoint_instance_type,
    endpoint_name=endpoint_name,
    data_capture_config=data_capture_config,
)

XGBoost image uri: 683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:0.90-1-cpu-py3
Deploying model Final-Project-xgb-regression-model-monitor-2024-06-21-2005 to endpoint Final-Project-xgb-regression-model-monitor-2024-06-21-2005
------!

### Create Predictor

In [8]:
from sagemaker.predictor import Predictor

predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=sagemaker_session, serializer=CSVSerializer()
)

### Generate Baseline

In [9]:
import pandas as pd
df = pd.read_csv("Data/val_with_target.csv")
df.columns

Index(['AdjSquareFeet', 'DistancetoCoast', 'DistancetoSinkhole',
       'DistancetoFireDepartment', 'LocationWindSpeed', 'Terrain',
       'ValueofHome', 'NumberOfBuildings', 'NumberOfUnits', 'Age'],
      dtype='object')

In [10]:
from time import sleep

# 'predictor' is  regression model object
validate_dataset = "validation_with_predictions.csv"

limit = 200  # To control the number of predictions made
i = 0
with open(f"Data/{validate_dataset}", "w") as baseline_file:
    # Adjust the header for regression output
    baseline_file.write("actual_value,predicted_value\n")  # our header
    with open("Data/val_with_target_head.csv", "r") as f:
        header = next(f).strip().split(',')  # Get header to find target column index
        target_index = header.index('ValueofHome')  # Find the index of the target column
        
        for row in f:
            if i >= limit:
                break
            split_row = row.strip().split(',')
            
            # Extract the actual target value using the target_index
            actual_value = split_row[target_index]
            
            # Prepare the input columns by excluding the target column
            input_cols = split_row[:target_index] + split_row[target_index + 1:]
            
            # Convert the list back to a comma-separated string
            input_cols_string = ','.join(input_cols)
            
            # Predict the value
            predicted_value = predictor.predict(input_cols_string)
            
            # Write the actual and predicted values to the file
            baseline_file.write(f"{actual_value},{predicted_value}\n")
            i += 1
            print(".", end="", flush=True)
            sleep(0.5)
print()
print("Done!")


ModelError: An error occurred (ModelError) when calling the InvokeEndpoint operation: Received server error (502) from primary with message "<html>
<head><title>502 Bad Gateway</title></head>
<body bgcolor="white">
<center><h1>502 Bad Gateway</h1></center>
<hr><center>nginx/1.10.3 (Ubuntu)</center>
</body>
</html>
". See https://us-east-1.console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/aws/sagemaker/Endpoints/Final-Project-xgb-regression-model-monitor-2024-06-21-2005 in account 004608622582 for more information.

In [None]:
a

In [None]:
# Examine Prediction From Model
!head Data/validation_with_predictions.csv

In [None]:
# Upload Predictions as a baseline dataset
baseline_prefix = prefix + "/baselining"
baseline_data_prefix = baseline_prefix + "/data"
baseline_results_prefix = baseline_prefix + "/results"

baseline_data_uri = f"s3://{bucket}/{baseline_data_prefix}"
baseline_results_uri = f"s3://{bucket}/{baseline_results_prefix}"
print(f"Baseline data uri: {baseline_data_uri}")
print(f"Baseline results uri: {baseline_results_uri}")

In [None]:
baseline_dataset_uri = S3Uploader.upload(f"test_data/{validate_dataset}", baseline_data_uri)
baseline_dataset_uri

### Create a baselining job with validation dataset predictions

In [None]:
from sagemaker.model_monitor import ModelQualityMonitor
from sagemaker.model_monitor import EndpointInput
from sagemaker.model_monitor.dataset_format import DatasetFormat

In [None]:
# Create the model quality monitoring object
churn_model_quality_monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=session,
)

In [None]:
# Name of the model quality baseline job
baseline_job_name = f"DEMO-xgb-churn-model-baseline-job-{datetime.utcnow():%Y-%m-%d-%H%M}"

In [None]:
# Execute the baseline suggestion job.
# You will specify problem type, in this case Binary Classification, and provide other required attributes.
job = churn_model_quality_monitor.suggest_baseline(
    job_name=baseline_job_name,
    baseline_dataset=baseline_dataset_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_results_uri,
    problem_type="BinaryClassification",
    inference_attribute="prediction",
    probability_attribute="probability",
    ground_truth_attribute="label",
)
job.wait(logs=False)

In [None]:
baseline_job = churn_model_quality_monitor.latest_baselining_job

In [None]:
# Check baseline
binary_metrics = baseline_job.baseline_statistics().body_dict["binary_classification_metrics"]
pd.json_normalize(binary_metrics).T

In [None]:
# View Constraints
pd.DataFrame(baseline_job.suggested_constraints().body_dict["binary_classification_constraints"]).T

### Setup continuous model monitoring to identify model quality drift

In [None]:
def invoke_endpoint(ep_name, file_name):
    with open(file_name, "r") as f:
        i = 0
        for row in f:
            payload = row.rstrip("\n")
            response = session.sagemaker_runtime_client.invoke_endpoint(
                EndpointName=endpoint_name,
                ContentType="text/csv",
                Body=payload,
                InferenceId=str(i),  # unique ID per row
            )["Body"].read()
            i += 1
            sleep(1)


def invoke_endpoint_forever():
    while True:
        try:
            invoke_endpoint(endpoint_name, "test_data/test-dataset-input-cols.csv")
        except session.sagemaker_runtime_client.exceptions.ValidationError:
            pass


thread = Thread(target=invoke_endpoint_forever)
thread.start()

In [None]:
# View Captured Data
print("Waiting for captures to show up", end="")
for _ in range(120):
    capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
    if capture_files:
        capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")
        capture_record = json.loads(capture_file[0])
        if "inferenceId" in capture_record["eventMetadata"]:
            break
    print(".", end="", flush=True)
    sleep(1)
print()
print("Found Capture Files:")
print("\n ".join(capture_files[-3:]))

In [None]:
print("\n".join(capture_file[-3:-1]))

In [None]:
print(json.dumps(capture_record, indent=2))

In [None]:
import random


def ground_truth_with_id(inference_id):
    random.seed(inference_id)  # to get consistent results
    rand = random.random()
    return {
        "groundTruthData": {
            "data": "1" if rand < 0.7 else "0",  # randomly generate positive labels 70% of the time
            "encoding": "CSV",
        },
        "eventMetadata": {
            "eventId": str(inference_id),
        },
        "eventVersion": "0",
    }


def upload_ground_truth(records, upload_time):
    fake_records = [json.dumps(r) for r in records]
    data_to_upload = "\n".join(fake_records)
    target_s3_uri = f"{ground_truth_upload_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
    print(f"Uploading {len(fake_records)} records to", target_s3_uri)
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

In [None]:
NUM_GROUND_TRUTH_RECORDS = 334  # 334 are the number of rows in data we're sending for inference


def generate_fake_ground_truth_forever():
    j = 0
    while True:
        fake_records = [ground_truth_with_id(i) for i in range(NUM_GROUND_TRUTH_RECORDS)]
        upload_ground_truth(fake_records, datetime.utcnow())
        j = (j + 1) % 5
        sleep(60 * 60)  # do this once an hour


gt_thread = Thread(target=generate_fake_ground_truth_forever)
gt_thread.start()

In [None]:
# Monitoring schedule name
churn_monitor_schedule_name = (
    f"DEMO-xgb-churn-monitoring-schedule-{datetime.utcnow():%Y-%m-%d-%H%M}"
)

In [None]:
# Create an enpointInput
endpointInput = EndpointInput(
    endpoint_name=predictor.endpoint_name,
    probability_attribute="0",
    probability_threshold_attribute=0.5,
    destination="/opt/ml/processing/input_data",
)

In [None]:
# Create the monitoring schedule to execute every hour.
from sagemaker.model_monitor import CronExpressionGenerator

response = churn_model_quality_monitor.create_monitoring_schedule(
    monitor_schedule_name=churn_monitor_schedule_name,
    endpoint_input=endpointInput,
    output_s3_uri=baseline_results_uri,
    problem_type="BinaryClassification",
    ground_truth_input=ground_truth_upload_path,
    constraints=baseline_job.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

In [None]:
churn_model_quality_monitor.describe_schedule()

In [None]:
# #xamine Model Schedule execution
executions = churn_model_quality_monitor.list_executions()
executions

In [None]:
# Wait for the first execution of the monitoring_schedule
print("Waiting for first execution", end="")
while True:
    execution = churn_model_quality_monitor.describe_schedule().get(
        "LastMonitoringExecutionSummary"
    )
    if execution:
        break
    print(".", end="", flush=True)
    sleep(10)
print()
print("Execution found!")

In [None]:
while not executions:
    executions = churn_model_quality_monitor.list_executions()
    print(".", end="", flush=True)
    sleep(10)
latest_execution = executions[-1]
latest_execution.describe()

In [None]:
status = execution["MonitoringExecutionStatus"]

while status in ["Pending", "InProgress"]:
    print("Waiting for execution to finish", end="")
    latest_execution.wait(logs=False)
    latest_job = latest_execution.describe()
    print()
    print(f"{latest_job['ProcessingJobName']} job status:", latest_job["ProcessingJobStatus"])
    print(
        f"{latest_job['ProcessingJobName']} job exit message, if any:",
        latest_job.get("ExitMessage"),
    )
    print(
        f"{latest_job['ProcessingJobName']} job failure reason, if any:",
        latest_job.get("FailureReason"),
    )
    sleep(
        30
    )  # model quality executions consist of two Processing jobs, wait for second job to start
    latest_execution = churn_model_quality_monitor.list_executions()[-1]
    execution = churn_model_quality_monitor.describe_schedule()["LastMonitoringExecutionSummary"]
    status = execution["MonitoringExecutionStatus"]

print("Execution status is:", status)

if status != "Completed":
    print(execution)
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )

In [None]:
latest_execution = churn_model_quality_monitor.list_executions()[-1]
report_uri = latest_execution.describe()["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
    "S3Uri"
]
print("Report Uri:", report_uri)

### Analyze Cloudwatch Metics

In [None]:
# Create CloudWatch client
cw_client = boto3.Session().client("cloudwatch")

namespace = "aws/sagemaker/Endpoints/final-project-model-metrics"

cw_dimensions = [
    {"Name": "Endpoint", "Value": endpoint_name},
    {"Name": "MonitoringSchedule", "Value": churn_monitor_schedule_name},
]

In [None]:
# List metrics through the pagination interface
paginator = cw_client.get_paginator("list_metrics")

for response in paginator.paginate(Dimensions=cw_dimensions, Namespace=namespace):
    model_quality_metrics = response["Metrics"]
    for metric in model_quality_metrics:
        print(metric["MetricName"])

In [None]:
alarm_name = "MODEL_QUALITY_F2_SCORE"
alarm_desc = (
    "Trigger an CloudWatch alarm when the f2 score drifts away from the baseline constraints"
)
mdoel_quality_f2_drift_threshold = (
    0.625  ##Setting this threshold purposefully low to see the alarm quickly.
)
metric_name = "f2"
namespace = "aws/sagemaker/Endpoints/model-metrics"

cw_client.put_metric_alarm(
    AlarmName=alarm_name,
    AlarmDescription=alarm_desc,
    ActionsEnabled=True,
    MetricName=metric_name,
    Namespace=namespace,
    Statistic="Average",
    Dimensions=[
        {"Name": "Endpoint", "Value": endpoint_name},
        {"Name": "MonitoringSchedule", "Value": churn_monitor_schedule_name},
    ],
    Period=600,
    EvaluationPeriods=1,
    DatapointsToAlarm=1,
    Threshold=mdoel_quality_f2_drift_threshold,
    ComparisonOperator="LessThanOrEqualToThreshold",
    TreatMissingData="breaching",
)

In [None]:
a

### Clean Up

In [None]:
churn_model_quality_monitor.delete_monitoring_schedule()

In [None]:
invoke_endpoint_thread.terminate()
ground_truth_thread.terminate()

In [None]:
from sagemaker.predictor import Predictor

predictor = Predictor(endpoint_name, sagemaker_session=sagemaker_session)
model_monitors = predictor.list_monitors()
for model_monitor in model_monitors:
    model_monitor.stop_monitoring_schedule()
    wait_for_execution_to_finish(model_monitor)
    model_monitor.delete_monitoring_schedule()

In [None]:
predictor.delete_endpoint()
predictor.delete_model()

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>