# Deploying a Amazon Comprehend Model using SageMaker Pipelines

This is an example on how to deploy an amazon comprehend model using SageMaker pipelines, which touches in the preprocessing, training, evaluation and deployment steps using the different available steps in SageMaker pipelines.

This example is based on: https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/nlp/amazon_comprehend_sagemaker_pipeline/sm_pipeline_with_comprehend.ipynb

## Prerequisites

In [77]:
# Import general modules
import boto3
import sagemaker

In [78]:
# Set global variables and initialize sessions to be used along notebook

region = "eu-west-2" # Replace with your region
sagemaker_session = sagemaker.session.Session()
default_bucket = sagemaker_session.default_bucket() # Replace if you have another bucket in mind
prefix_bucket = "sagemaker-training-pipelines" # Use it in case you want to put your artifacts inside another directory

# Get execution role arn to be used and perform operation in cloud
try:
    # get_execution_role() will only work within Sagemaker studio or notebook instance
    role_arn = sagemaker.get_execution_role()
except ValueError:
    # Will need to get the role ARN by initializing a a new IAM session and get the role by their name
    iam = boto3.client('iam')
    role_arn = iam.get_role(RoleName='AmazonSageMaker-ExecutionRole-20220817T160055')['Role']['Arn']
    print("Role ARN successfully extracted")

Couldn't call 'get_role' to get Role ARN from role name francisco-admin to get Role path.


Role ARN successfully extracted


For the execution role, you need to create a role that has the correct policies/permissions attached to it. 
For this example the following are needed:
- ComprehendFullAccess
- AmazonSageMakerFullAccess
- AWSLambda_FullAccess
- IAMFullAccess
- AmazonS3FullAccess

## Download data

In order to download the data, we will use the AWS CLI to get from public s3 buckets, and store locally, and then upload them to our sagemaker default bucket.

In [27]:
# Download data from s3 bucket to local
!aws s3 cp s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-train.csv .
!aws s3 cp s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-test.csv .

Completed 256.0 KiB/53.5 MiB (287.5 KiB/s) with 1 file(s) remaining
Completed 512.0 KiB/53.5 MiB (559.0 KiB/s) with 1 file(s) remaining
Completed 768.0 KiB/53.5 MiB (778.0 KiB/s) with 1 file(s) remaining
Completed 1.0 MiB/53.5 MiB (994.2 KiB/s) with 1 file(s) remaining  
Completed 1.2 MiB/53.5 MiB (1.2 MiB/s) with 1 file(s) remaining    
Completed 1.5 MiB/53.5 MiB (1.4 MiB/s) with 1 file(s) remaining    
Completed 1.8 MiB/53.5 MiB (1.6 MiB/s) with 1 file(s) remaining    
Completed 2.0 MiB/53.5 MiB (1.7 MiB/s) with 1 file(s) remaining    
Completed 2.2 MiB/53.5 MiB (1.9 MiB/s) with 1 file(s) remaining    
Completed 2.5 MiB/53.5 MiB (2.1 MiB/s) with 1 file(s) remaining    
Completed 2.8 MiB/53.5 MiB (2.2 MiB/s) with 1 file(s) remaining    
Completed 3.0 MiB/53.5 MiB (2.4 MiB/s) with 1 file(s) remaining    
Completed 3.2 MiB/53.5 MiB (2.3 MiB/s) with 1 file(s) remaining    
Completed 3.5 MiB/53.5 MiB (2.5 MiB/s) with 1 file(s) remaining    
Completed 3.8 MiB/53.5 MiB (2.6 MiB/s) with 1 fi

In [28]:
# Upload local files (train and test files) to our default sagemaker bucket

!aws s3 cp ./comprehend-train.csv s3://$default_bucket/$prefix_bucket/data/
!aws s3 cp ./comprehend-test.csv s3://$default_bucket/$prefix_bucket/data/

Completed 256.0 KiB/53.5 MiB (677.4 KiB/s) with 1 file(s) remaining
Completed 512.0 KiB/53.5 MiB (664.7 KiB/s) with 1 file(s) remaining
Completed 768.0 KiB/53.5 MiB (877.0 KiB/s) with 1 file(s) remaining
Completed 1.0 MiB/53.5 MiB (1.1 MiB/s) with 1 file(s) remaining    
Completed 1.2 MiB/53.5 MiB (1.3 MiB/s) with 1 file(s) remaining    
Completed 1.5 MiB/53.5 MiB (1.5 MiB/s) with 1 file(s) remaining    
Completed 1.8 MiB/53.5 MiB (1.7 MiB/s) with 1 file(s) remaining    
Completed 2.0 MiB/53.5 MiB (1.9 MiB/s) with 1 file(s) remaining    
Completed 2.2 MiB/53.5 MiB (1.5 MiB/s) with 1 file(s) remaining    
Completed 2.5 MiB/53.5 MiB (1.5 MiB/s) with 1 file(s) remaining    
Completed 2.8 MiB/53.5 MiB (1.6 MiB/s) with 1 file(s) remaining    
Completed 3.0 MiB/53.5 MiB (1.7 MiB/s) with 1 file(s) remaining    
Completed 3.2 MiB/53.5 MiB (1.8 MiB/s) with 1 file(s) remaining    
Completed 3.5 MiB/53.5 MiB (2.0 MiB/s) with 1 file(s) remaining    
Completed 3.8 MiB/53.5 MiB (2.0 MiB/s) with 1 fi

## Pipeline

Once we have a initial data to work with, we can start forming our pipeline steps. For this example, we will define 3 steps and in an specific order, but this can change based on use case.

### Parameter definition and Processor Definition

There are some parameter that we need to define beforehand, such as instance count and type, location of data to be used in pipeline, etc.

In [79]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
from sagemaker.sklearn.processing import SKLearnProcessor

In [80]:
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1) # Number of instances to use for jobs
processing_instance_type = ParameterString(name = "ProcessingInstanceType", default_value= "ml.t3.2xlarge") # Type of instance to use

# Location of train data (in S3)
input_train = ParameterString(
    name="TrainData",
    default_value=f"s3://{default_bucket}/{prefix_bucket}/data/comprehend-train.csv",
)

# Location of test data (in S3)
input_test = ParameterString(
    name="TestData",
    default_value=f"s3://{default_bucket}/{prefix_bucket}/data/comprehend-test.csv",
)

# Location to store trained model
model_output = ParameterString(name="ModelOutput", default_value=f"s3://{default_bucket}/model")

In [81]:
# We define a sklearn processor, which will be used to run our code in. It is like a ready container that has sklearn and other libraries installed.
# AWS has some ready containers (that you can use them by using the class objects).

# If you want to use custom dependencies, you can build your own container.
sklearn_processor = SKLearnProcessor(
    framework_version="1.0-1",
    instance_type=processing_instance_type.default_value,
    instance_count=processing_instance_count.default_value,
    base_job_name="data-process-comprehend",
    sagemaker_session=sagemaker_session,
    role = role_arn
)

### Data Preprocessing Step

We will create a simple data preprocessing step, you can see the preprocessing script file `prepare_data.py` below (if you run the cell, it will create the file for you :)). 

In [82]:
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import ProcessingInput, ProcessingOutput

In [83]:
%%writefile prepare_data.py

#!/usr/bin/env python

import os
import pandas as pd
from glob import glob
from tqdm import tqdm

os.system("du -a /opt/ml")

SRCTRAINFILE = glob("/opt/ml/processing/input_train/*.csv")[0]
print(SRCTRAINFILE)
SRCTESTFILE = glob("/opt/ml/processing/input_test/*.csv")[0]
print(SRCTESTFILE)

DSTTRAINFILE = "/opt/ml/processing/train/train.csv"
DSTTESTFILE = "/opt/ml/processing/test/test.csv"

# Preparation of the train set
trainFrame = pd.read_csv(SRCTRAINFILE, header=None)
testFrame = pd.read_csv(SRCTESTFILE, header=None)

# AWS recommends that you train an Amazon Comprehend model with at least 50 training documents for
# each class. See: https://docs.aws.amazon.com/comprehend/latest/dg/how-document-classification-training-data.html
#
# The dataset we use has 100,000 documents per class. To limit the costs and training times of this demo,
# we will limit it to 1000 documents per class
#
# If you want to test Amazon Comprehend on the full dataset, set MAXITEM to 100000

MAXITEM = 1000
# Keeping MAXITEM for each label
for i in trainFrame[0].unique():
    num = len(trainFrame[trainFrame[0] == i])
    dropnum = max(0, num - MAXITEM)
    indextodrop = trainFrame[trainFrame[0] == i].sample(n=dropnum).index
    trainFrame.drop(indextodrop, inplace=True)

# Escaping commas in preparation to write the data to a CSV file
trainFrame[1] = trainFrame[1].str.replace(",", "&#44;")
# Writing csv file
trainFrame.to_csv(
    path_or_buf=DSTTRAINFILE,
    header=False,
    index=False,
    escapechar="\\",
    doublequote=False,
    quotechar='"',
)

# Escaping commas in preparation to write the data to a CSV file
testFrame[0] = testFrame[0].str.replace(",", "&#44;")
# Writing csv file
testFrame.to_csv(
    path_or_buf=DSTTESTFILE,
    header=False,
    index=False,
    escapechar="\\",
    doublequote=False,
    quotechar='"',
)


Overwriting prepare_data.py


In [84]:
preprocess = ProcessingStep(
    name="ComprehendProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_train, destination="/opt/ml/processing/input_train"),
        ProcessingInput(source=input_test, destination="/opt/ml/processing/input_test"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="prepare_data.py",
)

### Train and Evaluation Step

We will create a simple train and evaluation step (only one step for both tasks), you can see the script file `train_eval_comprehend.py` below (if you run the cell, it will create the file for you :)). 

We will also create a evaluation report file that will be generated after the model training, based on an evaluation data set that is reserved automatically when training. Using this score, we can use it to check whether the model performance passes certain threshold and deploy if it does.

**Note:** For this example, we have the train and evaluation in one step, but this can be separated into their own steps if needed/wanted.

In [85]:
from sagemaker.workflow.properties import PropertyFile

In [86]:
%%writefile train_eval_comprehend.py

#!/usr/bin/env python

import os
import json
import sys
import pathlib
import argparse
import datetime
import time
import boto3

os.system("du -a /opt/ml")
print(os.environ)


def train(args):
    print(args)
    comprehend = boto3.client("comprehend", region_name=os.environ["AWS_REGION"])

    s3_train_data = args.train_input_file
    s3_train_output = args.train_output_path

    role_arn = args.iam_role_arn

    id_ = str(datetime.datetime.now().strftime("%s"))
    create_custom_classify_response = comprehend.create_document_classifier(
        DocumentClassifierName="DEMO-custom-classifier-" + id_,
        DataAccessRoleArn=role_arn,
        InputDataConfig={"DataFormat": "COMPREHEND_CSV", "S3Uri": s3_train_data},
        OutputDataConfig={"S3Uri": s3_train_output},
        LanguageCode="en",
    )

    jobArn = create_custom_classify_response["DocumentClassifierArn"]

    max_time = time.time() + 3 * 60 * 60  # 3 hours
    while time.time() < max_time:
        describe_custom_classifier = comprehend.describe_document_classifier(
            DocumentClassifierArn=jobArn
        )
        status = describe_custom_classifier["DocumentClassifierProperties"]["Status"]
        print("Custom classifier: {}".format(status))

        if status == "IN_ERROR":
            sys.exit(1)

        if status == "TRAINED":
            evaluation_metrics = describe_custom_classifier[
                "DocumentClassifierProperties"
            ]["ClassifierMetadata"]["EvaluationMetrics"]

            acc = evaluation_metrics.get("Accuracy")
            arn = describe_custom_classifier["DocumentClassifierProperties"][
                "DocumentClassifierArn"
            ]

            evaluation_output_dir = "/opt/ml/processing/evaluation"
            pathlib.Path(evaluation_output_dir).mkdir(parents=True, exist_ok=True)

            print("Writing out evaluation report with Accuracy: %f", acc)
            evaluation_path = f"{evaluation_output_dir}/evaluation.json"
            with open(evaluation_path, "w") as f:
                f.write(json.dumps(evaluation_metrics))

            arn_output_dir = "/opt/ml/processing/arn"
            pathlib.Path(arn_output_dir).mkdir(parents=True, exist_ok=True)

            print(f"Writing out classifier arn {arn}")
            arn_path = f"{arn_output_dir}/arn.txt"
            with open(arn_path, "w") as f:
                f.write(arn)

            break

        time.sleep(60)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--train-input-file", type=str, help="Path of input training file"
    )
    parser.add_argument("--train-output-path", type=str, help="s3 output folder")
    parser.add_argument(
        "--iam-role-arn",
        type=str,
        help="ARN of role allowing SageMaker to trigger Comprehend",
    )
    args = parser.parse_args()
    print(args)

    train(args)

Overwriting train_eval_comprehend.py


In [87]:
# Define class object that will store the models performance score
evaluation_report = PropertyFile(
    name="ComprehendEvaluationReport",
    output_name="evaluation",
    path="evaluation.json",
)

In [88]:
# Define train and evaluation step
# Note: We are able to reference the output from previous steps, so we are able to get the exact location of the train data (in S3). 
# The outputs of this step, will be used in following steps

comprehend_train_and_eval = ProcessingStep(
    name="ComprehendTrainAndEval",
    processor=sklearn_processor,
    job_arguments=[
        "--train-input-file",
        preprocess.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
        "--train-output-path",
        model_output,
        "--iam-role-arn",
        role_arn,
    ],
    code="train_eval_comprehend.py",
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ProcessingOutput(output_name="arn", source="/opt/ml/processing/arn"),
    ],
    property_files=[evaluation_report],
)

### Deploy Model Step

We will create a deployment step, you can see the script file `deploy_comprehend.py` below (if you run the cell, it will create the file for you :)).

This file, `deploy_comprehend.py`, will get the previous comprehend model, and create an endpoint using it. See script for more reference on how it is created.



In [89]:
%%writefile deploy_comprehend.py

#!/usr/bin/env python
import os
import sys
import time
import argparse
from urllib.parse import urlparse
import pathlib

import boto3

comprehend = boto3.client("comprehend", region_name=os.environ["AWS_REGION"])

print(sys.argv)


def deploy(args):
    s3 = boto3.client("s3")
    arn = (
        s3.get_object(
            Bucket=urlparse(args.arn_path).netloc,
            Key=urlparse(f"{args.arn_path}/arn.txt").path[1:],
        )["Body"]
        .read()
        .decode()
        .strip()
    )

    endpoint_response = comprehend.create_endpoint(
        EndpointName=f'DEMO-classifier-{time.strftime("%Y-%m-%d-%H-%M-%S")}',
        ModelArn=arn,
        DesiredInferenceUnits=10,
    )

    endpoint_arn = endpoint_response["EndpointArn"]

    max_time = time.time() + 15 * 60  # 15 min
    while time.time() < max_time:
        describe_endpoint = comprehend.describe_endpoint(EndpointArn=endpoint_arn)
        status = describe_endpoint["EndpointProperties"]["Status"]

        if status == "IN_ERROR":
            sys.exit(1)

        if status == "IN_SERVICE":
            endpoint_arn_output_dir = "/opt/ml/processing/endpoint_arn"
            pathlib.Path(endpoint_arn_output_dir).mkdir(parents=True, exist_ok=True)

            print(f"Writing out endpoint arn {endpoint_arn}")
            endpoint_arn_path = f"{endpoint_arn_output_dir}/endpoint_arn.txt"
            with open(endpoint_arn_path, "w") as f:
                f.write(endpoint_arn)
            break


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--arn-path", type=str, help="Path to the Arn on S3")
    args = parser.parse_args()
    print(args)

    deploy(args)

Overwriting deploy_comprehend.py


In [90]:
# Define deploy step
# Note: We are using the output from previous step as a job argument

step_deploy_model = ProcessingStep(
    name="ComprehendDeploy",
    processor=sklearn_processor,
    job_arguments=[
        "--arn-path",
        comprehend_train_and_eval.properties.ProcessingOutputConfig.Outputs["arn"].S3Output.S3Uri,
    ],
    code="deploy_comprehend.py",
    outputs=[
        ProcessingOutput(output_name="endpoint_arn", source="/opt/ml/processing/endpoint_arn")
    ],
)

In [91]:
# One of the cool things available, is to define conditional deployments of models. In simple terms, if a train model has lower accuracy than certain threshold,
# we can skip the deployment part, for doing this, we will need to create to use some classess from sagemaker.worflow.conditions.

from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# Define the condition itself, it will use the evaluation file object that has the trained model performance, and check if the accuracy is higher than 0.65.
cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name="ComprehendTrainAndEval",
        property_file=evaluation_report,
        json_path="Accuracy",
    ),
    right=0.65,
)

# Define a condition step, what will happen after checking the condition (what if it is higher than treshold, and what if it is lower).
# If it is higher, then we deploy and we run the deployment step (from below)
step_cond = ConditionStep(
    name="ComprehendAccuracyCondition",
    conditions=[cond_lte],
    if_steps=[step_deploy_model],
    else_steps=[],
)

### Test Model Endpoint Step (using Lambda Function)

Apart from running SageMaker jobs as pipeline steps, we can also create Lambda function to perform operations. For our case, we will use it to test the endpoint created in the deployment step. 

We will  make use of a script that will help us to create the correct role to be used by the lambda function (this script is `iam_helper.py`). For the actual lambda function, the code for it will be in the script `test_comprehend_lambda.py`.

In [92]:
%%writefile iam_helper.py

import boto3
import json

iam = boto3.client("iam")


def create_lambda_role(role_name):
    try:
        response = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(
                {
                    "Version": "2012-10-17",
                    "Statement": [
                        {
                            "Effect": "Allow",
                            "Principal": {"Service": "lambda.amazonaws.com"},
                            "Action": "sts:AssumeRole",
                        }
                    ],
                }
            ),
            Description="Role for Lambda to call SageMaker functions",
        )

        role_arn = response["Role"]["Arn"]

        response = iam.attach_role_policy(
            RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/AWSLambda_FullAccess"
        )

        response = iam.attach_role_policy(
            RoleName=role_name,
            PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
        )

        response = iam.attach_role_policy(
            RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/ComprehendFullAccess"
        )

        return role_arn

    except iam.exceptions.EntityAlreadyExistsException:
        print(f"Using ARN from existing role: {role_name}")
        response = iam.get_role(RoleName=role_name)
        return response["Role"]["Arn"]


def delete_lambda_role(role_name):
    response = iam.detach_role_policy(
        RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/AWSLambda_FullAccess"
    )

    response = iam.detach_role_policy(
        RoleName=role_name,
        PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess",
    )

    response = iam.detach_role_policy(
        RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/ComprehendFullAccess"
    )

    response = iam.delete_role(RoleName=role_name)

    return response

Overwriting iam_helper.py


In [93]:
%%writefile test_comprehend_lambda.py

#!/usr/bin/env python
import os
import sys
import json
import argparse
from urllib.parse import urlparse

import boto3

comprehend = boto3.client("comprehend", region_name=os.environ["AWS_REGION"])
s3 = boto3.client("s3")


def lambda_handler(event, context):

    endpoint_arn_path = event["endpoint_arn_path"]
    text = event["text"]

    endpoint_arn = (
        s3.get_object(
            Bucket=urlparse(endpoint_arn_path).netloc,
            Key=urlparse(f"{endpoint_arn_path}/endpoint_arn.txt").path[1:],
        )["Body"]
        .read()
        .decode()
        .strip()
    )

    endpoint_response = comprehend.classify_document(
        Text=text, EndpointArn=endpoint_arn
    )

    return {"statusCode": 200, "body": json.dumps(endpoint_response)}

Overwriting test_comprehend_lambda.py


In [94]:
# import iam_helper script (created above)
import iam_helper

lambda_role_name = "DEMO-test-comprehend-lambda-role" # Define a name for lambda execution role
lambda_role = iam_helper.create_lambda_role(lambda_role_name) # Create role

Using ARN from existing role: DEMO-test-comprehend-lambda-role


In [95]:
# Define example text to be used to ping the endpoint

example_text = (
    "Italian EBU Member RAI has won the 65th Eurovision Song Contest with the song "
    + "Zitti e buoni performed by Måneskin. It's the 3rd win for Italy who last triumphed in 1990. "
    + "26 countries took part in the Grand Final of the world’s largest live music event, "
    + "hosted by Dutch EBU Members NPO, NOS and AVROTROS on Saturday 22 May in Rotterdam. "
    + "Måneskin wrote the winning song which finished the night with 524 points, 25 points "
    + "ahead of 2nd placed France represented by Barbara Pravi singing Voila. Switzerland’s Gjon’s Tears with Tout l’Univers finished in third place."
)

In [96]:
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda

In [97]:
# Create lambda function

function_name = "DEMO-sagemaker-lambda-step-endpoint-test"

# Lambda helper class can be used to create the Lambda function
endpoint_lambda = Lambda(
    function_name=function_name,
    execution_role_arn=lambda_role,
    script="test_comprehend_lambda.py",
    handler="test_comprehend_lambda.lambda_handler",
)

# Define Lambda Step
test_endpoint = LambdaStep(
    name="LambdaStep",
    lambda_func=endpoint_lambda,
    inputs={
        "endpoint_arn_path": step_deploy_model.properties.ProcessingOutputConfig.Outputs[
            "endpoint_arn"
        ].S3Output.S3Uri,
        "text": example_text,
    },
)

### Creating Pipeline (linking all steps)

Now, we will link all steps created into a single pipeline

In [98]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = "DEMO-ComprehendPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        input_train,
        input_test,
        model_output,
    ],
    steps=[preprocess, comprehend_train_and_eval, step_cond,],
    sagemaker_session= sagemaker_session
)

In [99]:
role_arn

'arn:aws:iam::842780680566:role/service-role/AmazonSageMaker-ExecutionRole-20220817T160055'

In [100]:
# Creates pipeline or updates it
pipeline.upsert(role_arn=role_arn)

ResourceLimitExceeded: An error occurred (ResourceLimitExceeded) when calling the CreatePipeline operation: Resource limits for this account have been exceeded. Please contact Customer Support for assistance.