# Orchestrating Jobs, Model Registration, and Continuous Deployment with Amazon SageMaker

Amazon SageMaker offers Machine Learning application developers and Machine Learning operations engineers the ability to orchestrate SageMaker jobs and author reproducible Machine Learning pipelines, deploy custom-build models for inference in real-time with low latency or offline inferences with Batch Transform, and track lineage of artifacts. You can institute sound operational practices in deploying and monitoring production workflows, deployment of model artifacts, and track artifact lineage through a simple interface, adhering to safety and best-practice paradigmsfor Machine Learning application development.

The SageMaker Pipelines service supports a SageMaker Machine Learning Pipeline Domain Specific Language (DSL), which is a declarative Json specification. This DSL defines a Directed Acyclic Graph (DAG) of pipeline parameters and SageMaker job steps. The SageMaker Python Software Developer Kit (SDK) streamlines the generation of the pipeline DSL using constructs that are already familiar to engineers and scientists alike.

The SageMaker Model Registry is where trained models are stored, versioned, and managed. Data Scientists and Machine Learning Engineers can compare model versions, approve models for deployment, and deploy models from different AWS accounts, all from a single Model Registry. SageMaker enables customers to follow the best practices with ML Ops and getting started right. Customers are able to standup a full ML Ops end-to-end system with a single API call.

## Layout of the SageMaker ModelBuild Project Template

The template provides a starting point for bringing your SageMaker Pipeline development to production.

```
|-- codebuild-buildspec.yml
|-- CONTRIBUTING.md
|-- pipelines
|   |-- birddetect
|   |   |-- evaluateion.py
|   |   |-- __init__.py
|   |   |-- pipeline.py
|   |   `-- preprocess.py
|   |-- get_pipeline_definition.py
|   |-- __init__.py
|   |-- run_pipeline.py
|   |-- _utils.py
|   `-- __version__.py
|-- README.md
|-- sagemaker-pipelines-project.ipynb
|-- setup.cfg
|-- setup.py
|-- tests
|   `-- test_pipelines.py
`-- tox.ini
```

A description of some of the artifacts is provided below:
<br/><br/>
Your codebuild execution instructions:
```
|-- codebuild-buildspec.yml
```
<br/><br/>
Your pipeline artifacts, which includes a pipeline module defining the required `get_pipeline` method that returns an instance of a SageMaker pipeline, a preprocessing script that is used in feature engineering, and a model evaluation script to measure the Mean Squared Error of the model that's trained by the pipeline:

```
|-- pipelines
|   |-- abalone
|   |   |-- evaluate.py
|   |   |-- __init__.py
|   |   |-- pipeline.py
|   |   `-- preprocess.py

```

For additional subfolders with code and/or artifacts needed by pipeline, they need to be packaged correctly by the `setup.py` file. For example, to package a `pipelines/source` folder,

* Include a `__init__.py` file within the `source` folder.
* Add it to the `setup.py` file's `package_data` like so:

```
...
    packages=setuptools.find_packages(),
    include_package_data=True,
    package_data={"pipelines.my_pipeline.src": ["*.txt"]},
    python_requires=">=3.6",
    install_requires=required_packages,
    extras_require=extras,
...
```

<br/><br/>
Utility modules for getting pipeline definition jsons and running pipelines:

```
|-- pipelines
|   |-- get_pipeline_definition.py
|   |-- __init__.py
|   |-- run_pipeline.py
|   |-- _utils.py
|   `-- __version__.py
```
<br/><br/>
Python package artifacts:
```
|-- setup.cfg
|-- setup.py
```
<br/><br/>
A stubbed testing module for testing your pipeline as you develop:
```
|-- tests
|   `-- test_pipelines.py
```
<br/><br/>
The `tox` testing framework configuration:
```
`-- tox.ini
```

### A SageMaker Pipeline

The pipeline that we create follows a typical Machine Learning Application pattern of pre-processing, training, evaluation, and conditional model registration and publication, if the quality of the model is sufficient.

![A typical ML Application pipeline](img/pipeline-full.png)

### Getting some constants

We get some constants from the local execution environment.

In [83]:
import sagemaker
import boto3
import os
import time
import datetime
import json
import numpy as np
import pprint as pp

from collections import namedtuple
from collections import defaultdict
from collections import Counter



import matplotlib.pyplot as plt

sagemaker_session = sagemaker.Session()

default_bucket = sagemaker_session.default_bucket() # or use your own custom bucket name we will use default bucket
region = sagemaker_session.boto_region_name
account = sagemaker_session.account_id()
role = sagemaker.get_execution_role()

sess = sagemaker.Session()

base_job_prefix = 'End2End-Bird-detection'

In [67]:
!wget 'https://s3.amazonaws.com/fast-ai-imageclas/CUB_200_2011.tgz'
!tar xopf CUB_200_2011.tgz
!rm CUB_200_2011.tgz

--2022-09-22 12:59:23--  https://s3.amazonaws.com/fast-ai-imageclas/CUB_200_2011.tgz
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.41.112
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.41.112|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1150585339 (1.1G) [application/x-tar]
Saving to: ‘CUB_200_2011.tgz’


2022-09-22 12:59:45 (55.9 MB/s) - ‘CUB_200_2011.tgz’ saved [1150585339/1150585339]



In [None]:
!pip install opencv-python
!apt-get update
!apt-get install ffmpeg libsm6 libxext6  -y

In [None]:
import cv2
import glob
import random

img_array = []
image_dir = 'CUB_200_2011/images'

subset_data = ["013.Bobolink", "017.Cardinal", 
               "035.Purple_Finch", "036.Northern_Flicker",
              "047.American_Goldfinch","068.Ruby_throated_Hummingbird",
               "073.Blue_Jay","087.Mallard"]

# create an array of available image files
for sub_dir in os.listdir(image_dir):
    if sub_dir in subset_data:

        for filename in os.listdir(f'{image_dir}/{sub_dir}'):
            img_array.append(f'{image_dir}/{sub_dir}/{filename}')
            
# all the images should be the same size, so grab teh image size from first image
img = cv2.imread(img_array[0])
height, width, layers = img.shape
size = (width,height)

output_path = '../2_deployment/bird.mp4'
out = cv2.VideoWriter(output_path,cv2.VideoWriter_fourcc(*'MP4V'), 1, size)

#random sample images and append as frames into the video
for i in range(200):
    rand_index = random.randint(0,len(img_array)-1)
    img = cv2.imread(img_array[rand_index])
    out.write(img)
out.release()

print(f'video generated at {output_path}')

In [None]:
# Copy the images to our local bucket.

label_map = dict()
for i in range(len(subset_data)):
    label = subset_data[i].split('.')[-1]
    label_map[label] = i
        
s3 = boto3.client('s3')

input_manifest_name = 'input.manifest'
output_manifest_name = 'output.manifest'

# remove if file already exist
if os.path.exists(input_manifest_name):
    os.remove(input_manifest_name)
    
if os.path.exists(output_manifest_name):
    os.remove(output_manifest_name)

f = open(input_manifest_name, 'a')
g = open(output_manifest_name, 'a')

json_body = {'labels':[]}

print('Processing......\n')

for raw_label in os.listdir(image_dir):
    
    if raw_label in subset_data:
    
        label_name = raw_label.split('.')[-1]

        print(label_name, label_map[label_name])
        json_body['labels'].append({"label": label_name})

        for filename in os.listdir(f'{image_dir}/{raw_label}'):
            if '.jpg' in filename:
                key = f"{base_job_prefix}/unlabeled/images/{filename}"
                s3.upload_file(f'{image_dir}/{raw_label}/{filename}', default_bucket, key)

                img_path = f"s3://{default_bucket}/{key}"
                f.write('{"source-ref": "' + img_path + '"}\n')

                #build output manifest, if you don't want to go through the label        
                output_manifest = dict()

                output_manifest['source-ref'] = img_path
                output_manifest['label'] = label_map[label_name]
                label_metadata = dict()
                label_metadata['class-name'] = label_name
                # these are just placeholders to mimic an actual manifest file
                label_metadata['job-name'] = "labeling-job/bird-image-classification-1634678978"
                label_metadata['type'] = "groundtruth/image-classification"
                label_metadata['human-annotated'] = "yes"
                label_metadata['creation-date'] = str(datetime.datetime.now().isoformat(timespec='microseconds'))

                output_manifest['label-metadata'] = label_metadata

                g.write(f"{json.dumps(output_manifest)}\n")
        
        
f.close()
g.close()
        
        
input_manifest_key = f"{base_job_prefix}/unlabeled/manifest/{input_manifest_name}"
s3.upload_file(input_manifest_name, default_bucket, input_manifest_key)
        
s3_input_manifest = f"s3://{default_bucket}/{input_manifest_key}"

print("\nInput manifest file location:")
print(s3_input_manifest)

output_manifest_key = f"{base_job_prefix}/pipeline/manifest/{output_manifest_name}"
s3.upload_file(output_manifest_name, default_bucket, output_manifest_key)
        
s3_output_manifest = f"s3://{default_bucket}/{output_manifest_key}"

print("\nSynthetic output manifest file location: ")
print(s3_output_manifest)

In [None]:
!rm -rf ./CUB_200_2011
!rm -f attributes.txt

In [None]:
pp.pprint(json_body)

class_file_name = "class_labels.json"
with open(class_file_name, "w") as f:
    json.dump(json_body, f)

classes_key = f"{base_job_prefix}/unlabeled/classes/{class_file_name}"
s3.upload_file(class_file_name, default_bucket, classes_key)

s3_classes = f"s3://{default_bucket}/{classes_key}"

s3_classes

In [None]:
def make_template(test_template=False, save_fname="instructions.template"):
    template = r"""<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
        <crowd-form>
          <crowd-image-classifier
            name="crowd-image-classifier"
            src="{{ task.input.taskObject | grant_read_access }}"
            header="please classify"
            categories="{{ task.input.labels | to_json | escape }}"
          >
            <full-instructions header="Image classification instructions">
              <ol><li><strong>Read</strong> the task carefully and inspect the image.</li>
              <li><strong>Read</strong> the options and review the examples provided to understand more about the labels.</li>
              <li><strong>Choose</strong> the appropriate label that best suits the image.</li></ol>
            </full-instructions>
            <short-instructions>
              <p>Dear Annotator, please tell me whether what you can see in the image. Thank you!</p>
            </short-instructions>
          </crowd-image-classifier>
        </crowd-form>"""

    with open(save_fname, "w") as f:
        f.write(template)
    if test_template is False:
        print(template)

template_name = "instructions.template"
# make_template(test_template=True, save_fname="instructions.html")

make_template(test_template=False, save_fname=template_name)
templates_key = f"{base_job_prefix}/unlabeled/templates/{template_name}"
s3.upload_file(template_name, default_bucket, templates_key)

s3_templates = f"s3://{default_bucket}/{templates_key}"
print(f"S3 url: {s3_templates}")

In [None]:
# Specify ARNs for resources needed to run an image classification job.
ac_arn_map = {
    "us-west-2": "081040173940",
    "us-east-1": "432418664414",
    "us-east-2": "266458841044",
    "eu-west-1": "568282634449",
    "ap-northeast-1": "477331159723",
}

prehuman_arn = "arn:aws:lambda:{}:{}:function:PRE-ImageMultiClass".format(
    region, ac_arn_map[region]
)
acs_arn = "arn:aws:lambda:{}:{}:function:ACS-ImageMultiClass".format(region, ac_arn_map[region])

labeling_algorithm_specification_arn = "arn:aws:sagemaker:{}:027400017018:labeling-job-algorithm-specification/image-classification".format(
    region
)

#Update this code block if you want to use your own private workforce.
PRIVATE_WORKFORCE = False

public_workteam_arn = "arn:aws:sagemaker:{}:394669845002:workteam/public-crowd/default".format(region)

# private_workteam_arn = "<REPLACE WITH YOUR OWN PRIVATE TEAM ARN>"

In [None]:
job_name = "bird-image-classification-" + str(int(time.time())).split('.')[0]

# define human task configuration
human_task_config = {
    "AnnotationConsolidationConfig": {
        "AnnotationConsolidationLambdaArn": acs_arn,
    },
    "PreHumanTaskLambdaArn": prehuman_arn,
    "MaxConcurrentTaskCount": 200,  # 200 images will be sent at a time to the workteam.
    "NumberOfHumanWorkersPerDataObject": 3,  # 3 separate workers will be required to label each image.
    "TaskAvailabilityLifetimeInSeconds": 21600,  # Your worteam has 6 hours to complete all pending tasks.
    "TaskDescription": 'Carefully inspect the image and classify it by selecting one label from the categories provided.',
    "TaskKeywords": ["image", "classification", "birds"],
    "TaskTimeLimitInSeconds": 300,  # Each image must be labeled within 5 minutes.
    "TaskTitle": 'What bird is this',
    "UiConfig": {
        "UiTemplateS3Uri": s3_templates,
    },
}

# Using public or private workforce.  Public workforce require price info
if not PRIVATE_WORKFORCE:
    human_task_config["PublicWorkforceTaskPrice"] = {
        "AmountInUsd": {
            "Dollars": 0,
            "Cents": 1,
            "TenthFractionsOfACent": 2,
        }
    }
    human_task_config["WorkteamArn"] = public_workteam_arn
else:
    human_task_config["WorkteamArn"] = private_workteam_arn
    
ground_truth_request = {
    "InputConfig": {
        "DataSource": {
            "S3DataSource": {
                "ManifestS3Uri": s3_input_manifest
            }
        },
        "DataAttributes": {
            "ContentClassifiers": ["FreeOfPersonallyIdentifiableInformation", "FreeOfAdultContent"]
        },
    },
    "OutputConfig": {
        "S3OutputPath": f's3://{default_bucket}/{base_job_prefix}/labeled',
    },
    "HumanTaskConfig": human_task_config,
    "LabelingJobName": job_name,
    "RoleArn": role,
    "LabelAttributeName": "category",
    "LabelCategoryConfigS3Uri": s3_classes,
}

sagemaker_client = boto3.client("sagemaker")
sagemaker_client.create_labeling_job(**ground_truth_request)

In [None]:
# job_name = 'bird-image-classification-1647438119'
# sagemaker_client = boto3.client("sagemaker")
sagemaker_client.describe_labeling_job(LabelingJobName=job_name)

In [None]:
# Load the output manifest's annotations.
output_manifest = f"s3://{default_bucket}/{base_job_prefix}/labeled/{job_name}/manifests/intermediate/1/output.manifest"

!aws s3 cp {output_manifest} 'output.manifest'

with open("output.manifest", "r") as f:
    output = [json.loads(line.strip()) for line in f.readlines()]

# Create data arrays.
img_uris = [None] * len(output)
confidences = np.zeros(len(output))
groundtruth_labels = [None] * len(output)
human = np.zeros(len(output))

# Find the job name the manifest corresponds to.
keys = list(output[0].keys())
metakey = keys[np.where([("-metadata" in k) for k in keys])[0][0]]
jobname = metakey[:-9]

# Extract the data.
for datum_id, datum in enumerate(output):
    img_uris[datum_id] = datum["source-ref"]
    groundtruth_labels[datum_id] = str(datum[metakey]["class-name"])
    #confidences[datum_id] = datum[metakey]["confidence"]
    human[datum_id] = int(datum[metakey]["human-annotated"] == "yes")
groundtruth_labels = np.array(groundtruth_labels)

In [None]:
# Compute the number of annotations in each class.
n_classes = len(set(groundtruth_labels))
sorted_clnames, class_sizes = zip(*Counter(groundtruth_labels).most_common(n_classes))

# Find ids of human-annotated images.
human_sizes = [human[groundtruth_labels == clname].sum() for clname in sorted_clnames]
class_sizes = np.array(class_sizes)
human_sizes = np.array(human_sizes)

plt.figure(figsize=(9, 3), facecolor="white", dpi=100)
plt.title("Annotation histogram")
plt.bar(range(n_classes), human_sizes, color="gray", hatch="/", edgecolor="k", label="human")
plt.bar(
    range(n_classes),
    class_sizes - human_sizes,
    bottom=human_sizes,
    color="gray",
    edgecolor="k",
    label="machine",
)
plt.xticks(range(n_classes), sorted_clnames, rotation=90)
plt.ylabel("Annotation Count")
plt.legend()
plt.show()

In [None]:
# image location
s3_input_data = f"s3://{default_bucket}/{base_job_prefix}/unlabeled/images"
# labelled manifest location
s3_input_manifest = f"s3://{default_bucket}/{base_job_prefix}/pipeline/manifest"

In [None]:
!pygmentize './pipelines/birddetect/preprocess.py'

In [None]:
!pygmentize './pipelines/birddetect/code/train_debugger.py'

In [None]:
!pygmentize './pipelines/birddetect/evaluation.py'

In [None]:
!pygmentize './pipelines/birddetect/pipeline_tuning.py'

In [None]:
# from pipeline.pipeline import get_pipeline
from pipelines.birddetect.pipeline_tuning import get_pipeline

model_package_group_name = f"{base_job_prefix}ModelGroup"  # Model name in model registry
pipeline_name = f"{base_job_prefix}Pipeline"  # SageMaker Pipeline name

pipeline = get_pipeline(
    region=region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name,
    pipeline_name=pipeline_name,
    base_job_prefix=base_job_prefix
)

In [None]:
pipeline.upsert(role_arn=role)

In [None]:
execution = pipeline.start(
    parameters=dict(
        InputDataUrl=s3_input_data, # loaction of the raw data
        InputManifestUrl=s3_input_manifest,
        ProcessingInstanceCount=1,
#         ProcessingInstanceType="ml.m5.xlarge",
#         TrainingInstanceCount=1,
#         TrainingInstanceType="ml.p3.2xlarge",
        ModelApprovalStatus="PendingManualApproval"
    )
)

In [63]:
# INPUT REQUIRED - Please enter the S3 URI of the model artifact
# eg: s3://sagemaker-us-east-1-xxxxxxxx/BIRD-Sagemaker-Deployment/BIRD-Sagemaker-Deployment-2022-07-27-03-33-54-592/output/model.tar.gz
bird_model_path = 's3://sagemaker-us-east-1-729987989507/a8hovs5dbbn8-End2End-3xrsS1pWuZ-001-7d7393a3/output/model.tar.gz'

In [94]:
import sagemaker.serverless as Serverless

serverless_inf_config = Serverless.ServerlessInferenceConfig(memory_size_in_mb=3072, max_concurrency=1)

In [96]:
from sagemaker.tensorflow import TensorFlowModel
TF_FRAMEWORK_VERSION = '2.4.1'
model = TensorFlowModel(
    model_data=bird_model_path, 
    role=role,
    framework_version=TF_FRAMEWORK_VERSION)


predictor = model.deploy(serverless_inference_config=serverless_inf_config)
tf_endpoint_name = str(predictor.endpoint_name)
print(f"Endpoint [{predictor.endpoint_name}] deployed")




----!Endpoint [tensorflow-inference-2022-09-22-10-29-12-372] deployed


In [85]:
from sagemaker import Predictor
from sagemaker.serializers import IdentitySerializer
from sagemaker.deserializers import JSONDeserializer

#Update the below variable with your endpoint name from previous cell output
#tf_endpoint_name='<SAGEMAKER DEPLOYED ENDPOINT NAME>'
tf_endpoint_name = 'tensorflow-inference-2022-09-22-10-29-12-372'

serializer = IdentitySerializer(content_type="application/x-image")
deserializer = JSONDeserializer(accept='application/json')

predictor = Predictor(endpoint_name=tf_endpoint_name,serializer = serializer,deserializer = deserializer )

In [None]:
import cv_utils

classes_file = f"s3://default_bucket/{base_job_prefix}/full/data/classes.txt"
classes = [13, 17, 35, 36, 47, 68, 73, 87]

possible_classes= cv_utils.get_classes_as_list(classes_file,classes)


In [90]:
possible_classes    

['013.Bobolink',
 '017.Cardinal',
 '035.Purple_Finch',
 '036.Northern_Flicker',
 '047.American_Goldfinch',
 '068.Ruby_throated_Hummingbird',
 '073.Blue_Jay',
 '087.Mallard']

In [72]:
import cv_utils
sample_images = cv_utils.get_n_random_images(default_bucket,prefix=f'{base_job_prefix}/outputs/test',n=1)

local_paths = cv_utils.download_images_locally(default_bucket,sample_images)
print(local_paths)

['./inference-test-data/Bobolink_0001_9261.jpg']


In [78]:
for inputfile in local_paths:
    cv_utils.predict_bird_from_file(inputfile,predictor,possible_classes)
    
 

Class: 013.Bobolink, Confidence :1.00 
 ./inference-test-data/Bobolink_0001_9261.jpg


In [80]:
def delete_model_package_group(sm_client, package_group_name):
    try:
        model_versions = sm_client.list_model_packages(ModelPackageGroupName=package_group_name)

    except Exception as e:
        print("{} \n".format(e))
        return

    for model_version in model_versions["ModelPackageSummaryList"]:
        try:
            sm_client.delete_model_package(ModelPackageName=model_version["ModelPackageArn"])
        except Exception as e:
            print("{} \n".format(e))
        time.sleep(0.5)  # Ensure requests aren't throttled

    try:
        sm_client.delete_model_package_group(ModelPackageGroupName=package_group_name)
        print("{} model package group deleted".format(package_group_name))
    except Exception as e:
        print("{} \n".format(e))
    return


def delete_sagemaker_pipeline(sm_client, pipeline_name):
    try:
        sm_client.delete_pipeline(
            PipelineName=pipeline_name,
        )
        print("{} pipeline deleted".format(pipeline_name))
    except Exception as e:
        print("{} \n".format(e))
        return
    
def delete_sagemaker_project(sm_client, project_name):
    try:
        sm_client.delete_project(
        
            ProjectName=project_name,
        )
        print("{} project deleted".format(project_name))
    except Exception as e:
        print("{} \n".format(e))
        return