## TensorFlow 2 Complete Project Workflow in Amazon SageMaker
### Data Preprocessing -> Code Prototyping -> Automatic Model Tuning -> Deployment
    
1. [Introduction](#Introduction)
2. [SageMaker Processing for dataset transformation](#SageMakerProcessing)
3. [Local Mode training](#LocalModeTraining)
4. [Local Mode endpoint](#LocalModeEndpoint)
5. [SageMaker hosted training](#SageMakerHostedTraining)
6. [Automatic Model Tuning](#AutomaticModelTuning)
7. [SageMaker hosted endpoint](#SageMakerHostedEndpoint)
8. [Workflow Automation with SageMaker Pipelines](#WorkflowAutomation)
    1. [Pipeline Parameters](#PipelineParameters)
    2. [Processing Step](#ProcessingStep)
    3. [Training and Model Creation Steps](#TrainingModelCreation)
    4. [Batch Scoring Step](#BatchScoringStep)
    5. [Creating and executing the pipeline](#CreatingExecutingPipeline)
9. [ML Lineage Tracking](#LineageOfPipelineArtifacts)
10. [Extensions](#Extensions)


***Prerequisites:***
- In SageMaker Studio, for kernel select **Python 3 (TensorFlow 2.3 Python 3.7 CPU Optimized)**; for a SageMaker Notebook Instance, select the kernel **conda_tensorflow2_py36**.
- If you're using SageMaker Studio, skip the Local Mode sections of this example (those sections work with a SageMaker Notebook Instance).

    
## Introduction <a class="anchor" id="Introduction">

If you are using TensorFlow 2, you can use the Amazon SageMaker prebuilt TensorFlow 2 framework container with training scripts similar to those you would use outside SageMaker. This feature is named Script Mode.  Using Script Mode and other SageMaker features, you can build a complete workflow for a TensorFlow 2 project.  This notebook presents such a workflow, including all key steps such as preprocessing data with SageMaker Processing, model training with SageMaker hosted training, model tuning with SageMaker Automatic Model Tuning, and production-ready model deployment with SageMaker hosted endpoints. 

Working through these steps in a notebook is part of the prototyping process; however, a repeatable production workflow typically is run outside notebooks. To demonstrate automating the workflow, we'll use [Amazon SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines) for workflow orchestration. Purpose-built for machine learning (ML), SageMaker Pipelines helps you automate different steps of the ML workflow including data processing, model training, and batch prediction (scoring), and apply conditions such as approvals for model quality. It also includes a model registry and model lineage tracker.   

To enable you to run this notebook within a reasonable time (typically less than an hour), this notebook's use case is a straightforward regression task:  predicting house prices based on the well-known Boston Housing dataset. This public dataset contains 13 features regarding housing stock of towns in the Boston area.  Features include average number of rooms, accessibility to radial highways, adjacency to a major river, etc.  

To begin, we'll import some necessary packages and set up directories for local training and test data.  We'll also set up a SageMaker Session to perform various operations, and specify an Amazon S3 bucket to hold input data and output.  The default bucket used here is created by SageMaker if it doesn't already exist, and named in accordance with the AWS account ID and AWS Region.  

In [1]:
import boto3
import os
import sagemaker
import tensorflow as tf

sess = sagemaker.session.Session()
bucket = sess.default_bucket() 
region = boto3.Session().region_name

data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)
train_dir = os.path.join(os.getcwd(), 'data/train')
os.makedirs(train_dir, exist_ok=True)
test_dir = os.path.join(os.getcwd(), 'data/test')
os.makedirs(test_dir, exist_ok=True)
raw_dir = os.path.join(os.getcwd(), 'data/raw')
os.makedirs(raw_dir, exist_ok=True)

# SageMaker Processing for dataset transformation <a class="anchor" id="SageMakerProcessing">

Next, we'll import the dataset and transform it with SageMaker Processing, which can be used to process terabytes of data in a SageMaker-managed cluster separate from the instance running your notebook server. In a typical SageMaker workflow, ***notebooks are only used for prototyping*** and can be run on relatively inexpensive and less powerful instances, while full-scale processing, training and model hosting tasks are run on separate, more powerful SageMaker-managed instances.  SageMaker Processing includes off-the-shelf support for Scikit-learn and Spark, as well as a Bring Your Own Container option, so it can be used with many different data transformation technologies and tasks.  Another SageMaker feature related to data processing is [SageMaker Data Wrangler](https://aws.amazon.com/sagemaker/data-wrangler/), a visual data preparation tool integrated with the SageMaker Studio UI.    

To work with SageMaker Processing, first we'll load the Boston Housing dataset, save the raw feature data and upload it to Amazon S3 so it can be accessed by SageMaker Processing.  We'll also save the labels for training and testing.

In [2]:
import numpy as np
from tensorflow.python.keras.datasets import boston_housing
from sklearn.preprocessing import StandardScaler

(x_train, y_train), (x_test, y_test) = boston_housing.load_data()

np.save(os.path.join(raw_dir, 'x_train.npy'), x_train)
np.save(os.path.join(raw_dir, 'x_test.npy'), x_test)
np.save(os.path.join(raw_dir, 'y_train.npy'), y_train)
np.save(os.path.join(raw_dir, 'y_test.npy'), y_test)
s3_prefix = 'tf-2-workflow'
rawdata_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)
print(raw_s3)

Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/boston_housing.npz
s3://sagemaker-us-east-1-154082585954/tf-2-workflow/data/raw


Next, simply supply an ordinary Python data preprocessing script as shown below.  For this example, we're using a SageMaker prebuilt Scikit-learn framework container, which includes many common functions for processing data.  There are few limitations on what kinds of code and operations you can run, and only a minimal API contract:  input and output data must be placed in specified directories.  If this is done, SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete.

In [3]:
%%writefile preprocessing.py

import glob
import numpy as np
import os
from sklearn.preprocessing import StandardScaler

if __name__=='__main__':
    
    input_files = glob.glob('{}/*.npy'.format('/opt/ml/processing/input'))
    print('\nINPUT FILE LIST: \n{}\n'.format(input_files))
    scaler = StandardScaler()
    for file in input_files:
        raw = np.load(file)
        # only transform feature columns
        if 'y_' not in file:
            transformed = scaler.fit_transform(raw)
        if 'train' in file:
            if 'y_' in file:
                output_path = os.path.join('/opt/ml/processing/train', 'y_train.npy')
                np.save(output_path, raw)
                print('SAVED LABEL TRAINING DATA FILE\n')
            else:
                output_path = os.path.join('/opt/ml/processing/train', 'x_train.npy')
                np.save(output_path, transformed)
                print('SAVED TRANSFORMED TRAINING DATA FILE\n')
        else:
            if 'y_' in file:
                output_path = os.path.join('/opt/ml/processing/test', 'y_test.npy')
                np.save(output_path, raw)
                print('SAVED LABEL TEST DATA FILE\n')
            else:
                output_path = os.path.join('/opt/ml/processing/test', 'x_test.npy')
                np.save(output_path, transformed)
                print('SAVED TRANSFORMED TEST DATA FILE\n')

Writing preprocessing.py


Before starting the SageMaker Processing job, we instantiate a `SKLearnProcessor` object.  This object allows you to specify the instance type to use in the job, as well as how many instances.  Although the Boston Housing dataset is quite small, we'll use two instances to showcase how easy it is to spin up a cluster for SageMaker Processing.  

In [4]:
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor1 = SKLearnProcessor(framework_version='0.23-1',
                                     role=get_execution_role(),
                                     instance_type='ml.m5.xlarge',
                                     instance_count=2)

We're now ready to run the Processing job.  To enable distributing the data files equally among the instances, we specify the `ShardedByS3Key` distribution type in the `ProcessingInput` object.  This ensures that if we have `n` instances, each instance will receive `1/n` files from the specified S3 bucket.  It may take around 3 minutes for the following code cell to run, mainly to set up the cluster.  At the end of the job, the cluster automatically will be torn down by SageMaker.  

In [5]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from time import gmtime, strftime 

processing_job_name = "tf-2-workflow-{}".format(strftime("%d-%H-%M-%S", gmtime()))
output_destination = 's3://{}/{}/data'.format(bucket, s3_prefix)

sklearn_processor1.run(code='preprocessing.py',
                      job_name=processing_job_name,
                      inputs=[ProcessingInput(
                        source=raw_s3,
                        destination='/opt/ml/processing/input',
                        s3_data_distribution_type='ShardedByS3Key')],
                      outputs=[ProcessingOutput(output_name='train',
                                                destination='{}/train'.format(output_destination),
                                                source='/opt/ml/processing/train'),
                               ProcessingOutput(output_name='test',
                                                destination='{}/test'.format(output_destination),
                                                source='/opt/ml/processing/test')])

preprocessing_job_description = sklearn_processor1.jobs[-1].describe()


Job Name:  tf-2-workflow-18-21-48-08
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-154082585954/tf-2-workflow/data/raw', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'ShardedByS3Key', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-east-1-154082585954/tf-2-workflow-18-21-48-08/input/code/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-154082585954/tf-2-workflow/data/train', 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'test', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://sagemaker-us-east-1-154082585954/tf-2-w

In the log output of the SageMaker Processing job above, you should be able to see logs in two different colors for the two different instances, and that each instance received different files.  Without the `ShardedByS3Key` distribution type, each instance would have received a copy of **all** files.  By spreading the data equally among `n` instances, you should receive a speedup by approximately a factor of `n` for most stateless data transformations.  After saving the job results locally, we'll move on to prototyping training and inference code with Local Mode.

In [6]:
x_train_in_s3 = '{}/train/x_train.npy'.format(output_destination)
y_train_in_s3 = '{}/train/y_train.npy'.format(output_destination)
x_test_in_s3 = '{}/test/x_test.npy'.format(output_destination)
y_test_in_s3 = '{}/test/y_test.npy'.format(output_destination)

!aws s3 cp {x_train_in_s3} ./data/train/x_train.npy
!aws s3 cp {y_train_in_s3} ./data/train/y_train.npy
!aws s3 cp {x_test_in_s3} ./data/test/x_test.npy
!aws s3 cp {y_test_in_s3} ./data/test/y_test.npy

download: s3://sagemaker-us-east-1-154082585954/tf-2-workflow/data/train/x_train.npy to data/train/x_train.npy
download: s3://sagemaker-us-east-1-154082585954/tf-2-workflow/data/train/y_train.npy to data/train/y_train.npy
download: s3://sagemaker-us-east-1-154082585954/tf-2-workflow/data/test/x_test.npy to data/test/x_test.npy
download: s3://sagemaker-us-east-1-154082585954/tf-2-workflow/data/test/y_test.npy to data/test/y_test.npy


##  SageMaker hosted training <a class="anchor" id="SageMakerHostedTraining">

Once the prototyping phase of a project is complete, the hosted training feature of SageMaker is preferred for training jobs, especially large-scale, distributed training.  Unlike training in a notebook environment or SageMaker Local Mode, for hosted training the actual training itself occurs not in the notebook environment, but on a separate cluster of potentially more powerful machines managed by SageMaker.  Before starting hosted training, the data must be in S3, or an Amazon EFS or Amazon FSx for Lustre file system. We'll upload to S3 now, and confirm the upload was successful.

In [7]:
s3_prefix = 'tf-2-workflow'

traindata_s3_prefix = '{}/data/train'.format(s3_prefix)
testdata_s3_prefix = '{}/data/test'.format(s3_prefix)

In [8]:
train_s3 = sess.upload_data(path='./data/train/', key_prefix=traindata_s3_prefix)
test_s3 = sess.upload_data(path='./data/test/', key_prefix=testdata_s3_prefix)

inputs = {'train':train_s3, 'test': test_s3}

print(inputs)

{'train': 's3://sagemaker-us-east-1-154082585954/tf-2-workflow/data/train', 'test': 's3://sagemaker-us-east-1-154082585954/tf-2-workflow/data/test'}


We're now ready to set up an Estimator object for SageMaker hosted training. It is similar to a SageMaker Local Mode Estimator, except the `train_instance_type` has been set to a SageMaker ML instance type instead of `local` for Local Mode, in this case a `c5` compute optimized instance type.  The Git configuration passed into the Estimator pulls the training script from a Git repository and ensures that the team is sharing the same source controlled code instead of having different scripts scattered on various team members' machines. 

Hyperparameters are passed in as a dictionary with a longer number of epochs than the number used in a project's prototyping phase.  For hosted training initiated after prototyping is complete, you can train the model for a larger number of epochs with the expectation that model training likely will proceed without code-related errors and will converge to an improved, lower validation loss.

In [9]:
from sagemaker.tensorflow import TensorFlow

train_instance_type = 'ml.c5.xlarge'
hyperparameters = {'epochs': 30, 'batch_size': 128, 'learning_rate': 0.01}

git_config = {'repo': 'https://github.com/aws-samples/amazon-sagemaker-script-mode', 
              'branch': 'master'}

hosted_estimator = TensorFlow(
                       git_config=git_config,
                       source_dir='tf-2-workflow-smpipelines/train_model',
                       entry_point='train.py',
                       instance_type=train_instance_type,
                       instance_count=1,
                       hyperparameters=hyperparameters,
                       role=sagemaker.get_execution_role(),
                       base_job_name='tf-2-workflow',
                       framework_version='2.3.1',
                       py_version='py37',
                       script_mode=True)

After starting the hosted training job with the `fit` method call below, you should observe the training converge over the longer number of epochs to a validation loss that is considerably lower than that which can be achieved in a short prototyping test such as a Local Mode training job.  Can we do better than the model produced by this single hosted training job? We'll look into a way to do so in the **Automatic Model Tuning** section below. In the meantime, the hosted training job should take about 3 minutes to complete.  

In [10]:
hosted_estimator.fit(inputs)

2021-05-18 22:01:14 Starting - Starting the training job...
2021-05-18 22:01:38 Starting - Launching requested ML instancesProfilerReport-1621375273: InProgress
.........
2021-05-18 22:02:58 Starting - Preparing the instances for training...
2021-05-18 22:03:45 Downloading - Downloading input data......
2021-05-18 22:04:45 Training - Downloading the training image...
2021-05-18 22:05:01 Training - Training image download completed. Training in progress.[34m2021-05-18 22:05:02.880856: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:460] Initializing the SageMaker Profiler.[0m
[34m2021-05-18 22:05:02.889476: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:105] SageMaker Profiler is not enabled. The timeline writer thread will not be started, future recorded events will be dropped.[0m
[34m2021-05-18 22:05:03.210832: W tensorflow/core/profiler/internal/smprofiler_timeline.cc:460] Initializing the SageMaker Profiler.[0m
[34m2021-05-18 22:05:07,141 sagemaker-trai

All SageMaker training jobs produce a model saved in S3 that we can retrieve.  This is an example of the modularity of SageMaker: having trained the model in SageMaker, you can now take the model out of SageMaker and run it anywhere else.  Alternatively, you can deploy the model into a production-ready environment using SageMaker's hosted endpoints functionality, as shown in the **SageMaker hosted endpoint** section below, or in a batch job using SageMaker Batch Transform or SageMaker Processing.

Retrieving the model from S3 is very easy:  the hosted training estimator you created above stores a reference to the model's location in S3.  You simply copy the model from S3 using the estimator's `model_data` property and unzip it to inspect the contents.

In [11]:
!aws s3 cp {hosted_estimator.model_data} ./model/model.tar.gz

download: s3://sagemaker-us-east-1-154082585954/tf-2-workflow-2021-05-18-22-01-13-465/output/model.tar.gz to model/model.tar.gz


The unzipped archive should include the assets required by TensorFlow Serving to load the model and serve it, including a .pb file:  

In [12]:
!tar -xvzf ./model/model.tar.gz -C ./model

1/
1/variables/
1/variables/variables.data-00000-of-00001
1/variables/variables.index
1/saved_model.pb
1/assets/


## Automatic Model Tuning (optional) <a class="anchor" id="AutomaticModelTuning">

So far we have simply run individual training jobs without any real attempt to tune hyperparameters to produce a better model.  Selecting the right hyperparameter values to train your model can be difficult, and typically is very time consuming if done manually. The right combination of hyperparameters is dependent on your data and algorithm; some algorithms have many different hyperparameters that can be tweaked; some are very sensitive to the hyperparameter values selected; and most have a non-linear relationship between model fit and hyperparameter values.  SageMaker Automatic Model Tuning helps automate the hyperparameter tuning process:  it runs multiple training jobs with different hyperparameter combinations to find the set with the best model performance.

We begin by specifying the hyperparameters we wish to tune, and the range of values over which to tune each one.  We also must specify an objective metric to be optimized:  in this use case, we'd like to minimize the validation loss.

In [None]:
# from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner

# hyperparameter_ranges = {
#   'learning_rate': ContinuousParameter(0.001, 0.2, scaling_type="Logarithmic"),
#   'epochs': IntegerParameter(10, 50),
#   'batch_size': IntegerParameter(64, 256),
# }

# metric_definitions = [{'Name': 'loss',
#                        'Regex': ' loss: ([0-9\\.]+)'},
#                      {'Name': 'val_loss',
#                        'Regex': ' val_loss: ([0-9\\.]+)'}]

# objective_metric_name = 'val_loss'
# objective_type = 'Minimize'

Next we specify a HyperparameterTuner object that takes the above definitions as parameters.  Each tuning job must be given a budget:  a maximum number of training jobs.  A tuning job will complete after that many training jobs have been executed.  

We also can specify how much parallelism to employ, in this case five jobs, meaning that the tuning job will complete after three series of five jobs in parallel have completed.  For the default Bayesian Optimization tuning strategy used here, the tuning search is informed by the results of previous groups of training jobs, so we don't run all of the jobs in parallel, but rather divide the jobs into groups of parallel jobs.  There is a trade-off: using more parallel jobs will finish tuning sooner, but likely will sacrifice tuning search accuracy. 

Now we can launch a hyperparameter tuning job by calling the `fit` method of the HyperparameterTuner object.  The tuning job may take around 10 minutes to finish.  While you're waiting, the status of the tuning job, including metadata and results for invidual training jobs within the tuning job, can be checked in the SageMaker console in the **Hyperparameter tuning jobs** panel.  

In [None]:
# tuner = HyperparameterTuner(hosted_estimator,
#                             objective_metric_name,
#                             hyperparameter_ranges,
#                             metric_definitions,
#                             max_jobs=15,
#                             max_parallel_jobs=5,
#                             objective_type=objective_type)

# tuning_job_name = "tf-2-workflow-{}".format(strftime("%d-%H-%M-%S", gmtime()))
# tuner.fit(inputs, job_name=tuning_job_name)
# tuner.wait()

After the tuning job is finished, we can use the `HyperparameterTuningJobAnalytics` object from the SageMaker Python SDK to list the top 5 tuning jobs with the best performance. Although the results vary from tuning job to tuning job, the best validation loss from the tuning job (under the FinalObjectiveValue column) likely will be substantially lower than the validation loss from the hosted training job above, where we did not perform any tuning other than manually increasing the number of epochs once.  

In [None]:
# tuner_metrics = sagemaker.HyperparameterTuningJobAnalytics(tuning_job_name)
# tuner_metrics.dataframe().sort_values(['FinalObjectiveValue'], ascending=True).head(5)

The total training time and training jobs status can be checked with the following lines of code. Because automatic early stopping is by default off, all the training jobs should be completed normally.  For an example of a more in-depth analysis of a tuning job, see the SageMaker official sample [HPO_Analyze_TuningJob_Results.ipynb](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/hyperparameter_tuning/analyze_results/HPO_Analyze_TuningJob_Results.ipynb) notebook.

In [None]:
# total_time = tuner_metrics.dataframe()['TrainingElapsedTimeSeconds'].sum() / 3600
# print("The total training time is {:.2f} hours".format(total_time))
# tuner_metrics.dataframe()['TrainingJobStatus'].value_counts()

##  SageMaker hosted endpoint (optional) <a class="anchor" id="SageMakerHostedEndpoint">

Assuming the best model from the tuning job is better than the model produced by the individual hosted training job above, we could now easily deploy that model to production.  A convenient option is to use a SageMaker hosted endpoint, which serves real time predictions from the trained model (For asynchronous, offline predictions on large datasets, you can use either SageMaker Processing or SageMaker Batch Transform.). The endpoint will retrieve the TensorFlow SavedModel created during training and deploy it within a SageMaker TensorFlow Serving container. This all can be accomplished with one line of code.  

More specifically, by calling the `deploy` method of the HyperparameterTuner object we instantiated above, we can directly deploy the best model from the tuning job to a SageMaker hosted endpoint.  It will take a couple of minutes longer to deploy the model to the hosted endpoint compared to a prototyping tool such as a Local Mode endpoint, which is more useful for fast prototyping of inference code.  This is due to the fact that creating a SageMaker hosted endpoint involves spinning up one or more new instances, pulling the relevant inference Docker container and  model, in a production-ready environment prepared to serve traffic, potentially with autoscaling to handle spiky traffic patterns.

In [13]:
# # tuning_predictor = tuner.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge')  # use this if you ran hyperparameter tuning 
# tuning_predictor = hosted_estimator.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge')

update_endpoint is a no-op in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


----

KeyboardInterrupt: 

We can now generate predictions from this endpoint using the `predict` method of the Predictor object we instantiated above.

In [None]:
# results = tuning_predictor.predict(x_test[:10])['predictions'] 
# flat_list = [float('%.1f'%(item)) for sublist in results for item in sublist]
# print('predictions: \t{}'.format(np.array(flat_list)))
# print('target values: \t{}'.format(y_test[:10].round(decimals=1)))

To avoid billing charges from stray resources, you can delete the prediction endpoint to release its associated instance(s).

In [None]:
# sess.delete_endpoint(tuning_predictor.endpoint_name)

## Workflow Automation with SageMaker Pipelines <a class="anchor" id="WorkflowAutomation">

In the previous parts of this notebook, we prototyped various steps of a TensorFlow project within the notebook itself, with some steps being run on external SageMaker resources (data preprocessing, hosted training, model tuning, hosted endpoints).  Notebooks are great for prototyping, but generally are  not used in production-ready machine learning pipelines.  

A very simple pipeline in SageMaker includes processing the dataset to get it reading for training, performing the actual training, and then using the model to perform some form of inference such as batch prediction (scoring). We'll use SageMaker Pipelines to automate these steps, keeping the pipeline simple for now: it easily can be extended into a far more complex pipeline with conditional steps and more.

### Pipeline parameters <a class="anchor" id="PipelineParameters">

Before we begin to create the pipeline itself, we should think about how to parameterize it.  For example, we may use different instance types for different purposes, such as CPU-based types for data processing and GPU-based or more powerful types for model training.  These are all "knobs" of the pipeline that we can parameterize.  Parameterizing enables custom pipeline executions and schedules without having to modify the pipeline definition.
    
In the next cell several different pipeline parameters are specified.  These include not only package versions, but also input data location and hardware types and counts.  Many others are possible.

In [14]:
from sagemaker.workflow.parameters import ParameterInteger, ParameterString

# package versions
sklearn_version = ParameterString(name="SKLearnVersion", default_value="0.23-1")
tensorflow_version = ParameterString(name="TensorFlowVersion", default_value="2.3.1")
python_version = ParameterString(name="PythonVersion", default_value="py37")

# raw input data
input_data = ParameterString(name="InputData", default_value=raw_s3)

# processing step parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=2)

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.c5.2xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

# batch inference step parameters
batch_instance_type = ParameterString(name="BatchInstanceType", default_value="ml.c5.xlarge")
batch_instance_count = ParameterInteger(name="BatchInstanceCount", default_value=1)

### Processing Step <a class="anchor" id="ProcessingStep">

The first step in the pipeline will preprocess the data to prepare it for training. We create a `SKLearnProcessor` object similar to the one above, but now parameterized so we can separately track and change the job configuration as needed, for example to increase the instance type size and count to accommodate a growing dataset.

In [15]:
from sagemaker.sklearn.processing import SKLearnProcessor

role = sagemaker.get_execution_role()

sklearn_processor = SKLearnProcessor(
    framework_version=sklearn_version.default_value,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="tf-2-workflow-process",
    sagemaker_session=sess,
    role=role )

In [16]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="TF2Process",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input", s3_data_distribution_type='ShardedByS3Key'),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="./preprocessing.py" )

### Training and Model Creation Steps <a class="anchor" id="TrainingModelCreation">

The following code sets up a pipeline step for a training job. As mentioned above, the pipeline is parameterized to specify which SageMaker prebuilt TensorFlow 2 training container to use for the job.  As newer TensorFlow versions are released, the pipeline can be re-executed with the newer versions simply by updating the parameter.

In [17]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel

image_uri_train = sagemaker.image_uris.retrieve(
                                        framework="tensorflow",
                                        region=region,
                                        version=tensorflow_version.default_value,
                                        py_version=python_version.default_value,
                                        instance_type=training_instance_type,
                                        image_scope="training" )

Next, we specify an `Estimator` object, and define a `TrainingStep` to insert the training job in the pipeline with inputs from the previous SageMaker Processing step.

In [18]:
import time

model_path = f"s3://{bucket}/TF2WorkflowTrain"
training_parameters = {'epochs': 44, 'batch_size': 128, 'learning_rate': 0.0125, 'for_pipeline': 'true'}

estimator = TensorFlow(
    image_uri=image_uri_train,
    git_config=git_config,
    source_dir='tf-2-workflow-smpipelines/train_model',
    entry_point='train.py',
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    role=role,
    base_job_name="tf-2-workflow-train",
    output_path=model_path,
    hyperparameters=training_parameters )

In [19]:
step_train = TrainingStep(
    name="TF2WorkflowTrain",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri
        )
    },
)

As another step, we create a SageMaker `Model` object to wrap the model artifact, and associate it with a separate SageMaker prebuilt TensorFlow Serving inference container to potentially use later, for example in a SageMaker hosted endpoint.

In [20]:
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep

image_uri_inference = sagemaker.image_uris.retrieve(
                                        framework="tensorflow",
                                        region=region,
                                        version=tensorflow_version.default_value,
                                        py_version=python_version.default_value,
                                        instance_type=batch_instance_type,
                                        image_scope="inference" )
model = Model(
    image_uri=image_uri_inference,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role )

inputs_model = CreateModelInput(
    instance_type=batch_instance_type )

step_create_model = CreateModelStep(
    name="TF2WorkflowCreateModel",
    model=model,
    inputs=inputs_model )

### Batch Scoring Step <a class="anchor" id="BatchScoringStep">
    
The final step in this pipeline is offline, batch scoring (inference/prediction).  The inputs to this step will be the model we trained earlier, and the test data.  A simple, ordinary Python script is all we need to do the actual batch inference.

In [21]:
%%writefile batch-score.py

import os
import subprocess
import sys
import numpy as np
import pathlib
import tarfile

def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

if __name__ == "__main__":
    
    install('tensorflow==2.3.1')
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path, 'r:gz') as tar:
        tar.extractall('./model')
    import tensorflow as tf
    model = tf.keras.models.load_model('./model/1')
    test_path = "/opt/ml/processing/test/"
    x_test = np.load(os.path.join(test_path, 'x_test.npy'))
    y_test = np.load(os.path.join(test_path, 'y_test.npy'))
    scores = model.evaluate(x_test, y_test, verbose=2)
    print("\nTest MSE :", scores)
    
    output_dir = "/opt/ml/processing/batch"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    evaluation_path = f"{output_dir}/score-report.txt"
    with open(evaluation_path, 'w') as writer:
         writer.write(f"Test MSE : {scores}")

Writing batch-score.py


In regard to the SageMaker features we could use to perform batch scoring, we have several choices, including SageMaker Processing and SageMaker Batch Transform.  We'll use SageMaker Processing here.

In [22]:
batch_scorer = SKLearnProcessor(
                    framework_version=sklearn_version.default_value,
                    instance_type=batch_instance_type,
                    instance_count=batch_instance_count,
                    base_job_name="tf-2-workflow-batch",
                    sagemaker_session=sess,
                    role=role )

step_batch = ProcessingStep(
                    name="TF2WorkflowBatchScoring",
                    processor=batch_scorer,
                    inputs=[
                        ProcessingInput(
                            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                            destination="/opt/ml/processing/model"
                        ),
                        ProcessingInput(
                            source=step_process.properties.ProcessingOutputConfig.Outputs[
                                "test"
                            ].S3Output.S3Uri,
                            destination="/opt/ml/processing/test"
                        )
                    ],
                    outputs=[
                        ProcessingOutput(output_name="batch", source="/opt/ml/processing/batch"),
                    ],
                    code="./batch-score.py" )

### Creating and executing the pipeline <a class="anchor" id="CreatingExecutingPipeline">

With all of the pipeline steps now defined, we can define the pipeline itself as a `Pipeline` object comprising a series of those steps.  Parallel and conditional steps also are possible.

In [23]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=f"TF2Workflow",
    parameters=[sklearn_version,
                tensorflow_version,
                python_version,
                input_data,
                processing_instance_type, 
                processing_instance_count, 
                training_instance_type, 
                training_instance_count,
                batch_instance_type,
                batch_instance_count],
    steps=[step_process, 
           step_train, 
           step_create_model,
           step_batch
          ],
    sagemaker_session=sess )

We can inspect the pipeline definition in JSON format:

In [24]:
import json

definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'SKLearnVersion',
   'Type': 'String',
   'DefaultValue': '0.23-1'},
  {'Name': 'TensorFlowVersion', 'Type': 'String', 'DefaultValue': '2.3.1'},
  {'Name': 'PythonVersion', 'Type': 'String', 'DefaultValue': 'py37'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-154082585954/tf-2-workflow/data/raw'},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 2},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.2xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'BatchInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.xlarge'},
  {'Name': 'BatchInstanceCount', 'Type': 'Integer', 'DefaultValue': 1}],
 'Steps': [{'Name': 'TF2Process',
   'Type': 'Processing',
   'Arguments': {'Processin

After upserting its definition, we can start the pipeline with the `Pipeline` object's `start` method:

In [25]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

We can now confirm that the pipeline is executing.  In the log output below, confirm that `PipelineExecutionStatus` is `Executing`.

In [26]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:154082585954:pipeline/tf2workflow',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:154082585954:pipeline/tf2workflow/execution/gwiy4e3ox2m3',
 'PipelineExecutionDisplayName': 'execution-1621375772882',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2021, 5, 18, 22, 9, 32, 766000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 5, 18, 22, 9, 32, 766000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:154082585954:user-profile/d-tvjg2ugulkio/user-154082585954-1',
  'UserProfileName': 'user-154082585954-1',
  'DomainId': 'd-tvjg2ugulkio'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:154082585954:user-profile/d-tvjg2ugulkio/user-154082585954-1',
  'UserProfileName': 'user-154082585954-1',
  'DomainId': 'd-tvjg2ugulkio'},
 'ResponseMetadata': {'RequestId': '2c1ba5cf-c3e0-4fac-8203-263ddc18df1d',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x

Typically this pipeline should take about 10 minutes to complete.  We can wait for completion by invoking `wait()`. After execution is complete, we can list the status of the pipeline steps.

In [27]:
execution.wait()
execution.list_steps()

[{'StepName': 'TF2WorkflowCreateModel',
  'StartTime': datetime.datetime(2021, 5, 18, 22, 17, 16, 43000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 5, 18, 22, 17, 16, 958000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:154082585954:model/pipelines-gwiy4e3ox2m3-tf2workflowcreatemod-bheso5atwc'}}},
 {'StepName': 'TF2WorkflowBatchScoring',
  'StartTime': datetime.datetime(2021, 5, 18, 22, 17, 15, 992000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 5, 18, 22, 22, 6, 412000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:154082585954:processing-job/pipelines-gwiy4e3ox2m3-tf2workflowbatchscor-niimstrlmw'}}},
 {'StepName': 'TF2WorkflowTrain',
  'StartTime': datetime.datetime(2021, 5, 18, 22, 13, 48, 509000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 5, 18, 22, 17, 15, 698000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',

### Check the score report

After the batch scoring job in the pipeline is complete, the batch scoring report is uploaded to S3.  For simplicity, this report simply states the test mean squared error (MSE), but in general reports can include as much detail as desired.  Reports such as these also can be formatted for use in conditional approval steps in SageMaker Pipelines.  For example, the pipeline could have a condition step that only allows further steps to proceed only if the MSE is lower than some threshold.  

In [28]:
report_path = f"{step_batch.outputs[0].destination}/score-report.txt"
!aws s3 cp {report_path} ./score-report.txt && cat score-report.txt

download: s3://sagemaker-us-east-1-154082585954/tf-2-workflow-batch-2021-05-18-22-09-28-934/output/batch/score-report.txt to ./score-report.txt
Test MSE : 19.270126342773438

## ML Lineage Tracking <a class="anchor" id="LineageOfPipelineArtifacts">

SageMaker ML Lineage Tracking creates and stores information about the steps of a ML workflow from data preparation to model deployment. With the tracking information you can reproduce the workflow steps, track model and dataset lineage, and establish model governance and audit standards.

Let's now check out the lineage of the model generated by the pipeline above.  The lineage table identifies the resources used in training, including the timestamped train and test data sources, and the specific version of the TensorFlow 2 container in use during the training job.  

In [29]:
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(execution.list_steps()):
    if execution_step['StepName'] == 'TF2WorkflowTrain':
        display(viz.show(pipeline_execution_step=execution_step))

Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...cess-2021-05-18-22-09-28-163/output/test,Input,DataSet,ContributedTo,artifact
1,s3://...ess-2021-05-18-22-09-28-163/output/train,Input,DataSet,ContributedTo,artifact
2,76310...s.com/tensorflow-training:2.3.1-cpu-py37,Input,Image,ContributedTo,artifact
3,s3://...flowTrain-PFypc8UUe4/output/model.tar.gz,Output,Model,Produced,artifact


## Extensions <a class="anchor" id="Extensions">

We've covered a lot of content in this notebook:  SageMaker Processing for data transformation, prototyping training and inference code, Automatic Model Tuning, and SageMaker hosted training and inference.  These are central elements for most machine learning workflows in SageMaker.  Additionally, we examined how SageMaker Pipelines helps automate deep learning workflows after completion of the prototyping phase of a project.

Besides all of the SageMaker features explored above, there are many other features that may be applicable to your project.  For example, to handle common problems during model training such as vanishing or exploding gradients, **SageMaker Debugger** is useful.  To manage common problems such as data drift after a model is in production, **SageMaker Model Monitor** can be applied.