# ML SageMaker Pipeline Example to train and deploy a BERT-Based text classifier

### Introduction

This Notebook is example for the following:
* Define and run a pipeline using a directed acyclic graph (DAG) with specific pipeline parameters and model hyper-parameters
* Define a processing step that cleans, balances, transforms, and splits our dataset into train, validation, and test dataset
* Define a training step that trains a model using the train and validation datasets
* Define a processing step that evaluates the trained model's performance on the test dataset
* Define a register model step that creates a model package from the trained model
* Define a conditional step that checks the model's performance and conditionally registers the model for deployment
* Works with the kernel "Python 3 (Data Science)"

### Table of Contents

- [1. Configure the dataset and processing step](#KM-1.)
  - [1.1. Configure S3 path for raw input data](#KM-1.1.)
  - [1.2. Configure processing step](#KM-1.2.)
    - [Step 1](#KM-step1)
    - [Step 2](#KM-step2)
- [2. Configure training step](#KM-2.)
  - [2.1. Define parameters](#KM-2.1.)
  - [2.2. Configure hyper-parameters](#KM-2.2.)
  - [2.3. Configure model-evaluation metrics](#KM-2.3.)
  - [2.4. Configure the `PyTorchEstimator`](#KM-2.4.)
  - [2.5. Setup pipeline step caching](#KM-2.5.)
  - [2.6. Configure the `TrainingStep`](#KM-2.6.)
    - [Step 3](#KM-step3)
- [3. Configure model-evaluation step](#KM-3.)
- [4. Configure and register model step](#KM-4.)
  - [4.1. Configure the model for deployment](#KM-4.1.)
  - [4.2. Register the model for deployment](#KM-4.2.)
    - [Step 4](#KM-step4)
- [5. Create model for deployment step](#KM-5.)
    - [Step 5](#KM-step5)
    - [Step 6](#KM-step6)
- [6. Check accuracy condition step](#KM-6.)
- [7. Create pipeline](#KM-7.)
  - [7.1. Define a pipeline of parameters, steps, and conditions](#KM-7.1.)
  - [7.2. Start Pipeline](#KM-7.2.)
  - [7.3. Wait for pipeline execution](#KM-7.3.)
  - [7.4. Describe completed pipeline](#KM-7.4.)
  - [7.5. Wait for the pipeline to complete](#KM-7.5.)
- [8. Evaluate the model](#KM-8.)
  - [8.1. Describe evaluation metrics](#KM-8.1.)
    - [Step 7](#KM-step7)
  - [8.2. Review the evaluation report](#KM-8.2.)
  - [8.3. List pipeline artifacts](#KM-8.3.)
    - [Step 8](#KM-step8)
- [9. Deploy and test the model](#KM-9.)
  - [9.1. Approve trained model](#KM-9.1.)
  - [9.2. Deploy model](#KM-9.2.)
  - [9.3. Create endpoint from registry](#KM-9.3.)
  - [9.4. Test model](#KM-9.4.)
  - [9.5. SageMaker Studio extensions](#KM-9.5.)

**Terminology**

This notebook focuses on the following features of Amazon SageMaker Pipelines:

* **Pipelines** - a directed acyclic graph (DAG) of steps and conditions to orchestrate SageMaker jobs and resource creation
* **Processing job steps** - a simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model explainability
* **Training job steps** - an iterative process that teaches a model to make predictions on new data by presenting examples from a training dataset
* **Conditional step execution** - provides conditional execution of branches in a pipeline
* **Registering models** - register a model in a model registry to create a deployable models in Amazon SageMaker
* **Parameterized pipeline executions** - allows pipeline executions to vary by supplied parameters
* **Model endpoint** - hosts the model as a REST endpoint to serve predictions from new data

**BERT Pipeline**

The pipeline that will be created follows a typical machine learning application pattern of pre-processing, training, evaluation, and model registration.

In the processing step, we perform feature engineering to transform the `review_body` text into BERT embeddings using the pre-trained BERT model and split the dataset into train, validation and test files. The transformed dataset is stored in a feature store. To optimize for Tensorflow training, the transformed dataset files are saved using the TFRecord format in Amazon S3.

In the training step, we fine-tune the BERT model to the customer reviews dataset and add a new classification layer to predict the `sentiment` for a given `review_body`.

In the evaluation step, we take the trained model and a test dataset as input, and produce a JSON file containing classification evaluation metrics.

In the condition step, we register the trained model if the accuracy of the model, as determined by our evaluation step, exceeds a given threshold value. 

![](./images/bert_sagemaker_pipeline.png)


In [145]:
#from sagemaker import get_execution_role
#import sagemaker

In [146]:
!aws describe-image-version help

Note: AWS CLI version 2, the latest major version of the AWS CLI, is now stable and recommended for general use. For more information, see the AWS CLI version 2 installation instructions at: https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html

usage: aws [options] <command> <subcommand> [<subcommand> ...] [parameters]
To see help text, you can run:

  aws help
  aws <command> help
  aws <command> <subcommand> help
aws: error: argument command: Invalid choice, valid choices are:

accessanalyzer                           | acm                                     
acm-pca                                  | alexaforbusiness                        
amp                                      | amplify                                 
amplifybackend                           | apigateway                              
apigatewaymanagementapi                  | apigatewayv2                            
appconfig                                | appflow                              

First, install the required modules.

In [147]:
# please ignore warning messages during the installation
!pip install --disable-pip-version-check -q sagemaker==2.29.0

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes


In [148]:
from botocore.exceptions import ClientError

import os
import sagemaker
import logging
import boto3
import sagemaker
import pandas as pd
import json

sess   = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

Setup the pipeline name.

In [149]:
import time
timestamp = int(time.time())

pipeline_name = 'BERT-pipeline-{}'.format(timestamp)
role

'arn:aws:iam::704911750285:role/service-role/AmazonSageMaker-ExecutionRole-20200806T193572'

<a name='KM-1.'></a>
# 1. Configure the dataset and processing step

<a name='KM-1.1.'></a>
### 1.1. Configure S3 path for raw input data

The raw dataset is in the public S3 bucket. Let's start by specifying the S3 location of it:

In [150]:
raw_input_data_s3_uri = 's3://kaushal-mlops/data/raw/' #indicate the source of the raw file
print(raw_input_data_s3_uri)

s3://kaushal-mlops/data/raw/


List the files in the S3 bucket (in this case it will be just one file):

In [151]:
!aws s3 ls $raw_input_data_s3_uri

2021-07-13 05:42:00          0 
2021-07-13 05:42:53    8457214 womens_clothing_ecommerce_reviews.csv


<a name='KM-1.2.'></a>
### 1.2. Configure processing step


For the pipeline workflow you will need to create workflow parameters of a specific type: integer, string, or float.

In [152]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

Now set the parameters for the processing step.

In [153]:
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.c5.2xlarge"
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

train_split_percentage = ParameterFloat(
    name="TrainSplitPercentage",
    default_value=0.90,
)

validation_split_percentage = ParameterFloat(
    name="ValidationSplitPercentage",
    default_value=0.05,
)

test_split_percentage = ParameterFloat(
    name="TestSplitPercentage",
    default_value=0.05,
)

balance_dataset = ParameterString(
    name="BalanceDataset",
    default_value="True",
)

max_seq_length = ParameterInteger(
    name="MaxSeqLength",
    default_value=128,
)

feature_store_offline_prefix = ParameterString(
    name="FeatureStoreOfflinePrefix",
    default_value="reviews-feature-store-" + str(timestamp),
)

feature_group_name = ParameterString(
    name="FeatureGroupName",
    default_value="reviews-feature-group-" + str(timestamp)
)

input_data = ParameterString(
    name="InputData",
    default_value=raw_input_data_s3_uri,
)

Setting up scikit-learn-based processor, pass the SageMaker execution role, processing instance type and instance count.

In [154]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': region},                             
)

Now we will use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance's `run` method, for those familiar with the existing Python SDK.

Note the `"sentiment-train"`, `"sentiment-validation"` and `"sentiment-test"` named channels specified in the output configuration for the processing job. Such step `Properties` can be used in subsequent steps and will resolve to their runtime values at execution. In particular, we will call out this usage defining the training step.

In [155]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processing_inputs=[
    ProcessingInput(
        input_name='raw-input-data',
        source=input_data,
        destination='/opt/ml/processing/input/data/',  #EC2 instance
        s3_data_distribution_type='ShardedByS3Key'
    )
]

processing_outputs=[
    ProcessingOutput(output_name='sentiment-train',
                     source='/opt/ml/processing/output/sentiment/train',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='sentiment-validation',
                     source='/opt/ml/processing/output/sentiment/validation',
                     s3_upload_mode='EndOfJob'),
    ProcessingOutput(output_name='sentiment-test',
                     source='/opt/ml/processing/output/sentiment/test',
                     s3_upload_mode='EndOfJob')
]        

processing_step = ProcessingStep(
    name='Processing', 
    code='src/prepare_data.py',
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=['--train-split-percentage', str(train_split_percentage.default_value),                   
                   '--validation-split-percentage', str(validation_split_percentage.default_value),
                   '--test-split-percentage', str(test_split_percentage.default_value),
                   '--balance-dataset', str(balance_dataset.default_value),
                   '--max-seq-length', str(max_seq_length.default_value),                   
                   '--feature-store-offline-prefix', str(feature_store_offline_prefix.default_value),
                   '--feature-group-name', str(feature_group_name.default_value)
                  ]
)        

print(processing_step)

ProcessingStep(name='Processing', step_type=<StepTypeEnum.PROCESSING: 'Processing'>)


Now we can call out the properties of the processing job as an object using the command `processing_step.properties`. To print out and explore the attributes use `__dict__` method. 

In [156]:
# prints out the list of the processing job properties
print(json.dumps(
    processing_step.properties.__dict__,
    indent=4, sort_keys=True, default=str
))

{
    "AppSpecification": "<sagemaker.workflow.properties.Properties object at 0x7f972b7624d0>",
    "AutoMLJobArn": "<sagemaker.workflow.properties.Properties object at 0x7f972b762b10>",
    "CreationTime": "<sagemaker.workflow.properties.Properties object at 0x7f972b762a90>",
    "Environment": "<sagemaker.workflow.properties.Properties object at 0x7f972b7625d0>",
    "ExitMessage": "<sagemaker.workflow.properties.Properties object at 0x7f972b762950>",
    "ExperimentConfig": "<sagemaker.workflow.properties.Properties object at 0x7f972b7627d0>",
    "FailureReason": "<sagemaker.workflow.properties.Properties object at 0x7f972b762990>",
    "LastModifiedTime": "<sagemaker.workflow.properties.Properties object at 0x7f972b762a50>",
    "MonitoringScheduleArn": "<sagemaker.workflow.properties.Properties object at 0x7f972b762ad0>",
    "NetworkConfig": "<sagemaker.workflow.properties.Properties object at 0x7f972b762610>",
    "ProcessingEndTime": "<sagemaker.workflow.properties.Properties

Pull the channel `sentiment-train` from the output configuration of the processing job. Print out the attributes of the resulting object:

In [157]:
print(json.dumps(
    processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-train'].__dict__, 
    indent=4, sort_keys=True, default=str
))

{
    "AppManaged": "<sagemaker.workflow.properties.Properties object at 0x7f972b779250>",
    "FeatureStoreOutput": "<sagemaker.workflow.properties.Properties object at 0x7f972b7791d0>",
    "OutputName": "<sagemaker.workflow.properties.Properties object at 0x7f972b779090>",
    "S3Output": "<sagemaker.workflow.properties.Properties object at 0x7f972b7790d0>",
    "_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train']",
    "_shape_name": "ProcessingOutput"
}


Now you can pull and print out attributes of the S3 output path related to the `sentiment-train` output channel:

In [158]:
print(json.dumps(
    processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri.__dict__,
    indent=4, sort_keys=True, default=str
))

{
    "__str__": "S3Uri",
    "_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-train'].S3Output.S3Uri",
    "_shape_name": "S3Uri"
}


<a name='KM-step1'></a>
### Step 1

Pull and print out attributes of the S3 output path object related to the `sentiment-test` output channel.

In [159]:
print(json.dumps(
    processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri.__dict__, 
    indent=4, sort_keys=True, default=str
))

{
    "__str__": "S3Uri",
    "_path": "Steps.Processing.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri",
    "_shape_name": "S3Uri"
}


These objects can be passed into the next steps of the workflow. Also, we can pull the arguments of the processing step with the corresponding function. The result is in the dictionary format. Review the keys of this dictionary:

In [160]:
processing_step.arguments.keys()

dict_keys(['ProcessingResources', 'AppSpecification', 'RoleArn', 'ProcessingInputs', 'ProcessingOutputConfig', 'Environment'])

Pull and review processing inputs from the arguments of the processing step:

In [161]:
processing_step.arguments['ProcessingInputs']

[{'InputName': 'raw-input-data',
  'AppManaged': False,
  'S3Input': {'S3Uri': ParameterString(name='InputData', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://kaushal-mlops/data/raw/'),
   'LocalPath': '/opt/ml/processing/input/data/',
   'S3DataType': 'S3Prefix',
   'S3InputMode': 'File',
   'S3DataDistributionType': 'ShardedByS3Key',
   'S3CompressionType': 'None'}},
 {'InputName': 'code',
  'AppManaged': False,
  'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-704911750285/sagemaker-scikit-learn-2021-07-13-17-19-31-240/input/code/prepare_data.py',
   'LocalPath': '/opt/ml/processing/input/code',
   'S3DataType': 'S3Prefix',
   'S3InputMode': 'File',
   'S3DataDistributionType': 'FullyReplicated',
   'S3CompressionType': 'None'}}]

<a name='KM-step2'></a>
### Step 2

Pull and review configuration of the processing outputs from the arguments of the processing step.

In [162]:
processing_step.arguments['ProcessingOutputConfig'] 

{'Outputs': [{'OutputName': 'sentiment-train',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-704911750285/sagemaker-scikit-learn-2021-07-13-17-19-30-829/output/sentiment-train',
    'LocalPath': '/opt/ml/processing/output/sentiment/train',
    'S3UploadMode': 'EndOfJob'}},
  {'OutputName': 'sentiment-validation',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-704911750285/sagemaker-scikit-learn-2021-07-13-17-19-30-829/output/sentiment-validation',
    'LocalPath': '/opt/ml/processing/output/sentiment/validation',
    'S3UploadMode': 'EndOfJob'}},
  {'OutputName': 'sentiment-test',
   'AppManaged': False,
   'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-704911750285/sagemaker-scikit-learn-2021-07-13-17-19-30-829/output/sentiment-test',
    'LocalPath': '/opt/ml/processing/output/sentiment/test',
    'S3UploadMode': 'EndOfJob'}}]}

<a name='KM-2.'></a>
# 2. Configure training step

<a name='KM-2.1.'></a>
### 2.1. Define parameters

Setup the parameters for the workflow. 

In [163]:
freeze_bert_layer = ParameterString(
    name="FreezeBertLayer",
    default_value="False",
)

epochs = ParameterInteger(
    name="Epochs",
    default_value=3
)
    
learning_rate = ParameterFloat(
    name="LearningRate",
    default_value=0.00001
) 
    
train_batch_size = ParameterInteger(
    name="TrainBatchSize",
    default_value=64
)

train_steps_per_epoch = ParameterInteger(
    name="TrainStepsPerEpoch",
    default_value=50
)

validation_batch_size = ParameterInteger(
    name="ValidationBatchSize",
    default_value=64
)

validation_steps_per_epoch = ParameterInteger(
    name="ValidationStepsPerEpoch",
    default_value=50
)

seed = ParameterInteger(
    name="Seed",
    default_value=42
)

run_validation = ParameterString(
    name="RunValidation",
    default_value="True",
)

train_instance_count = ParameterInteger(
    name="TrainInstanceCount",
    default_value=1
)

train_instance_type = ParameterString(
    name="TrainInstanceType",
    default_value="ml.c5.9xlarge"
)

train_volume_size = ParameterInteger(
    name="TrainVolumeSize",
    default_value=256
) 

input_mode = ParameterString(
    name="InputMode",
    default_value="File",
)

<a name='KM-2.2.'></a>
### 2.2. Configure hyper-parameters

Setup the dictionary that will be passed into the hyperparameters argument.

In [164]:
hyperparameters={
    'max_seq_length': max_seq_length,
    'freeze_bert_layer': freeze_bert_layer,
    'epochs': epochs,
    'learning_rate': learning_rate,
    'train_batch_size': train_batch_size,
    'train_steps_per_epoch': train_steps_per_epoch,
    'validation_batch_size': validation_batch_size,
    'validation_steps_per_epoch': validation_steps_per_epoch,
    'seed': seed,
    'run_validation': run_validation
}

<a name='KM-2.3.'></a>
### 2.3. Configure model-evaluation metrics

Choose loss and accuracy as the evaluation metrics.

In [165]:
metric_definitions = [
     {'Name': 'validation:loss', 'Regex': 'val_loss: ([0-9.]+)'},
     {'Name': 'validation:accuracy', 'Regex': 'val_acc: ([0-9.]+)'},
]

For example, these sample log lines...
```
[step: 100] val_loss: 0.55 - val_acc: 74.64%
```
...will produce the following metrics in CloudWatch:

`validation:loss` =  0.55

`validation:accuracy` = 74.64


<a name='KM-2.4.'></a>
### 2.4. Configure the `PyTorchEstimator`

Configure an estimator and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_dir` so that it can be hosted later.

In [166]:
from sagemaker.pytorch import PyTorch as PyTorchEstimator

estimator = PyTorchEstimator(
    entry_point='train.py',
    source_dir='src',
    role=role,
    instance_count=train_instance_count,
    instance_type=train_instance_type,
    volume_size=train_volume_size,
    py_version='py3',
    framework_version='1.6.0',
    hyperparameters=hyperparameters,
    metric_definitions=metric_definitions,
    input_mode=input_mode
)

<a name='KM-2.5.'></a>
### 2.5. Setup pipeline step caching
Step signature caching allows SageMaker Pipelines, before executing a step, to find a previous execution of a step that was called using the same arguments. Cache hit gets created if the previous execution is found. Then during execution instead of recomputing the step, pipelines propagates the values from the cache hit.

Timeout period is defined using [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601#Durations) format, it can contain a year, month, week, day, hour, and minute value. 

More details on SageMaker Pipeline step caching can be found [here](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html).

In [167]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H") # PT1H represents `one hour`

<a name='KM-2.6.'></a>
### 2.6. Configure the `TrainingStep`

Now configure the `TrainingStep` calling the outputs of the processing step:

In [168]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name='Train',
    estimator=estimator,
    inputs={
        'train': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'sentiment-train'
            ].S3Output.S3Uri,
            content_type='text/csv'
        ),
        'validation': TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[
                'sentiment-validation'
            ].S3Output.S3Uri,
            content_type='text/csv'
        )
    },
    cache_config=cache_config
)

print(training_step)

TrainingStep(name='Train', step_type=<StepTypeEnum.TRAINING: 'Training'>)


<a name='KM-step3'></a>
### Step 3

Use `__dict__` method to print out attributes of the training step properties. Briefly review the result. The attributes match the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.

In [169]:
training_step.properties.__dict__

{'_path': 'Steps.Train',
 '_shape_name': 'DescribeTrainingJobResponse',
 'TrainingJobName': <sagemaker.workflow.properties.Properties at 0x7f9728e767d0>,
 'TrainingJobArn': <sagemaker.workflow.properties.Properties at 0x7f9728efbe90>,
 'TuningJobArn': <sagemaker.workflow.properties.Properties at 0x7f9728efbb90>,
 'LabelingJobArn': <sagemaker.workflow.properties.Properties at 0x7f9728efbad0>,
 'AutoMLJobArn': <sagemaker.workflow.properties.Properties at 0x7f9728efbc90>,
 'ModelArtifacts': <sagemaker.workflow.properties.Properties at 0x7f9728efb950>,
 'TrainingJobStatus': <sagemaker.workflow.properties.Properties at 0x7f9728efbcd0>,
 'SecondaryStatus': <sagemaker.workflow.properties.Properties at 0x7f9728efb510>,
 'FailureReason': <sagemaker.workflow.properties.Properties at 0x7f9728efba90>,
 'HyperParameters': <sagemaker.workflow.properties.Properties at 0x7f9728efbe10>,
 'AlgorithmSpecification': <sagemaker.workflow.properties.Properties at 0x7f9728efbb10>,
 'RoleArn': <sagemaker.workf

<a name='KM-3.'></a>
# 3. Configure model-evaluation step

First, develop an evaluation script that will be specified in the model evaluation processing step. The evaluation script users the trained model and the test dataset to produce a JSON file with classification evaluation metrics such as accuracy.

After pipeline execution, you will examine the resulting `evaluation.json` for analysis.

The evaluation script performs the following steps:
* loads in the model
* reads in the test data
* issues a bunch of predictions against the test data
* builds a classification report, including accuracy
* saves the evaluation report to the evaluation directory

Create an instance of the `SKLearnProcessor` to run our evaluation script as a scikit-learn-based SageMaker processing job.

In [170]:
from sagemaker.sklearn.processing import SKLearnProcessor

evaluation_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={'AWS_DEFAULT_REGION': region},
    max_runtime_in_seconds=7200
)

Setup the output `PropertyFile`.

In [171]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(
    name='EvaluationReport',
    output_name='metrics',
    path='evaluation.json'
)

Use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance's `run` method.

In [172]:
from sagemaker.processing import ProcessingInput, ProcessingOutput

evaluation_step = ProcessingStep(
    name='EvaluateModel',
    processor=evaluation_processor,
    code='src/evaluate_model_metrics.py',
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination='/opt/ml/processing/input/model'
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs['sentiment-test'].S3Output.S3Uri,
            destination='/opt/ml/processing/input/data'
        )
    ],
    outputs=[
        ProcessingOutput(output_name='metrics', 
                         s3_upload_mode='EndOfJob',
                         source='/opt/ml/processing/output/metrics/'),
    ],
    job_arguments=[
        '--max-seq-length', str(max_seq_length.default_value),
    ],
    property_files=[evaluation_report],
)

<a name='KM-4.'></a>
# 4. Configure and register model step

<a name='KM-4.1.'></a>
### 4.1. Configure the model for deployment

Use the estimator instance that was used for the training step to construct an instance of `RegisterModel`. The result of executing `RegisterModel` in a pipeline is a model package. A model package is a reusable model artifacts abstraction that packages all ingredients necessary for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

A model package group is a collection of model packages. You can create a model package group for a specific ML business problem, and you can keep adding versions/model packages into it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker workflow pipeline so that they can keep adding versions/model packages to the group for every workflow pipeline run.

The construction of `RegisterModel` is very similar to an estimator instance's `register` method, for those familiar with the existing Python SDK.

In particular, we will pass in the `S3ModelArtifacts` from the `training_step` properties.

Of note, here we will be provided a specific model package group name which will be used in the Model Registry and Continuous Integration/Continuous Deployment (CI/CD) work later on. Let's setup the variables.

In [173]:
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

deploy_instance_type = ParameterString(
    name="DeployInstanceType",
    default_value="ml.m5.large"
)

deploy_instance_count = ParameterInteger(
    name="DeployInstanceCount",
    default_value=1
)

In [174]:
model_package_group_name = f"BERT-Reviews-{timestamp}"

print(model_package_group_name)

BERT-Reviews-1626196769


Configure the `ModelMetrics` to be stored as metadata.

In [175]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

print(model_metrics)

<sagemaker.model_metrics.ModelMetrics object at 0x7f9729061390>


Define deployment image for inference.

In [176]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="1.6.0",
    py_version="py36",
    instance_type=deploy_instance_type,
    image_scope="inference"
)
print(inference_image_uri)    #AWS ECR for Pytorch repository

763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:1.6.0-cpu-py36


<a name='KM-4.2.'></a>
### 4.2. Register the model for deployment

<a name='KM-step4'></a>
### Step 4

Configure the register model step by passing the image uri from above.

In [177]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="RegisterModel",
    estimator=estimator,
    image_uri=inference_image_uri, 
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/jsonlines"],
    response_types=["application/jsonlines"],
    inference_instances=[deploy_instance_type],
    transform_instances=[deploy_instance_type], # batch transform is not used in this lab
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

<a name='KM-5.'></a>
# 5. Create model for deployment step

<a name='KM-step5'></a>
### Step 5

Configure model for deployment by passing the same inference image into the `image_uri` argument of the function `Model`.

In [178]:
from sagemaker.model import Model

model_name = 'bert-model-{}'.format(timestamp)

model = Model(
    name=model_name,
    image_uri=inference_image_uri, # Replace None
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

Now configure create model input:

In [179]:
from sagemaker.inputs import CreateModelInput

create_inputs = CreateModelInput(
    instance_type=deploy_instance_type, 
)

<a name='KM-step6'></a>
### Step 6

Configure create model step for the workflow.Here we define the above model and model inputs configuration into the related arguments of the function `CreateModelStep`.

In [180]:
from sagemaker.workflow.steps import CreateModelStep

create_step = CreateModelStep(
    name="CreateModel",
    model=model, # mention the model name defined above
    inputs=create_inputs, # mention the inputs from above
    )

<a name='KM-6.'></a>
# 6. Check accuracy condition step

Finally, you would like to only register this model if the accuracy of the model, as determined by our evaluation step `evaluation_step`, exceeded some value. A `ConditionStep` allows for pipelines to support conditional execution in the pipeline DAG based on conditions of step properties. 

Below, you will:

* define a minimum accuracy value as a parameter
* define a `ConditionGreaterThan` on the accuracy value found in the output of the evaluation step, `evaluation_step`.
* use the condition in the list of conditions in a `ConditionStep`
* pass the `RegisterModel` step collection into the `if_steps` of the `ConditionStep`

In [181]:
min_accuracy_value = ParameterFloat(
    name="MinAccuracyValue",
    default_value=0.33 # random choice from three classes
)

In [182]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value",
    ),
    right=min_accuracy_value # minimum accuracy threshold
)

minimum_accuracy_condition_step = ConditionStep(
    name="AccuracyCondition",
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step, create_step], # successfully exceeded or equaled the minimum accuracy, continue with model registration
    else_steps=[], # did not exceed the minimum accuracy, the model will not be registered
)

<a name='KM-7.'></a>
# 7. Create pipeline

<a name='KM-7.1.'></a>
### 7.1. Define a pipeline of parameters, steps, and conditions

Let's tie it all up into a workflow pipeline so you can execute it, and even schedule it.

A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair so we can append the timestamp to the name to reduce the chance of name conflict.

Note:

* All the parameters used in the definitions must be present.
* Steps passed into the pipeline need not be in the order of execution. The SageMaker workflow service will resolve the _data dependency_ DAG as steps the execution complete.
* Steps must be unique to either pipeline step list or a single condition step if/else list.

In [183]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_count,
        processing_instance_type,
        max_seq_length,
        balance_dataset,
        train_split_percentage,
        validation_split_percentage,
        test_split_percentage,
        feature_store_offline_prefix,
        feature_group_name,
        epochs,
        learning_rate,
        train_batch_size,
        train_steps_per_epoch,
        validation_batch_size,
        validation_steps_per_epoch,
        freeze_bert_layer,
        seed,
        train_instance_count,
        train_instance_type,
        train_volume_size,        
        input_mode,
        run_validation,
        min_accuracy_value,
        model_approval_status,
        deploy_instance_type,
        deploy_instance_count
    ],
    steps=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
    sagemaker_session=sess,
)

Let's examine the JSON of the pipeline definition that meets the SageMaker Workflow Pipeline DSL specification.

By examining the definition, you are also confirming that the pipeline was well-defined, and that the parameters and step properties resolve correctly.

In [184]:
import json
from pprint import pprint

definition = json.loads(pipeline.definition())

pprint(definition)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Metadata': {},
 'Parameters': [{'DefaultValue': 's3://kaushal-mlops/data/raw/',
                 'Name': 'InputData',
                 'Type': 'String'},
                {'DefaultValue': 1,
                 'Name': 'ProcessingInstanceCount',
                 'Type': 'Integer'},
                {'DefaultValue': 'ml.c5.2xlarge',
                 'Name': 'ProcessingInstanceType',
                 'Type': 'String'},
                {'DefaultValue': 128,
                 'Name': 'MaxSeqLength',
                 'Type': 'Integer'},
                {'DefaultValue': 'True',
                 'Name': 'BalanceDataset',
                 'Type': 'String'},
                {'DefaultValue': 0.9,
                 'Name': 'TrainSplitPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 0.05,
                 'Name': 'ValidationSplitPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 0.05,
                 'Name': 'TestSplitPercentage',
         

_Ignore the `WARNING` below_

Create pipeline using the `create` method and then print the Amazon Resource Name (ARN) of it.

In [185]:
response = pipeline.create(role_arn=role)

pipeline_arn = response["PipelineArn"]
print(pipeline_arn)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


arn:aws:sagemaker:us-east-1:704911750285:pipeline/bert-pipeline-1626196769


_Ignore the `WARNING` ^^ above ^^_

<a name='KM-7.2.'></a>
### 7.2. Start Pipeline
Let's submit our pipeline definition to the Amazon SageMaker Pipeline service. The role passed in will be used by the service to create all the jobs defined in the steps. You will start the pipeline using the parameters passed into the `start()` function.

In [186]:
execution = pipeline.start(
    parameters=dict(
        InputData=raw_input_data_s3_uri,
        ProcessingInstanceCount=1,
        ProcessingInstanceType='ml.c5.2xlarge',
        MaxSeqLength=128,
        BalanceDataset='True',
        TrainSplitPercentage=0.9,
        ValidationSplitPercentage=0.05,
        TestSplitPercentage=0.05,
        FeatureStoreOfflinePrefix='reviews-feature-store-'+str(timestamp),
        FeatureGroupName='reviews-feature-group-'+str(timestamp),
        Epochs=3,
        LearningRate=0.000012,
        TrainBatchSize=64,
        TrainStepsPerEpoch=50,
        ValidationBatchSize=64,
        ValidationStepsPerEpoch=64,
        FreezeBertLayer='False',
        Seed=42,         
        TrainInstanceCount=1,
        TrainInstanceType='ml.c5.9xlarge',
        TrainVolumeSize=256,
        InputMode='File',
        RunValidation='True',
        MinAccuracyValue=0.01,
        ModelApprovalStatus='PendingManualApproval', 
        DeployInstanceType='ml.m5.large',
        DeployInstanceCount=1 
    )
)

print(execution.arn)

arn:aws:sagemaker:us-east-1:704911750285:pipeline/bert-pipeline-1626196769/execution/sheppysdxasf


<a name='KM-7.3.'></a>
### 7.3. Wait for pipeline execution

Now you can describe execution instance and list the steps in the execution to find out more about the execution.

In [187]:
from pprint import pprint

execution_run = execution.describe()
pprint(execution_run)

{'CreatedBy': {'DomainId': 'd-imx3yawcgs5a',
               'UserProfileArn': 'arn:aws:sagemaker:us-east-1:704911750285:user-profile/d-imx3yawcgs5a/default-1625860915913',
               'UserProfileName': 'default-1625860915913'},
 'CreationTime': datetime.datetime(2021, 7, 13, 17, 19, 34, 800000, tzinfo=tzlocal()),
 'LastModifiedBy': {'DomainId': 'd-imx3yawcgs5a',
                    'UserProfileArn': 'arn:aws:sagemaker:us-east-1:704911750285:user-profile/d-imx3yawcgs5a/default-1625860915913',
                    'UserProfileName': 'default-1625860915913'},
 'LastModifiedTime': datetime.datetime(2021, 7, 13, 17, 19, 34, 800000, tzinfo=tzlocal()),
 'PipelineArn': 'arn:aws:sagemaker:us-east-1:704911750285:pipeline/bert-pipeline-1626196769',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:704911750285:pipeline/bert-pipeline-1626196769/execution/sheppysdxasf',
 'PipelineExecutionDisplayName': 'execution-1626196774870',
 'PipelineExecutionStatus': 'Executing',
 'ResponseMetadata': {

Print the execution display name and its ARN:

In [188]:
execution_run_name = execution_run['PipelineExecutionDisplayName']
print(execution_run_name)

execution-1626196774870


In [189]:
pipeline_execution_arn = execution_run['PipelineExecutionArn']
#arn:aws:sagemaker:us-east-1:704911750285:pipeline/bert-pipeline-1626191036/execution/wssuje9jjjoy
print(pipeline_execution_arn)

arn:aws:sagemaker:us-east-1:704911750285:pipeline/bert-pipeline-1626196769/execution/sheppysdxasf


<a name='KM-7.4.'></a>
### 7.4. Describe completed pipeline

Wait for the first step to start running and print the information about it:

In [190]:
import time

time.sleep(30)

execution.list_steps()

[{'StepName': 'Processing',
  'StartTime': datetime.datetime(2021, 7, 13, 17, 19, 35, 373000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:704911750285:processing-job/pipelines-sheppysdxasf-processing-wgf8ygcap5'}}}]

<a name='KM-7.5.'></a>
### 7.5. Wait for the pipeline to complete

To get the information about the pipeline execution we can use low-level service client of the boto3 session. It is also useful for other operations that we will see below.

In the code below we will be observing the pipeline execution summary and waiting for the execution status to change from `Executing` to `Succeeded`.

### _This cell will take approximately 30-45 minutes to run._

In [191]:
%%time

import time
from pprint import pprint

sm = boto3.Session().client(service_name='sagemaker', region_name=region)

executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
print(pipeline_execution_status)

while pipeline_execution_status=='Executing':
    try:
        executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)['PipelineExecutionSummaries']
        pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
    except Exception as e:
        print('Please wait...')
        time.sleep(30)    
    
pprint(executions_response)

Executing
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
Please wait...
[{'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:704911750285:pipeline/bert-pipeline-1626196769/execution/sheppysdxasf',
  'PipelineExecutionDisplayName': 'execution-1626196774870',
  'PipelineExecutionStatus': 'Succeeded',
  'StartTime': datetime.datetime(2021, 7, 13, 17, 19, 34, 800000, tzinfo=tzlocal())}]

_Wait for the pipeline ^^ above ^^ to complete._

We can list the execution steps to check out the status and artifacts:

In [192]:
pipeline_execution_status = executions_response[0]['PipelineExecutionStatus']
print(pipeline_execution_status)

Succeeded


In [193]:
pipeline_execution_arn = executions_response[0]['PipelineExecutionArn']
print(pipeline_execution_arn)

arn:aws:sagemaker:us-east-1:704911750285:pipeline/bert-pipeline-1626196769/execution/sheppysdxasf


<a name='KM-8.'></a>
# 8. Evaluate the model

<a name='KM-8.1.'></a>
### 8.1. Describe evaluation metrics

Examine the resulting model evaluation after the pipeline completes. Download the resulting evaluation.json file from S3 and print the report.

In [194]:
processing_job_name = None

# pull the processing step name
for execution_step in reversed(execution.list_steps()):
    if execution_step['StepName'] == 'Processing':
        processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]

# get the description of the processing job
describe_transform_processing_job_response = sm.describe_processing_job(ProcessingJobName=processing_job_name)

# get the output S3 path
transform_output_s3_uri = describe_transform_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print('Transform output {}'.format(transform_output_s3_uri))

Transform output s3://sagemaker-us-east-1-704911750285/sagemaker-scikit-learn-2021-07-13-17-19-30-829/output/sentiment-train


In [195]:
# list the files in the resulting output S3 path
!aws s3 ls --recursive $transform_output_s3_uri

2021-07-13 17:32:45    4884058 sagemaker-scikit-learn-2021-07-13-17-19-30-829/output/sentiment-train/part-algo-1-womens_clothing_ecommerce_reviews.tsv


<a name='KM-step7'></a>
### Step 7

Pull the name of the model-evaluation step and then get the S3 path of the evaluation metrics, which will contain the evaluation report. We can find the execution step with the step name `EvaluateModel` following the example above.

In [196]:
processing_job_name = None

for execution_step in reversed(execution.list_steps()):
    if execution_step['StepName'] == 'EvaluateModel': # Mention the StepName
            processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]

describe_evaluation_processing_job_response = sm.describe_processing_job(ProcessingJobName=processing_job_name)

evaluation_metrics_s3_uri = describe_evaluation_processing_job_response['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']
print('Evaluation output {}'.format(evaluation_metrics_s3_uri))

Evaluation output s3://sagemaker-us-east-1-704911750285/sagemaker-scikit-learn-2021-07-13-17-19-32-402/output/metrics


<a name='KM-8.2.'></a>
### 8.2. Review the evaluation report

Download the evaluation report and print the accuracy.

In [197]:
from pprint import pprint

evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    evaluation_metrics_s3_uri
))

pprint(json.loads(evaluation_json))

{'metrics': {'accuracy': {'value': 0.7313915857605178}}}


<a name='KM-8.3.'></a>
### 8.3. List pipeline artifacts

<a name='KM-step8'></a>
### Step 8

Print the ARN and job name of the training job from the execution step with the step name `Train` following the example above.

In [198]:
training_job_arn=None

for execution_step in execution.list_steps():
    if execution_step['StepName'] == 'Train': # mention the StepName
        training_job_arn = execution_step['Metadata']['TrainingJob']['Arn']        
        pprint(execution_step)
        break
print('Training job ARN: {}'.format(training_job_arn))
        
training_job_name = training_job_arn.split('/')[-1]
print('Training job Name: {}'.format(training_job_name))

{'EndTime': datetime.datetime(2021, 7, 13, 17, 50, 35, 423000, tzinfo=tzlocal()),
 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:704911750285:training-job/pipelines-sheppysdxasf-train-gb8vwckom0'}},
 'StartTime': datetime.datetime(2021, 7, 13, 17, 32, 53, 3000, tzinfo=tzlocal()),
 'StepName': 'Train',
 'StepStatus': 'Succeeded'}
Training job ARN: arn:aws:sagemaker:us-east-1:704911750285:training-job/pipelines-sheppysdxasf-train-gb8vwckom0
Training job Name: pipelines-sheppysdxasf-train-gb8vwckom0


Using similar approach we can find and print the pipeline artifacts.

In [199]:
processing_job_name=None
training_job_name=None

In [200]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(execution.list_steps()):
    pprint(execution_step)
    if execution_step['StepName'] == 'Processing':
        processing_job_name=execution_step['Metadata']['ProcessingJob']['Arn'].split('/')[-1]
        print('Processing job name: {}'.format(processing_job_name))
        display(viz.show(processing_job_name=processing_job_name))
    elif execution_step['StepName'] == 'Train':
        training_job_name=execution_step['Metadata']['TrainingJob']['Arn'].split('/')[-1]
        print('Training job name: {}'.format(training_job_name))
        display(viz.show(training_job_name=training_job_name))
    else:
        display(viz.show(pipeline_execution_step=execution_step))
        time.sleep(5)

{'EndTime': datetime.datetime(2021, 7, 13, 17, 32, 52, 606000, tzinfo=tzlocal()),
 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:704911750285:processing-job/pipelines-sheppysdxasf-processing-wgf8ygcap5'}},
 'StartTime': datetime.datetime(2021, 7, 13, 17, 19, 35, 373000, tzinfo=tzlocal()),
 'StepName': 'Processing',
 'StepStatus': 'Succeeded'}
Processing job name: pipelines-sheppysdxasf-processing-wgf8ygcap5


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...-17-19-34-005/input/code/prepare_data.py,Input,DataSet,ContributedTo,artifact
1,s3://kaushal-mlops/data/raw/,Input,DataSet,ContributedTo,artifact
2,68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...07-13-17-19-30-829/output/sentiment-test,Output,DataSet,Produced,artifact
4,s3://...17-19-30-829/output/sentiment-validation,Output,DataSet,Produced,artifact
5,s3://...7-13-17-19-30-829/output/sentiment-train,Output,DataSet,Produced,artifact


{'EndTime': datetime.datetime(2021, 7, 13, 17, 50, 35, 423000, tzinfo=tzlocal()),
 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:704911750285:training-job/pipelines-sheppysdxasf-train-gb8vwckom0'}},
 'StartTime': datetime.datetime(2021, 7, 13, 17, 32, 53, 3000, tzinfo=tzlocal()),
 'StepName': 'Train',
 'StepStatus': 'Succeeded'}
Training job name: pipelines-sheppysdxasf-train-gb8vwckom0


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...17-19-30-829/output/sentiment-validation,Input,DataSet,ContributedTo,artifact
1,s3://...7-13-17-19-30-829/output/sentiment-train,Input,DataSet,ContributedTo,artifact
2,76310...onaws.com/pytorch-training:1.6.0-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...asf-Train-gB8vwCkoM0/output/model.tar.gz,Output,Model,Produced,artifact


{'EndTime': datetime.datetime(2021, 7, 13, 17, 58, 58, 286000, tzinfo=tzlocal()),
 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:704911750285:processing-job/pipelines-sheppysdxasf-evaluatemodel-sasfjnzcbe'}},
 'StartTime': datetime.datetime(2021, 7, 13, 17, 50, 36, 139000, tzinfo=tzlocal()),
 'StepName': 'EvaluateModel',
 'StepStatus': 'Succeeded'}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...232/input/code/evaluate_model_metrics.py,Input,DataSet,ContributedTo,artifact
1,s3://...07-13-17-19-30-829/output/sentiment-test,Input,DataSet,ContributedTo,artifact
2,s3://...asf-Train-gB8vwCkoM0/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...n-2021-07-13-17-19-32-402/output/metrics,Output,DataSet,Produced,artifact


{'EndTime': datetime.datetime(2021, 7, 13, 17, 58, 59, 8000, tzinfo=tzlocal()),
 'Metadata': {'Condition': {'Outcome': 'True'}},
 'StartTime': datetime.datetime(2021, 7, 13, 17, 58, 58, 700000, tzinfo=tzlocal()),
 'StepName': 'AccuracyCondition',
 'StepStatus': 'Succeeded'}


None

{'EndTime': datetime.datetime(2021, 7, 13, 17, 59, 0, 120000, tzinfo=tzlocal()),
 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:704911750285:model-package/bert-reviews-1626196769/1'}},
 'StartTime': datetime.datetime(2021, 7, 13, 17, 58, 59, 196000, tzinfo=tzlocal()),
 'StepName': 'RegisterModel',
 'StepStatus': 'Succeeded'}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...asf-Train-gB8vwCkoM0/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,76310...aws.com/pytorch-inference:1.6.0-cpu-py36,Input,Image,ContributedTo,artifact
2,bert-reviews-1626196769-1-PendingManualApprova...,Input,Approval,ContributedTo,action
3,BERT-Reviews-1626196769-1626199139-aws-model-p...,Output,ModelGroup,AssociatedWith,context


{'EndTime': datetime.datetime(2021, 7, 13, 17, 59, 0, 327000, tzinfo=tzlocal()),
 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:704911750285:model/pipelines-sheppysdxasf-createmodel-y5kllzxevq'}},
 'StartTime': datetime.datetime(2021, 7, 13, 17, 58, 59, 239000, tzinfo=tzlocal()),
 'StepName': 'CreateModel',
 'StepStatus': 'Succeeded'}


None

<a name='KM-9.'></a>
# 9. Deploy and test the model

<a name='KM-9.1.'></a>
### 9.1. Approve trained model

The pipeline created a model package version within the specified model package group and an approval status of `PendingManualApproval`.  This requires a separate step to manually approve the model before deploying to production.

You can approve the model using the SageMaker Studio UI or programmatically as shown below.

Get the model package ARN.

In [201]:
for execution_step in execution.list_steps():
    if execution_step['StepName'] == 'RegisterModel':
        model_package_arn = execution_step['Metadata']['RegisterModel']['Arn']
        break
print(model_package_arn)

arn:aws:sagemaker:us-east-1:704911750285:model-package/bert-reviews-1626196769/1


Update the model package with the `Approved` status to prepare for deployment.

The model must be `Approved` before it can be deployed.

In [202]:
model_package_update_response = sm.update_model_package(
    ModelPackageArn=model_package_arn,
    ModelApprovalStatus="Approved",
)

pprint(model_package_update_response)

{'ModelPackageArn': 'arn:aws:sagemaker:us-east-1:704911750285:model-package/bert-reviews-1626196769/1',
 'ResponseMetadata': {'HTTPHeaders': {'content-length': '102',
                                      'content-type': 'application/x-amz-json-1.1',
                                      'date': 'Tue, 13 Jul 2021 18:05:00 GMT',
                                      'x-amzn-requestid': '1266d58e-240e-4d9f-ac7a-d46afcdc584c'},
                      'HTTPStatusCode': 200,
                      'RequestId': '1266d58e-240e-4d9f-ac7a-d46afcdc584c',
                      'RetryAttempts': 0}}


<a name='KM-9.2.'></a>
### 9.2. Deploy model

Get the model ARN and the model name from it.

In [203]:
for execution_step in execution.list_steps():
    print(execution_step['StepName'])
    if execution_step['StepName'] == 'CreateModel':
        model_arn = execution_step['Metadata']['Model']['Arn']
        break
print(model_arn)

model_name = model_arn.split('/')[-1]
print(model_name)

CreateModel
arn:aws:sagemaker:us-east-1:704911750285:model/pipelines-sheppysdxasf-createmodel-y5kllzxevq
pipelines-sheppysdxasf-createmodel-y5kllzxevq


<a name='KM-9.3.'></a>
### 9.3. Create endpoint from registry

Configure the endpoint.

In [204]:
endpoint_config_name = 'bert-model-epc-{}'.format(timestamp)
print(endpoint_config_name)

create_endpoint_config_response = sm.create_endpoint_config(
    EndpointConfigName = endpoint_config_name,
    ProductionVariants=[{
        'InstanceType':'ml.m5.xlarge',
        'InitialVariantWeight':1,
        'InitialInstanceCount':1,
        'ModelName': model_name,
        'VariantName':'AllTraffic'}])

bert-model-epc-1626196769


Create the endpoint.

In [205]:
pipeline_endpoint_name = 'bert-model-ep-{}'.format(timestamp)
print("EndpointName={}".format(pipeline_endpoint_name))

create_endpoint_response = sm.create_endpoint(
    EndpointName=pipeline_endpoint_name,
    EndpointConfigName=endpoint_config_name)
print(create_endpoint_response['EndpointArn'])

EndpointName=bert-model-ep-1626196769
arn:aws:sagemaker:us-east-1:704911750285:endpoint/bert-model-ep-1626196769


In [206]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/endpoints/{}">SageMaker REST Endpoint</a></b>'.format(region, pipeline_endpoint_name)))

Wait until the endpoint is deployed.

### _This cell will take approximately 5-10 minutes to run._

In [207]:
%%time

while True:
    try: 
        waiter = sm.get_waiter('endpoint_in_service')
        print('Waiting for endpoint to be in `InService`...')
        waiter.wait(EndpointName=pipeline_endpoint_name)
        break;
    except:
        print('Waiting for endpoint...')
        endpoint_status = sm.describe_endpoint(EndpointName=pipeline_endpoint_name)['EndpointStatus']
        print('Endpoint status: {}'.format(endpoint_status))
        if endpoint_status == 'Failed':
            break
        time.sleep(30)
        
print('Endpoint deployed.')

Waiting for endpoint to be in `InService`...
Endpoint deployed.
CPU times: user 195 ms, sys: 20.6 ms, total: 216 ms
Wall time: 7min 1s


_Wait until the endpoint ^^ above ^^ is deployed._

<a name='KM-9.4.'></a>
### 9.4. Test model

In [208]:
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONLinesSerializer
from sagemaker.deserializers import JSONLinesDeserializer

class SentimentPredictor(Predictor):
    def __init__(self, endpoint_name, sagemaker_session):
        super().__init__(endpoint_name, 
                         sagemaker_session=sagemaker_session, 
                         serializer=JSONLinesSerializer(),
                         deserializer=JSONLinesDeserializer())

Predict the `sentiment` with `review_body` samples and review the result:

In [209]:
inputs = [
    {"features": ["I love this product!"]},
    {"features": ["OK, but not great."]},
    {"features": ["This is not the right product."]},
]

predictor = SentimentPredictor(endpoint_name=pipeline_endpoint_name,
                               sagemaker_session=sess)

predicted_classes = predictor.predict(inputs)

for predicted_class in predicted_classes:
    print("Predicted class {} with probability {}".format(predicted_class['predicted_label'], predicted_class['probability']))

Predicted class 1 with probability 0.9310359954833984
Predicted class 0 with probability 0.4926156997680664
Predicted class -1 with probability 0.7770135402679443


<a name='KM-9.5.'></a>
### 9.5. SageMaker Studio extensions

SageMaker Studio provides a rich set of features to visually inspect SageMaker resources including pipelines, training jobs, and endpoints. Please take time to explore it opening the facet shown in the following image.

![](images/sm_studio_extensions_pipelines.png)

This completes the deployment of an end-to-end pipeline with BERT and SageMaker Pipelines!