# Orchestrating Jobs with Amazon SageMaker Model Building Pipelines

- Amazon SageMaker Model Building Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines.
- It also enables then to deploy custom-build models for inference in real-time with low latency, run offline inferences with Batch Transform, and track lineage of artifacts.
- They can institute sound operational practices in deploying and monitoring production workflows, deploying model artifacts, and tracking artifact lineage through a simple interface, adhering to safety and best practice paradigms for ML application development.

## SageMaker Pipelines

SageMaker Pipelines supports the following activities, which are demonstrated in this notebook:

* Pipelines - A DAG of steps and conditions to orchestrate SageMaker jobs and resource creation.
* Processing job steps - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.
* Training job steps - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.
* Conditional execution steps - A step that provides conditional execution of branches in a pipeline.
* Register model steps - A step that creates a model package resource in the Model Registry that can be used to create deployable models in Amazon SageMaker.
* Create model steps - A step that creates a model for use in transform steps or later publication as an endpoint.
* Transform job steps - A batch transform to preprocess datasets to remove noise or bias that interferes with training or inference from a dataset, get inferences from large datasets, and run inference when a persistent endpoint is not needed.
* Fail steps - A step that stops a pipeline execution and marks the pipeline execution as failed.
* Parametrized Pipeline executions - Enables variation in pipeline executions according to specified parameters.

## Notebook Overview

This notebook shows how to:
- Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.
- Define a Processing step that performs cleaning, feature engineering, and splitting the input data into train and test data sets.
- Define a Training step that trains a model on the preprocessed train data set.
- Define a Processing step that evaluates the trained model's performance on the test dataset
- Define a Create Model step that creates a model from the model artifacts used in training.
- Define a Transform step that performs batch transformation based on the model that was created.
- Define a Register Model step that creates a model package from the estimator and model artifacts used to train the model.
- Define a Conditional step that measures a condition based on output from prior steps and conditionally executes other steps.
- Define a Fail step with a customized error message indicating the cause of the execution failure.
- Define and create a Pipeline definition in a DAG, with the defined parameters and steps.
- Start a Pipeline execution and wait for execution to complete.
- Download the model evaluation report from the S3 bucket for examination.
- Start a second Pipeline execution

## A SageMaker Pipeline

- The pipeline that you create follows a typical machine learning (ML) application pattern of preprocessing, training, evaluation, model creation, batch transformation, and model registration

## Dataset

The dataset you use is the [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone) [1].  The aim for this task is to determine the age of an abalone from its physical measurements. At the core, this is a regression problem.

The dataset contains several features: length (the longest shell measurement), diameter (the diameter perpendicular to length), height (the height with meat in the shell), whole_weight (the weight of whole abalone), shucked_weight (the weight of meat), viscera_weight (the gut weight after bleeding), shell_weight (the weight after being dried), sex ('M', 'F', 'I' where 'I' is Infant), and rings (integer).

The number of rings turns out to be a good approximation for age (age is rings + 1.5). However, to obtain this number requires cutting the shell through the cone, staining the section, and counting the number of rings through a microscope, which is a time-consuming task. However, the other physical measurements are easier to determine. You use the dataset to build a predictive model of the variable rings through these other physical measurements.

Before you upload the data to an S3 bucket, upgrade the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/) to the latest version and gather some constants you can use later in this notebook.

[1] Dua, D. and Graff, C. (2019). [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml). Irvine, CA: University of California, School of Information and Computer Science.

In [2]:
# !pip install --upgrade sagemaker

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
Collecting sagemaker
  Downloading sagemaker-2.86.2.tar.gz (521 kB)
     |████████████████████████████████| 521 kB 16.9 MB/s            
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting attrs==20.3.0
  Using cached attrs-20.3.0-py2.py3-none-any.whl (49 kB)
Building wheels for collected packages: sagemaker
  Building wheel for sagemaker (setup.py) ... [?25ldone
[?25h  Created wheel for sagemaker: filename=sagemaker-2.86.2-py2.py3-none-any.whl size=720870 sha256=70bed023e617d62ed3b85b4345201ba1cb3f1bcda7866db4fb8f2a2bbb9852df
  Stored in directory: /root/.cache/pip/wheels/ea/94/35/1b815c5bdf63f8947160a8e6a4eb12f4bb05bd6f9cc773176a
Successfully built sagemaker
Installing collected packages: attrs, sagemaker
  Attempting uninstall: attrs
    Found existing installation: attrs 19.3.0
    Uninstalling attrs-19.3.0:
      Successfully uninstalled attrs-19.3.0
  Attempting uni

In [1]:
import boto3
import sagemaker


sagemaker_session = sagemaker.session.Session()

region = sagemaker_session.boto_region_name
print("region name: ", region)

role = sagemaker.get_execution_role()
print("sagemaker role: ", role)

default_bucket = sagemaker_session.default_bucket()
print("default bucket: ", default_bucket)

model_package_group_name = f"AbaloneModelPackageGroupName"

region name:  ap-northeast-2
sagemaker role:  arn:aws:iam::988889742134:role/service-role/AmazonSageMaker-ExecutionRole-20220315T092490
default bucket:  sagemaker-ap-northeast-2-988889742134


Now, upload the data into the default bucket.
You can select our own data set for the 'input_data_uri' as is appropriate

In [2]:
!mkdir -p data

In [3]:
# Download abalone.csv to local
local_path = "data/abalone-dataset.csv"

s3_resource = boto3.resource("s3")

s3_resource.Bucket(f"sagemaker-sample-files").download_file(
    "datasets/tabular/uci_abalone/abalone.csv",
    local_path
)

In [4]:
# Upload abalone.csv to s3 default bucket
base_uri = f"s3://{default_bucket}/abalone"

input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri
)
print(input_data_uri)

s3://sagemaker-ap-northeast-2-988889742134/abalone/abalone-dataset.csv


Download a second dataset for batch transformation after model creation.
You can select our own dataset for batch_data_uri as is appropriate.

In [5]:
# Download abalone-dataset-batch.txt to local
local_path = "data/abalone-dataset-batch"

s3_resource = boto3.resource("s3")

s3_resource.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch",
    local_path
)

In [6]:
# Upload dbalone-dataset-batch.txt to s3 default bucket
base_uri = f"s3://{default_bucket}/abalone"

batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)

print(batch_data_uri)

s3://sagemaker-ap-northeast-2-988889742134/abalone/abalone-dataset-batch


Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

* `ParameterString` - represents a `str` Python type
* `ParameterInteger` - represents an `int` Python type
* `ParameterFloat` - represents a `float` Python type

These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.

The parameters defined in this workflow include:

* `processing_instance_type` - The `ml.*` instance type of the processing job.
* `processing_instance_count` - The instance count of the processing job.
* `training_instance_type` - The `ml.*` instance type of the training job.
* `model_approval_status` - The approval status to register with the trained model for CI/CD purposes ("PendingManualApproval" is the default).
* `input_data` - The S3 bucket URI location of the input data.
* `batch_data` - The S3 bucket URI location of the batch data.
* `mse_threshold` - The Mean Squared Error (MSE) threshold used to verify the accuracy of a model.

In [7]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat
)


processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)

processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)

training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri
)

batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri
)

mse_threshold = ParameterFloat(
    name="MseThreshold",
    default_value=6.0
)

### Define a Processing Step for Feature Engineering

First, develop a preprocessing script that is specified in the Processing step.

This notebook cell writes a file `preprocessing_abalone.py`, which contains the preprocessing script. You can update the script, and rerun this cell to overwrite. The preprocessing script uses `scikit-learn` to do the following:

* Fill in missing sex category data and encode it so that it is suitable for training.
* Scale and normalize all numerical fields, aside from sex and rings numerical data.
* Split the data into training, validation, and test datasets.

The Processing step executes the script on the input data. The Training step uses the preprocessed training features and labels to train a model. The Evaluation step uses the trained model and preprocessed test features and labels to evaluate the model.

In [8]:
!mkdir -p abalone

### Raw abalone-dataset

In [9]:
import numpy as np
import pandas as pd


local_dataset_dir = "data/abalone-dataset.csv"

feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}

def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z

df = pd.read_csv(
    local_dataset_dir,
    header=None,
    names=feature_columns_names + [label_column],
    dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
)

In [10]:
df.head()

Unnamed: 0,sex,length,diameter,height,whole_weight,shucked_weight,viscera_weight,shell_weight,rings
0,M,0.455,0.365,0.095,0.514,0.2245,0.101,0.15,15.0
1,M,0.35,0.265,0.09,0.2255,0.0995,0.0485,0.07,7.0
2,F,0.53,0.42,0.135,0.677,0.2565,0.1415,0.21,9.0
3,M,0.44,0.365,0.125,0.516,0.2155,0.114,0.155,10.0
4,I,0.33,0.255,0.08,0.205,0.0895,0.0395,0.055,7.0


In [11]:
df.shape

(4177, 9)

### Preprocessed abalone-dataset

In [12]:
%%writefile abalone/preprocessing.py
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
    )

    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Overwriting abalone/preprocessing.py


Next, create an instance of a `SKLearnProcessor` processor and use that in our `ProcessingStep`.

You also specify the `framework_version` to use throughout this notebook.

Note the `processing_instance_type` and `processing_instance_count` parameters used by the processor instance.

In [13]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role
)

Finally, use the processor instance to construct a `ProcessingStep`, along with the input and output channels, and the code that will be executed when the pipeline invokes pipeline execution. This is similar to a processor instance's `run` method in the Python SDK.

Note the `input_data` parameters passed into `ProcessingStep` is the input data used in the step. This input data is used by the processor instance when it is run.

Also, note the `"train_data"` and `"test_data"` named channels specified in the output configuration for the processing job. Step `Properties` can be used in subsequent steps and resolve to their runtime values at execution. Specifically, this usage is called out when you define the training step.

In [14]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=input_data,
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train"
        ),
        ProcessingOutput(
            output_name="validation",
            source="/opt/ml/processing/validation"
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/test"
        )
    ],
    code="abalone/preprocessing.py"
)

### Temporally, Define processing step

In [27]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbalonePipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        input_data
    ],
    steps=[step_process]
)

In [28]:
import json


definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-northeast-2-988889742134/abalone/abalone-dataset.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/c

In [29]:
# Submit SageMaker pipeline definition to SageMaker pipeline

pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline',
 'ResponseMetadata': {'RequestId': '81f51753-6388-4619-b4ff-19ee05b07fc9',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '81f51753-6388-4619-b4ff-19ee05b07fc9',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Mon, 18 Apr 2022 04:45:23 GMT'},
  'RetryAttempts': 0}}

In [30]:
execution = pipeline.start()

In [31]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline/execution/9xr4ptrf69u6',
 'PipelineExecutionDisplayName': 'execution-1650257172532',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'abalonepipeline',
  'TrialName': '9xr4ptrf69u6'},
 'CreationTime': datetime.datetime(2022, 4, 18, 4, 46, 12, 471000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 4, 18, 4, 46, 12, 471000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:user-profile/d-acqzeujeoeou/jamie',
  'UserProfileName': 'jamie',
  'DomainId': 'd-acqzeujeoeou'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:user-profile/d-acqzeujeoeou/jamie',
  'UserProfileName': 'jamie',
  'DomainId': 'd-acqzeujeoeou'},
 'ResponseMetadata': {'RequestId': 'fc542f27-3cbf

In [32]:
execution.wait()

In [33]:
execution.list_steps()

[{'StepName': 'AbaloneProcess',
  'StartTime': datetime.datetime(2022, 4, 18, 4, 46, 13, 333000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 4, 18, 4, 50, 21, 99000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:processing-job/pipelines-9xr4ptrf69u6-abaloneprocess-k87wweumao'}}}]

### Extraction of Preprocessed train/test data

In [52]:
def get_preprocessed_data(execution, client, kind=0):
    """
    :parameters
    execution: the variable of pipeline.start(); instance
    client: boto3 sagemaker client
    kind: (kind=0 -> train, kind=1 -> validation)
    """
    
    pipeline_step_list = execution.list_steps()
    
    step_process_pipeline_arn = pipeline_step_list[0]['Metadata']['ProcessingJob']['Arn']
#     print("step process pipeline arn: ", step_process_pipeline_arn)
    
    step_process_pipeline_job_name = step_process_pipeline_arn.split('/')[-1]
#     print("step process pipeline job name: ", step_process_pipeline_job_name)
    
    step_process_pipeline_info = client.describe_processing_job(
        ProcessingJobName=step_process_pipeline_job_name
    )
    
    preprocessed_data_artifact = step_process_pipeline_info['ProcessingOutputConfig']['Outputs'][kind]['S3Output']['S3Uri']
    
    preprocessed_data_artifact_split = preprocessed_data_artifact.split("/")
    preprocessed_data_artifact_prefix = "/".join(preprocessed_data_artifact_split[3:])
    
    return preprocessed_data_artifact_prefix

In [53]:
import boto3


sagemaker_client = boto3.client("sagemaker")

train_preprocessed_data_artifact = get_preprocessed_data(execution, sagemaker_client, kind=0)
print("train preprocessed data artifact s3 uri: ", train_preprocessed_data_artifact)

validation_preprocessed_data_artifact = get_preprocessed_data(execution, sagemaker_client, kind=1)
print("validation preprocessed data artifact s3 uri: ", validation_preprocessed_data_artifact)

train preprocessed data artifact s3 uri:  AbaloneProcess-e6ff493b4b79fe2e2f2146b46607be87/output/train
validation preprocessed data artifact s3 uri:  AbaloneProcess-e6ff493b4b79fe2e2f2146b46607be87/output/validation


In [43]:
!mkdir -p preprocessed_data

In [54]:
# Download preprocessed train data to local path
local_path = "preprocessed_data/preprocessed_abalone_train_dataset.csv"

s3_resource = boto3.resource("s3")
s3_resource.Bucket(default_bucket).download_file(
    f"{train_preprocessed_data_artifact}/train.csv",
    local_path
)

In [55]:
# Download preprocessed validation data to local path
local_path = "preprocessed_data/preprocessed_abalone_validation_dataset.csv"

s3_resource = boto3.resource("s3")
s3_resource.Bucket(default_bucket).download_file(
    f"{validation_preprocessed_data_artifact}/validation.csv",
    local_path
)

In [56]:
preprocessed_train_df = pd.read_csv(
    "preprocessed_data/preprocessed_abalone_train_dataset.csv",
    header=None
)

In [58]:
preprocessed_train_df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10
0,10.0,-1.948659,-1.843041,-1.423087,-1.426063,-1.378176,-1.410511,-1.392603,0.0,0.0,1.0
1,9.0,1.049379,1.079522,-0.347099,0.71739,0.739548,0.934355,0.637055,0.0,0.0,1.0
2,15.0,-0.491279,-0.583316,0.011563,-0.516473,-0.711319,-0.274574,-0.350832,0.0,0.0,1.0
3,8.0,-0.241443,-0.331371,-0.466653,-0.437954,-0.332832,-0.40231,-0.559186,0.0,1.0,0.0
4,8.0,-0.074885,-0.079426,0.011563,-0.228911,-0.044461,-0.192458,-0.433455,0.0,1.0,0.0


In [59]:
preprocessed_train_df.shape

(2923, 11)

In [62]:
preprocessed_validation_df = pd.read_csv(
    "preprocessed_data/preprocessed_abalone_validation_dataset.csv",
    header=None
)

In [63]:
preprocessed_validation_df.head()

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10
0,8.0,0.25823,0.172519,-0.227545,-0.368613,-0.341843,-0.434244,-0.203547,0.0,1.0,0.0
1,13.0,1.174297,1.633801,1.087551,1.413859,1.51905,1.112273,1.193864,1.0,0.0,0.0
2,10.0,-0.44964,-0.482538,-0.227545,-0.727555,-0.799182,-0.6076,-0.638217,1.0,0.0,0.0
3,4.0,-2.989644,-2.951599,-2.977291,-1.660599,-1.594455,-1.620363,-1.679988,0.0,1.0,0.0
4,13.0,-0.074885,0.12213,-0.107991,-0.240128,-0.100783,-0.33388,-0.243062,0.0,1.0,0.0


In [64]:
preprocessed_validation_df.shape

(627, 11)

### Define a Training Step to Train a Model

In this section, use Amazon SageMaker's [XGBoost Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) to train on this dataset. Configure an Estimator for the XGBoost algorithm and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_dir` so that it can be hosted later.

The model path where the models from training will be saved is also specified.

Note the `training_instance_type` parameter may be used in multiple places in the pipeline. In this case, the `training_instance_type` is passed into the estimator.

In [15]:
from sagemaker.estimator import Estimator


model_path = f"s3://{default_bucket}/AbaloneTrain"

# xgboost image uri
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type
)

# xgboost extimator
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role
)

# xgboost hyperparameters
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0
)

In [16]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation"].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)

### Temporally, Define Training Step

In [91]:
pipeline_name = f"AbalonePipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        input_data
    ],
    steps=[step_process, step_train]
)

In [92]:
import json


definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-northeast-2-988889742134/abalone/abalone-dataset.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-scikit-lear

In [93]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline',
 'ResponseMetadata': {'RequestId': 'dc12d555-c58e-48c7-b5ed-2946105cbd82',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'dc12d555-c58e-48c7-b5ed-2946105cbd82',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Mon, 18 Apr 2022 06:17:55 GMT'},
  'RetryAttempts': 0}}

In [94]:
execution = pipeline.start()

In [95]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline/execution/lpygdhjk3rou',
 'PipelineExecutionDisplayName': 'execution-1650262731759',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'abalonepipeline',
  'TrialName': 'lpygdhjk3rou'},
 'CreationTime': datetime.datetime(2022, 4, 18, 6, 18, 51, 690000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 4, 18, 6, 18, 51, 690000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:user-profile/d-acqzeujeoeou/jamie',
  'UserProfileName': 'jamie',
  'DomainId': 'd-acqzeujeoeou'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:user-profile/d-acqzeujeoeou/jamie',
  'UserProfileName': 'jamie',
  'DomainId': 'd-acqzeujeoeou'},
 'ResponseMetadata': {'RequestId': '10c86df4-2194

In [96]:
execution.wait()

In [97]:
execution.list_steps()

[{'StepName': 'AbaloneTrain',
  'StartTime': datetime.datetime(2022, 4, 18, 6, 23, 25, 478000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 4, 18, 6, 26, 13, 281000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:training-job/pipelines-lpygdhjk3rou-abalonetrain-jh9gjyyi7x'}}},
 {'StepName': 'AbaloneProcess',
  'StartTime': datetime.datetime(2022, 4, 18, 6, 18, 52, 854000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 4, 18, 6, 23, 24, 922000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:processing-job/pipelines-lpygdhjk3rou-abaloneprocess-vkkp01bhr2'}}}]

### Extraction of trained model artifact

In [107]:
def get_train_artifact(execution, client):
    """
    :parameters
    execution: the variable of pipeline.start(); instance
    client: boto3 sagemaker client
    """
    pipeline_list_steps = execution.list_steps()
    
    train_step_pipeline_arn = pipeline_list_steps[0]["Metadata"]["TrainingJob"]["Arn"]
    
    train_step_pipeline_job_name = train_step_pipeline_arn.split("/")[-1]
    
    train_step_pipeline_job_description = client.describe_training_job(TrainingJobName=train_step_pipeline_job_name)
    
    train_step_pipeline_model_artifacts_s3_uri = train_step_pipeline_job_description["ModelArtifacts"]["S3ModelArtifacts"].split("/")
    
    train_step_pipeline_model_artifacts_s3_prefix = "/".join(train_step_pipeline_model_artifacts_s3_uri[3:])
    
    return train_step_pipeline_model_artifacts_s3_prefix

In [108]:
import boto3


sagemaker_client = boto3.client("sagemaker")

train_step_pipeline_model_artifacts_s3_prefix = get_train_artifact(execution, sagemaker_client)
print(train_step_pipeline_model_artifacts_s3_prefix)

AbaloneTrain/pipelines-lpygdhjk3rou-AbaloneTrain-Jh9gJyyI7X/output/model.tar.gz


### Define a Model Evaluation Step to Evaluate the Trained Model

First, develop an evaluation script that is specified in a Processing step that performs the model evaluation.

After pipeline execution, you can examine the resulting `evaluation.json` for analysis.

The evaluation script uses `xgboost` to do the following:

* Load the model.
* Read the test data.
* Issue predictions against the test data.
* Build a classification report, including accuracy and ROC curve.
* Save the evaluation report to the evaluation directory.

In [17]:
%%writefile abalone/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting abalone/evaluation.py


Next, create an instance of a ScriptProcessor processor and use it in the ProcessingStep.

Note tha processing_instance_type parameter passed into the processor.

In [18]:
from sagemaker.processing import ScriptProcessor


# Define evaluation docker image uri
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
)

Use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is similar to a processor instance's `run` method in the Python SDK.

Specifically, the `S3ModelArtifacts` from the `step_train` `properties` and the `S3Uri` of the `"test_data"` output channel of the `step_process` `properties` are passed as inputs. The `TrainingStep` and `ProcessingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) and [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response objects, respectively.

#### Create a PropertyFile

A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.

In [19]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)

In [20]:
step_eval = ProcessingStep(
    name="AbaloneEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation"
        ),
    ],
    code="abalone/evaluation.py",
    property_files=[evaluation_report],
)

### Temporally, Define Evaluation Step

In [116]:
pipeline_name = f"AbalonePipeline"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        input_data
    ],
    steps=[step_process, step_train, step_eval]
)

In [117]:
import json


definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-northeast-2-988889742134/abalone/abalone-dataset.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-scikit-lear

In [118]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline',
 'ResponseMetadata': {'RequestId': '6cce9591-110d-4aa3-861b-6d64b9ec2a12',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6cce9591-110d-4aa3-861b-6d64b9ec2a12',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Mon, 18 Apr 2022 07:16:17 GMT'},
  'RetryAttempts': 0}}

In [119]:
execution = pipeline.start()

In [120]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline/execution/pqx7b9rfgrc5',
 'PipelineExecutionDisplayName': 'execution-1650266197240',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'abalonepipeline',
  'TrialName': 'pqx7b9rfgrc5'},
 'CreationTime': datetime.datetime(2022, 4, 18, 7, 16, 37, 171000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 4, 18, 7, 16, 37, 171000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:user-profile/d-acqzeujeoeou/jamie',
  'UserProfileName': 'jamie',
  'DomainId': 'd-acqzeujeoeou'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:user-profile/d-acqzeujeoeou/jamie',
  'UserProfileName': 'jamie',
  'DomainId': 'd-acqzeujeoeou'},
 'ResponseMetadata': {'RequestId': '25d9f419-14c2

In [121]:
execution.wait()

In [122]:
execution.list_steps()

[{'StepName': 'AbaloneEval',
  'StartTime': datetime.datetime(2022, 4, 18, 7, 23, 55, 322000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 4, 18, 7, 28, 1, 805000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:processing-job/pipelines-pqx7b9rfgrc5-abaloneeval-5sgvmslkdx'}}},
 {'StepName': 'AbaloneTrain',
  'StartTime': datetime.datetime(2022, 4, 18, 7, 20, 54, 203000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 4, 18, 7, 23, 54, 346000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:training-job/pipelines-pqx7b9rfgrc5-abalonetrain-a3u2mvezsv'}}},
 {'StepName': 'AbaloneProcess',
  'StartTime': datetime.datetime(2022, 4, 18, 7, 16, 38, 688000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 4, 18, 7, 20, 48, 759000, tzinfo=tzlocal()),
  'StepSt

### Extract of evaluation report

In [137]:
def get_eval_artifact(execution, client):
    """
    :parameters
    execution: the variable of pipeline.start(); instance
    client: boto3 sagemaker client
    """
    
    pipeline_step_list = execution.list_steps()
    
    eval_step_pipeline_job_name = pipeline_step_list[0]["Metadata"]["ProcessingJob"]["Arn"].split("/")[-1]
    
    eval_step_pipeline_description = client.describe_processing_job(
        ProcessingJobName=eval_step_pipeline_job_name
    )
    
    eval_step_pipeline_artifact_s3_uri = eval_step_pipeline_description["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    
    eval_step_pipeline_artifact_s3_uri_split = eval_step_pipeline_artifact_s3_uri.split("/")
    
    eval_step_pipeline_artifact_s3_prefix = "/".join(eval_step_pipeline_artifact_s3_uri_split[3:])
    
    return eval_step_pipeline_artifact_s3_prefix

In [142]:
import boto3


sagemaker_client = boto3.client("sagemaker")

eval_step_pipeline_artifact_report = get_eval_artifact(execution, sagemaker_client)

In [139]:
!mkdir -p evaluation

In [146]:
# Download preprocessed train data to local path
local_path = "evaluation/evaluation_report.json"

s3_resource = boto3.resource("s3")
s3_resource.Bucket(default_bucket).download_file(
    f"{eval_step_pipeline_artifact_report}/evaluation.json",
    local_path
)

In [148]:
import json


with open("evaluation/evaluation_report.json", "r") as f:
    json_data = json.load(f)

print(json.dumps(json_data))

{"regression_metrics": {"mse": {"value": 4.868807195586715, "standard_deviation": 2.204898278682266}}}


### Define a Create Model Step to Create a Model

In order to perform batch transformation using the example model, create a SageMaker model. 

Specifically, pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.

In [21]:
from sagemaker.model import Model


model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

Supply the model input -- `instance_type` and `accelerator_type` for creating the SageMaker Model and then define the `CreateModelStep` passing in the inputs and the model instance defined before.

In [22]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)

step_create_model = CreateModelStep(
    name="AbaloneCreateModel",
    model=model,
    inputs=inputs,
)

### Define a Transform Step to Perform Batch Transformation

Now that a model instance is defined, create a `Transformer` instance with the appropriate model type, compute instance type, and desired output S3 URI.

Specifically, pass in the `ModelName` from the `CreateModelStep`, `step_create_model` properties. The `CreateModelStep` `properties` attribute matches the object model of the [DescribeModel](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeModel.html) response object.

In [23]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform",
)

Pass in the transformer instance and the `TransformInput` with the `batch_data` pipeline parameter defined earlier.

In [24]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="AbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data)
)

### Define a Register Model Step to Create a Model Package

- Use the estimator instance specified in the training step to construct an instance of RegisterModel.
- The result of executing RegisterModel in a pipeline is a model package.
- A model package is an abstraction of reusable model artifacts that packages all ingredients required for inference.
- Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

- A model package group is a collection of model packages.
- A model package group can be created for a specific ML business problem, and new versions of the model packages can be added to it.
- Typically, customers are expected to create a ModelPackageGroup for a SageMaker pipeline so that model package versions can be added to the group for every SageMaker Pipeline run.

The construction of `RegisterModel` is similar to an estimator instance's `register` method in the Python SDK.

Specifically, pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.

Note that the specific model package group name provided in this notebook can be used in the model registry and CI/CD work with SageMaker Projects.

In [25]:
print("Model Package Group name: ", model_package_group_name)

Model Package Group name:  AbaloneModelPackageGroupName


In [26]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)

### Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed

This section walks you through the following steps:

* Define a `FailStep` with customized error message, which indicates the cause of the execution failure.
* Enter the `FailStep` error message with a `Join` function, which appends a static text string with the dynamic `mse_threshold` parameter to build a more informative error message.

In [27]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join


step_fail = FailStep(
    name="AbaloneMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE > ", mse_threshold])
)

### Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State

In this step, the model is registered only if the accuracy of the model, as determined by the evaluation step `step_eval`, exceeded a specified value. Otherwise, the pipeline execution fails and terminates. A `ConditionStep` enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties.

In the following section, you:

* Define a `ConditionLessThanOrEqualTo` on the accuracy value found in the output of the evaluation step, `step_eval`.
* Use the condition in the list of conditions in a `ConditionStep`.
* Pass the `CreateModelStep` and `TransformStep` steps, and the `RegisterModel` step collection into the `if_steps` of the `ConditionStep`, which are only executed if the condition evaluates to `True`.
* Pass the `FailStep` step into the `else_steps`of the `ConditionStep`, which is only executed if the condition evaluates to `False`.

In [168]:
import json


with open("evaluation/evaluation_report.json", "r") as f:
    json_data = json.load(f)

condition_left = json_data["regression_metrics"]["mse"]["value"]
print("condition left: ", condition_left)

print("condition right: ", mse_threshold.default_value)

condition left:  4.868807195586715
condition right:  6.0


In [28]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

### Define a Pipeline of Parameters, Steps, and Conditions

In this section, combine the steps into a Pipeline so it can be executed.

A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair.

Note:

* All the parameters used in the definitions must be present.
* Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the _data dependency_ DAG as steps for the execution to complete.
* Steps must be unique to across the pipeline step list and all condition step if/else lists.

In [29]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

In [30]:
import json


definition = json.loads(pipeline.definition())
definition

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-northeast-2-988889742134/abalone/abalone-dataset.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-ap-northeast-2-988889742134/abalone/abalone-dataset-batch'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 6.0}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingR

### Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the Pipeline service. The role passed in will be used by the Pipeline service to create all the jobs defined in the steps.

In [31]:
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline',
 'ResponseMetadata': {'RequestId': '5af14ab3-24a6-4920-bdeb-79775adf5a50',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5af14ab3-24a6-4920-bdeb-79775adf5a50',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '88',
   'date': 'Mon, 18 Apr 2022 09:12:10 GMT'},
  'RetryAttempts': 0}}

Start the pipeline and accept all the default parameters.

In [32]:
execution = pipeline.start()

### Pipeline Operations: Examining and Waiting for Pipeline Execution

Describe the pipeline execution.

In [33]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:pipeline/abalonepipeline/execution/deu6n0phd4e0',
 'PipelineExecutionDisplayName': 'execution-1650273183256',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'abalonepipeline',
  'TrialName': 'deu6n0phd4e0'},
 'CreationTime': datetime.datetime(2022, 4, 18, 9, 13, 3, 196000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 4, 18, 9, 13, 3, 196000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:user-profile/d-acqzeujeoeou/jamie',
  'UserProfileName': 'jamie',
  'DomainId': 'd-acqzeujeoeou'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:user-profile/d-acqzeujeoeou/jamie',
  'UserProfileName': 'jamie',
  'DomainId': 'd-acqzeujeoeou'},
 'ResponseMetadata': {'RequestId': 'd7d6ee38-88ea-4

In [34]:
execution.wait()

### Examining the Evaluation

Examine the resulting model evaluation after the pipeline completes.

Download the resulting evaluation.json file from S3 and print the report.

In [35]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)

pprint(json.loads(evaluation_json))

{'regression_metrics': {'mse': {'standard_deviation': 2.239582466868699,
                                'value': 5.015788087840755}}}


### Lineage

Review the lineage of the artifacts generated by the pipeline.

In [38]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

{'StepName': 'AbaloneProcess', 'StartTime': datetime.datetime(2022, 4, 18, 9, 13, 4, 204000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 4, 18, 9, 17, 11, 139000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:processing-job/pipelines-deu6n0phd4e0-abaloneprocess-tihde7qc77'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...46b46607be87/input/code/preprocessing.py,Input,DataSet,ContributedTo,artifact
1,s3://...988889742134/abalone/abalone-dataset.csv,Input,DataSet,ContributedTo,artifact
2,36674...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...493b4b79fe2e2f2146b46607be87/output/test,Output,DataSet,Produced,artifact
4,s3://...79fe2e2f2146b46607be87/output/validation,Output,DataSet,Produced,artifact
5,s3://...93b4b79fe2e2f2146b46607be87/output/train,Output,DataSet,Produced,artifact


{'StepName': 'AbaloneTrain', 'StartTime': datetime.datetime(2022, 4, 18, 9, 17, 11, 655000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 4, 18, 9, 20, 0, 324000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:training-job/pipelines-deu6n0phd4e0-abalonetrain-fkvzceuo0g'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...79fe2e2f2146b46607be87/output/validation,Input,DataSet,ContributedTo,artifact
1,s3://...93b4b79fe2e2f2146b46607be87/output/train,Input,DataSet,ContributedTo,artifact
2,36674...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...loneTrain-fkVZCEUO0G/output/model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'AbaloneEval', 'StartTime': datetime.datetime(2022, 4, 18, 9, 20, 1, 222000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 4, 18, 9, 24, 16, 819000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:processing-job/pipelines-deu6n0phd4e0-abaloneeval-ua1ouzfpff'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...394e09ce2edc51e/input/code/evaluation.py,Input,DataSet,ContributedTo,artifact
1,s3://...493b4b79fe2e2f2146b46607be87/output/test,Input,DataSet,ContributedTo,artifact
2,s3://...loneTrain-fkVZCEUO0G/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,36674...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...3b74404394e09ce2edc51e/output/evaluation,Output,DataSet,Produced,artifact


{'StepName': 'AbaloneMSECond', 'StartTime': datetime.datetime(2022, 4, 18, 9, 24, 17, 662000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 4, 18, 9, 24, 17, 982000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'Condition': {'Outcome': 'True'}}}


None

{'StepName': 'AbaloneRegisterModel', 'StartTime': datetime.datetime(2022, 4, 18, 9, 24, 18, 618000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 4, 18, 9, 24, 19, 778000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:model-package/abalonemodelpackagegroupname/1'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...loneTrain-fkVZCEUO0G/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,36674...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
2,abalonemodelpackagegroupname-1-PendingManualAp...,Input,Approval,ContributedTo,action
3,AbaloneModelPackageGroupName-1650273859-aws-mo...,Output,ModelGroup,AssociatedWith,context


{'StepName': 'AbaloneCreateModel', 'StartTime': datetime.datetime(2022, 4, 18, 9, 24, 18, 618000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 4, 18, 9, 24, 19, 851000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:model/pipelines-deu6n0phd4e0-abalonecreatemodel-vepa4pmukt'}}}


None

{'StepName': 'AbaloneTransform', 'StartTime': datetime.datetime(2022, 4, 18, 9, 24, 20, 552000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 4, 18, 9, 28, 45, 482000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:988889742134:transform-job/pipelines-deu6n0phd4e0-abalonetransform-fpovffrntf'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...loneTrain-fkVZCEUO0G/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,36674...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
2,s3://...8889742134/abalone/abalone-dataset-batch,Input,DataSet,ContributedTo,artifact
3,s3://...ortheast-2-988889742134/AbaloneTransform,Output,DataSet,Produced,artifact


### Parametrized Executions

You can run additional executions of the pipeline and specify different pipeline parameters. The `parameters` argument is a dictionary containing parameter names, and where the values are used to override the defaults values.

Based on the performance of the model, you might want to kick off another pipeline execution on a compute-optimized instance type and set the model approval status to "Approved" automatically. This means that the model package version generated by the `RegisterModel` step is automatically ready for deployment through CI/CD pipelines, such as with SageMaker Projects.

In [39]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ModelApprovalStatus="Approved",
    )
)

In [None]:
execution.wait()