# Pipeline

In [2]:
!pip3 install python-dotenv

Collecting python-dotenv
  Using cached python_dotenv-0.21.1-py3-none-any.whl (19 kB)
[33mDEPRECATION: pyodbc 4.0.0-unsupported has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pyodbc or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063[0m[33m
[0mInstalling collected packages: python-dotenv
Successfully installed python-dotenv-0.21.1
[0m

In [3]:
from dotenv import find_dotenv, load_dotenv
load_dotenv(find_dotenv("config.env"))

True

In [4]:
import os
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession, LocalPipelineSession

role = sagemaker.get_execution_role()
bucket = os.environ["BUCKET"]
pipeline_session = PipelineSession(default_bucket=bucket)

In [5]:
sagemaker.__version__

'2.183.0'

In [6]:
import boto3

sagemaker_session = sagemaker.session.Session()
sagemaker_client = boto3.client("sagemaker")
iam_client = boto3.client("iam")
region = boto3.Session().region_name



In [7]:
ENDPOINT = "dogBreeds-endpoint"
DATA_CAPTURE_DESTINATION = f"s3://{bucket}/monitoring/data-capture"
MODEL_PACKAGE_GROUP = "dogBreeds"

In [8]:
config = {
    "session": pipeline_session,
    "instance_type": "ml.m5.xlarge",
    "image": None,
    "framework_version": "1.12",
    "py_version": "py38",
}

In [9]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="15d")

## Data preprocessing

In [10]:
# # For testing script locally

# import importlib
# from preprocessing import preprocess_data

# importlib.reload(preprocess_data)

# import tempfile
# import shutil

# from pathlib import Path

# directory = tempfile.mkdtemp()
# data_dir = "all/"
# train_ratio = 0.8
# output_dir = Path(directory) / "output"

# preprocess_data.preprocess_data(data_dir, output_dir, train_ratio)

In [11]:
# shutil.rmtree(directory)

In [12]:
from sagemaker.workflow.pipeline_context import LocalPipelineSession
local_pipeline_session = LocalPipelineSession()

In [13]:
from sagemaker.workflow.parameters import ParameterString

dataset_location = ParameterString(
    name="dataset_location",
    default_value=f"s3://{bucket}/all",
)

In [14]:
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig

pipeline_definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)

In [15]:
# dataset_location

In [16]:
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.pytorch.processing import PyTorchProcessor

# Define the script processor
pytorch_preprocessor = PyTorchProcessor(
    base_job_name="preprocess-data",
    image_uri=config["image"],
    framework_version=config["framework_version"],
    py_version=config["py_version"],
    instance_type=config["instance_type"],
    instance_count=1,
    role=role,
    sagemaker_session=config["session"],
    # sagemaker_session=local_pipeline_session
)

preprocessing_step = ProcessingStep(
    name="preprocess-data",
    step_args=pytorch_preprocessor.run(
        code='preprocess_data.py',
        source_dir='preprocessing', 
        inputs=[
            ProcessingInput(
                source=dataset_location,
                destination='/opt/ml/processing/input'
            )
        ],
        outputs=[
            ProcessingOutput(
                output_name="train",
                source='/opt/ml/processing/output/train',
                destination=f's3://{bucket}/output/train'
            ),
            ProcessingOutput(
                output_name="test",
                source='/opt/ml/processing/output/test',
                destination=f's3://{bucket}/output/test'
            ),
            ProcessingOutput(
                output_name="data-baseline",
                source='/opt/ml/processing/output/data-baseline',
                destination=f's3://{bucket}/output/data-baseline'
            )
        ]
    ),
    cache_config=cache_config
)



## Training

In [17]:
# !pip install --upgrade torch torchvision

In [18]:
# # For testing script locally

# import importlib
# import train

# importlib.reload(train)

# import tempfile
# import shutil

# from pathlib import Path
# from train import main

# directory = tempfile.mkdtemp()
# data_dir = "all/"
# model_dir = Path(directory) / "model"
# output_dir = Path(directory) / "output"

# model_dir.mkdir(parents=True, exist_ok=True)
# output_dir.mkdir(parents=True, exist_ok=True)

# num_epochs = 1
# batch_size = 16
# debug = True

# main(str(data_dir), str(model_dir), str(output_dir), num_epochs, batch_size, debug)

In [19]:
from sagemaker.pytorch import PyTorch

estimator = PyTorch(
    base_job_name="dogbreeds-training",
    entry_point=f"train.py",

    hyperparameters={
        "epochs": 5,
        "batch_size": 32,
    },
    
    metric_definitions=[
        {"Name": "loss", "Regex": "Loss: ([0-9\\.]+)"},
        {"Name": "accuracy", "Regex": "Validation Accuracy: ([0-9\\.]+)"},
    ],
    image_uri=config["image"],
    framework_version=config["framework_version"],
    py_version=config["py_version"],
    instance_type="ml.g4dn.xlarge",
    instance_count=1,
    disable_profiler=True,
    sagemaker_session=config["session"],
    role=role
)

In [20]:
from sagemaker.workflow.steps import TrainingStep
from sagemaker.inputs import TrainingInput

train_model_step = TrainingStep(
    name="train-model",
    step_args=estimator.fit(
        inputs={
            "train": TrainingInput(
                s3_data=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri
            )
        }
    ),
    cache_config=cache_config,
)

## Evaluation

In [21]:
# # For testing script locally

# # if 'autoreload' not in get_ipython().extension_manager.loaded:
# #     %load_ext autoreload
# import importlib

# import tempfile
# import shutil

# from pathlib import Path
# from evaluate import evaluation

# importlib.reload(evaluation)
# # generate model.tar.gz
# import tarfile
# import os

# def tar_sagemaker_style(source_dir, output_filename):
#     with tarfile.open(output_filename, "w:gz") as tar:
#         for item in os.listdir(source_dir):
#             item_path = os.path.join(source_dir, item)
#             tar.add(item_path, arcname=item)

# # Tar the 'model/' folder
# output_file = 'model.tar.gz'
# tar_sagemaker_style(str(model_dir), model_dir / output_file)
# print("tared data")

# baseline_dir = output_dir.parent / "baseline"
# baseline_dir.mkdir(exist_ok=True)

# with tarfile.open(Path(directory) / "model.tar.gz") as tar:
#     tar.extractall(path=Path(model_dir))
# evaluation.main(str(model_dir), str(data_dir), str(output_dir), str(baseline_dir), debug=True)

In [22]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name="evaluation-report", output_name="evaluation", path="evaluation.json"
)

In [23]:
model_assets = train_model_step.properties.ModelArtifacts.S3ModelArtifacts
model_assets

<sagemaker.workflow.properties.Properties at 0x7f188cb45510>

In [24]:
config

{'session': <sagemaker.workflow.pipeline_context.PipelineSession at 0x7f18c86276d0>,
 'instance_type': 'ml.m5.xlarge',
 'image': None,
 'framework_version': '1.12',
 'py_version': 'py38'}

In [25]:
from sagemaker.pytorch.processing import PyTorchProcessor

pytorch_processor = PyTorchProcessor(
    base_job_name="evaluation-processor",
    image_uri=config["image"],
    framework_version=config["framework_version"],
    py_version=config["py_version"],
    instance_type=config["instance_type"],
    instance_count=1,
    role=role,
    sagemaker_session=config["session"],
)


In [26]:
evaluate_model_step = ProcessingStep(
    name="evaluate-model",
    step_args=pytorch_processor.run(
        code=f"evaluation.py",
        source_dir='evaluate',
        inputs=[
            ProcessingInput(
                source=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test",
            ),
            ProcessingInput(
                source=model_assets,
                destination="/opt/ml/processing/model",
            ),
        ],
        outputs=[
            ProcessingOutput(
                output_name="evaluation", source="/opt/ml/processing/evaluation"
            ),
            ProcessingOutput(
                output_name="model-baseline", source="/opt/ml/processing/baseline"
            ),
        ],
    ),
    property_files=[evaluation_report],
    cache_config=cache_config,
)

## Data & Model Quality Check Step

In [27]:
GROUND_TRUTH_LOCATION = f"s3://{bucket}/monitoring/groundtruth"
DATA_QUALITY_LOCATION = f"s3://{bucket}/monitoring/data-quality"
MODEL_QUALITY_LOCATION = f"s3://{bucket}/monitoring/model-quality"

In [28]:
from sagemaker.workflow.quality_check_step import (
    QualityCheckStep,
    DataQualityCheckConfig,
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.model_monitor.dataset_format import DatasetFormat

data_quality_baseline_step = QualityCheckStep(
    name="generate-data-quality-baseline",
    check_job_config=CheckJobConfig(
        instance_type="ml.c5.xlarge",
        instance_count=1,
        volume_size_in_gb=20,
        sagemaker_session=pipeline_session,
        role=role,
    ),
    quality_check_config=DataQualityCheckConfig(
        baseline_dataset=preprocessing_step.properties.ProcessingOutputConfig.Outputs[
            "data-baseline"
        ].S3Output.S3Uri,
        dataset_format=DatasetFormat.csv(header=True, output_columns_position="START"),
        output_s3_uri=DATA_QUALITY_LOCATION,
    ),
    model_package_group_name=MODEL_PACKAGE_GROUP,
    skip_check=True,
    register_new_baseline=True,
    cache_config=cache_config,
)

In [29]:
from sagemaker.workflow.quality_check_step import ModelQualityCheckConfig

model_quality_baseline_step = QualityCheckStep(
    name="generate-model-quality-baseline",
    check_job_config=CheckJobConfig(
        instance_type="ml.c5.xlarge",
        instance_count=1,
        volume_size_in_gb=20,
        sagemaker_session=pipeline_session,
        role=role,
    ),
    quality_check_config=ModelQualityCheckConfig(
        baseline_dataset=evaluate_model_step.properties.ProcessingOutputConfig.Outputs[
            "model-baseline"
        ].S3Output.S3Uri,
        dataset_format=DatasetFormat.csv(header=True),
        problem_type="MulticlassClassification",
        # evaluate_model_step generated baseline csv of header "Label", "Predicted", "Confidence"
        ground_truth_attribute="Label",
        inference_attribute="Predicted",
        probability_attribute="Confidence",
        output_s3_uri=MODEL_QUALITY_LOCATION,
    ),
    model_package_group_name=MODEL_PACKAGE_GROUP,
    skip_check=True,
    register_new_baseline=True,
    cache_config=cache_config,
)

## Registering Model

In [30]:
config

{'session': <sagemaker.workflow.pipeline_context.PipelineSession at 0x7f18c86276d0>,
 'instance_type': 'ml.m5.xlarge',
 'image': None,
 'framework_version': '1.12',
 'py_version': 'py38'}

In [31]:
MODEL_PACKAGE_GROUP = "dogBreeds"

In [32]:
from sagemaker.pytorch.model import PyTorchModel

pytorch_model = PyTorchModel(
    model_data=model_assets,
    entry_point="inference.py",
    image_uri=config["image"],
    py_version=config["py_version"],
    framework_version=config["framework_version"],
    sagemaker_session=config["session"],
    role=role,
)

In [33]:
from sagemaker.model_metrics import ModelMetrics, MetricsSource
from sagemaker.drift_check_baselines import DriftCheckBaselines

model_metrics = ModelMetrics(
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
    model_statistics=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.CalculatedBaselineStatistics,
        content_type="application/json",
    ),
    model_constraints=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.CalculatedBaselineConstraints,
        content_type="application/json",
    ),
)

drift_check_baselines = DriftCheckBaselines(
    model_data_statistics=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_data_constraints=MetricsSource(
        s3_uri=data_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
    model_statistics=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckStatistics,
        content_type="application/json",
    ),
    model_constraints=MetricsSource(
        s3_uri=model_quality_baseline_step.properties.BaselineUsedForDriftCheckConstraints,
        content_type="application/json",
    ),
)

In [34]:
from sagemaker.workflow.model_step import ModelStep

register_model_step = ModelStep(
    name="register-model",
    step_args=pytorch_model.register(
        model_package_group_name=MODEL_PACKAGE_GROUP,
        model_metrics=model_metrics,
        drift_check_baselines=drift_check_baselines,
        approval_status="PendingManualApproval",
        content_types=["application/json", "application/x-image"],
        response_types=["application/json"],
        inference_instances=["ml.m5.xlarge"],
        transform_instances=["ml.g4dn.xlarge"],
        domain="MACHINE_LEARNING",
        task="CLASSIFICATION",
        framework="PYTORCH",
        framework_version=config["framework_version"],
    ),
)

In [35]:
from sagemaker.workflow.parameters import ParameterFloat

accuracy_threshold = ParameterFloat(name="accuracy_threshold", default_value=0.50)

In [36]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

fail_step = FailStep(
    name="fail",
    error_message=Join(
        on=" ",
        values=[
            "Execution failed because the model's accuracy was lower than",
            accuracy_threshold,
        ],
    ),
)

In [37]:
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo

condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=evaluate_model_step.name,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value",
    ),
    right=accuracy_threshold,
)

In [38]:
from sagemaker.workflow.condition_step import ConditionStep

condition_step = ConditionStep(
    name="check-model-accuracy",
    conditions=[condition],
    if_steps=[
        model_quality_baseline_step, 
        register_model_step
    ],
    else_steps=[fail_step],
)

In [39]:
from sagemaker.workflow.pipeline import Pipeline
training_pipeline = Pipeline(
    name="dogBreeds-training-pipeline",
    parameters=[dataset_location, accuracy_threshold],
    steps=[
        preprocessing_step,
        train_model_step,
        data_quality_baseline_step,
        evaluate_model_step,
        condition_step,
    ],
    pipeline_definition_config=pipeline_definition_config,
    sagemaker_session=config["session"],
)

In [40]:
training_pipeline.upsert(role_arn=role)

Using provided s3_resource


INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource
Using provided s3_resource


INFO:sagemaker.processing:Uploaded evaluate to s3://pochingto-testing/dogBreeds-training-pipeline/code/4db941f04ed748882e10300736cad13a/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://pochingto-testing/dogBreeds-training-pipeline/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh


Using provided s3_resource


INFO:sagemaker.processing:Uploaded preprocessing to s3://pochingto-testing/dogBreeds-training-pipeline/code/af7855aa5c91b75932b386e381849af6/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://pochingto-testing/dogBreeds-training-pipeline/code/0c8137ea235a6debf66cba8d901e144c/runproc.sh
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


Using provided s3_resource
Using provided s3_resource


INFO:sagemaker.processing:Uploaded evaluate to s3://pochingto-testing/dogBreeds-training-pipeline/code/4db941f04ed748882e10300736cad13a/sourcedir.tar.gz
INFO:sagemaker.processing:runproc.sh uploaded to s3://pochingto-testing/dogBreeds-training-pipeline/code/2c207c809cb0e0e9a1d77e5247f961f9/runproc.sh


Using provided s3_resource
Using provided s3_resource


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:681340771742:pipeline/dogBreeds-training-pipeline',
 'ResponseMetadata': {'RequestId': 'a233e3f8-afe5-4a49-86af-9b15a86dc0fc',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'a233e3f8-afe5-4a49-86af-9b15a86dc0fc',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '95',
   'date': 'Thu, 23 Nov 2023 20:28:09 GMT'},
  'RetryAttempts': 0}}

# Setup Automatic Deployment

In this part, my premission is not correctly configured. So I setup this part using AWS console UI. 

## Setup lambda

In [41]:
# # setup role for lambda to deploy endpoint
# import json

# lambda_role_name = "lambda-deployment-role"
# lambda_role_arn = None

# try:
#     response = iam_client.create_role(
#         RoleName=lambda_role_name,
#         AssumeRolePolicyDocument=json.dumps(
#             {
#                 "Version": "2012-10-17",
#                 "Statement": [
#                     {
#                         "Effect": "Allow",
#                         "Principal": {
#                             "Service": ["lambda.amazonaws.com", "events.amazonaws.com"]
#                         },
#                         "Action": "sts:AssumeRole",
#                     }
#                 ],
#             }
#         ),
#         Description="Lambda Endpoint Deployment",
#     )

#     lambda_role_arn = response["Role"]["Arn"]

#     iam_client.attach_role_policy(
#         PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
#         RoleName=lambda_role_name,
#     )

#     iam_client.attach_role_policy(
#         PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
#         RoleName=lambda_role_name,
#     )

#     print(f'Role "{lambda_role_name}" created with ARN "{lambda_role_arn}".')
# except iam_client.exceptions.EntityAlreadyExistsException:
#     response = iam_client.get_role(RoleName=lambda_role_name)
#     lambda_role_arn = response["Role"]["Arn"]
#     print(f'Role "{lambda_role_name}" already exists with ARN "{lambda_role_arn}".')

In [42]:
# ENDPOINT = "dogbreeds-endpoint"
# DATA_CAPTURE_DESTINATION = f"s3://{bucket}/monitoring/data-capture"

In [43]:
# from sagemaker.lambda_helper import Lambda

# deploy_lambda_fn = Lambda(
#     function_name="deploy_fn",
#     execution_role_arn=lambda_role_arn,
#     script="lambda.py",
#     handler="lambda.lambda_handler",
#     timeout=600,
#     session=sagemaker_session,
#     runtime="python3.11",
#     environment={
#         "Variables": {
#             "ENDPOINT": ENDPOINT,
#             "DATA_CAPTURE_DESTINATION": DATA_CAPTURE_DESTINATION,
#             "ROLE": role,
#         }
#     },
# )

# lambda_response = deploy_lambda_fn.upsert()
# lambda_response

## Setup Eventbridge

In [44]:
# MODEL_PACKAGE_GROUP

In [45]:
# event_pattern = f"""
# {{
#   "source": ["aws.sagemaker"],
#   "detail-type": ["SageMaker Model Package State Change"],
#   "detail": {{
#     "ModelPackageGroupName": ["{MODEL_PACKAGE_GROUP}"],
#     "ModelApprovalStatus": ["Approved"]
#   }}
# }}
# """

In [46]:
# event_pattern

In [47]:
# events_client = boto3.client("events")
# rule_response = events_client.put_rule(
#     Name="PipelineModelApprovedRule",
#     EventPattern=event_pattern,
#     State="ENABLED",
#     RoleArn=role,
# )

# Manually Deploy

In [48]:
from sagemaker.predictor import Predictor

ENDPOINT = "dogBreeds-endpoint"
DATA_CAPTURE_DESTINATION = f"{bucket}/monitoring/data-capture"
MODEL_PACKAGE_GROUP = "dogBreeds"

In [49]:
import boto3
import sagemaker

sagemaker_client = boto3.client("sagemaker")
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

## Sagemaker SDK

In [50]:
# from sagemaker import ModelPackage

# model_package = ModelPackage(
#     model_package_arn=package["ModelPackageArn"],
#     sagemaker_session=sagemaker_session,
#     role=role,
# )

In [51]:
# model_package.deploy(
#     endpoint_name=ENDPOINT, 
#     initial_instance_count=1, 
#     instance_type=config["instance_type"]
# )

## boto3 SDK

In [52]:
# response = sagemaker_client.list_model_packages(
#     ModelPackageGroupName=MODEL_PACKAGE_GROUP,
#     ModelApprovalStatus="Approved",
#     SortBy="CreationTime",
#     MaxResults=1,
# )

# package = (
#     response["ModelPackageSummaryList"][0]
#     if response["ModelPackageSummaryList"]
#     else None
# )
# package

In [53]:
# import time
# import boto3

# sagemaker_client = boto3.client("sagemaker")

# endpoint_name = "dogBreeds-endpoint"
# data_capture_destination = DATA_CAPTURE_DESTINATION # f"s3://{bucket}/monitoring/data-capture"

# timestamp = time.strftime("%m%d%H%M%S", time.localtime())
# model_name = f"{endpoint_name}-model-{timestamp}"
# endpoint_config_name = f"{endpoint_name}-config-{timestamp}"
# model_package_arn=package["ModelPackageArn"]

# sagemaker_client.create_model(
#     ModelName=model_name, 
#     ExecutionRoleArn=role, 
#     Containers=[{
#         "ModelPackageName": model_package_arn
#     }] 
# )
# sagemaker_client.create_endpoint_config(
#     EndpointConfigName=endpoint_config_name,
#     ProductionVariants=[{
#         "ModelName": model_name,
#         "InstanceType": "ml.m5.xlarge",
#         "InitialVariantWeight": 1,
#         "InitialInstanceCount": 1,
#         "VariantName": "AllTraffic",
#     }],

#     DataCaptureConfig={
#         "EnableCapture": True,
#         "InitialSamplingPercentage": 100,
#         "DestinationS3Uri": data_capture_destination,
#         "CaptureOptions": [
#             {
#                 "CaptureMode": "Input"
#             },
#             {
#                 "CaptureMode": "Output"
#             },
#         ],
#         "CaptureContentTypeHeader": {
#             "JsonContentTypes": [
#                 "application/json",
#                 "application/x-image"
#             ]
#         }
#     },
# )

In [54]:
# response = sagemaker_client.list_endpoints(NameContains=endpoint_name, MaxResults=1)

# if len(response["Endpoints"]) == 0:
#     sagemaker_client.create_endpoint(
#         EndpointName=endpoint_name, 
#         EndpointConfigName=endpoint_config_name,
#     )
# else:
#     sagemaker_client.update_endpoint(
#         EndpointName=endpoint_name, 
#         EndpointConfigName=endpoint_config_name,
#     )

# Setup Monitoring

In [55]:
from time import sleep
from sagemaker.model_monitor import MonitoringExecution
from sagemaker.s3 import S3Downloader

def describe_monitoring_schedules(endpoint_name):
    schedules = []
    response = sagemaker_client.list_monitoring_schedules(EndpointName=endpoint_name)[
        "MonitoringScheduleSummaries"
    ]
    for item in response:
        name = item["MonitoringScheduleName"]
        schedule = {
            "Name": name,
            "Type": item["MonitoringType"],
        }

        description = sagemaker_client.describe_monitoring_schedule(
            MonitoringScheduleName=name
        )
        # print(description)
        schedule["Status"] = description["MonitoringScheduleStatus"]

        last_execution_status = description["LastMonitoringExecutionSummary"][
            "MonitoringExecutionStatus"
        ]

        schedule["Last Execution Status"] = last_execution_status
        schedule["Last Execution Date"] = str(description["LastMonitoringExecutionSummary"]["LastModifiedTime"])

        if last_execution_status == "Failed":
            schedule["Failure Reason"] = description["LastMonitoringExecutionSummary"][
                "FailureReason"
            ]
        elif last_execution_status == "CompletedWithViolations":
            processing_job_arn = description["LastMonitoringExecutionSummary"][
                "ProcessingJobArn"
            ]
            execution = MonitoringExecution.from_processing_arn(
                sagemaker_session=sagemaker_session,
                processing_job_arn=processing_job_arn,
            )
            execution_destination = execution.output.destination

            violations_filepath = os.path.join(
                execution_destination, "constraint_violations.json"
            )
            violations = json.loads(S3Downloader.read_file(violations_filepath))[
                "violations"
            ]

            schedule["Violations"] = violations

        schedules.append(schedule)

    return schedules


def describe_monitoring_schedule(endpoint_name, monitoring_type):
    found = False

    schedules = describe_monitoring_schedules(endpoint_name)
    for schedule in schedules:
        if schedule["Type"] == monitoring_type:
            found = True
            print(json.dumps(schedule, indent=2))

    if not found:
        print(f"There's no {monitoring_type} Monitoring Schedule.")


def describe_data_monitoring_schedule(endpoint_name):
    describe_monitoring_schedule(endpoint_name, "DataQuality")


def describe_model_monitoring_schedule(endpoint_name):
    describe_monitoring_schedule(endpoint_name, "ModelQuality")


def delete_monitoring_schedule(endpoint_name, monitoring_type):
    attempts = 30
    found = False

    response = sagemaker_client.list_monitoring_schedules(EndpointName=endpoint_name)[
        "MonitoringScheduleSummaries"
    ]
    for item in response:
        if item["MonitoringType"] == monitoring_type:
            found = True
            
            summary = sagemaker_client.describe_monitoring_schedule(
                MonitoringScheduleName=item["MonitoringScheduleName"]
            )
            status = summary["MonitoringScheduleStatus"]

            if status == "Scheduled" and "LastMonitoringExecutionSummary" in summary and "MonitoringExecutionStatus" in summary["LastMonitoringExecutionSummary"]:
                status = summary["LastMonitoringExecutionSummary"]["MonitoringExecutionStatus"]

            while status in ("Pending", "InProgress") and attempts > 0:
                attempts -= 1
                print(
                    f"Monitoring schedule status: {status}. Waiting for it to finish."
                )
                sleep(30)

                status = sagemaker_client.describe_monitoring_schedule(
                    MonitoringScheduleName=item["MonitoringScheduleName"]
                )["MonitoringScheduleStatus"]

            if status not in ("Pending", "InProgress"):
                sagemaker_client.delete_monitoring_schedule(
                    MonitoringScheduleName=item["MonitoringScheduleName"]
                )
                print("Monitoring schedule deleted.")
            else:
                print("Waiting for monitoring schedule timed out")

    if not found:
        print(f"There's no {monitoring_type} Monitoring Schedule.")


def delete_data_monitoring_schedule(endpoint_name):
    delete_monitoring_schedule(endpoint_name, "DataQuality")


def delete_model_monitoring_schedule(endpoint_name):
    delete_monitoring_schedule(endpoint_name, "ModelQuality")

In [56]:
GROUND_TRUTH_LOCATION = f"s3://{bucket}/monitoring/groundtruth"
DATA_QUALITY_LOCATION = f"s3://{bucket}/monitoring/data-quality"
MODEL_QUALITY_LOCATION = f"s3://{bucket}/monitoring/model-quality"

In [58]:
# GROUND_TRUTH_LOCATION

## Data Monitoring

In [59]:
import json
import numpy as np
import io
import base64

from PIL import Image

def extract_rgb_features(image_data):
    img = Image.open(io.BytesIO(image_data))
    width, height = img.size
    print(f"Img size w x h: {width}, {height}")
    img_array = np.array(img)
    
    columns = ['width', 'height', 'red_mean', 'red_std', 'green_mean', 'green_std', 'blue_mean', 'blue_std']
    if img_array.ndim == 3:
        red_channel = img_array[:, :, 0]
        green_channel = img_array[:, :, 1]
        blue_channel = img_array[:, :, 2]

        red_mean, red_std = np.mean(red_channel), np.std(red_channel)
        print(f"red mean {red_mean} red std {red_std}")
        green_mean, green_std = np.mean(green_channel), np.std(green_channel)
        print(f"green mean {green_mean} green std {green_std}")
        blue_mean, blue_std = np.mean(blue_channel), np.std(blue_channel)
        print(f"blue mean {blue_mean} blue std {blue_std}")

        features = [width, height, red_mean, red_std, green_mean, green_std, blue_mean, blue_std]
    else:
        features = [width, height] + [-1.0] * 6
        
    response = {
        "width": width, 
        "height": height, 
        "red_mean": red_mean, 
        "red_std": red_std, 
        "green_mean": green_mean, 
        "green_std": green_std
    }
    print(f"Reponse: {response}")
    return response

def preprocess_handler(inference_record):
    data = json.loads(inference_record.endpoint_input.data)
    image_data = data["image_data"]
    image_data = base64.b64decode(image_data)
    return extract_rgb_features(image_data)

In [60]:
DATA_QUALITY_PREPROCESSOR = "data_quality_preprocessor.py"

In [61]:
s3_bucket = boto3.Session().resource("s3").Bucket(pipeline_session.default_bucket())
prefix = "dogbreeds/monitoring"
s3_bucket.Object(os.path.join(prefix, DATA_QUALITY_PREPROCESSOR)).upload_file(
    str(DATA_QUALITY_PREPROCESSOR)
)
data_quality_preprocessor = (
    f"s3://{os.path.join(s3_bucket.name, prefix, DATA_QUALITY_PREPROCESSOR)}"
)

In [62]:
# data_quality_preprocessor

In [59]:
from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor

data_monitor = DefaultModelMonitor(
    instance_type=config["instance_type"],
    instance_count=1,
    max_runtime_in_seconds=3600,
    role=role,
)

data_monitor.create_monitoring_schedule(
    monitor_schedule_name="dogbreeds-data-monitoring-schedule",
    endpoint_input=ENDPOINT,
    record_preprocessor_script=data_quality_preprocessor,
    statistics=f"{DATA_QUALITY_LOCATION}/statistics.json",
    constraints=f"{DATA_QUALITY_LOCATION}/constraints.json",
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    output_s3_uri=DATA_QUALITY_LOCATION,
    enable_cloudwatch_metrics=True,
)

In [83]:
# describe_data_monitoring_schedule(ENDPOINT)

{
  "Name": "dogbreeds-data-monitoring-schedule",
  "Type": "DataQuality",
  "Status": "Scheduled",
  "Last Execution Status": "CompletedWithViolations",
  "Last Execution Date": "2023-11-22 22:08:28.549000+00:00",
  "Violations": [
    {
      "feature_name": "height",
      "constraint_check_type": "data_type_check",
      "description": "Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 0.0% of data is Integral."
    },
    {
      "feature_name": "blue_std",
      "constraint_check_type": "data_type_check",
      "description": "Data type match requirement is not met. Expected data type: Fractional, Expected match: 100.0%. Observed: Only 0.0% of data is Fractional."
    },
    {
      "feature_name": "width",
      "constraint_check_type": "data_type_check",
      "description": "Data type match requirement is not met. Expected data type: Integral, Expected match: 100.0%. Observed: Only 0.0% of data is Integral."
    },
   

## Model Monitoring

In [63]:
from sagemaker.model_monitor import ModelQualityMonitor, EndpointInput

model_monitor = ModelQualityMonitor(
    instance_type=config["instance_type"],
    instance_count=1,
    max_runtime_in_seconds=1800,
    role=role
)

model_monitor.create_monitoring_schedule(
    monitor_schedule_name="dogbreeds-model-monitoring-schedule",
    
    endpoint_input = EndpointInput(
        endpoint_name=ENDPOINT,
        inference_attribute="prediction",
        destination="/opt/ml/processing/input_data",
    ),
    
    problem_type="MulticlassClassification",
    ground_truth_input=GROUND_TRUTH_LOCATION,
    constraints=f"{MODEL_QUALITY_LOCATION}/constraints.json",
    schedule_cron_expression=CronExpressionGenerator.hourly(),
    output_s3_uri=MODEL_QUALITY_LOCATION,
    enable_cloudwatch_metrics=True,
)

In [84]:
# describe_model_monitoring_schedule(ENDPOINT)

{
  "Name": "dogbreeds-model-monitoring-schedule",
  "Type": "ModelQuality",
  "Status": "Scheduled",
  "Last Execution Status": "InProgress",
  "Last Execution Date": "2023-11-22 22:07:51.938000+00:00"
}


## Generate Traffic

In [65]:
from PIL import Image
import io
import base64

def load_and_preprocess_image(image_path):
    # Load the image
    image = Image.open(image_path)
    # image = image.resize((224, 224))

    # Convert the image to bytes
    img_byte_arr = io.BytesIO()
    image.save(img_byte_arr, format='JPEG')  # Adjust format if needed
    img_byte_arr = img_byte_arr.getvalue()
    image_data = base64.b64encode(img_byte_arr).decode('utf-8')

    return image_data

image_path = './test_images/chow-chow.jpg'
image_data = load_and_preprocess_image(image_path)

In [66]:
import boto3
import json

endpoint_name = "dogbreeds-endpoint"

client = boto3.client('sagemaker-runtime')

# content_type = "application/x-image"

# response = client.invoke_endpoint(
#     EndpointName=endpoint_name,
#     ContentType=content_type,
#     Body=image_data
# )

content_type = "application/json"

for i in range(10): 
    response = client.invoke_endpoint(
        EndpointName=endpoint_name,
        ContentType=content_type,
        Body=json.dumps({"image_data": image_data}),
        InferenceId=str(i)
    )

    # Parse the response
    response_body = response['Body'].read()
    predictions = json.loads(response_body)
    print(f"Inference id {i} : {predictions}")

Inference id 0 : {'prediction': 'Chow_chow', 'confidence': 0.7634961009025574}
Inference id 1 : {'prediction': 'Chow_chow', 'confidence': 0.7634961009025574}
Inference id 2 : {'prediction': 'Chow_chow', 'confidence': 0.7634961009025574}
Inference id 3 : {'prediction': 'Chow_chow', 'confidence': 0.7634961009025574}
Inference id 4 : {'prediction': 'Chow_chow', 'confidence': 0.7634961009025574}
Inference id 5 : {'prediction': 'Chow_chow', 'confidence': 0.7634961009025574}
Inference id 6 : {'prediction': 'Chow_chow', 'confidence': 0.7634961009025574}
Inference id 7 : {'prediction': 'Chow_chow', 'confidence': 0.7634961009025574}
Inference id 8 : {'prediction': 'Chow_chow', 'confidence': 0.7634961009025574}
Inference id 9 : {'prediction': 'Chow_chow', 'confidence': 0.7634961009025574}


In [109]:
image_data = load_and_preprocess_image(image_path)
image_data = base64.b64decode(image_data)
response = extract_rgb_features(image_data)

Img size w x h: 943, 1414
red mean 128.78669973496366 red std 67.0890291598068
green mean 85.30613798389383 green std 51.851484277429236
blue mean 47.351028422036265 blue std 41.72439149432255
Reponse: {'width': 943, 'height': 1414, 'red_mean': 128.78669973496366, 'red_std': 67.0890291598068, 'green_mean': 85.30613798389383, 'green_std': 51.851484277429236}


In [110]:
response

{'width': 943,
 'height': 1414,
 'red_mean': 128.78669973496366,
 'red_std': 67.0890291598068,
 'green_mean': 85.30613798389383,
 'green_std': 51.851484277429236}

## Generate Fake Ground Truth

In [77]:
GROUND_TRUTH_LOCATION

's3://pochingto-testing/monitoring/groundtruth'

In [63]:
import random
from datetime import datetime
from sagemaker.s3 import S3Uploader

records = []
for i in range(10):
    inference_id = str(i)

    records.append(json.dumps({
        "groundTruthData": {
            "data": "Chow_chow",
            "encoding": "CSV",
        },
        "eventMetadata": {
            "eventId": str(inference_id),
        },
        "eventVersion": "0",
    }))

groundtruth_payload = "\n".join(records)
upload_time = datetime.utcnow()
uri = f"{GROUND_TRUTH_LOCATION}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
S3Uploader.upload_string_as_file_body(groundtruth_payload, uri)