# Amazon SageMaker Test


#### 1.1 Import necessary libraries

In [91]:
%%time

from datetime import datetime, timedelta, timezone
import json
import os
import re
import boto3
from time import sleep
from threading import Thread

import pandas as pd

from sagemaker import get_execution_role, session, Session, image_uris
from sagemaker.s3 import S3Downloader, S3Uploader
from sagemaker.processing import ProcessingJob
from sagemaker.serializers import CSVSerializer

from sagemaker.model import Model
from sagemaker.model_monitor import DataCaptureConfig

session = Session()

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
CPU times: user 33.1 ms, sys: 6.49 ms, total: 39.6 ms
Wall time: 66.3 ms


#### 1.2 AWS region and  IAM Role

In [92]:
# Get Execution role
role = get_execution_role()
print("RoleArn:", role)

region = session.boto_region_name
print("Region:", region)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
RoleArn: arn:aws:iam::752847213914:role/service-role/AmazonSageMakerServiceCatalogProductsExecutionRole
Region: us-east-1


#### 1.3 S3 bucket and prefixes

#### 2.3 Deploy the model with data capture enabled.
Next, deploy the SageMaker model on a specific instance with data capture enabled.

In [93]:
endpoint_name = 'Xgboost-Inference-endpoint-2023-11-07-1116' #'DEMO-mlops-xgboost-quality-monitor-2023-11-06-1605' 

#### 2.4 Create the SageMaker Predictor object from the endpoint to be used for invoking the model

In [74]:
from sagemaker.predictor import Predictor

predictor = Predictor(
    endpoint_name=endpoint_name, sagemaker_session=session, serializer=CSVSerializer()
)

In [94]:
churn_cutoff = 0.8
validate_dataset = "validation_with_predictions_test.csv"


In [111]:
limit = 20  # Need at least 200 samples to compute standard deviations
i = 0
with open(f"data/{validate_dataset}", "w") as baseline_file:
    baseline_file.write("probability,prediction,label\n")  # our header
    with open("data/validation.csv", "r") as f:
        for row in f:
            (label, input_cols) = row.split(",", 1)
            result = predictor.predict(input_cols).decode('utf-8')
            print(result)
            probability = float(predictor.predict(input_cols))
            prediction = str(probability)
            baseline_file.write(f"{probability},{prediction},{label}\n")
            i += 1
            if i > limit:
                break
            print(".", end="", flush=True)
            sleep(0.5)
print()
print("Done!")


0.0

.4.0

.4.0

.1.0

.4.0

.1.0

.4.0

.1.0

.3.0

.1.0

.3.0

.5.0

.4.0

.0.0

.0.0

.3.0

.3.0

.1.0

.3.0

.0.0

.2.0


Done!


In [84]:
#test_data = pd.read_csv('data/test_x.csv')


In [87]:
!head data/validation_with_predictions_test.csv


probability,prediction,label
0.0,0.0,0
0.0,0.0,4
0.0,0.0,4
1.0,1.0,2
0.0,0.0,4
1.0,1.0,1
0.0,0.0,4
1.0,1.0,1
0.0,0.0,3


In [85]:
response = pd.read_csv('data/validation_with_predictions_test.csv')

In [86]:
from sklearn.metrics import classification_report
print(classification_report(response['probability'], response['label']))

              precision    recall  f1-score   support

         0.0       0.08      0.02      0.03      1362
         1.0       0.69      0.38      0.49       581
         2.0       0.00      0.00      0.00         0
         3.0       0.00      0.00      0.00         0
         4.0       0.00      0.00      0.00         0
         5.0       0.01      0.03      0.01        58

    accuracy                           0.13      2001
   macro avg       0.13      0.07      0.09      2001
weighted avg       0.26      0.13      0.16      2001



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [57]:
pd.DataFrame(baseline_job.suggested_constraints().body_dict["multiclass_classification_constraints"]).T

Unnamed: 0,threshold,comparison_operator
accuracy,0.134328,LessThanThreshold
weighted_recall,0.134328,LessThanThreshold
weighted_precision,0.018044,LessThanThreshold
weighted_f0_5,0.021822,LessThanThreshold
weighted_f1,0.031815,LessThanThreshold
weighted_f2,0.058687,LessThanThreshold


In the above example you can see that model quality monitor suggested a constraint that will ensure that the model F2 score should note drop below 0.625. Few generated constraints _may_ be a tad aggressive like precision, where it will alert on any drops below 1.0. It is recommended to modify this file as necessary prior to using for monitoring.

#### 4.1 Generate prediction data for Model Quality  Monitoring

Start generating some artificial traffic.  The cell below starts a thread to send some traffic to the endpoint. Note that you need to stop the kernel to terminate this thread. If there is no traffic, the monitoring jobs are marked as `Failed` since there is no data to process.

In [58]:
endpoint_name


'Xgboost-Inference-endpoint-2023-11-07-1116'

In [59]:
def invoke_endpoint(ep_name, file_name):
    with open(file_name, "r") as f:
        i = 0
        for row in f:
            payload = row.rstrip("\n")
            response = session.sagemaker_runtime_client.invoke_endpoint(
                EndpointName=ep_name,
                ContentType="text/csv",
                Body=payload,
                InferenceId=str(i),  # unique ID per row
            )["Body"].read()
            i += 1
            sleep(1)


def invoke_endpoint_forever():
    while True:
        try:
            invoke_endpoint(endpoint_name, "data/test_x.csv")
        except session.sagemaker_runtime_client.exceptions.ValidationError:
            pass


thread = Thread(target=invoke_endpoint_forever)
thread.start()


Notice the new attribute `inferenceId`, which we're setting when invoking the endpoint. This is used to join the prediction data with the ground truth data.

#### 4.2 View captured data

Now list the data capture files stored in Amazon S3. You should expect to see different files from different time periods organized based on the hour in which the invocation occurred. The format of the Amazon S3 path is:

`s3://{destination-bucket-prefix}/{endpoint-name}/{variant-name}/yyyy/mm/dd/hh/filename.jsonl`

In [63]:
print("Waiting for captures to show up", end="")
for _ in range(120):
    capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
    if capture_files:
        capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")
        capture_record = json.loads(capture_file[0])
        if "inferenceId" in capture_record["eventMetadata"]:
            break
    print(".", end="", flush=True)
    sleep(1)
print()
print("Found Capture Files:")
print("\n ".join(capture_files[-3:]))


Waiting for captures to show upsagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
.sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
.sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
.sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
.sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sa

Next, view the contents of a single capture file. Here you should see all the data captured in an Amazon SageMaker specific JSON-line formatted file. Take a quick peek at the first few lines in the captured file.

In [64]:
print("\n".join(capture_file[-3:-1]))


NameError: name 'capture_file' is not defined

Finally, the contents of a single line is present below in a formatted JSON file so that you can observe a little better.

Again, notice the `inferenceId` attribute that is set as part of the invoke_endpoint call.  If this is present, it will be used to join with ground truth data (otherwise `eventId` will be used):

In [143]:
print(json.dumps(capture_record, indent=2))

{
  "captureData": {
    "endpointInput": {
      "observedContentType": "text/csv",
      "mode": "INPUT",
      "data": "0.2775108399892457,0.4213949623176243,0.5776854083906318,0.7453982733885129,-0.6419521205443532,-0.18389844555064858,-0.0887819772092396,-0.06686049299075673,-0.09097838412590584,0.0939703335680906,-0.0021291967892783903,0.008632794314209756",
      "encoding": "CSV"
    },
    "endpointOutput": {
      "observedContentType": "text/csv; charset=utf-8",
      "mode": "OUTPUT",
      "data": "0.0\n",
      "encoding": "CSV"
    }
  },
  "eventMetadata": {
    "eventId": "63e722b3-ed3f-4450-86e9-d1d308bda1e3",
    "inferenceId": "5998",
    "inferenceTime": "2023-11-06T18:01:40Z"
  },
  "eventVersion": "0"
}


In [144]:
import random


def ground_truth_with_id(inference_id):
    random.seed(inference_id)  # to get consistent results
    rand = random.random()
    return {
        "groundTruthData": {
            "data": "1" if rand < 0.7 else "0",  # randomly generate positive labels 70% of the time
            "encoding": "CSV",
        },
        "eventMetadata": {
            "eventId": str(inference_id),
        },
        "eventVersion": "0",
    }


def upload_ground_truth(records, upload_time):
    fake_records = [json.dumps(r) for r in records]
    data_to_upload = "\n".join(fake_records)
    target_s3_uri = f"{ground_truth_upload_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
    print(f"Uploading {len(fake_records)} records to", target_s3_uri)
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

In [145]:
NUM_GROUND_TRUTH_RECORDS = 334  # 334 are the number of rows in data we're sending for inference


def generate_fake_ground_truth_forever():
    j = 0
    while True:
        fake_records = [ground_truth_with_id(i) for i in range(NUM_GROUND_TRUTH_RECORDS)]
        upload_ground_truth(fake_records, datetime.utcnow())
        j = (j + 1) % 5
        sleep(60 * 60)  # do this once an hour


gt_thread = Thread(target=generate_fake_ground_truth_forever)
gt_thread.start()

Uploading 334 records to s3://sagemaker-us-east-1-752847213914/mlops-data/ground_truth_data/2023-11-06-14-16-55/2023/11/06/18/0256.jsonl
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml


In [146]:
##Monitoring schedule name
mlops_monitor_schedule_name = (
    f"DEMO-mlops-Xgboost-monitoring-schedule-{datetime.utcnow():%Y-%m-%d-%H%M}"
)

For the monitoring schedule you need to specify how to interpret an endpoint's output. Given that the endpoint in this notebook outputs CSV data, the below code specifies that the first column of the output, `0`, contains a probability (of churn in this example). You will further specify `0.5` as the cutoff  used to determine a positive label (that is, predict that a customer will churn).

In [152]:
# Create an enpointInput
endpointInput = EndpointInput(
    endpoint_name=predictor.endpoint_name,
    probability_attribute="0",
    probability_threshold_attribute=0.5,
    inference_attribute="0",
    destination="/opt/ml/processing/input_data",
)

In [153]:
# Create the monitoring schedule to execute every hour.
from sagemaker.model_monitor import CronExpressionGenerator

response = mlops_model_quality_monitor.create_monitoring_schedule(
    monitor_schedule_name=mlops_monitor_schedule_name,
    endpoint_input=endpointInput,
    output_s3_uri=baseline_results_uri,
    problem_type="MulticlassClassification",
    ground_truth_input=ground_truth_upload_path,
    constraints=baseline_job.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True,
)

INFO:sagemaker.model_monitor.model_monitoring:Creating Monitoring Schedule with name: DEMO-mlops-Xgboost-monitoring-schedule-2023-11-06-1802


In [98]:
# Create the monitoring schedule
# You will see the monitoring schedule in the 'Scheduled' status
mlops_model_quality_monitor.describe_schedule()

ParamValidationError: Parameter validation failed:
Invalid type for parameter MonitoringScheduleName, value: None, type: <class 'NoneType'>, valid types: <class 'str'>

In [99]:
# Initially there will be no executions since the first execution happens at the top of the hour
# Note that it is common for the execution to luanch upto 20 min after the hour.
executions = mlops_model_quality_monitor.list_executions()
executions

ParamValidationError: Parameter validation failed:
Invalid type for parameter MonitoringScheduleName, value: None, type: <class 'NoneType'>, valid types: <class 'str'>

In [None]:
# Wait for the first execution of the monitoring_schedule
print("Waiting for first execution", end="")
while True:
    execution = mlops_model_quality_monitor.describe_schedule().get(
        "LastMonitoringExecutionSummary"
    )
    if execution:
        break
    print(".", end="", flush=True)
    sleep(10)
print()
print("Execution found!")

Waiting for first execution........................Uploading 334 records to s3://sagemaker-us-east-1-752847213914/mlops-data/ground_truth_data/2023-11-06-14-16-55/2023/11/06/18/2204.jsonl
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
....................................................Uploading 334 records to s3://sagemaker-us-east-1-752847213914/mlops-data/ground_truth_data/2023-11-06-14-16-55/2023/11/06/18/3055.jsonl
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /root/.config/sagemaker/config.yaml
..............................................................

In [None]:
while not executions:
    executions = churn_model_quality_monitor.list_executions()
    sleep(10)
latest_execution = executions[-1]
latest_execution.describe()

##### Inspect a specific execution (latest execution)
In the previous cell, you picked up the latest completed or failed scheduled execution. Here are the possible terminal states and what each of them mean: 
* Completed - This means the monitoring execution completed and no issues were found in the violations report.
* CompletedWithViolations - This means the execution completed, but constraint violations were detected.
* Failed - The monitoring execution failed, maybe due to client error (perhaps incorrect role permissions) or infrastructure issues. Further examination of FailureReason and ExitMessage is necessary to identify what exactly happened.
* Stopped - job exceeded max runtime or was manually stopped.

In [None]:
status = execution["MonitoringExecutionStatus"]

while status in ["Pending", "InProgress"]:
    print("Waiting for execution to finish", end="")
    latest_execution.wait(logs=False)
    latest_job = latest_execution.describe()
    print()
    print(f"{latest_job['ProcessingJobName']} job status:", latest_job["ProcessingJobStatus"])
    print(
        f"{latest_job['ProcessingJobName']} job exit message, if any:",
        latest_job.get("ExitMessage"),
    )
    print(
        f"{latest_job['ProcessingJobName']} job failure reason, if any:",
        latest_job.get("FailureReason"),
    )
    sleep(
        30
    )  # model quality executions consist of two Processing jobs, wait for second job to start
    latest_execution = churn_model_quality_monitor.list_executions()[-1]
    execution = churn_model_quality_monitor.describe_schedule()["LastMonitoringExecutionSummary"]
    status = execution["MonitoringExecutionStatus"]

print("Execution status is:", status)

if status != "Completed":
    print(execution)
    print(
        "====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures."
    )

In [None]:
latest_execution = churn_model_quality_monitor.list_executions()[-1]
report_uri = latest_execution.describe()["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
    "S3Uri"
]
print("Report Uri:", report_uri)

#### 4.5 View violations generated by monitoring schedule

If there are any violations compared to the baseline, they will be listed in the reports uploaded to S3.

In [None]:
pd.options.display.max_colwidth = None
violations = latest_execution.constraint_violations().body_dict["violations"]
violations_df = pd.json_normalize(violations)
violations_df.head(10)

Here you can see that one of the violations generated is that the f2 score is less than the threshold value set as part of baselining.

You can keep your endpoint running to continue capturing data. If you do not plan to collect more data or use this endpoint further, you should delete the endpoint to avoid incurring additional charges. Note that deleting your endpoint does not delete the data that was captured during the model invocations. That data persists in Amazon S3 until you delete it yourself.

But before that, you need to delete the schedule first.

In [None]:
churn_model_quality_monitor.delete_monitoring_schedule()
sleep(60)  # actually wait for the deletion

In [None]:
predictor.delete_model()
predictor.delete_endpoint()