# Create and orchestrate NLP workflow using SageMaker Pipelines 

## Contents

## 1. Setup

In [1]:
%%capture

!pip install --upgrade sagemaker

### Imports 

In [2]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.huggingface import HuggingFace
from sagemaker.inputs import TrainingInput
from time import gmtime, strftime
from pprint import pprint
import pandas as pd
import sagemaker
import logging
import boto3
import os

In [3]:
logger = logging.getLogger('__name__')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

In [4]:
logger.info(f'Using SageMaker: {sagemaker.__version__}')

Using SageMaker: 2.49.0


In [5]:
session = sagemaker.Session()
bucket = session.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

current_timestamp = strftime('%m-%d-%H-%M', gmtime())
pipeline_name = f'nlp-pipeline-{current_timestamp}'

In [6]:
logger.info(f'Bucket name = {bucket}')
logger.info(f'Role = {role}')
logger.info(f'Region = {region}')

Bucket name = sagemaker-us-east-1-892313895307
Role = arn:aws:iam::892313895307:role/service-role/AmazonSageMaker-ExecutionRole-20210714T091788
Region = us-east-1


In [7]:
s3_client = boto3.client('s3', region_name=region)

## Define pipeline parameters 

In [8]:
training_instance_count = ParameterInteger(name='TrainingInstanceCount', default_value=1)
training_instance_type = ParameterString(name='TrainingInstanceType', default_value='ml.p3.2xlarge')
trained_model_s3_uri = ParameterString(name='TrainedModelS3Uri', default_value=f's3://{bucket}/pipeline/model')

## Define training step

In [9]:
hyperparameters={'epochs': 1,
                 'train_batch_size': 16,
                 'model_name':'distilbert-base-uncased',
                 'model_s3': trained_model_s3_uri.default_value,
                 'output_dir':'/opt/ml/checkpoints'}

In [10]:
huggingface_estimator = HuggingFace(entry_point='train.py',
                            source_dir='./src',
                            instance_type=training_instance_type.default_value,
                            instance_count=training_instance_count.default_value,
                            role=role,
                            transformers_version='4.6',
                            tensorflow_version='2.4',
                            py_version='py37',  
                            disable_profiler=True,
                            debugger_hook_config=False,
                            #model_dir=trained_model_s3_uri.default_value,
                            #output_dir=trained_model_s3_uri.default_value,
                            #output_path=trained_model_s3_uri.default_value,
                            checkpoint_s3_uri=trained_model_s3_uri.default_value,
                            hyperparameters=hyperparameters)

In [11]:
training_step = TrainingStep(
    name='train',
    estimator=huggingface_estimator
)

training_step

TrainingStep(name='train', step_type=<StepTypeEnum.TRAINING: 'Training'>, depends_on=None)

In [12]:
training_step.properties.ModelArtifacts.S3ModelArtifacts.__dict__

{'_path': 'Steps.train.ModelArtifacts.S3ModelArtifacts',
 '_shape_names': ['S3Uri'],
 '__str__': 'S3Uri'}

## Define processing step for model deployment

In [13]:
endpoint_name = f'hf-clf-{current_timestamp}'
deploy_model_script_uri = f's3://{bucket}/pipeline/code/deploy.py'

In [14]:
s3_client.upload_file(Filename='./src/deploy.py', Bucket=bucket, Key='pipeline/code/deploy.py')

In [15]:
deploy_model_processor = SKLearnProcessor(framework_version='0.23-1', 
                                          role=role, 
                                          instance_type='ml.t3.medium', 
                                          instance_count=1, 
                                          base_job_name='deploy-processing-job', 
                                          sagemaker_session=session)

In [16]:
type(deployment_instance_type.default_value)

str

In [17]:
deploy_step = ProcessingStep(name='deploy', 
                             processor=deploy_model_processor, 
                             job_arguments=['--model_name', endpoint_name, # reuse endpoint name 
                                            '--region', region, 
                                            '--deployment_instance_type', 'ml.m5.4xlarge', 
                                            '--deployment_instance_count', '1', 
                                            '--model_s3_path', trained_model_s3_uri.default_value, 
                                            '--endpoint_name', endpoint_name], 
                             code=deploy_model_script_uri)

In [18]:
deploy_step.__dict__

{'name': 'deploy',
 'step_type': <StepTypeEnum.PROCESSING: 'Processing'>,
 'depends_on': None,
 'processor': <sagemaker.sklearn.processing.SKLearnProcessor at 0x7f8051e049d0>,
 'inputs': None,
 'outputs': None,
 'job_arguments': ['--model_name',
  'hf-clf-07-18-19-42',
  '--region',
  'us-east-1',
  '--model_s3_path',
  's3://sagemaker-us-east-1-892313895307/pipeline/model',
  '--endpoint_name',
  'hf-clf-07-18-19-42'],
 'code': 's3://sagemaker-us-east-1-892313895307/pipeline/code/deploy.py',
 'property_files': None,
 '_properties': <sagemaker.workflow.properties.Properties at 0x7f805021c490>,
 'cache_config': None}

## Create Pipeline

In [19]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        deployment_instance_type,
        deployment_instance_count, 
        trained_model_s3_uri],
    steps=[deploy_step],
    sagemaker_session=session)

In [20]:
pipeline.__dict__

{'name': 'nlp-pipeline-07-18-19-42',
 'parameters': [ParameterString(name='TrainingInstanceType', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='ml.m5.4xlarge'),
  ParameterInteger(name='TrainingInstanceCount', parameter_type=<ParameterTypeEnum.INTEGER: 'Integer'>, default_value=1),
  ParameterString(name='TrainedModelS3Uri', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-us-east-1-892313895307/pipeline/model')],
 'pipeline_experiment_config': <sagemaker.workflow.pipeline_experiment_config.PipelineExperimentConfig at 0x7f80526466d0>,
 'steps': [ProcessingStep(name='deploy', step_type=<StepTypeEnum.PROCESSING: 'Processing'>, depends_on=None)],
 'sagemaker_session': <sagemaker.session.Session at 0x7f8051e12150>}

In [21]:
response = pipeline.create(role_arn=role)
response

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:892313895307:pipeline/nlp-pipeline-07-18-19-42',
 'ResponseMetadata': {'RequestId': 'ed177357-4994-44ee-8c50-36be6e84ee2b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'ed177357-4994-44ee-8c50-36be6e84ee2b',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '92',
   'date': 'Sun, 18 Jul 2021 19:42:46 GMT'},
  'RetryAttempts': 0}}

In [22]:
execution = pipeline.start()

In [23]:
execution.arn

'arn:aws:sagemaker:us-east-1:892313895307:pipeline/nlp-pipeline-07-18-19-42/execution/pylvnqmgavot'

In [24]:
status = execution.describe()
pprint(status)

{'CreatedBy': {'DomainId': 'd-dowart1jabkf',
               'UserProfileArn': 'arn:aws:sagemaker:us-east-1:892313895307:user-profile/d-dowart1jabkf/ts-zd-e2e',
               'UserProfileName': 'ts-zd-e2e'},
 'CreationTime': datetime.datetime(2021, 7, 18, 19, 42, 46, 910000, tzinfo=tzlocal()),
 'LastModifiedBy': {'DomainId': 'd-dowart1jabkf',
                    'UserProfileArn': 'arn:aws:sagemaker:us-east-1:892313895307:user-profile/d-dowart1jabkf/ts-zd-e2e',
                    'UserProfileName': 'ts-zd-e2e'},
 'LastModifiedTime': datetime.datetime(2021, 7, 18, 19, 42, 46, 910000, tzinfo=tzlocal()),
 'PipelineArn': 'arn:aws:sagemaker:us-east-1:892313895307:pipeline/nlp-pipeline-07-18-19-42',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:892313895307:pipeline/nlp-pipeline-07-18-19-42/execution/pylvnqmgavot',
 'PipelineExecutionDisplayName': 'execution-1626637366986',
 'PipelineExecutionStatus': 'Executing',
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '721',
       