# MLOps SageMaker

This notebook contains examples related to manage an end to end ML workflow for packaging ML model for edge environments using SageMaker services:

1. SageMaker Training Job
2. SageMaker Neo Compile Job
3. SageMaker Edge Packaging Job
4. SageMaker Model Registry

    4.1 Create SageMaker Model Package Group
    
    4.2 Create SageMaker Model Package
    
    4.3 Register ML model

***

## Import required modules

In [None]:
import boto3
from datetime import datetime
import logging
import os
import pandas as pd
from sagemaker import get_execution_role, image_uris
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput
from sagemaker.tensorflow import TensorFlow
import time
import traceback
import yaml

In [None]:
logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger(__name__)

***

## Global configurations

Configuration variables used for Training, Compilation and Packaging jobs

In [None]:
account_id = boto3.client('sts').get_caller_identity().get('Account')
region = boto3.session.Session().region_name
role_name = ""
role = "arn:aws:iam::{}:role/{}".format(account_id, role_name)
kms_alias = "ml-kms-dev"
kms_key = "arn:aws:kms:{}:{}:alias/{}".format(region, account_id, kms_alias)

edge_model_name = "mlops-iot-regressor"
edge_model_version = "1"
model_package_group = {
    "name": "mlops-iot-regressor",
    "description": "model package group used for versioning ml models for edge environments"
}
processing_arguments = ["--input_file", "bottle.csv"]
processing_ecr_image_name = "mlops-iot-processing"
processing_ecr_image_tag = "latest"
processing_ecr_image = "{}.dkr.ecr.{}.amazonaws.com/{}:{}".format(account_id, region, processing_ecr_image_name, processing_ecr_image_tag)
sagemaker_framework_version = "2.4"
sagemaker_python_version = "py37"
s3_bucket_name = "isengard-bpistone-mlops-iot-dev"
s3_compiled_files_path = "output/compiled"
s3_packaged_files_path = "output/packaged"
s3_processing_artifact_path = "artifact/processing"
s3_processing_input_files_path = "input/data"
s3_processing_output_files_path = "output/data"
s3_training_artifact_path = "artifact/training"
s3_training_artifact_name = "sourcedir_dnn.tar.gz"
s3_training_input_files_path = "output/data"
s3_training_output_files_path = "output/model"
training_hyperparameters = {
    "epochs": 10,
    "dataset_percentage": 100,
    "input_file": "bottle.csv"
}

instance_type = "ml.m5.xlarge"
instance_count = 1
processing_instance_type = "ml.t3.large"
processing_instance_count = 1
platform_arch = "x86_64"
platform_os: "LINUX"

In [None]:
sagemaker_client = boto3.client("sagemaker")

***

## Create Processor

This method create the processor by using a built-in SageMaker images for SKLearn framework 

In [None]:
def create_preprocessing_processor():
    try:
        processor = Processor(
            image_uri=processing_ecr_image,
            role=role,
            instance_count=processing_instance_count,
            instance_type=processing_instance_type,
            output_kms_key=kms_key
        )

        return processor
    except Exception as e:
        stacktrace = traceback.format_exc()
        LOGGER.error("{}".format(stacktrace))

        raise e

## Processing method

This method can be used for running a SageMaker Processing Job and train the ML toy model provided by using the artifact stored in the S3 bucket

In [None]:
def process(processor):
    try:
        processor.run(
            inputs=[
                ProcessingInput(input_name="input", source="s3://{}/{}".format(s3_bucket_name, s3_processing_input_files_path), destination="/opt/ml/processing/input")
            ],
            outputs=[
                ProcessingOutput(output_name="output", source="/opt/ml/processing/output", destination="s3://{}/{}".format(s3_bucket_name, s3_processing_output_files_path))
            ],
            arguments=processing_arguments,
            wait=True
        )
        
        return processor._current_job_name
    except Exception as e:
        stacktrace = traceback.format_exc()
        LOGGER.error("{}".format(stacktrace))

        raise e

***

## Create Estimator

This method create the estimator by using a built-in SageMaker images for Keras framework 

In [None]:
def create_training_estimator():
    try:
        estimator = TensorFlow(
            entry_point="train.py",
            framework_version=sagemaker_framework_version,
            py_version=sagemaker_python_version,
            source_dir="s3://{}/{}/{}".format(s3_bucket_name,
                                              s3_training_artifact_path,
                                              s3_training_artifact_name
                                              ),
            output_path="s3://{}/{}".format(s3_bucket_name,
                                            s3_training_output_files_path),
            hyperparameters=training_hyperparameters,
            enable_sagemaker_metrics=True,
            role=role,
            instance_count=instance_count,
            instance_type=instance_type,
            output_kms_key=kms_key
        )
        
        return estimator
    except Exception as e:
        stacktrace = traceback.format_exc()
        LOGGER.error("{}".format(stacktrace))

        raise e

## Train method

This method can be used for running a SageMaker Training Job and train the ML toy model provided by using the artifact stored in the S3 bucket

In [None]:
def train(estimator):
    try:
        estimator.fit(
            inputs={
                "train": "s3://{}/{}".format(
                    s3_bucket_name,
                    s3_training_input_files_path
                )
            },
            logs="Rules"
        )
        
        return estimator._current_job_name
    except Exception as e:
        stacktrace = traceback.format_exc()
        LOGGER.error("{}".format(stacktrace))

        raise e

## Compile method

This method can be used for running a SageMaker Compilation Job with Neo for compiling the model for the target instance defined in the configuration parameter

In [None]:
def compile_neo(training_job_name):
    
    try:
        neo_job_name = "sagemaker-neo-job-keras-{}".format(datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))

        LOGGER.info("{}".format('s3://{}/{}/{}/output/model.tar.gz'.format(
                    s3_bucket_name,
                    s3_training_input_files_path,
                    training_job_name
                )))

        sagemaker_client.create_compilation_job(
            CompilationJobName=neo_job_name,
            RoleArn=role,
            InputConfig={
                'S3Uri': 's3://{}/{}/{}/output/model.tar.gz'.format(
                    s3_bucket_name,
                    s3_training_input_files_path,
                    training_job_name
                ),
                'DataInputConfig': '{"input_token": [1, 1, 1, 1]}',
                'Framework': 'KERAS'
            },
            OutputConfig={
                'S3OutputLocation': "s3://{}/{}/{}".format(
                    s3_bucket_name,
                    s3_compiled_files_path,
                    neo_job_name
                ),
                'TargetPlatform': { 
                    'Os': platform_os, 
                    'Arch': platform_arch,
                },
                'KmsKeyId': kms_key
            },
            StoppingCondition={'MaxRuntimeInSeconds': 900}
        )

        while True:
            resp = sagemaker_client.describe_compilation_job(CompilationJobName=neo_job_name)
            current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            if resp['CompilationJobStatus'] in ['STARTING', 'INPROGRESS']:
                LOGGER.info("{}: Running...".format(current_time))
            else:
                LOGGER.info("{} - {}".format(neo_job_name, resp['CompilationJobStatus']))

                if resp['CompilationJobStatus'] == "FAILED":
                    if "FailureReason" in resp:
                        LOGGER.info("{}".format(resp["FailureReason"]))
                break
            time.sleep(60)

        return neo_job_name
    except Exception as e:
        stacktrace = traceback.format_exc()

        LOGGER.error("[ERROR]: {}".format(stacktrace))

        raise e

## Package method

This method can be used for running a SageMaker Edge Packaging Job for creating the artifact ready to be deployed on the target edge device

In [None]:
def package_edge_manager(neo_job_name):
    try:
        edge_manager_job_name = "edge-manager-job-keras-{}".format(datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))

        resp = sagemaker_client.create_edge_packaging_job(
            EdgePackagingJobName=edge_manager_job_name,
            CompilationJobName=neo_job_name,
            ModelName=edge_model_name,
            ModelVersion=edge_model_version,
            RoleArn=role,
            OutputConfig={
                'S3OutputLocation': "s3://{}/{}/{}".format(
                    s3_bucket_name,
                    s3_packaged_files_path,
                    edge_manager_job_name
                ),
                'KmsKeyId': kms_key
            }
        )
        while True:
            resp = sagemaker_client.describe_edge_packaging_job(EdgePackagingJobName=edge_manager_job_name)
            current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            if resp['EdgePackagingJobStatus'] in ['STARTING', 'INPROGRESS']:
                LOGGER.info("{}: Running...".format(current_time))
            else:
                LOGGER.info("{}".format(resp))
                LOGGER.info("{} - {}".format(edge_manager_job_name, resp['EdgePackagingJobStatus']))
                
                break
            time.sleep(60)
            
        return edge_manager_job_name
    except Exception as e:
        stacktrace = traceback.format_exc()

        LOGGER.error("[ERROR]: {}".format(stacktrace))
        print("[ERROR]: {}".format(stacktrace))

        raise e

***

## Model Registry Section

This section contains the methods for managing SageMaker Model Registry

1. Describe Model Package Group
2. Create Model Package Group
3. Create Model Package

### Describe Model Package Group

This method can be used for checking if a Model Package Group exists in the SageMaker Model Registry environment

In [None]:
def describe_model_package_group():
    try:
        LOGGER.info("Describing {}".format(model_package_group["name"]))

        response = sagemaker_client.describe_model_package_group(
            ModelPackageGroupName=model_package_group["name"]
        )

        LOGGER.info("{}".format(response))

        return response
    except Exception as e:
        stacktrace = traceback.format_exc()
        LOGGER.error("{}".format(stacktrace))

        return ""

### Create Model Package Group

This method can be used for creating a Model Package Group

In [None]:
def create_model_package_group(tags=[]):
    try:
        LOGGER.info("Creating {}".format(model_package_group["name"]))

        response = sagemaker_client.create_model_package_group(
            ModelPackageGroupName=model_package_group["name"],
            ModelPackageGroupDescription=model_package_group["description"],
            Tags=tags
        )

        LOGGER.info("{}".format(response))

        return response
    except Exception as e:
        stacktrace = traceback.format_exc()
        LOGGER.error("{}".format(stacktrace))

        raise e

## Create Model Package

This method can be used for registering a new model version, as Model Package, in the previously created Model Package Group

For registering a Model Package, required informations are:

1. ECR image uri: the image uri that eventually can be used for performing inference on SageMaker. In our case, we are taking the XGBoost image that will not be used on the Edge device
2. Model Url: S3 path to the model. In our case this is the path to the packaged model
3. Job Name: SageMaker Job name used for tracking the model lineage, the sequence of the operations that led to the trained model
4. Approval Status: Status to assign to the registered model. Three possible status: PendingManualApproval, Rejected, Approved

In [None]:
def register_model_package(image_uri, model_url, training_job_name, approval_status="PendingManualApproval", tags=[]):
    try:
        modelpackage_inference_specification = {
            "InferenceSpecification": {
                "Containers": [
                    {
                        "Image": image_uri,
                        "ModelDataUrl": model_url
                    }
                ],
                "SupportedContentTypes": ["text/csv"],
                "SupportedResponseMIMETypes": ["text/csv"],
            }
        }

        create_model_package_input_dict = {
            "ModelPackageGroupName": model_package_group["name"],
            "ModelPackageDescription": "Model for {}".format(model_package_group["description"]),
            "ModelApprovalStatus": approval_status,
            "Tags": tags
        }
        create_model_package_input_dict.update(modelpackage_inference_specification)
        create_mode_package_response = sagemaker_client.create_model_package(**create_model_package_input_dict)
        model_package_arn = create_mode_package_response["ModelPackageArn"]

        LOGGER.info('ModelPackage Version ARN : {}'.format(model_package_arn))

        return model_package_arn

    except Exception as e:
        stacktrace = traceback.format_exc()
        LOGGER.error("{}".format(stacktrace))

        raise e

### Update model package status

This method can be used for updating the approval status of a registered model

In [None]:
def update_model_package_status(model_package_arn, status, description=""):
    try:
        LOGGER.info("Updating {}".format(model_package_arn))
        LOGGER.info("To status {}".format(status))
        
        response = sagemaker_client.update_model_package(
            ModelPackageArn=model_package_arn,
            ModelApprovalStatus=status,
            ApprovalDescription=description
        )
        
        LOGGER.info("{}".format(response))

        return response

    except Exception as e:
        stacktrace = traceback.format_exc()
        LOGGER.error("{}".format(stacktrace))

        raise e

### List Model Packages

This method can be used for listing all the registered models in a Model Package Group

In [None]:
def list_model_packages(next_token=None):
    try:
        if next_token is None:
            results = sagemaker_client.list_model_packages(
                ModelPackageGroupName=model_package_group["name"],
                SortBy='CreationTime',
                SortOrder='Descending',
                MaxResults=100
            )
        else:
            results = sagemaker_client.list_model_packages(
                ModelPackageGroupName=model_package_group["name"],
                SortBy='CreationTime',
                SortOrder='Descending',
                MaxResults=100,
                NextToken=next_token
            )

        model_package_groups = []
        model_package_arns = []
        model_package_descriptions = []
        model_package_creation_time = []
        model_package_approval_status = []

        if "ModelPackageSummaryList" in results:
            for el in results["ModelPackageSummaryList"]:  
                model_package_groups.append(model_package_group["name"])
                model_package_arns.append(el["ModelPackageArn"])
                model_package_descriptions.append(el["ModelPackageDescription"])
                model_package_creation_time.append(el["CreationTime"])
                model_package_approval_status.append(el["ModelApprovalStatus"])
                
        data = {
            'Model Package Group': model_package_groups, 
            'Package ARN': model_package_arns,
            'Package Description': model_package_descriptions,
            'Package Cration Time': model_package_creation_time,
            'Package Approval Status': model_package_approval_status
        }
                
        if "NextToken" in results and results["NextToken"] != "":
            tmp_data = list_model_packages(results["NextToken"])
            
            data["Model Package Group"] += tmp_data["Model Package Group"]
            data["Package ARN"] += tmp_data["Package ARN"]
            data["Package Description"] += tmp_data["Package Description"]
            data["Package Cration Time"] += tmp_data["Package Cration Time"]
            data["Package Approval Status"] += tmp_data["Package Approval Status"]
        
        else:
            return data
    except Exception as e:
        stacktrace = traceback.format_exc()
        LOGGER.error("{}".format(stacktrace))

        raise e

***

## Process data

In [None]:
processor = create_preprocessing_processor()

In [None]:
processing_job_name = process(processor)

***

## Train model

In [None]:
estimator = create_training_estimator()

In [None]:
training_job_name = train(estimator)

***

### Compile model

In [None]:
neo_job_name = compile_neo(training_job_name)

***

## Package model

In [None]:
edge_manager_job_name = package_edge_manager(neo_job_name)

***

## Create model package group

In [None]:
check_model_package_group = describe_model_package_group()

In [None]:
if check_model_package_group == "":
    create_model_package_group()

## Register model

In [None]:
image_uri = image_uris.retrieve(framework='tensorflow', image_scope="inference", region='eu-west-1', version='2.2')
model_url = "s3://{}/{}/{}/model-{}_{}.tar.gz".format(
    S3_BUCKET_NAME,
    S3_COMPILED_FILES_PATH,
    neo_job_name,
    "LINUX",
    "X86_64"
)

In [None]:
register_model_package(image_uri, model_url, neo_job_name)

***

## List registered models

In [None]:
data = list_model_packages()

df = pd.DataFrame(data=data)

display(df)

***

## Update package status

In [None]:
update_model_package_status(df["Package ARN"][0], "Approved")