# End to End Pipeline: Bring your own container to SageMaker pipelines
This notebook walks you through Bring your own container to [Amazon SagMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines/) with [Hugging Face](https://huggingface.co/). Hugging Face containes plenty of NLP taskes such as text classitifation, summarization, text generation. In this tutorial, we take text classificaiton as an example.

### Overview
<div align="center"><img src="images/byoc_mlops_nb.png" /></div>

---
- [1.Prepare the environment](#envpreparation)
- [2.Data preparation](#datapreparation)
- [3.Feature ingestion](#featureingestion)
- [4.Model building](#modelbuilding)
- [5.Asynchronous inference](#asyncInference)
- [6.Real-time inference](#realtimeInference)
- [7.Cleanup](#cleanup)

<a id="envpreparation"></a>
## 1. Prepare the environment

In [None]:
! pip install --upgrade pip
! python3 -m pip install sagemaker==2.72.1

In [None]:
import json
import logging
import boto3
import pandas as pd
import io
import glob
import os
import string
import re
import time
from time import strftime,gmtime
from botocore.exceptions import ClientError
import urllib
import sys

import sagemaker
from sagemaker import get_execution_role

In [None]:
logger = logging.getLogger(name='byoc-pipeline')
sagemaker_session = sagemaker.Session()
boto_session = sagemaker_session.boto_session
sagemaker_client = boto_session.client('sagemaker')
sm_runtime = boto3.Session().client('sagemaker-runtime')
region = sagemaker_session.boto_region_name

role = get_execution_role()

client = boto3.client('sts')
account = client.get_caller_identity()['Account']

bucket = sagemaker_session.default_bucket()
prefix = "sm-pipeline-huggingface"
task_name = "text-classification"

### Docker Environment Preparation
Because the volume size of container is larger than available size in root of Notebook Jupyter instance, we need to put the directory of docker data into ```/home/ec2-user/SageMaker/docker```.

By default, data root of docker is set as ```/var/lib/docker/```. we need to change the directory of docker to ```/home/ec2-user/SageMaker/docker```.

In [None]:
!cat /etc/docker/daemon.json

In [None]:
!bash ./scripts/prepare-docker.sh

<a id="datapreparation"></a>
## 2. Data prepration

Download the data from [Standord AI Lab](https://ai.stanford.edu/~amaas/data/sentiment/).

In [None]:
%%time
!wget https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz -O aclImdb_v1.tar.gz
!tar --no-same-owner -xvzf aclImdb_v1.tar.gz
!rm aclImdb_v1.tar.gz

Stage data for ingesting dataset into feature ingestion. [SageMaker Feature Store](https://aws.amazon.com/sagemaker/feature-store/) only accept text without punctuation, we need to remove punctuation on the raw data. This process is necessary when you use your own data.

In [None]:
punc_list = string.punctuation  # you can self define list of punctuation to remove here

def remove_punctuation(text):
    """
    This function takes strings containing self defined punctuations and returns
    strings with punctuations removed.
    Input(string): one tweet, contains punctuations in the self-defined list
    Output(string): one tweet, self-defined punctuations removed
    """
    translator = str.maketrans("", "", punc_list)
    return text.translate(translator)

def staging_data(data_dir):
    for data_type in ["train", "test"]:
        data_list = []
        for label in ["neg", "pos"]:
            data_path = os.path.join(data_dir, data_type, label)
            for files in glob.glob(data_path + '/*.txt'):
                data_id = files.split('/')[-1].replace('.txt', '')
                with open(files, 'r') as f:
                    line = f.readline()
                    line = remove_punctuation(line)
                    line = re.sub("\s+", " ", line)
                    data_list.append([data_id, line, label])
                    
        data_df = pd.DataFrame(data_list, columns=["index", "text", "label"])
        data_df["event_time"] = time.time()
        data_df["data_type"] = data_type
        #data_df.reset_index(inplace=True)
        data_df.to_csv(f'{data_dir}/{data_type}.csv', index=False)

data_dir = f"{os.getcwd()}/aclImdb"
staging_data(data_dir)

In [None]:
train_df = pd.read_csv(os.path.join(data_dir, 'train.csv'))
train_df.head()

Upload data to S3 bucket

In [None]:
!aws s3 cp ./aclImdb/train.csv s3://$bucket/$prefix/raw_data/imdb_train.csv
!aws s3 cp ./aclImdb/test.csv s3://$bucket/$prefix/raw_data/imdb_test.csv

Create test data for inference

In [None]:
sample_data_dir = "./data"
if not os.path.exists(sample_data_dir):
    os.makedirs(sample_data_dir)
train_df["text"][:10].to_csv(f"{sample_data_dir}/sample_imdb.csv", header=None, index=None)

<a id="featureingestion"></a>
## 3. Feature ingestion

We ingest dataset into feature store with `SageMaker processing`. Alternatively, you can complete this task with `SageMaker Data Wrangler`.

### 3.1 Create feature group

In [None]:
train_df.dtypes

In [None]:
train_df = train_df.astype({
    "index": "string",
    "text": "string",
    "label": "string",
    "data_type": "string"
})

Create feature group

In [None]:
from sagemaker.feature_store.feature_group import FeatureGroup

feature_group_name = f"hugging-face-imdb-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"
imdb_feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)
imdb_feature_group.load_feature_definitions(data_frame=train_df)

In [None]:
def wait_for_feature_group_creation_complete(feature_group):
    status = feature_group.describe().get('FeatureGroupStatus')
    print(f'Initial status: {status}')
    while status == 'Creating':
        logger.info(f'Waiting for feature group: {feature_group.name} to be created ...')
        time.sleep(5)
        status = feature_group.describe().get('FeatureGroupStatus')
    if status != 'Created':
        raise SystemExit(f'Failed to create feature group {feature_group.name}: {status}')
    logger.info(f'FeatureGroup {feature_group.name} was successfully created.')

In [None]:
imdb_feature_group.create(s3_uri=f's3://{bucket}/{prefix}/feature_store', 
                               record_identifier_name='index', 
                               event_time_feature_name='event_time', 
                               role_arn=role, 
                               enable_online_store=True)

In [None]:
wait_for_feature_group_creation_complete(imdb_feature_group)

Remove IMDB dataset.

In [None]:
!rm -r aclImdb

### 3.2 Feature engineering with SageMaker preprocessing

In [None]:
from sagemaker.spark.processing import PySparkProcessor

s3_uri_prefix = f's3://{bucket}/{prefix}/raw_data/*'

pyspark_processor = PySparkProcessor(framework_version='2.4', # Spark version
                                     role=role,
                                     instance_type='ml.m5.xlarge',
                                     instance_count=2,
                                     base_job_name='sm-processing-pyspark-fs-ingestion',
                                     env={'AWS_DEFAULT_REGION': boto3.Session().region_name,
                                          'mode': 'python'},
                                     max_runtime_in_seconds=3600)

In [None]:
%%time
pyspark_processor.run(submit_app='./processing/batch_ingest_sm_pyspark.py', 
                      arguments = ['--feature_group_name', feature_group_name, 
                                   '--s3_uri_prefix', s3_uri_prefix], 
                      spark_event_logs_s3_uri=f's3://{bucket}/{prefix}/spark-logs', 
                      logs=False)  # set logs=True to enable logging

### 3.3 Verify feature ingestion

In [None]:
featurestore_runtime = boto_session.client(
    service_name="sagemaker-featurestore-runtime", region_name=region
)

In [None]:
response = featurestore_runtime.get_record(
        FeatureGroupName=feature_group_name,
        RecordIdentifierValueAsString="3174_4",
    )

In [None]:
record = response["Record"]
df = pd.DataFrame(record).set_index('FeatureName').transpose()
df["text"].tolist()[0]

<a id="modelbuilding"></a>
## 4. Model Building
With data in the feature store, you can now start the model building pipeline. You can leave the default parameter values.

### 4.1 Build and push your own docker to ECR

We preprare the three containers respectively for training, batch inference and real-time inference.

Build and push training image to ECR.

In [None]:
train_image_name = f"sagemaker-{task_name}-training"
tag = "tf2.5.1"
training_image = f"{account}.dkr.ecr.{region}.amazonaws.com/{train_image_name}:{tag}"

In [None]:
!bash containers/training/build_tools/build_and_push.sh {region} {train_image_name} {tag}

---
Build and push serving image to ECR.

In [None]:
serving_image_name = f"sagemaker-{task_name}-serving"
tag = "tf2.5.1"
serving_image = f"{account}.dkr.ecr.{region}.amazonaws.com/{serving_image_name}:{tag}"

In [None]:
!bash containers/serving/build_tools/build_and_push.sh {region} {serving_image_name} {tag}

---
Build and push batch inference image to ECR

In [None]:
batch_inference_image_name = f"sagemaker-{task_name}-batch-inference"
tag = "tf2.5.1"
batch_inference_image = f"{account}.dkr.ecr.{region}.amazonaws.com/{batch_inference_image_name}:{tag}"

In [None]:
!bash containers/batch_transform/build_tools/build_and_push.sh {region} {batch_inference_image_name} {tag}

### 4.2 Define SageMaker pipeline for model building and model registy

In [None]:
from pipelines.byoc_pipeline import create_pipeline
import pipelines.byoc_pipeline
import importlib
importlib.reload(pipelines.byoc_pipeline)

model_package_group_name = "huggingfaceImdb"
database_name = "huggingface_imdb_featurestore"

pipeline_configuration = {
        "fg_name": feature_group_name,
        "create_dataset_script_path": "processing/create_dataset.py",
        "prefix": f"{prefix}_byoc_build",
        "database_name": database_name,
        "model_package_group_name": model_package_group_name,
        "model_accuracy_threshold": 0.9,
        "containers": {
            "training_docker_image": training_image,
            "endpoint_docker_image": serving_image,
            "transform_docker_image": batch_inference_image
        },
        "metric_definitions": [
            {
                "Name": "loss",
                "Regex": "loss': ([0-9\\.]+)"
            },
            {
                "Name": "learning_rate",
                "Regex": "learning_rate': ([0-9e\\-\\.]+)"
            },
            {
                "Name": "eval_loss",
                "Regex": "eval_loss': ([0-9e\\-\\.]+)"
            },
            {
                "Name": "eval_accuracy",
                "Regex": "eval_accuracy': ([0-9e\\-\\.]+)"
            }
        ],
        "hpo_configuration":{
            "objective_metric": "eval_accuracy",
            "max_jobs": 1,
            "max_parallel_jobs": 1,
            "strategy": "Bayesian",
            "objective_type": "Maximize",
            "param_ranges": {
                "ContinuousParameter": [
                    {
                        "Name": "learning_rate",
                        "MaxValue": 5e-3,
                        "MinValue": 5e-6,
                        "ScalingType": "Logarithmic"
                    }
                ]
            },
            "static_hyperparameters": {
                "weight_decay": 0.01,
                "per_device_train_batch_size": 16,
                "per_device_eval_batch_size": 32,
                "num_train_epochs": 10,
                "warmup_steps": 500,
                "logging_steps": 10,
                "eval_steps": 500,
                "tokenizer_download_model": "enable"
            }
        },
        "hyperparameters": {
            "weight_decay": 0.01,
            "per_device_train_batch_size": 16,
            "per_device_eval_batch_size": 32,
            "num_train_epochs": 10,
            "warmup_steps": 500,
            "logging_steps": 10,
            "eval_steps": 500,
            "learning_rate": 5e-5
        },
        "feature_names": ["index", "text", "data_type"],
        "label_name": ["label"]
    }

pipeline = create_pipeline(role, "huggingface-pipeline", sagemaker_session, **pipeline_configuration)

In [None]:
pipeline.upsert(role_arn=role)

parameters = {
    "TrainingInstance": "ml.p3.16xlarge",
    "ProcessingInstanceType": "ml.m5.xlarge"
}

start_response = pipeline.start(parameters=parameters)

Check the execution status in SageMaker studio. It takes about 1 hour to complete pipeline execution with the default parameters. If you would like to run more training jobs or run more epochs, the parameters `delay` and `max_attemps` need to be modified to a larger value to make sure `delay * max_attemps` is larger than the total training time.

In [None]:
start_response.wait(delay=30, max_attempts=180)
start_response.describe()

Extract arn of model package from meta data store.

In [None]:
model_list = sagemaker_client.list_model_packages(ModelPackageGroupName=model_package_group_name)["ModelPackageSummaryList"]
model_package_arn = model_list[0]["ModelPackageArn"]

When the objective metric `accuracy` is larger than defined threshold, model will be registried into model registry. By setting the input parameter `ModelApprovalStatus` to `Approved`, the latest model can be accessible to inference endpoint. Manually setting the model status to `Approved` is required if you set `ModelApprovalStatus` to `PendingManualApproval` as below.

In [None]:
model_package_update_input_dict = {
    "ModelPackageArn" : model_package_arn,
    "ModelApprovalStatus" : "Approved"
}
model_package_update_response = sagemaker_client.update_model_package(**model_package_update_input_dict)

Create a model for serving.

In [None]:
model_name = f"huggingface-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"
print("Model name : {}".format(model_name))
container_list = [{'ModelPackageName': model_package_arn}]

create_model_response = sagemaker_client.create_model(
    ModelName = model_name,
    ExecutionRoleArn = role,
    Containers = container_list
)

<a id="asyncInference"></a>
## 5. Asynchronous inference

[Amazon SageMaker Asynchronous Inference](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference.html) is a new capability in SageMaker that queues incoming requests and processes them asynchronously. This option is ideal for requests with large payload sizes up to 1GB, long processing times, and near real-time latency requirements.

### 5.1 Deploy asynchronous endpoint

In [None]:
async_endpoint_config_name = f"BYOCAsyncEndpointConfig-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"

create_endpoint_config_response = sagemaker_client.create_endpoint_config(
    EndpointConfigName=async_endpoint_config_name,
    ProductionVariants=[
        {
            "VariantName": "variant1",
            "ModelName": model_name,
            "InstanceType": "ml.m5.large",
            "InitialInstanceCount": 1
        }
    ],
    AsyncInferenceConfig={
        "OutputConfig": {
            "S3OutputPath": f"s3://{bucket}/{prefix}/output",
            #  Optionally specify Amazon SNS topics
            #"NotificationConfig": {
            #  "SuccessTopic": success_topic,
            #  "ErrorTopic": error_topic,
            #}
        },
        "ClientConfig": {
            "MaxConcurrentInvocationsPerInstance": 2
        }
    }
)
print(f"Created EndpointConfig: {create_endpoint_config_response['EndpointConfigArn']}")

In [None]:
async_endpoint_name = f"byoc-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"
create_endpoint_response = sagemaker_client.create_endpoint(EndpointName=async_endpoint_name, EndpointConfigName=async_endpoint_config_name)
print(f"Creating Endpoint: {create_endpoint_response['EndpointArn']}")

In [None]:
waiter = boto3.client('sagemaker').get_waiter('endpoint_in_service')
print("Waiting for endpoint to create...")
waiter.wait(EndpointName=async_endpoint_name)
resp = sagemaker_client.describe_endpoint(EndpointName=async_endpoint_name)
print(f"Endpoint Status: {resp['EndpointStatus']}")

Enable autoscaling

In [None]:
client = boto3.client('application-autoscaling') # Common class representing Application Auto Scaling for SageMaker amongst other services

resource_id='endpoint/' + async_endpoint_name + '/variant/' + 'variant1' # This is the format in which application autoscaling references the endpoint

response = client.register_scalable_target(
    ServiceNamespace='sagemaker', 
    ResourceId=resource_id,
    ScalableDimension='sagemaker:variant:DesiredInstanceCount',
    MinCapacity=0,  
    MaxCapacity=5
)

response = client.put_scaling_policy(
    PolicyName='Invocations-ScalingPolicy',
    ServiceNamespace='sagemaker', # The namespace of the AWS service that provides the resource. 
    ResourceId=resource_id, # Endpoint name 
    ScalableDimension='sagemaker:variant:DesiredInstanceCount', # SageMaker supports only Instance Count
    PolicyType='TargetTrackingScaling', # 'StepScaling'|'TargetTrackingScaling'
    TargetTrackingScalingPolicyConfiguration={
        'TargetValue': 5.0, # The target value for the metric. 
        'CustomizedMetricSpecification': {
            'MetricName': 'ApproximateBacklogSizePerInstance',
            'Namespace': 'AWS/SageMaker',
            'Dimensions': [
                {'Name': 'EndpointName', 'Value': async_endpoint_name }
            ],
            'Statistic': 'Average',
        },
        'ScaleInCooldown': 120, # The cooldown period helps you prevent your Auto Scaling group from launching or terminating 
                                # additional instances before the effects of previous activities are visible. 
                                # You can configure the length of time based on your instance startup time or other application needs.
                                # ScaleInCooldown - The amount of time, in seconds, after a scale in activity completes before another scale in activity can start. 
        'ScaleOutCooldown': 120 # ScaleOutCooldown - The amount of time, in seconds, after a scale out activity completes before another scale out activity can start.
    }
)

### 5.2 Tesing batch inference with asynchronous inference

Upload sample data to s3

In [None]:
input_s3_location = f"s3://{bucket}/{prefix}/sample_data/sample_imdb.csv"

!aws s3 cp ./data/sample_imdb.csv $input_s3_location

In [None]:
response = sm_runtime.invoke_endpoint_async(
    EndpointName=async_endpoint_name, 
    InputLocation=input_s3_location
)
output_location = response['OutputLocation']
print(f"OutputLocation: {output_location}")

In [None]:
def get_output(output_location):
    output_url = urllib.parse.urlparse(output_location)
    bucket = output_url.netloc
    key = output_url.path[1:]
    while True:
        try:
            return sagemaker_session.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])
        except ClientError as e:
            if e.response['Error']['Code'] == 'NoSuchKey':
                print("waiting for output...")
                time.sleep(2)
                continue
            raise

In [None]:
output = get_output(output_location)
print(f"Output size in bytes: {((sys.getsizeof(output)))}")

Then we confirm the result of asynchronous inference.

In [None]:
async_infer_res = "./data/async_res.json"

!aws s3 cp $output_location $async_infer_res

with open(async_infer_res, 'r') as f:
    async_res = json.load(f)
async_res

<a id="realtimeInference"></a>
## 6. Real-time inference
[Real-time inference](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints.html) is ideal for inference workloads where you have real-time, interactive, low latency requirements. 

### 6.1 Deploy real-time inference endpoint

In [None]:
endpoint_config_name = f"BYOCEndpointConfig-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"

create_endpoint_config_response = sagemaker_client.create_endpoint_config(
    EndpointConfigName=endpoint_config_name,
    ProductionVariants=[
        {
            "VariantName": "variant1",
            "ModelName": model_name,
            "InstanceType": "ml.m5.large",
            "InitialInstanceCount": 1
        }
    ]
)
print(f"Created EndpointConfig: {create_endpoint_config_response['EndpointConfigArn']}")

In [None]:
endpoint_name = f"byoc-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"
create_endpoint_response = sagemaker_client.create_endpoint(EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)
print(f"Creating Endpoint: {create_endpoint_response['EndpointArn']}")

In [None]:
waiter = boto3.client('sagemaker').get_waiter('endpoint_in_service')
print("Waiting for endpoint to create...")
waiter.wait(EndpointName=endpoint_name)
resp = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
print(f"Endpoint Status: {resp['EndpointStatus']}")

### 6.2 Testing real-time inference endpoint
Load sample data

In [None]:
sample_df = pd.read_csv('./data/sample_imdb.csv', header=None)
sample_df.columns = ["text"]
sample_df

Error will occur when parameter of request is too long, where asynchronous inference would be an alternative. We pick out the first 5 rows to do real-time inference.

In [None]:
sample_list = sample_df["text"].values.tolist()[:5]

In [None]:
df_record = pd.DataFrame({"inputs": sample_list})
csv_file = io.StringIO()
df_record.to_csv(csv_file, sep=",", header=False, index=False)
payload_as_csv = csv_file.getvalue()

response = sm_runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    Body= payload_as_csv,
    ContentType = 'text/csv'
)

body = response["Body"].read()
msg = body.decode("utf-8")
data = json.loads(msg)
data

We can see the predicted results from real-time inference is identical as those from asynchronous inference.

<a id="cleanup"></a>
## 7. Cleanup

Delete resources(endpoint, model, s3, docker image) we created above.

In [None]:
from scripts.utils import delete_endpoint, delete_model, delete_s3, delete_ecr, delete_fg

# delete endpoints
delete_endpoint(sagemaker_client, async_endpoint_name, async_endpoint_config_name)
delete_endpoint(sagemaker_client, endpoint_name, endpoint_config_name)

# delete models
delete_model(sagemaker_client, model_name, model_package_arn, model_package_group_name)

# delete feature group
delete_fg(sagemaker_client, feature_group_name)

# delete data in s3
delete_s3(bucket, prefix)

# delete docker image in ECR
for image_name in [train_image_name, serving_image_name, batch_inference_image_name]:
    delete_ecr(image_name)

Delete local docker images

In [None]:
!docker rmi -f $(docker images -a | grep text-classification)