# Deploy our ML Model

**SageMaker Studio Kernel**: Data Science

In this exercise you will do:
 - Run a Preprocessing Job using Amazon SageMaker Processing Job
 - Run a Tensorflow Training Job using Amazon SageMaker Training Job
 - Register a new version of the trained model in the Amazon SageMaker Model Registry

***

## Part 1/4 - Setup
Here we'll import some libraries and define some variables.

### Import required modules

In [None]:
import boto3
from datetime import datetime
import logging
from sagemaker import get_execution_role
from sagemaker.model_monitor import DataCaptureConfig, EndpointInput
import sagemaker.session
from sagemaker.tensorflow.model import TensorFlowPredictor
import traceback

In [None]:
s3_client = boto3.client("s3")
sagemaker_client = boto3.client("sagemaker")

In [None]:
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)

***

## Part 2/4 - Create Model Predictor
During this step, we are creating a model predictor for a previously created SageMaker Endpoint

In [None]:
region = boto3.session.Session().region_name
role_name = "mlops-sagemaker-execution-role"
role = "arn:aws:iam::{}:role/{}".format(boto3.client('sts').get_caller_identity().get('Account'), role_name)

kms_account_id = boto3.client('sts').get_caller_identity().get('Account')

bucket_inference = ""

kms_alias = "ml-kms"

model_package_group = "ml-end-to-end-group"

monitoring_output_path = "data/monitoring/captured"

In [None]:
kms_key = "arn:aws:kms:{}:{}:alias/{}".format(region, kms_account_id, kms_alias)

In [None]:
boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client("sagemaker")
runtime_client = boto_session.client("sagemaker-runtime")

sagemaker_session = sagemaker.session.Session(
    boto_session=boto_session,
    sagemaker_client=sagemaker_client,
    sagemaker_runtime_client=runtime_client,
    default_bucket=bucket_inference
)

In [None]:
from sagemaker.deserializers import CSVDeserializer
from sagemaker.serializers import CSVSerializer
from sagemaker.tensorflow.model import TensorFlowPredictor

predictor = TensorFlowPredictor(
    endpoint_name=model_package_group + "-dev",
    model_name="saved_model",
    model_version=1,
    accept_type="text/csv",
    serializer=CSVSerializer(),
    deserializer=CSVDeserializer()
)

In [None]:
inputs = ["Sei disgustoso"]

result = predictor.predict(inputs)

LOGGER.info("{}".format(result))

## Part 3/4 - Monitoring
Here we are creating monitoring jobs for extracting metrics from our SageMaker Endpoint

### Create a Baseline for the monitoring job

From our train dataset, let's select the relevant attributes and generate a dataset for baselining. Then we use Amazon SageMaker Model Monitor to suggest a set of baseline constraints and descriptive statistics.

In [None]:
from ast import literal_eval
import numpy as np
import pandas as pd
from sagemaker.model_monitor import CronExpressionGenerator, ModelQualityMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat
from time import gmtime, strftime

In [None]:
bucket_artifacts = ""
bucket_inference = ""

monitoring_input_files_path = "data/monitoring/input"
processing_output_files_path = "data/output"

In [None]:
train_data = "s3://{}/{}/train/train.csv".format(bucket_artifacts, processing_output_files_path)

columns = ["text", "Sentiment"]

In [None]:
baseline_output_path = "s3://{}/data/monitoring/output".format(bucket_inference)

### Execute predictions using the validation dataset

In [None]:
! rm -rf /tmp/validation_with_predictions.csv

In [None]:
limit = 200  # Need at least 200 samples to compute standard deviations
i = 0

header = True

try:
    with open("/tmp/validation_with_predictions.csv", "w") as baseline_file:
        baseline_file.write("probability,prediction,label\n")
        
        df = pd.read_csv(train_data, usecols=columns)
        df = df.dropna()
        
        for index, row in df.iterrows():
            if header:
                header = False
            else:
                text = row["text"]
                Sentiment = row["Sentiment"]

                inputs = [text]

                results = predictor.predict(inputs)

                probability = results[0][1]
                prediction = results[0][0]
                
                baseline_file.write(f"{probability},{prediction},{Sentiment}\n")
                i += 1
                if i > limit:
                    break
                print(".", end="", flush=True)

    LOGGER.info("Done!")
except Exception as e:
    print(text)
    print(Sentiment)
    stacktrace = traceback.format_exc()
    LOGGER.error("{}".format(stacktrace))

    raise e

In [None]:
! head /tmp/validation_with_predictions.csv

In [None]:
monitoring_input_files_path = "data/monitoring/input"

In [None]:
s3_client.delete_object(Bucket=bucket_inference, Key=monitoring_input_files_path)

baseline_dataset_uri = sagemaker_session.upload_data('/tmp/validation_with_predictions.csv', key_prefix=monitoring_input_files_path)

print(baseline_dataset_uri)

Please note that running the baselining job will require 8-10 minutes. In the meantime, you can take a look at the Deequ library, used to execute these analyses with the default Model Monitor container: https://github.com/awslabs/deequ

In [None]:
monitor = ModelQualityMonitor(
    role=role,
    instance_count=1,
    instance_type="ml.c5.4xlarge",
    volume_size_in_gb=20,
    max_runtime_in_seconds=1800,
    sagemaker_session=sagemaker_session,
)

In [None]:
monitor.suggest_baseline(
    baseline_dataset=baseline_dataset_uri,
    dataset_format=DatasetFormat.csv(header=True),
    output_s3_uri=baseline_output_path,
    problem_type="MulticlassClassification",
    inference_attribute="prediction",
    probability_attribute="probability",
    ground_truth_attribute="label",
    wait=True
)

### Explore the results of the baselining job
You could see the baseline constraints and statistics files are uploaded to the S3 location.

In [None]:
baseline_job = monitor.latest_baselining_job

##### View the metrics generated
You could see that the baseline statistics and constraints files are already uploaded to S3.

In [None]:
metrics = baseline_job.baseline_statistics().body_dict["multiclass_classification_metrics"]
pd.json_normalize(metrics).T

##### View the constraints generated

In [None]:
pd.DataFrame(baseline_job.suggested_constraints().body_dict["multiclass_classification_constraints"]).T

### Generate Prediction data for Model Quality Monitoring

In [None]:
test_data = "s3://{}/{}/test/test.csv".format(bucket_artifacts, processing_output_files_path)

columns = ["text", "Sentiment"]

In [None]:
limit = 200
i = 0

header = True

try:
        
    df = pd.read_csv(train_data, usecols=columns)
    df = df.dropna()

    for index, row in df.iterrows():
        if header:
            header = False
        else:
            text = row["text"]
            Sentiment = row["Sentiment"]

            inputs = [text]

            predictor.predict(inputs)
            i += 1
            if i > limit:
                break
            print(".", end="", flush=True)

    LOGGER.info("Done!")
except Exception as e:
    print(text)
    print(Sentiment)
    stacktrace = traceback.format_exc()
    LOGGER.error("{}".format(stacktrace))

    raise e

### View captured data

Now list the data capture files stored in Amazon S3. You should expect to see different files from different time periods organized based on the hour in which the invocation occurred. The format of the Amazon S3 path is:

In [None]:
import json
from sagemaker.s3 import S3Downloader, S3Uploader
from time import sleep

In [None]:
bucket_inference = ""

endpoint_name = model_package_group + "-dev"

monitoring_output_path = "data/monitoring/captured"

#### Saving eventIds in a list for creating a grount truth file

In [None]:
print("Waiting for captures to show up", end="")

event_ids = []

for _ in range(120):
    capture_files = sorted(S3Downloader.list("s3://{}/{}/{}".format(bucket_inference, monitoring_output_path, endpoint_name)))
    if capture_files:
        capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")
        capture_record = json.loads(capture_file[0])
        if "inferenceId" in capture_record["eventMetadata"] or "eventId" in capture_record["eventMetadata"]:
            ids = []
            for record in capture_file:
                try:
                    record = json.loads(record)

                    event_ids.append(record["eventMetadata"]["eventId"])
                except:
                    pass
            break
    print(".", end="", flush=True)
    sleep(1)

    
print("Found Capture Files:")
print("\n ".join(capture_files[-3:]))

Finally, the contents of a single line is present below in a formatted JSON file so that you can observe a little better.

Again, notice the `inferenceId` or `eventId` attribute that is set as part of the invoke_endpoint call.  If this is present, it will be used to join with ground truth data (otherwise `eventId` will be used):

In [None]:
print("\n".join(capture_file[-3:-1]))

In [None]:
print(json.dumps(capture_record, indent=2))

### Generate synthetic ground truth

Next, start generating ground truth data. The model quality job will fail if there's no ground truth data to merge.

In [None]:
ground_truth_upload_path = "data/monitoring/ground_truth"

In [None]:
import random

def ground_truth_with_id(inference_id):
    random.seed(inference_id)  # to get consistent results
    rand = random.random()
    return {
        "groundTruthData": {
            "data": "{},1.0".format(random.sample(range(0, 3), 1)[0]),
            "encoding": "CSV",
        },
        "eventMetadata": {
            "eventId": str(inference_id),
        },
        "eventVersion": "0",
    }

def upload_ground_truth(records, upload_time):
    fake_records = [json.dumps(r) for r in records]
    data_to_upload = "\n".join(fake_records)
    target_s3_uri = f"s3://{bucket_inference}/{ground_truth_upload_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
    print(f"Uploading {len(fake_records)} records to", target_s3_uri)
    S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

In [None]:
fake_records = []

for el in range(len(event_ids) - 1):    
    event_id = event_ids.pop()
    
    result = ground_truth_with_id(event_id)
    
    fake_records.append(result)

upload_ground_truth(fake_records, datetime.utcnow())

### Create Monitoring Scheduler

Here we are creating our monitoring scheduler. It will execute monitoring jobs with hourly schedule execution. When we create the schedule, we can also specify two scripts that will preprocess the records before the analysis takes place and execute post-processing at the end. For this example, we are not going to use a record preprocessor, and we are just specifying a post-processor that outputs some text for demo purposes.

In [None]:
reports_path = "s3://{}/data/monitoring/reports".format(bucket_inference)

LOGGER.info(reports_path)

In [None]:
# Create an enpointInput
endpointInput = EndpointInput(
    endpoint_name=predictor.endpoint_name,
    inference_attribute="0",
    probability_attribute="1",
    probability_threshold_attribute=0.5,
    destination="/opt/ml/processing/input_data",
)

In [None]:
endpoint_name = predictor.endpoint_name

mon_schedule_name = "" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())

monitor.create_monitoring_schedule(
    monitor_schedule_name=mon_schedule_name,
    endpoint_input=endpointInput,
    #record_preprocessor_script=preprocessor_path,
    #post_analytics_processor_script=postprocessor_path,
    problem_type="MulticlassClassification",
    output_s3_uri="s3://{}/{}".format(bucket_inference, reports_path),
    ground_truth_input="s3://{}/{}".format(bucket_inference, ground_truth_upload_path),
    constraints=monitor.suggested_constraints(),
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    enable_cloudwatch_metrics=True
)

In [None]:
desc_schedule_result = monitor.describe_schedule()
desc_schedule_result

In [None]:
latest_execution = monitor.list_executions()[-1]
report_uri = latest_execution.describe()["ProcessingOutputConfig"]["Outputs"][0]["S3Output"][
    "S3Uri"
]
print("Report Uri:", report_uri)

In [None]:
pd.options.display.max_colwidth = None
violations = latest_execution.constraint_violations().body_dict["violations"]
violations_df = pd.json_normalize(violations)
violations_df.head(10)

### Delete Scheduler

Once the schedule is created, it will kick of jobs at specified intervals. Note that if you are kicking this off after creating the hourly schedule, you might find the executions empty. You might have to wait till you cross the hour boundary (in UTC) to see executions kick off. Since we don't want to wait for the hour in this example we can delete the schedule and use the code in next steps to simulate what will happen when a schedule is triggered, by running an Amazon SageMaker Processing Job.

In [None]:
monitor.delete_monitoring_schedule()

### Manual monitoring execution

In oder to trigger the execution manually, we first get all paths to data capture, baseline statistics, baseline constraints, etc. Then, we use a utility fuction, defined in monitoringjob_utils.py, to run the processing job.

In [None]:
import os
import sys
import boto3

In [None]:
sys.path.insert(0, os.path.abspath('./../scripts'))

In [None]:
from monitoringjob_utils import run_model_monitor_job_processor

In [None]:
bucket_artifacts = ""
bucket_inference = ""

endpoint_name = predictor.endpoint_name
current_endpoint_capture_prefix = "data/monitoring/captured/{}".format(endpoint_name)

In [None]:
monitoring_code_prefix = "artifact/monitoring"

boto3.Session().resource("s3").Bucket(bucket_artifacts).Object(monitoring_code_prefix + "/preprocess.py").upload_file("./../algorithms/monitoring/src/preprocess.py")
boto3.Session().resource("s3").Bucket(bucket_artifacts).Object(monitoring_code_prefix + "/postprocess.py").upload_file("./../algorithms/monitoring/src/postprocess.py")

preprocessor_path = "s3://{}/{}/preprocess.py".format(bucket_artifacts, monitoring_code_prefix)
postprocessor_path = "s3://{}/{}/postprocess.py".format(bucket_artifacts, monitoring_code_prefix)

LOGGER.info(preprocessor_path)
LOGGER.info(postprocessor_path)

In [None]:
region = boto3.session.Session().region_name
role_name = "mlops-sagemaker-execution-role"
role = "arn:aws:iam::{}:role/{}".format(boto3.client('sts').get_caller_identity().get('Account'), role_name)

result = s3_client.list_objects(Bucket=bucket_inference, Prefix=current_endpoint_capture_prefix)

capture_files = ["s3://{0}/{1}".format(bucket_inference, capture_file.get("Key")) for capture_file in result.get("Contents")]
data_capture_path = capture_files[len(capture_files) - 1][: capture_files[len(capture_files) - 1].rfind('/')]

ground_truth_upload_path = "data/monitoring/ground_truth"
result = s3_client.list_objects(Bucket=bucket_inference, Prefix=ground_truth_upload_path)

ground_truth_files = ["s3://{0}/{1}".format(bucket_inference, capture_file.get("Key")) for capture_file in result.get("Contents")]
ground_truth_path = ground_truth_files[len(ground_truth_files) - 1][: ground_truth_files[len(ground_truth_files) - 1].rfind('/')]

LOGGER.info("Capture Files: ")
LOGGER.info("\n ".join(capture_files))
LOGGER.info("Ground Truth path: {}".format(ground_truth_path))

In [None]:
run_model_monitor_job_processor(
    region, 
    "ml.m5.xlarge", 
    role, 
    endpoint_name,
    data_capture_path, 
    ground_truth_path,
    reports_path,
    #preprocessor_path=preprocessor_path,
    #postprocessor_path=postprocessor_path
)

### Analysis

Here we are analyzing the report created by our Monitoring Job

In [None]:
import json
import pandas as pd

In [None]:
bucket_inference = ""

reports_path = "data/monitoring/reports"

In [None]:
result = s3_client.list_objects(Bucket=bucket_inference, Prefix=reports_path)

try:
    monitoring_reports = ['s3://{0}/{1}'.format(bucket_inference, capture_file.get("Key")) for capture_file in result.get('Contents')]
    print("Monitoring Reports Files: ")
    print("\n ".join(monitoring_reports))
except:
    print('No monitoring reports found.')

In [None]:
!aws s3 cp {monitoring_reports[0]} ./../data/monitoring/
!aws s3 cp {monitoring_reports[1]} ./../data/monitoring/
!aws s3 cp {monitoring_reports[2]} ./../data/monitoring/
!aws s3 cp {monitoring_reports[3]} ./../data/monitoring/

In [None]:
pd.set_option('display.max_colwidth', None)

file = open('./../data/monitoring/constraint_violations.json', 'r')
data = file.read()

violations_df = pd.json_normalize(json.loads(data)['violations'])
violations_df