## Workflow Creation using SageMaker Pipelines - Complete

This notebook shows how to:

1. Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.
2. Define a Processing step that performs cleaning, feature engineering, and splitting the input data into train and test data sets.
3. Define a Training step that trains a model on the preprocessed train data set.
4. Define a Processing step that evaluates the trained model's performance on the test dataset.
5. Define a Create Model step that creates a model from the model artifacts used in training.
6. Define a Transform step that performs batch transformation based on the model that was created.
7. Define a Register Model step that creates a model package from the estimator and model artifacts used to train the model.
8. Define a Conditional step that measures a condition based on output from prior steps and conditionally executes other steps.
9. Define and create a Pipeline definition in a DAG, with the defined parameters and steps.
10. Start a Pipeline execution and wait for execution to complete.
11. Download the model evaluation report from the S3 bucket for examination.
12. Start a second Pipeline execution.

![A typical ML Application pipeline](./img/pipeline-full.png)

#### Imports 

In [1]:
from sagemaker.workflow.parameters import ParameterInteger,ParameterString
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep, JsonGet
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import CreateModelStep
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.steps import TransformStep
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.inputs import CreateModelInput
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.inputs import TrainingInput
from sagemaker.estimator import Estimator
from sagemaker.model import Model
import pandas as pd
import sagemaker
import logging
import boto3
import json

##### Setup logger

In [2]:
logger = logging.getLogger('__name__')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

##### Essentials

In [3]:
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
model_package_group_name = f'AbaloneML'

Couldn't call 'get_role' to get Role ARN from role name AmazonSageMaker-ExecutionRole-20210522T230509 to get Role path.
Assuming role was created in SageMaker AWS console, as the name contains `AmazonSageMaker-ExecutionRole`. Defaulting to Role ARN with service-role in path. If this Role ARN is incorrect, please add IAM read permissions to your role or supply the Role Arn directly.


In [4]:
print(f'Default bucket = {bucket}')

Default bucket = sagemaker-us-east-1-892313895307


####  Prep data

The dataset you use is the [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone).  The aim for this task is to determine the age of an abalone from its physical measurements. At the core, this is a regression problem.

Predict age based on physical measurements.

In [5]:
df = pd.read_csv('./data/abalone.csv')
df.head(5)

Unnamed: 0,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
0,M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15
1,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7
2,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9
3,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10
4,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7


Copy data from local to S3

In [6]:
!aws s3 cp ./data/abalone.csv s3://{bucket}/abalone/

upload: data/abalone.csv to s3://sagemaker-us-east-1-892313895307/abalone/abalone.csv


In [7]:
!aws s3 cp ./data/abalone-unlabeled s3://{bucket}/abalone/

upload: data/abalone-unlabeled to s3://sagemaker-us-east-1-892313895307/abalone/abalone-unlabeled


In [8]:
input_data_uri = f's3://{bucket}/abalone/abalone.csv'
batch_data_uri = f's3://{bucket}/abalone/abalone-unlabeled' 

### 1. Define Pipeline-level parameters 

In [9]:
processing_instance_count = ParameterInteger(name='ProcessingInstanceCount', default_value=1)
processing_instance_type = ParameterString(name='ProcessingInstanceType', default_value='ml.m5.xlarge')
training_instance_type = ParameterString(name='TrainingInstanceType', default_value='ml.m5.xlarge')
model_approval_status = ParameterString(name='ModelApprovalStatus', default_value='Approved')
input_data = ParameterString(name='InputData', default_value=input_data_uri)
batch_data = ParameterString(name='BatchData', default_value=batch_data_uri)

![Define Parameters](./img/pipeline-1.png)

### 2. Feature Engineering

* Fill in missing sex category data and encode it so that it is suitable for training.
* Scale and normalize all numerical fields, aside from sex and rings numerical data.
* Split the data into training, validation, and test datasets.

In [10]:
%%writefile src/preprocessing.py
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
import pandas as pd
import numpy as np
import argparse
import requests
import tempfile
import logging
import sklearn
import os


logger = logging.getLogger('__name__')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

logger.info(f'Using Sklearn version: {sklearn.__version__}')


if __name__ == '__main__':
    logger.info('Sklearn Preprocessing Job [Start]')
    base_dir = '/opt/ml/processing'

    df = pd.read_csv(f'{base_dir}/input/abalone.csv')
    y = df.pop('rings')
    cols = df.columns
    logger.info(f'Columns = {cols}')

    numeric_features = list(df.columns)
    numeric_features.remove('sex')
    numeric_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='median')), 
                                          ('scaler', StandardScaler())])

    categorical_features = ['sex']
    categorical_transformer = Pipeline(steps=[('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
                                              ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    preprocess = ColumnTransformer(transformers=[('num', numeric_transformer, numeric_features), 
                                                 ('cat', categorical_transformer, categorical_features)])

    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    pd.DataFrame(train).to_csv(f'{base_dir}/train/train.csv', header=False, index=False)
    pd.DataFrame(validation).to_csv(f'{base_dir}/validation/validation.csv', header=False, index=False)
    pd.DataFrame(test).to_csv(f'{base_dir}/test/test.csv', header=False, index=False)
    logger.info('Sklearn Preprocessing Job [End]')

Overwriting src/preprocessing.py


In [11]:
framework_version = '0.23-1'

sklearn_processor = SKLearnProcessor(framework_version=framework_version, 
                                     instance_type=processing_instance_type, 
                                     instance_count=processing_instance_count, 
                                     base_job_name='sklearn-abalone-preprocess', 
                                     role=role)

In [12]:
step_process = ProcessingStep(name='AbalonePreprocess', 
                              processor=sklearn_processor, 
                              inputs=[ProcessingInput(source=input_data, destination='/opt/ml/processing/input')], 
                              outputs=[ProcessingOutput(output_name='train', source='/opt/ml/processing/train'), 
                                       ProcessingOutput(output_name='validation', source='/opt/ml/processing/validation'), 
                                       ProcessingOutput(output_name='test', source='/opt/ml/processing/test')], 
                              code='src/preprocessing.py')

![Define a Processing Step for Feature Engineering](img/pipeline-2.png)

### 3. Train a Model

In [13]:
model_path = f's3://{bucket}/model'

In [14]:
image_uri = sagemaker.image_uris.retrieve(framework='xgboost', 
                                          region=region, 
                                          version='1.0-1', 
                                          py_version='py3', 
                                          instance_type=training_instance_type)
image_uri

'683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-xgboost:1.0-1-cpu-py3'

In [15]:
xgb_train = Estimator(image_uri=image_uri, 
                      instance_type=training_instance_type, 
                      instance_count=1, 
                      output_path=model_path, 
                      role=role)

In [16]:
xgb_train.set_hyperparameters(objective='reg:squarederror', 
                              num_round=50, 
                              max_depth=5, 
                              eta=0.2, 
                              gamma=4, 
                              min_child_weight=6, 
                              subsample=0.7, 
                              silent=0)

In [17]:
step_train = TrainingStep(name='AbaloneTraining', 
                          estimator=xgb_train, 
                          inputs={'train': TrainingInput(s3_data=step_process.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri, 
                                                         content_type='text/csv'), 
                                  'validation': TrainingInput(s3_data=step_process.properties.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri, 
                                                              content_type='text/csv')})

![Define a Training Step to Train a Model](img/pipeline-3.png)

### 4. Evaluate Trained Model

First, develop an evaluation script that is specified in a Processing step that performs the model evaluation.

After pipeline execution, you can examine the resulting `evaluation.json` for analysis.

The evaluation script uses `xgboost` to do the following:

* Load the model.
* Read the test data.
* Issue predictions against the test data.
* Build a classification report, including accuracy and ROC curve.
* Save the evaluation report to the evaluation directory.

In [18]:
%%writefile src/evaluation.py
from sklearn.metrics import mean_squared_error
import numpy as np
import pandas as pd
import tarfile
import pathlib
import xgboost
import joblib
import pickle
import json


if __name__ == '__main__': 
    model_path = '/opt/ml/processing/model/model.tar.gz'
    
    with tarfile.open(model_path) as tar:
        tar.extractall(path='.')

    model = pickle.load(open('xgboost-model', 'rb'))

    test_path = '/opt/ml/processing/test/test.csv'
    
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {'regression_metrics': 
                   {'mse': 
                    {'value': mse, 'standard_deviation': std}
                   }
                  }

    output_dir = '/opt/ml/processing/evaluation'
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f'{output_dir}/evaluation.json'
    with open(evaluation_path, 'w') as f: 
        f.write(json.dumps(report_dict))

Overwriting src/evaluation.py


In [19]:
from sagemaker.processing import ScriptProcessor

# using the same XGBoost training image from the previous step
script_eval = ScriptProcessor(image_uri=image_uri, 
                              command=['python3'], 
                              instance_type=processing_instance_type, 
                              instance_count=1, 
                              base_job_name='AbaloneEvaluate', 
                              role=role)

In [20]:
evaluation_report = PropertyFile(name='EvaluationReport', output_name='evaluation', path='evaluation.json')

step_eval = ProcessingStep(name='AbaloneEvaluate', 
                           processor=script_eval, 
                           inputs=[ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts, 
                                                   destination='/opt/ml/processing/model'), 
                                   ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri, 
                                                   destination='/opt/ml/processing/test')], 
                           outputs=[ProcessingOutput(output_name='evaluation', source='/opt/ml/processing/evaluation')], 
                           code='src/evaluation.py', 
                           property_files=[evaluation_report])

### 5. Create a Model

In [21]:
# using the same XGBoost container image used for training and evaluation in the previous steps
model = Model(image_uri=image_uri, 
              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, 
              sagemaker_session=sagemaker_session, 
              role=role)

In [22]:
inputs = CreateModelInput(instance_type='ml.m5.large')

step_create_model = CreateModelStep(name='AbaloneCreateModel', 
                                    model=model, 
                                    inputs=inputs)

### 6. Batch Transform Step

In [23]:
transformer = Transformer(model_name=step_create_model.properties.ModelName, 
                          instance_type='ml.m5.xlarge', 
                          instance_count=1, 
                          output_path=f's3://{bucket}/AbaloneTransform')

In [24]:
step_transform = TransformStep(name='AbaloneTransform', 
                               transformer=transformer, 
                               inputs=TransformInput(data=batch_data))

### 7. Register Model Step to Create a Model Package

In [25]:
model_metrics = ModelMetrics(model_statistics=MetricsSource(
    s3_uri='{}/evaluation.json'.format(step_eval.arguments['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']),
    content_type='application/json'))

In [26]:
step_register = RegisterModel(name='AbaloneRegisterModel', 
                              estimator=xgb_train, 
                              model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,  
                              content_types=['text/csv'], 
                              response_types=['text/csv'], 
                              inference_instances=['ml.t2.medium', 'ml.m5.xlarge'], 
                              transform_instances=['ml.m5.xlarge'], 
                              model_package_group_name=model_package_group_name, 
                              approval_status=model_approval_status, 
                              model_metrics=model_metrics)

![Define a Create Model Step and Batch Transform to Process Data in Batch at Scale](img/pipeline-5.png)

### 8. Conditional Step

##### Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry

In this step, the model is registered only if the accuracy of the model, as determined by the evaluation step `step_eval`, exceeded a specified value. A `ConditionStep` enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties. 

In the following section, you:

* Define a `ConditionLessThanOrEqualTo` on the accuracy value found in the output of the evaluation step, `step_eval`.
* Use the condition in the list of conditions in a `ConditionStep`.
* Pass the `CreateModelStep` and `TransformStep` steps, and the `RegisterModel` step collection into the `if_steps` of the `ConditionStep`, which are only executed, if the condition evaluates to `True`.

In [27]:
cond_lte = ConditionLessThanOrEqualTo(left=JsonGet(step=step_eval, 
                                                   property_file=evaluation_report, 
                                                   json_path='regression_metrics.mse.value'), 
                                      right=6.0)

In [28]:
step_cond = ConditionStep(name='AbaloneMSECond', 
                          conditions=[cond_lte], 
                          if_steps=[step_register, step_create_model, step_transform], 
                          else_steps=[])

![Define a Condition Step to Check Accuracy and Conditionally Execute Steps](img/pipeline-6.png)

### 9. Define and create a Pipeline 

In [29]:
pipeline_name = 'AbalonePipeline'

pipeline = Pipeline(name=pipeline_name, 
                    parameters=[processing_instance_type, 
                                processing_instance_count, 
                                training_instance_type,
                                model_approval_status,
                                input_data, 
                                batch_data], 
                    steps=[step_process, step_train, step_eval, step_cond])

![Define a Pipeline of Parameters, Steps, and Conditions](img/pipeline-7.png)

Examine the Pipeline definition

In [30]:
definition = json.loads(pipeline.definition())
definition

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'Approved'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-892313895307/abalone/abalone.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-892313895307/abalone/abalone-unlabeled'}],
 'Steps': [{'Name': 'AbalonePreprocess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688

### 10. Kickstart Pipeline Execution

In [31]:
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:892313895307:pipeline/abalonepipeline',
 'ResponseMetadata': {'RequestId': '4776d426-09ee-40a7-acdf-fba3acee94af',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '4776d426-09ee-40a7-acdf-fba3acee94af',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Mon, 24 May 2021 00:01:24 GMT'},
  'RetryAttempts': 0}}

In [32]:
execution = pipeline.start()

In [33]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:892313895307:pipeline/abalonepipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:892313895307:pipeline/abalonepipeline/execution/ov5id8picd4y',
 'PipelineExecutionDisplayName': 'execution-1621814485614',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2021, 5, 24, 0, 1, 25, 537000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 5, 24, 0, 1, 25, 537000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:892313895307:user-profile/d-dowart1jabkf/team-v',
  'UserProfileName': 'team-v',
  'DomainId': 'd-dowart1jabkf'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:892313895307:user-profile/d-dowart1jabkf/team-v',
  'UserProfileName': 'team-v',
  'DomainId': 'd-dowart1jabkf'},
 'ResponseMetadata': {'RequestId': 'd9981777-dbb0-4c27-ac24-3142eecf6741',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'd9981777-dbb0-4c27-ac24-314

In [34]:
execution.wait()