# Drift Detection  Notebook

This notebook will exercise the drift detection MLOps pipeline

## Setup

👇 Set the project name for your drift pipeline

In [None]:
project_name = "<<project_name>>"  # << Update this drift detection project

Get back the project id and region

In [None]:
import sagemaker
import json

sess = sagemaker.session.Session()
region_name = sess._region_name
sm_client = sess.sagemaker_client
project_id = sm_client.describe_project(ProjectName=project_name)["ProjectId"]

print(f"Project: {project_name} ({project_id})")

## Data Prep

Let's copy some trip data and taxi zone files to the input location

In [None]:
from sagemaker.s3 import S3Downloader, S3Uploader

# Download trip data and taxi zones to input folder
download_uri = "s3://nyc-tlc/trip data/green_tripdata_2018-02.csv"
S3Downloader().download(download_uri, "input/data")
download_uri = "s3://nyc-tlc/misc/taxi_zones.zip"
S3Downloader().download(download_uri, "input/zones")

# Upload input to the target location
artifact_bucket = f"sagemaker-project-{project_id}-{region_name}"
input_data_uri = f"s3://{artifact_bucket}/{project_id}/input"
S3Uploader().upload("input", input_data_uri)

print("Listing input files:")
for s3_uri in S3Downloader.list(input_data_uri):
    print(s3_uri)

## Train

Start the pipeline now that we have uploaded some data

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"{project_name}-pipeline"
pipeline = Pipeline(pipeline_name)

# Start pipeline
execution = pipeline.start()
execution_name = execution.arn.split("/")[-1]

print(f"Waiting for execution: {execution_name} for pipeline {pipeline_name}...")
execution.wait()
execution_status = execution.describe()["PipelineExecutionStatus"]
print(f"Status: {execution_status}")

List the execution steps.  Note that we have baseline and training jobs.

In [None]:
for step in execution.list_steps():
    print("Step: {}, Status: {}".format(step["StepName"], step["StepStatus"]))

### Evaluate

Get the estimator for the training job in the pipeline.

In [None]:
from sagemaker.estimator import Estimator


def get_execution_step(step_name):
    return [
        step["Metadata"]
        for step in execution.list_steps()
        if step["StepName"] == step_name
    ]


training_job_arn = get_execution_step("TrainModel")[0]["TrainingJob"]["Arn"]
training_job_name = training_job_arn.split("/")[-1]
estimator = Estimator.attach(training_job_name)

### Download the Debugger XGBoost training report

SageMaker Debugger generates a [XGBoost Training Report](https://docs.aws.amazon.com/sagemaker/latest/dg/debugger-training-xgboost-report.html) by a processing jobs that run concurrent to the training job. Let's wait for it to complete.

In [None]:
# get name of the xgboost training report
xgb_report_job_name = [
    rule["RuleEvaluationJobArn"].split("/")[-1]
    for rule in estimator.latest_training_job.rule_job_summary()
    if "CreateXgboostReport" in rule["RuleConfigurationName"]
][0]

print("Waiting for XGBoost training report to complete...")
sm_client.get_waiter("processing_job_completed_or_stopped").wait(
    ProcessingJobName=xgb_report_job_name
)
print("Done")

ℹ️ The code below will download the output from the Debugger report in the `report` folder.  Click the link to open the report.

In [None]:
from IPython.display import FileLink
from sagemaker.s3 import S3Downloader, S3Uploader

# Get the s3 output
report_uri = sm_client.describe_processing_job(ProcessingJobName=xgb_report_job_name)[
    "ProcessingOutputConfig"
]["Outputs"][0]["S3Output"]["S3Uri"]

# Download the notebook from the report
S3Downloader().download(f"{report_uri}/xgboost_report.html", "report")
FileLink("report/xgboost_report.html", result_html_prefix="Open Report: ")

### Approve Model

🛑 Once we are happy with this training job, we can [Update the Approval Status](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry-approve.html) of a model.

In [None]:
model_package_arn = get_execution_step("RegisterModel")[0]["RegisterModel"]["Arn"]
model_package_version = model_package_arn.split("/")[-1]
print(f"Model version: {model_package_version}")

Let's update the status to approved

In [None]:
model_package_update_input_dict = {
    "ModelPackageArn": model_package_arn,
    "ModelApprovalStatus": "Approved",
}
model_package_update_response = sm_client.update_model_package(
    **model_package_update_input_dict
)

## Deploy

Now that our model is approve, the deployment pipeline will kick off.

In [None]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer

# Define the predictor for staging
def get_predictor(stage_name):
    endpoint_name = f"sagemaker-{project_name}-{stage_name}"
    return Predictor(
        endpoint_name, serializer=CSVSerializer(), deserializer=JSONDeserializer()
    )


predictor = get_predictor("staging")

print(f"Waiting for staging endpoint: {predictor.endpoint_name} to be deployed...")
sm_client.get_waiter("endpoint_in_service").wait(EndpointName=predictor.endpoint_name)
print("Ready")

### Test Staging

Let's send some traffic to the staging endpoint with the following payload:

| passenger_count	| pickup_latitude	| pickup_longitude	| dropoff_latitude	| dropoff_longitude	| geo_distance	| hour	| weekday	| month |
| -| - | - | - | - | - | - | - | - |
| 1	| -73.986114	| 40.685634	| -73.936794	| 40.715370	| 5.318025	| 7	| 0	| 2 |

We expect approximately a $20 fare:

In [None]:
payload = "1,-73.986114,40.685634,-73.936794,40.715370,5.318025,7,0,2"
predictor.predict(data=payload)

### Approve Staging

🛑 Head over to the AWS Code Pipeline and approve the staging deployment to kick off the production deployment

### Test Production

Let's load our production endpoint

In [None]:
from botocore.exceptions import WaiterError

try:
    predictor = get_predictor("prod")
    print(f"Waiting for prod endpoint: {predictor.endpoint_name} to be deployed...")
    sm_client.get_waiter("endpoint_in_service").wait(
        EndpointName=predictor.endpoint_name
    )
    print("Ready")
except WaiterError as err:
    error_message = err.last_response["Error"]["Message"]
    if error_message.startswith("Could not find endpoint"):
        err = Exception("Approve Staging deployment to start Production deployment")
    raise err

And confirm that data capture is enabled.

In [None]:
data_capture = sm_client.describe_endpoint(EndpointName=predictor.endpoint_name)[
    "DataCaptureConfig"
]
print(f"Data capture is: {data_capture['CaptureStatus']}")

### Inspect Data Capture

Let's send some traffic to the producition endpoint, which our [Data Quality Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-data-quality.html) should detect as drifting from the baseline.

In [None]:
%%time
for n in range(100):
    predictor.predict(data=payload)

Let's see if we have received some outputs to our data capture

In [None]:
data_capture_uri = data_capture["DestinationS3Uri"]
data_capture_files = S3Downloader.list(data_capture_uri)

print("Found {} files".format(len(data_capture_files)))

if data_capture["EnableCapture"] and len(data_capture_files) > 0:
    # Get the first line of the most recent file
    event = json.loads(S3Downloader.read_file(data_capture_files[-1]).split("\n")[0])
    print("\nLast file:\n{}".format(json.dumps(event, indent=2)))
elif len(data_capture_files) == 0:
    print("No files yet, please rerun this cell in a few seconds")

Now let's download the validation dataset from the latest processing job, and tweak some of the columns to change the distribution of the data.

In [None]:
import boto3
import pandas as pd
import random
from sagemaker.s3 import S3Downloader


def get_latest_processed_data():
    execution_arn = sm_client.list_pipeline_executions(
        PipelineName=pipeline_name, SortBy="CreationTime"
    )["PipelineExecutionSummaries"][0]["PipelineExecutionArn"]
    steps = sm_client.list_pipeline_execution_steps(
        PipelineExecutionArn=execution_arn, SortOrder="Ascending"
    )["PipelineExecutionSteps"]
    preprocess_arn = next(
        item["Metadata"]["ProcessingJob"]["Arn"]
        for item in steps
        if item["StepName"] == "PreprocessData"
    )
    job_outputs = sm_client.describe_processing_job(
        ProcessingJobName=preprocess_arn.split("/")[1]
    )["ProcessingOutputConfig"]["Outputs"]
    validation_uri = next(
        item["S3Output"]["S3Uri"]
        for item in job_outputs
        if item["OutputName"] == "validation"
    )
    return validation_uri


dataset_location = get_latest_processed_data()
S3Downloader().download(dataset_location, "prerocessed")
df = pd.read_csv("prerocessed/validation.csv", header=None)

# Changing the distribution of data to artificially cause an alarm
df[1] = random.choices([1, 2, 3, 4, 5, 6], weights=[2, 1, 2, 5, 2, 1], k=df.shape[0])
df[6] = df[1].apply(lambda x: 70 * random.betavariate(2.5, 2))
tweaked_rows = df.drop(0, axis=1).to_csv(header=False, index=False).split("\n")

Then make a series of prediction requests with this data to cause an artificial model monitoring alarm to be triggered.

In [None]:
%%time
endpoint_requests = 10000
for i in range(endpoint_requests):
    predictor.predict(data=tweaked_rows[i % len(tweaked_rows)])

## Monitor

Let's check that we have a monitor configured and that its schedule.

In [None]:
from datetime import datetime, timedelta
from dateutil.tz import tzlocal

model_monitor = predictor.list_monitors()[0]
model_monitor_status = model_monitor.describe_schedule()["MonitoringScheduleStatus"]
print(f"Model Monitoring: {model_monitor_status}")

now = datetime.now(tzlocal())
next_hour = (now + timedelta(hours=1)).replace(minute=0)
scheduled_diff = (next_hour - now).seconds // 60
print("Next schedule in {} minutes".format(scheduled_diff))

List the latest execution and output the status

In [None]:
monitor_executions = model_monitor.list_executions()
if len(monitor_executions) == 0:
    raise (Exception("Please wait, no monitor executions available yet"))

# Get the latest monitor status
monitor_status = monitor_executions[0].describe()["ProcessingJobStatus"]
if monitor_status == "Completed":
    monitor_message = monitor_executions[0].describe()["ExitMessage"]
    print(f"Latest execution: {monitor_message}")
else:
    print(f"Latest execution: {monitor_status}")

### Inspect Model Monitor report

🛑 Browse to the model monitoring results in SageMaker Studio to download and run a report

## Retrain

The monitoring schedule will have trigger a detection alarm will have triggered a new pipeline to retrain.

In [None]:
latest_pipeline_execution = sm_client.list_pipeline_executions(
    PipelineName=pipeline_name,
)["PipelineExecutionSummaries"][0]
latest_execution_status = latest_pipeline_execution["PipelineExecutionStatus"]
time_ago = datetime.now(tzlocal()) - latest_pipeline_execution["StartTime"]

print(
    f"Latest pipeline: {pipeline_name} execution: {latest_execution_status} started {time_ago.total_seconds()/60:0.2f} mins ago"
)

And let's list the steps of that execution 

In [None]:
execution_steps = sm_client.list_pipeline_execution_steps(
    PipelineExecutionArn=latest_pipeline_execution["PipelineExecutionArn"],
)["PipelineExecutionSteps"]
for step in execution_steps:
    print("Step: {}, Status: {}".format(step["StepName"], step["StepStatus"]))

✅ Great now you have completed all the steps.

## Clean up

Execute the following cell to delete any registered models.

In [None]:
response = sm_client.list_model_packages(ModelPackageGroupName=project_name)
for model_package in response["ModelPackageSummaryList"]:
    print("Deleting Version {}".format(model_package["ModelPackageArn"].split("/")[-1]))
    sm_client.delete_model_package(ModelPackageName=model_package["ModelPackageArn"])

Execute the following cell to delete cloudformation stacks

1. SageMaker prod endpoint
2. SageMaker staging endpoint
3. SageMaker Pipeline Workflow and Model Package Group

In [None]:
import boto3

cfn = boto3.client("cloudformation")

for stack_name in [
    f"sagemaker-{project_name}-deploy-prod",
    f"sagemaker-{project_name}-deploy-staging",
    f"sagemaker-{project_name}-pipeline",
]:
    print("Deleting stack: {}".format(stack_name))
    cfn.delete_stack(StackName=stack_name)
    cfn.get_waiter("stack_delete_complete").wait(StackName=stack_name)

The following code will clean up all objects in the artifact bucket and delete the SageMaker project.

In [None]:
s3_resource = boto3.resource("s3")
s3_artifact_bucket = s3_resource.Bucket(artifact_bucket)
s3_artifact_bucket.object_versions.delete()
print("Artifact bucket objects deleted")

sm_client.delete_project(ProjectName=project_name)
print("SageMaker Project deleted")