# Batch Pipeline Notebook

This notebook will exercise the drift detection MLOps `batch pipeline`

## Setup

Retrieve the project name from your build pipeline

In [None]:
%store -r project_name

Get back the project id and region

In [None]:
import json

import sagemaker

sess = sagemaker.session.Session()
region_name = sess._region_name
sm_client = sess.sagemaker_client
project_id = sm_client.describe_project(ProjectName=project_name)["ProjectId"]
artifact_bucket = f"sagemaker-project-{project_id}-{region_name}"

print(f"Project: {project_name} ({project_id})")

Your batch pipeline should now be running, click the link below to open the AWS CodePipeline in a new window.

In [None]:
from IPython.core.display import HTML

HTML(
    f'Open <a target="_blank" href="https://{region_name}.console.aws.amazon.com/codesuite/codepipeline/pipelines/sagemaker-{project_name}-batch/view?region={region_name}">Code Pipeline</a> in a new window'
)

## Data Prep

Download the test dataset output from the pre-processing job in our build pipeline, which we will use for input to batch scoring.

In [None]:
import random

import boto3
import pandas as pd


def get_latest_processed_data(pipeline_name, step_name, output_name):
    execution_arn = sm_client.list_pipeline_executions(
        PipelineName=pipeline_name, SortBy="CreationTime"
    )["PipelineExecutionSummaries"][0]["PipelineExecutionArn"]
    steps = sm_client.list_pipeline_execution_steps(
        PipelineExecutionArn=execution_arn, SortOrder="Ascending"
    )["PipelineExecutionSteps"]
    if "monitoring" in step_name:
        links = next(
            item["Metadata"]["QualityCheck"]
            for item in steps
            if item["StepName"] == step_name
        )
        return list(filter(lambda x: str(x).startswith("s3:/"), links.values()))

    preprocess_arn = next(
        item["Metadata"]["ProcessingJob"]["Arn"]
        for item in steps
        if item["StepName"] == step_name
    )
    job_outputs = sm_client.describe_processing_job(
        ProcessingJobName=preprocess_arn.split("/")[1]
    )["ProcessingOutputConfig"]["Outputs"]
    return next(
        item["S3Output"]["S3Uri"]
        for item in job_outputs
        if item["OutputName"] == output_name
    )


pipeline_name = f"{project_name}-build"
test_uri = get_latest_processed_data(pipeline_name, "PreprocessData", "test")

# Load the test scores into a dataframe
test_df = pd.read_csv(f"{test_uri}/test.csv")
print(test_df.shape)
test_df

Upload the test dataset to the batch staging input location.

In [None]:
batch_staging_uri = f"s3://{artifact_bucket}/batch-input/staging/test.csv"
test_df.to_csv(batch_staging_uri, header=False, index=False)

## Test Staging

A staging SageMaker Pipeline is created by AWS CloudFormation in the `Batch_CFN_Staging` stage of the above AWS CodePipeline.

Once it is created, run the next cell to start the pipeline.

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"{project_name}-batch-staging"
pipeline = Pipeline(pipeline_name)

# Start pipeline
execution = pipeline.start(parameters={"DataInputUri": batch_staging_uri})
execution_name = execution.arn.split("/")[-1]

print(f"Waiting for execution: {execution_name} for pipeline {pipeline_name}...")
execution.wait()
execution_status = execution.describe()["PipelineExecutionStatus"]
print(f"Status: {execution_status}")

Once this has completed, download the batch scoring results which has the updated `fare_amount` column.

In [None]:
from sagemaker.experiments.trial import _Trial
from sagemaker.experiments.trial_component import _TrialComponent

latest_trial_name = list(
    _Trial.list(
        experiment_name=pipeline_name, sort_by="CreationTime", sort_order="Descending"
    )
)[0].trial_name
transform_trial_component_name = [
    k
    for k in _TrialComponent.list(trial_name=latest_trial_name)
    if k.trial_component_source.get("SourceType") == "SageMakerTransformJob"
][0].trial_component_name

trial_component = _TrialComponent.load(
    trial_component_name=transform_trial_component_name
)
transformed_data_uri = trial_component.output_artifacts[
    "SageMaker.TransformOutput"
].value

In [None]:
# Load the predicted scores
pred_df = pd.read_csv(
    f"{transformed_data_uri}/test.csv.out",
    names=["fare_amount_prediction"],
)
pred_df

### Evaluate

Calculate the root mean square error (RMSE) to evaluate the performance of this model. 

In [None]:
from math import sqrt

from sklearn.metrics import mean_squared_error

mse = mean_squared_error(test_df["fare_amount"], pred_df["fare_amount_prediction"])
rmse = sqrt(mse)
print(f"RMSE: {rmse}")

Plot the residues to see where the errors are relative to the fare amount.

In [None]:
import seaborn as sns

sns.residplot(
    x=test_df["fare_amount"], y=pred_df["fare_amount_prediction"], lowess=True
);

### Approve Staging

🛑  Head back to the AWS Code Pipeline and approve the staging batch scoring to kick off the production batch scoring

## Test Production

Before we test production, let's tweak some of the columns to change the distribution of the data. 

This represents a simulation of reality where the distribution of the incoming data has changed due to changes in the environment.

Note: Once this is run you will notice that the pipeline execution will be shown as "Failed". This is expected since data violation will have been detected. opening the relevant step on SageMaker Studio will further reveal that it errored with a message "Quality Check failed. See violation report at: ..."

In [None]:
test_df["passenger_count"] = random.choices(
    [1, 2, 3, 4, 5, 6], weights=[2, 1, 2, 5, 2, 1], k=test_df.shape[0]
)
test_df["geo_distance"] = test_df["passenger_count"].apply(
    lambda x: 70 * random.betavariate(2.5, 2)
)

Upload the tweaked dataset to the production input location

In [None]:
batch_prod_uri = f"s3://{artifact_bucket}/batch-input/prod/tweaked.csv"
test_df.to_csv(batch_prod_uri, header=False, index=False)

After a few minutes our production batch pipeline will be ready for scoring.   

Start the production batch pipeline and wait for it to finish.

In [None]:
pipeline_name = f"{project_name}-batch-prod"
pipeline = Pipeline(pipeline_name)

# Start pipeline
execution = pipeline.start(parameters={"DataInputUri": batch_prod_uri})
execution_name = execution.arn.split("/")[-1]

print(f"Waiting for execution: {execution_name} for pipeline {pipeline_name}...")
execution.wait()
execution_status = execution.describe()["PipelineExecutionStatus"]
print(f"Status: {execution_status}")

Let's list steps, and we will see the last step was to `EvaluateDrift` Lambda function

In [None]:
for step in execution.list_steps():
    print("Step: {}, Status: {}".format(step["StepName"], step["StepStatus"]))

## Monitor

Let's download the files produced by the Model Monitor job

In [None]:
from sagemaker.s3 import S3Downloader

monitor_uri = get_latest_processed_data(
    pipeline_name, "MonitorDataQuality-monitoring", "monitoring_output"
)

print("Downloading monitor files:")
for s3_uri in monitor_uri:
    print(s3_uri.split("/")[-1])
    S3Downloader().download(s3_uri, "monitor")

If the job has output a `constraint_violations.json` file let's load this and output the violations.

In [None]:
import json
import os

violations = None
if "constraint_violations.json" in os.listdir("monitor"):
    with open("monitor/constraint_violations.json", "r") as f:
        violations = json.load(f)["violations"]
else:
    print("No violations")

violations

## Retrain

The `EvaluateDrift` Lambda will read the contents of `constraint_violations.json` and will publish Amazon [CloudWatch Metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-interpreting-cloudwatch.html).  

If drift is detected for a metric above the threshold defined in the `prod-config.json` in the batch pipeline, then the Amazon CloudWatch will Alarm resulting in the SageMaker pipeline to be re-trained.

To see the CloudWatch metric Alarm click on the link below.

In [None]:
alarm_name = f"sagemaker-{pipeline_name}-threshold"

HTML(
    f'Open <a target="_blank" href="https://{region_name}.console.aws.amazon.com/cloudwatch/home?region={region_name}#alarmsV2:alarm/{alarm_name}">CloudWatch Alarm</a> in new window'
)

This will result in a new SageMaker pipeline execution starting.

In [None]:
from datetime import datetime, timedelta

from dateutil.tz import tzlocal

pipeline_name = f"{project_name}-build"

latest_pipeline_execution = sm_client.list_pipeline_executions(
    PipelineName=pipeline_name,
)["PipelineExecutionSummaries"][0]
latest_execution_status = latest_pipeline_execution["PipelineExecutionStatus"]
time_ago = datetime.now(tzlocal()) - latest_pipeline_execution["StartTime"]

print(
    f"Latest pipeline: {pipeline_name} execution: {latest_execution_status} started {time_ago.total_seconds()/60:0.2f} mins ago"
)

We can verify that this was triggered by Drift by inspecting the InputSource:

In [None]:
params = sm_client.list_pipeline_parameters_for_execution(
    PipelineExecutionArn=latest_pipeline_execution["PipelineExecutionArn"],
)
input_source = [
    p["Value"] for p in params["PipelineParameters"] if p["Name"] == "InputSource"
][0]
print(f"Pipeline execution started with InputSource: {input_source}")

And let's list the steps of that execution.  

In [None]:
execution_steps = sm_client.list_pipeline_execution_steps(
    PipelineExecutionArn=latest_pipeline_execution["PipelineExecutionArn"],
)["PipelineExecutionSteps"]
for step in execution_steps:
    print("Step: {}, Status: {}".format(step["StepName"], step["StepStatus"]))

✅ Great now you have completed all the steps.

## Clean up

Execute the following cell to delete cloudformation stacks

1. SageMaker batch prod pipeline
2. SageMaker batch staging pipeline

In [None]:
import boto3

cfn = boto3.client("cloudformation")

for stack_name in [
    f"sagemaker-{project_name}-batch-prod",
    f"sagemaker-{project_name}-batch-staging",
]:
    print("Deleting stack: {}".format(stack_name))
    cfn.delete_stack(StackName=stack_name)
    cfn.get_waiter("stack_delete_complete").wait(StackName=stack_name)

You can return to the [build-pipeline](build-pipeline.ipynb) notebook to complete the cleanup.