In [1]:
# Parameters
kms_key = "arn:aws:kms:us-west-2:000000000000:1234abcd-12ab-34cd-56ef-1234567890ab"

## Orchestrate Jobs to Train & Evaluate Models with SageMaker Pipelines (Beta Version)

Amazon SageMaker Pipelines offers machine learning (ML) application developers and operations engineers the ability to orchestrate SageMaker jobs and author reproducible ML pipelines. It also enables them to deploy custom-built models for inference in real-time with low latency, run offline inferences with Batch Transform, and track lineage of artifacts. They can institute sound operational practices in deploying and monitoring production workflows, deploying model artifacts, and tracking artifact lineage through a simple interface, adhering to safety and best practice paradigms for ML application development.

The SageMaker Pipelines service supports a SageMaker Pipeline domain specific language (DSL), which is a declarative JSON specification. This DSL defines a directed acyclic graph (DAG) of pipeline parameters and SageMaker job steps. The SageMaker Python Software Developer Kit (SDK) streamlines the generation of the pipeline DSL using constructs that engineers and scientists are already familiar with.

## Caching Improvements Beta

This notebook leverages a beta sagemaker python sdk package that includes improvements around s3 path organization of code, input, and output artifacts. In addition, your dev account has been allow listed, so the updated default cache key attributes will be reflected when executing the following pipeline steps. 

## Runtime

This notebook takes approximately 15 min to run.

## Contents

1. [SageMaker Pipelines](#SageMaker-Pipelines)
1. [Notebook Overview](#Notebook-Overview)
1. [A SageMaker Pipeline](#A-SageMaker-Pipeline)
1. [Dataset](#Dataset)
1. [Define Parameters to Parametrize Pipeline Execution](#Define-Parameters-to-Parametrize-Pipeline-Execution)
1. [Define a Processing Step for Feature Engineering](#Define-a-Processing-Step-for-Feature-Engineering)
1. [Define a Training Step to Train a Model](#Define-a-Training-Step-to-Train-a-Model)
1. [Define a Model Evaluation Step to Evaluate the Trained Model](#Define-a-Model-Evaluation-Step-to-Evaluate-the-Trained-Model)
1. [Define a Pipeline of Parameters, Steps, and Conditions](#Define-a-Pipeline-of-Parameters,-Steps,-and-Conditions)
1. [Submit the pipeline to SageMaker and start execution](#Submit-the-pipeline-to-SageMaker-and-start-execution)
1. [Pipeline Operations: Examining and Waiting for Pipeline Execution](#Pipeline-Operations:-Examining-and-Waiting-for-Pipeline-Execution)
    1. [Examining the Evaluation](#Examining-the-Evaluation)
    1. [Lineage](#Lineage)
    1. [Note for Beta Users](#Note-for-Beta-Users)

## SageMaker Pipelines

SageMaker Pipelines supports the following activities, which are demonstrated in this notebook:

* Pipelines - A DAG of steps and conditions to orchestrate SageMaker jobs and resource creation.
* Processing job steps - A simplified, managed experience on SageMaker to run data processing workloads, such as feature engineering, data validation, model evaluation, and model interpretation.
* Training job steps - An iterative process that teaches a model to make predictions by presenting examples from a training dataset.
* Parametrized Pipeline executions - Enables variation in pipeline executions according to specified parameters.

## Notebook Overview

This notebook shows how to:

* Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.
* Define a Processing step that performs cleaning, feature engineering, and splitting the input data into train and test data sets.
* Define a Training step that trains a model on the preprocessed train data set.
* Define a Processing step that evaluates the trained model's performance on the test dataset.
* Define and create a Pipeline definition in a DAG, with the defined parameters and steps.
* Start a Pipeline execution and wait for execution to complete.
* Download the model evaluation report from the S3 bucket for examination.
* Start a second Pipeline execution.

## A SageMaker Pipeline

The pipeline that you create follows a shortened version of a typical machine learning (ML) application pattern:  preprocessing, training, and evaluation.

## Dataset

The dataset you use is the [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone) [1].  The aim for this task is to determine the age of an abalone snail from its physical measurements. At the core, this is a regression problem.

The dataset contains several features: length (the longest shell measurement), diameter (the diameter perpendicular to length), height (the height with meat in the shell), whole_weight (the weight of whole abalone), shucked_weight (the weight of meat), viscera_weight (the gut weight after bleeding), shell_weight (the weight after being dried), sex ('M', 'F', 'I' where 'I' is Infant), and rings (integer).

The number of rings turns out to be a good approximation for age (age is rings + 1.5). However, to obtain this number requires cutting the shell through the cone, staining the section, and counting the number of rings through a microscope, which is a time-consuming task. However, the other physical measurements are easier to determine. You use the dataset to build a predictive model of the variable rings through these other physical measurements.

> [1] Dua, D. and Graff, C. (2019). [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml). Irvine, CA: University of California, School of Information and Computer Science.

## Install the Beta Package

Run the following cell to install the beta python sdk package.

In [2]:
import sys

!{sys.executable} -m pip install "./sagemaker-beta.tar.gz"

Processing ./sagemaker-beta.tar.gz
  Preparing metadata (setup.py) ... [?25ldone
Using legacy 'setup.py install' for sagemaker, since package 'wheel' is not installed.
Installing collected packages: sagemaker
  Attempting uninstall: sagemaker
    Found existing installation: sagemaker 2.109.1.dev0
    Uninstalling sagemaker-2.109.1.dev0:
      Successfully uninstalled sagemaker-2.109.1.dev0
  Running setup.py install for sagemaker ... [?25ldone
[?25hSuccessfully installed sagemaker-2.109.1.dev0

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip available: [0m[31;49m22.2.1[0m[39;49m -> [0m[32;49m22.2.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


## Define Constants

Before you upload the data to an S3 bucket, gather some constants you can use later in this notebook.

In [3]:
import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import CacheConfig

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role() # Or a literal role ARN you've created in your account
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"AbaloneModelPackageGroupName"
cache_config = CacheConfig(
    enable_caching=True,
    expire_after="T12H"
)

## Define Parameters to Parametrize Pipeline Execution

Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

* `ParameterString` - represents a `str` Python type
* `ParameterInteger` - represents an `int` Python type
* `ParameterFloat` - represents a `float` Python type

These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.

The parameters defined in this workflow include:

* `processing_instance_count` - The instance count of the processing job.
* `instance_type` - The `ml.*` instance type of the training job.
* `model_approval_status` - The approval status to register with the trained model for CI/CD purposes ("PendingManualApproval" is the default).
* `mse_threshold` - The Mean Squared Error (MSE) threshold used to verify the accuracy of a model.

In [4]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

## Define a Processing Step for Feature Engineering

First, develop a preprocessing script that is specified in the Processing step.

The file `preprocessing.py`, in `artifacts/code` contains the preprocessing script. You can update the script and save the file to overwrite. The preprocessing script uses `scikit-learn` to do the following:

* Fill in missing sex category data and encode it so that it is suitable for training.
* Scale and normalize all numerical fields, aside from sex and rings numerical data.
* Split the data into training, validation, and test datasets.

The Processing step executes the script on the input data. The Training step uses the preprocessed training features and labels to train a model. The Evaluation step uses the trained model and preprocessed test features and labels to evaluate the model.

Next, create an instance of a `SKLearnProcessor` processor and use that in our `ProcessingStep`.

You also specify the `framework_version` to use throughout this notebook.

Note the `processing_instance_count` parameter used by the processor instance.

In [5]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

Finally, we take the output of the processor's `run` method and pass that as arguments to the `ProcessingStep`. By passing the `pipeline_session` to the `sagemaker_session`, calling `.run()` does not launch the processing job, it returns the arguments needed to run the job as a step in the pipeline.

Note the `"train_data"` and `"test_data"` named channels specified in the output configuration for the processing job. Step `Properties` can be used in subsequent steps and resolve to their runtime values at execution. Specifically, this usage is called out when you define the training step.

In [6]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.dataset_definition.inputs import S3Input

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(
            source="artifacts/data/abalone-dataset.csv",
            input_name="abalone-dataset",
            s3_input=S3Input(
                local_path="/opt/ml/processing/input",
                s3_uri="artifacts/data/abalone-dataset.csv",
                s3_data_type="S3Prefix",
                s3_input_mode="File",
                s3_data_distribution_type="FullyReplicated",
                s3_compression_type="None",
            )
        )
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="artifacts/code/process/preprocessing.py",
)

step_process = ProcessingStep(name="AbaloneProcess", step_args=processor_args, cache_config=cache_config)



## Define a Training Step to Train a Model

In this section, use Amazon SageMaker's [XGBoost Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) to train on this dataset. Configure an Estimator for the XGBoost algorithm and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_dir` so that it can be hosted later.

The model path where the models from training are saved is also specified.

Note the `instance_type` parameter may be used in multiple places in the pipeline. In this case, the `instance_type` is passed into the estimator.

In [7]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

model_path = f"s3://{default_bucket}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

Finally, we use the output of the estimator's `.fit()` method as arguments to the `TrainingStep`. By passing the `pipeline_session` to the `sagemaker_session`, calling `.fit()` does not launch the training job, it returns the arguments needed to run the job as a step in the pipeline.

Pass in the `S3Uri` of the `"train_data"` output channel to the `.fit()` method. Also, use the other `"test_data"` output channel for model evaluation in the pipeline. The `properties` attribute of a Pipeline step matches the object model of the corresponding response of a describe call. These properties can be referenced as placeholder values and are resolved at runtime. For example, the `ProcessingStep` `properties` attribute matches the object model of the [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response object.

In [8]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    step_args=train_args,
    cache_config=cache_config
)

## Define a Model Evaluation Step to Evaluate the Trained Model

First, develop an evaluation script that is specified in a Processing step that performs the model evaluation.

After pipeline execution, you can examine the resulting `evaluation.json` for analysis.

The evaluation script `evaluation.py` in `artifacts/code` uses `xgboost` to do the following:

* Load the model.
* Read the test data.
* Issue predictions against the test data.
* Build a classification report, including accuracy and ROC curve.
* Save the evaluation report to the evaluation directory.

Next, create an instance of a `ScriptProcessor` processor and use it in the `ProcessingStep`.

In [9]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="artifacts/code/process/evaluation.py",
)

Use the processor's arguments returned by `.run()` to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution.

Specifically, the `S3ModelArtifacts` from the `step_train` `properties` and the `S3Uri` of the `"test_data"` output channel of the `step_process` `properties` are passed as inputs. The `TrainingStep` and `ProcessingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) and [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response objects, respectively.

In [10]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="AbaloneEval",
    step_args=eval_args,
    property_files=[evaluation_report],
    cache_config=cache_config
)

## Define a Pipeline of Parameters and Steps

In this section, combine the steps into a Pipeline so it can be executed.

A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair.

Note:

* All the parameters used in the definitions must be present.
* Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the data dependency DAG as steps for the execution to complete.
* Steps must be unique to across the pipeline step list and all condition step if/else lists.

In [11]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbaloneBetaPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval],
)

### (Optional) Examining the pipeline definition

The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly.

In [12]:
import json


definition = json.loads(pipeline.definition())
definition


Job Name:  sklearn-abalone-process-2022-10-10-14-45-57-613
Inputs:  [{'InputName': 'abalone-dataset', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-577286953245/AbaloneBetaPipeline/AbaloneProcess/input/abalone-dataset/abalone-dataset.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-577286953245/AbaloneBetaPipeline/code/a55d50a0c87783b887401dc1ff1d9bf9/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sagemaker-us-west-2-577286953245', 'AbaloneBetaPipeline', <sagemaker.workflow.execution_variables.ExecutionVaria

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'MseThreshold', 'Type': 'Float', 'DefaultValue': 6.0}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '246618743249.dkr.ecr.us-west-2.amazonaws.com/sagemaker-scikit-learn:0.23-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/prepro

## Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the Pipeline service. The Pipeline service uses the role that is passed in to create all the jobs defined in the steps.

In [13]:
pipeline.upsert(role_arn=role)


Job Name:  sklearn-abalone-process-2022-10-10-14-46-34-011
Inputs:  [{'InputName': 'abalone-dataset', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-577286953245/AbaloneBetaPipeline/AbaloneProcess/input/abalone-dataset/abalone-dataset.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-us-west-2-577286953245/AbaloneBetaPipeline/code/a55d50a0c87783b887401dc1ff1d9bf9/preprocessing.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'train', 'AppManaged': False, 'S3Output': {'S3Uri': Join(on='/', values=['s3:/', 'sagemaker-us-west-2-577286953245', 'AbaloneBetaPipeline', <sagemaker.workflow.execution_variables.ExecutionVaria

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:577286953245:pipeline/abalonebetapipeline',
 'ResponseMetadata': {'RequestId': '45abfcc6-4a70-468e-a349-434685b35793',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '45abfcc6-4a70-468e-a349-434685b35793',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '87',
   'date': 'Mon, 10 Oct 2022 14:46:35 GMT'},
  'RetryAttempts': 0}}

Start the pipeline and accept all the default parameters.

In [14]:
execution = pipeline.start()

## Pipeline Operations: Examining and Waiting for Pipeline Execution

Describe the pipeline execution.

In [15]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:577286953245:pipeline/abalonebetapipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-2:577286953245:pipeline/abalonebetapipeline/execution/a1u2vszc97go',
 'PipelineExecutionDisplayName': 'execution-1665413199062',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'abalonebetapipeline',
  'TrialName': 'a1u2vszc97go'},
 'CreationTime': datetime.datetime(2022, 10, 10, 7, 46, 38, 992000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 10, 10, 7, 46, 38, 992000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '88201694-fc82-41eb-8310-9ee0ad717a54',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '88201694-fc82-41eb-8310-9ee0ad717a54',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '498',
   'date': 'Mon, 10 Oct 2022 14:46:42 GMT'},
  'RetryAttempts': 0}}

Wait for the execution to complete.

In [16]:
execution.wait()

List the steps in the execution. These are the steps in the pipeline that have been resolved by the step executor service.

In [17]:
execution.list_steps()

[{'StepName': 'AbaloneEval',
  'StartTime': datetime.datetime(2022, 10, 10, 7, 53, 40, 358000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 10, 10, 7, 57, 51, 211000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:577286953245:processing-job/pipelines-a1u2vszc97go-abaloneeval-xk7l6pznzl'}}},
 {'StepName': 'AbaloneTrain',
  'StartTime': datetime.datetime(2022, 10, 10, 7, 50, 56, 491000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 10, 10, 7, 53, 39, 133000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:577286953245:training-job/pipelines-a1u2vszc97go-abalonetrain-wdmciuulzv'}}},
 {'StepName': 'AbaloneProcess',
  'StartTime': datetime.datetime(2022, 10, 10, 7, 46, 39, 740000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 10, 10, 7, 50, 55, 811000, tzinfo=tzlocal()),
  'StepStatu

### Examining the Evaluation

Examine the resulting model evaluation after the pipeline completes by downloading the resulting `evaluation.json` file from S3 and looking at the report.

### Lineage

Review the lineage of the artifacts generated by the pipeline.

In [18]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

{'StepName': 'AbaloneProcess', 'StartTime': datetime.datetime(2022, 10, 10, 7, 46, 39, 740000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 10, 10, 7, 50, 55, 811000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:577286953245:processing-job/pipelines-a1u2vszc97go-abaloneprocess-wix9lzoylg'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...87783b887401dc1ff1d9bf9/preprocessing.py,Input,DataSet,ContributedTo,artifact
1,s3://...nput/abalone-dataset/abalone-dataset.csv,Input,DataSet,ContributedTo,artifact
2,24661...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://.../a1u2vszc97go/AbaloneProcess/output/test,Output,DataSet,Produced,artifact
4,s3://...szc97go/AbaloneProcess/output/validation,Output,DataSet,Produced,artifact
5,s3://...a1u2vszc97go/AbaloneProcess/output/train,Output,DataSet,Produced,artifact


{'StepName': 'AbaloneTrain', 'StartTime': datetime.datetime(2022, 10, 10, 7, 50, 56, 491000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 10, 10, 7, 53, 39, 133000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:577286953245:training-job/pipelines-a1u2vszc97go-abalonetrain-wdmciuulzv'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...szc97go/AbaloneProcess/output/validation,Input,DataSet,ContributedTo,artifact
1,s3://...a1u2vszc97go/AbaloneProcess/output/train,Input,DataSet,ContributedTo,artifact
2,24661...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...loneTrain-WDMCIuULzV/output/model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'AbaloneEval', 'StartTime': datetime.datetime(2022, 10, 10, 7, 53, 40, 358000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 10, 10, 7, 57, 51, 211000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:577286953245:processing-job/pipelines-a1u2vszc97go-abaloneeval-xk7l6pznzl'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...50c93b74404394e09ce2edc51e/evaluation.py,Input,DataSet,ContributedTo,artifact
1,s3://.../a1u2vszc97go/AbaloneProcess/output/test,Input,DataSet,ContributedTo,artifact
2,s3://...loneTrain-WDMCIuULzV/output/model.tar.gz,Input,Model,ContributedTo,artifact
3,24661...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...u2vszc97go/AbaloneEval/output/evaluation,Output,DataSet,Produced,artifact


## Note for Beta Users

At this point, the steps in the pipeline have executed and new cache keys have been created. To observe the caching mechanism, re-execute the pipeline with the same step arguments. Then, go back and update different step arguments to experience the new cache hit and cache miss behavior. You can also update the cache config expire time.

In [20]:
new_execution = pipeline.start()

In [21]:
new_execution.wait()

In [22]:
new_execution.list_steps()

[{'StepName': 'AbaloneEval',
  'StartTime': datetime.datetime(2022, 10, 10, 8, 21, 29, 971000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 10, 10, 8, 25, 36, 926000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:577286953245:processing-job/pipelines-dgrx1ym1u94p-abaloneeval-uywxs6xyxw'}}},
 {'StepName': 'AbaloneTrain',
  'StartTime': datetime.datetime(2022, 10, 10, 8, 18, 29, 465000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 10, 10, 8, 21, 29, 130000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:577286953245:training-job/pipelines-dgrx1ym1u94p-abalonetrain-kc10ttcpoh'}}},
 {'StepName': 'AbaloneProcess',
  'StartTime': datetime.datetime(2022, 10, 10, 8, 14, 16, 931000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 10, 10, 8, 18, 29, 42000, tzinfo=tzlocal()),
  'StepStatus