# Orchestrating Jobs, Model Registration, and Continuous Deployment with Amazon SageMaker

Amazon SageMaker offers Machine Learning application developers and Machine Learning operations engineers the ability to orchestrate SageMaker jobs and author reproducible Machine Learning pipelines, deploy custom-build models for inference in real-time with low latency or offline inferences with Batch Transform, and track lineage of artifacts. You can institute sound operational practices in deploying and monitoring production workflows, deployment of model artifacts, and track artifact lineage through a simple interface, adhering to safety and best-practice paradigmsfor Machine Learning application development.

## Layout of the SageMaker ModelBuild Project Template

The template provides a starting point for bringing your SageMaker Pipeline development to production.

```
|-- codebuild-buildspec.yml
|-- CONTRIBUTING.md
|-- pipelines
|   |-- abalone
|   |   |-- evaluate.py
|   |   |-- __init__.py
|   |   |-- pipeline.py
|   |   `-- preprocess.py
|   |-- get_pipeline_definition.py
|   |-- __init__.py
|   |-- run_pipeline.py
|   |-- _utils.py
|   `-- __version__.py
|-- README.md
|-- sagemaker-pipelines-project.ipynb
|-- setup.cfg
|-- setup.py
|-- tests
|   `-- test_pipelines.py
`-- tox.ini
```

### A SageMaker Pipeline

The pipeline that we create follows a typical Machine Learning Application pattern of pre-processing, training, evaluation, and conditional model registration and publication, if the quality of the model is sufficient.

### Getting some constants

We get some constants from the local execution environment.

In [2]:
import boto3
import sagemaker

region = boto3.Session().region_name
account_number = boto3.client('sts').get_caller_identity().get('Account')
role = sagemaker.get_execution_role()
default_bucket = sagemaker.session.Session().default_bucket()
account_number = boto3.client('sts').get_caller_identity().get('Account')
client = boto3.client('sagemaker')



In [3]:
model_package_group_name = f"CustomerChurnGenericModel"
pipeline_name = f"CustomerChurnPipeline"
generic_model_data_bucket_name = f"sagemaker-mlaas-pooled-{region}-{account_number}"

## Upload Sample Training Data

In [4]:
s3_bucket_prefix = "sample-data"
local_path = "data/AnyCompany"
sagemaker.Session().upload_data(path=local_path,bucket=generic_model_data_bucket_name,key_prefix=s3_bucket_prefix)

's3://sagemaker-mlaas-pooled-us-east-1-854977427211/sample-data'

### Get the pipeline instance
Here we get the pipeline instance from your pipeline module so that we can work with it.

In [5]:
from pipeline import get_pipeline

pipeline = get_pipeline(
    region=region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name,
    pipeline_name=pipeline_name,
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


### Submit the pipeline to SageMaker and start execution
Let's submit our pipeline definition to the workflow service. The role passed in will be used by the workflow service to create all the jobs defined in the steps.

In [6]:
pipeline.upsert(role_arn=role)

Popping out 'TrainingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
Popping out 'ModelPackageName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:854977427211:pipeline/CustomerChurnPipeline',
 'ResponseMetadata': {'RequestId': '91a55fc1-e511-4c62-98b9-04caffac0dff',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '91a55fc1-e511-4c62-98b9-04caffac0dff',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '89',
   'date': 'Fri, 29 Sep 2023 01:45:42 GMT'},
  'RetryAttempts': 0}}

We'll start the pipeline, accepting all the default parameters.

Values can also be passed into these pipeline parameters on starting of the pipeline, and will be covered later. 

In [7]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ProcessingInstanceCount=1,
        TrainDataPath = f"s3://{generic_model_data_bucket_name}/sample-data/train/train.csv",
        TestDataPath = f"s3://{generic_model_data_bucket_name}/sample-data/test/test.csv",
        ValidationDataPath = f"s3://{generic_model_data_bucket_name}/sample-data/validation/validation.csv",
        ModelPath = f"s3://{generic_model_data_bucket_name}/model_artifacts",
        BucketName = f"{generic_model_data_bucket_name}",
        ObjectKey = f"basic_tier_model_artifacts",
        ModelPackageGroupName = f"{model_package_group_name}",
        ModelVersion = "0"
    )
)


### Pipeline Operations: examining and waiting for pipeline execution

Now we describe execution instance and list the steps in the execution to find out more about the execution.

In [8]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:854977427211:pipeline/CustomerChurnPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:854977427211:pipeline/CustomerChurnPipeline/execution/jtf24iz099p0',
 'PipelineExecutionDisplayName': 'execution-1695951946572',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'customerchurnpipeline',
  'TrialName': 'jtf24iz099p0'},
 'CreationTime': datetime.datetime(2023, 9, 29, 1, 45, 46, 493000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 9, 29, 1, 45, 46, 493000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:854977427211:user-profile/d-xtqedkdldism/mlaas-provider-user',
  'UserProfileName': 'mlaas-provider-user',
  'DomainId': 'd-xtqedkdldism'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:854977427211:user-profile/d-xtqedkdldism/mlaas-provider-user',
  'UserProfileName': 'mlaas-provider-user',
  'DomainId': 'd-xtqedkdldis

We can wait for the execution by invoking `wait()` on the execution:

In [9]:
execution.wait()

We can list the execution steps to check out the status and artifacts:

## Create Basic Tier SageMaker Endpoint

We will deploy the model to a SageMaker dedicated endpoint.

In [None]:
model_name='GenericModel'
endpoint_config_name = f"EndpointConfig-{model_name}"
endpoint_name = f"Endpoint-{model_name}"
shared_inference_api_url = "https://lmla2luuak.execute-api.us-east-1.amazonaws.com/" # Example: https://3289dfakjkak.execute-api.us-east-1.amazonaws.com/


# get image URI
image_uri = sagemaker.image_uris.retrieve(
        framework="xgboost",
        region=region,
        version="1.0-1",
        py_version="py3",
        instance_type='ml.t2.medium',
    )

# create sagemaker model
create_model_api_response = client.create_model(
                                    ModelName=model_name,
                                    PrimaryContainer={
                                        'Image': image_uri,
                                        'ModelDataUrl': f"s3://{generic_model_data_bucket_name}/basic_tier_model_artifacts/sample-data.model.0.tar.gz",
                                        'Environment': {}
                                    },
                                    ExecutionRoleArn=role
                            )
print ("create_model API response", create_model_api_response)

# create sagemaker endpoint config
create_endpoint_config_api_response = client.create_endpoint_config(
                                            EndpointConfigName=f"EndpointConfig-{model_name}",
                                            ProductionVariants=[
                                                {
                                                    'VariantName': 'prod1',
                                                    'ModelName': model_name,
                                                    'InitialInstanceCount': 1,
                                                    'InstanceType': 'ml.t2.medium'
                                                },
                                            ]
                                       )

print ("create_endpoint_config API response", create_endpoint_config_api_response)

# create sagemaker endpoint
create_endpoint_api_response = client.create_endpoint(
                                    EndpointName=endpoint_name,
                                    EndpointConfigName=endpoint_config_name,
                                )

print ("create_endpoint API response", create_endpoint_api_response)    

print(f"Creating endpoint {endpoint_name}...")
waiter = client.get_waiter('endpoint_in_service')
waiter.wait(EndpointName=endpoint_name)
print(f"Endpoint {endpoint_name} is in service.")

create_model API response {'ModelArn': 'arn:aws:sagemaker:us-east-1:854977427211:model/genericmodel', 'ResponseMetadata': {'RequestId': '6b3d8a69-f555-4396-b727-3d23c2f79179', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6b3d8a69-f555-4396-b727-3d23c2f79179', 'content-type': 'application/x-amz-json-1.1', 'content-length': '74', 'date': 'Fri, 29 Sep 2023 01:53:33 GMT'}, 'RetryAttempts': 0}}
create_endpoint_config API response {'EndpointConfigArn': 'arn:aws:sagemaker:us-east-1:854977427211:endpoint-config/endpointconfig-genericmodel', 'ResponseMetadata': {'RequestId': '98fcbb24-2706-48f7-b680-a0ef2a40d284', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '98fcbb24-2706-48f7-b680-a0ef2a40d284', 'content-type': 'application/x-amz-json-1.1', 'content-length': '108', 'date': 'Fri, 29 Sep 2023 01:53:33 GMT'}, 'RetryAttempts': 0}}
create_endpoint API response {'EndpointArn': 'arn:aws:sagemaker:us-east-1:854977427211:endpoint/endpoint-genericmodel', 'ResponseMetadata':

## Onboard Basic Tier Tenants

The following Script retrieves the SaaS Control Plane URL and the username and password you require to login.

<div class="alert alert-block alert-info">
<font color='black'><b>!!! Follow instructions in the Workshop Studio to onboard 2 (two) Basic Tier Tenants</b></font>
</div>

## Test Inference

Execute the cell below to initialize the inference helper functions

In [47]:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import json
import base64
import requests
import argparse
import os

def get_jwt(username, password, tenant_name, api_url):

    auth_str = f'{username}:{password}'
    byte_str = auth_str.encode('ascii')
    auth_b64 = base64.b64encode(byte_str)

    headers = {
        'Authorization': 'Basic {}'.format(auth_b64.decode('ascii')),
        'tenant-name': tenant_name
    }

    try:
        print("Getting JWT")
        response = requests.get(api_url + 'v1/jwt', headers=headers)
        print(response.text if response.text else response.reason)
        jwt = json.loads(response.text)['jwt']
        return jwt
    except Exception as e:
        print("Error getting JWT", e)

def run_inference(username, password, tenant_name, request, api_url):

    jwt = get_jwt(username, password, tenant_name, api_url)

    headers = {
        'Authorization': 'Bearer {}'.format(jwt),
        'Content-Type': 'text/csv'
    }
    
    try:
        print(f"Inference request with: {request}")
        response = requests.post(api_url + 'v1/basic_inference', headers=headers, data=request)
        print(response.text if response.text else response.reason)
    except Exception as e:
        print("Error executing inference request", e)

NOTE: Provide the following data before executing the run_inference python function
> First, run inference for basic tenant 1

In [48]:
basic_tenant_1_username = "salyekh+basic_tenant_1@amazon.com" # Example: mlaas+basic1@amazon.com
basic_tenant_1_name = "basic_tenant_1" # Example: basic1
basic_tenant_1_request = "84,1,3,98,2,4" 

In [49]:
run_inference(username=basic_tenant_1_username,password="Mlaa$1234",tenant_name=basic_tenant_1_name,api_url=shared_inference_api_url,request=basic_tenant_1_request)

Getting JWT
{"jwt": "eyJraWQiOiJuTmIyVGhSTnNvVUVycnRaWFB1dk9LMFFxUlpqVDBzUWJsTlwvaGUzZmQrZz0iLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiIxMjAyMGUzNC02YTdmLTQ1Y2EtYjlhMS1hZGZmOGU5Nzk2MzQiLCJjb2duaXRvOmdyb3VwcyI6WyI4aTJndDVwN3VhNWp1ZWdxeHZzdGdxIl0sImlzcyI6Imh0dHBzOlwvXC9jb2duaXRvLWlkcC51cy1lYXN0LTEuYW1hem9uYXdzLmNvbVwvdXMtZWFzdC0xX3Jwck1yeXpINCIsImNvZ25pdG86dXNlcm5hbWUiOiJzYWx5ZWtoK2Jhc2ljX3RlbmFudF8xQGFtYXpvbi5jb20iLCJvcmlnaW5fanRpIjoiOTg1NTgyNDktNzU2NS00NTlhLTg1YWMtYzkzZjNlZWQ2ZGEzIiwiY3VzdG9tOnRlbmFudElkIjoiOGkyZ3Q1cDd1YTVqdWVncXh2c3RncSIsImF1ZCI6IjU3aTRkb2tiYXVrNHJ0dmM1M3VvYnRlbmxkIiwiZXZlbnRfaWQiOiI4MzgwYTFmMy0yYzYyLTQ3ZWItODNkOS0xNGViNDNkNzY2ZGIiLCJjdXN0b206dXNlclJvbGUiOiJUZW5hbnRBZG1pbiIsInRva2VuX3VzZSI6ImlkIiwiYXV0aF90aW1lIjoxNjk0MDU4MzU5LCJleHAiOjE2OTQwNjE5NTksImlhdCI6MTY5NDA1ODM1OSwianRpIjoiMTZhY2VlNTktZjVkYS00NGQxLThlNjMtZGZhMTRlMmQ3MDVmIiwiZW1haWwiOiJzYWx5ZWtoK2Jhc2ljX3RlbmFudF8xQGFtYXpvbi5jb20ifQ.GoMmqPtPNj_OqYf4TKMKfSrVZIVWbll8DgoolpGzv5lJs-fBwmYFWnLts-TscmlSznyqA8OGjsQHcU17yR5IXIjMGke

NOTE: Provide the following data before executing the run_inference python function
> First, run inference for basic tenant 2

In [None]:
basic_tenant_2_username = "basic2@example.com" # Example: mlaas+basic2@amazon.com
basic_tenant_2_name = "basic2" # Example: basic2
basic_tenant_2_request = "18,0,3,105,2,9" 

In [None]:
run_inference(username=basic_tenant_2_username,password="Mlaa$1234",tenant_name=basic_tenant_2_name,api_url=shared_inference_api_url,request=basic_tenant_2_request)

## Lab 2

## Onboard Advanced Tier Tenants

<div class="alert alert-block alert-info">
<font color='black'><b>!!! Follow instructions in the Workshop Studio to onboard 2 (two) Advanced Tier Tenants</b></font>
</div>

## Upload Training Data

Execute the cell below to initialize the file upload helper function

In [52]:
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
import json
import base64
import requests
import argparse
import os
        
def upload_file(username, password, tenant_name, file, file_type):
    
    jwt = get_jwt(username, password, tenant_name, api_url)
    file_name =  os.path.basename(file)
    
    headers = {
        'Authorization': 'Bearer {}'.format(jwt),
        'Content-Type': 'text/csv',
        'file-name': file_name,
        'file-type': file_type
    }

    try:
        print(f"Uploading file: {file}")
        response = requests.put(
            api_url + 'v1/upload', headers=headers, data=open(file, 'rb'))
        print(response.text if response.text else response.reason)
    except Exception as e:
        print("Error uploading file", e)

Run the python function above to upload your advanced tenant training data. This function will handle both, getting a JWT Token and uploading the training data to the correct S3 tenant prefix.

NOTE: Provide the following data before executing the upload_file python function
> First, upload the data for advanced tenant 1

In [53]:
advanced_tenant_1_username = "advanced1@example.com" 
advanced_tenant_1_name = "advanced1" 
advanced_tenant_1_file = "data/Advanced-Tenant1/advanced1_dataset.csv" 

In [54]:
upload_file(username=advanced_tenant_1_username, password="Mlaa$1234",tenant_name=advanced_tenant_1_name,file_type="csv",file=advanced_tenant_1_file)

NameError: name 'api_url' is not defined

NOTE: Provide the following data before executing the upload_file python function

In [116]:
advanced_tenant_1_username = "advanced2@example.com" 
advanced_tenant_1_name = "advanced2" 
advanced_tenant_1_file = "data/Advanced-Tenant2/advanced2_dataset.csv" 

In [107]:
upload_file(username=advanced_tenant_2_username, password="Mlaa$1234",tenant_name=advanced_tenant_2_name,file_type="csv",file=advanced_tenant_2_file)

Getting JWT
Error getting JWT Invalid URL 'v1/jwt': No scheme supplied. Perhaps you meant https://v1/jwt?
Uploading file: tenant-training-data/basic-tenant-1-training-data.csv
Error uploading file [Errno 2] No such file or directory: 'tenant-training-data/basic-tenant-1-training-data.csv'


<div class="alert alert-block alert-info">
<font color='black'><b>!!! Verify that execution of 2 new training pipelines has started. Use SageMaker console to check with execution status. Wait for the execution of the pipelines to complete</b></font>
</div>

## Test Inference

In [121]:
advanced1_request = "18,0,3,105,2,9" 
run_inference(username=advanced_tenant_1_username,password="Mlaa$1234",tenant_name=advanced_tenant_1_name,api_url=shared_inference_api_url,request=advanced_tenant_1_request)

Getting JWT
Error getting JWT Invalid URL 'v1/jwt': No scheme supplied. Perhaps you meant https://v1/jwt?
Inference request with: 1993,2244,5,2,1.18,2
Error executing inference request Invalid URL 'v2/inference': No scheme supplied. Perhaps you meant https://v2/inference?


In [123]:
advanced2_request = "18,0,3,105,2,9" 
run_inference(username=advanced_tenant_2_username,password="Mlaa$1234",tenant_name=advanced_tenant_2_name,api_url=shared_inference_api_url,request=advanced_tenant_2_request)

Getting JWT
Error getting JWT Invalid URL 'v1/jwt': No scheme supplied. Perhaps you meant https://v1/jwt?
Inference request with: 1993,2244,5,2,1.18,2
Error executing inference request Invalid URL 'v2/inference': No scheme supplied. Perhaps you meant https://v2/inference?


### Lab 3

<div class="alert alert-block alert-info">
<font color='black'><b>!!! Follow instructions in the Workshop Studio to onboard 1 Premium Tier Tenant named Premium1</b></font>
</div>

## Test Inference

In [None]:
premium_tenant_1_username = "premium1@example.com" 
premium_tenant_1_name = "premium1" 
dedicated_inference_api_url_for_premium1 = ""

In [121]:
premium1_request = "18,0,3,105,2,9" 
run_inference(username=premium_tenant_1_username,password="Mlaa$1234",tenant_name=premium_tenant_1_name,api_url=advanced_tenant_1_api_url,request=request)

Getting JWT
Error getting JWT Invalid URL 'v1/jwt': No scheme supplied. Perhaps you meant https://v1/jwt?
Inference request with: 1993,2244,5,2,1.18,2
Error executing inference request Invalid URL 'v2/inference': No scheme supplied. Perhaps you meant https://v2/inference?
