# A SageMaker Workflow

The pipeline that we create follows a typical Machine Learning Application pattern of pre-processing, training, evaluation, and model registration:

![A typical ML Application pipeline](img/pipeline-full.png)

### Create SageMaker Clients and Session

First, we create a new SageMaker Session in the current AWS region. We also acquire the role arn for the session.

This role arn should be the execution role arn that you set up in the Prerequisites section of this notebook.

In [4]:
from botocore.exceptions import ClientError

import os
import sagemaker
import logging
import boto3
import sagemaker
import pandas as pd

sess = sagemaker.Session()
bucket = sess.default_bucket()
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

sm = boto3.Session().client(service_name="sagemaker", region_name=region)

# Track the Pipeline as an `Experiment`

In [5]:
import time

timestamp = int(time.time())

pipeline_name = "BERT-pipeline-{}".format(timestamp)

In [6]:
%store pipeline_name

Stored 'pipeline_name' (str)


In [7]:
from smexperiments.experiment import Experiment

pipeline_experiment = Experiment.create(
    experiment_name=pipeline_name,
    description="Climate-change-bert-pipeline",
    sagemaker_boto_client=sm,
)

pipeline_experiment_name = pipeline_experiment.experiment_name
print("Pipeline experiment name: {}".format(pipeline_experiment_name))

Pipeline experiment name: BERT-pipeline-1669943858


In [8]:
%store pipeline_experiment_name

Stored 'pipeline_experiment_name' (str)


# Create the `Trial`

In [9]:
from smexperiments.trial import Trial

pipeline_trial = Trial.create(
    trial_name="trial-{}".format(timestamp), experiment_name=pipeline_experiment_name, sagemaker_boto_client=sm
)

pipeline_trial_name = pipeline_trial.trial_name
print("Trial name: {}".format(pipeline_trial_name))

Trial name: trial-1669943858


In [10]:
%store pipeline_trial_name

Stored 'pipeline_trial_name' (str)


# Define Parameters to Parametrize Pipeline Execution

We define Workflow Parameters by which we can parametrize our Pipeline and vary the values injected and used in Pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

* `ParameterString` - representing a `str` Python type
* `ParameterInteger` - representing an `int` Python type
* `ParameterFloat` - representing a `float` Python type

These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.

The parameters defined in this workflow below include:

* `processing_instance_type` - The `ml.*` instance type of the processing job.
* `processing_instance_count` - The instance count of the processing job. For illustrative purposes only: 1 is the only value that makes sense here.
* `train_instance_type` - The `ml.*` instance type of the training job.
* `model_approval_status` - What approval status to register the trained model with for CI/CD purposes. Defaults to "PendingManualApproval". (NOTE: not available in service yet)
* `input_data` - The URL location of the input data

# Pipeline Parameters

In [11]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

# Experiment Parameters

In [12]:
%store -r pipeline_experiment_name

In [13]:
exp_name = ParameterString(
    name="ExperimentName",
    default_value=pipeline_experiment_name,
)

# Processing Step Parameters

![Define a Processing Step for Feature Engineering](img/pipeline-2.png)

![](img/prepare_dataset_bert.png)

In [15]:
# raw_input_data_s3_uri = "s3://{}/amazon-reviews-pds/tsv/".format(bucket)
%store -r raw_input_data_s3_uri

In [16]:
!aws s3 ls $raw_input_data_s3_uri

2022-11-29 16:39:14          0 
2022-11-30 03:00:28    6570564 twitter_sentiment_data.csv


In [17]:
import time

timestamp = int(time.time())

input_data = ParameterString(
    name="InputData",
    default_value=raw_input_data_s3_uri,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.t3.xlarge")

max_seq_length = ParameterInteger(
    name="MaxSeqLength",
    default_value=18,
)

balance_dataset = ParameterString(
    name="BalanceDataset",
    default_value="True",
)

train_split_percentage = ParameterFloat(
    name="TrainSplitPercentage",
    default_value=0.90,
)

validation_split_percentage = ParameterFloat(
    name="ValidationSplitPercentage",
    default_value=0.05,
)

test_split_percentage = ParameterFloat(
    name="TestSplitPercentage",
    default_value=0.05,
)

feature_store_offline_prefix = ParameterString(
    name="FeatureStoreOfflinePrefix",
    default_value="climate-feature-store-" + str(timestamp),
)

feature_group_name = ParameterString(name="FeatureGroupName", default_value="climatetweet-feature-group-" + str(timestamp))

In [None]:
!pygmentize ./preprocess-scikit-text-to-bert-feature-store.py

We create an instance of an `SKLearnProcessor` processor and we use that in our `ProcessingStep`.

We also specify the `framework_version` we will use throughout.

Note the `processing_instance_type` and `processing_instance_count` parameters that used by the processor instance.

In [18]:
from sagemaker.sklearn.processing import SKLearnProcessor

processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [19]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processing_inputs = [
    ProcessingInput(
        input_name="raw-input-data",
        source=input_data,
        destination="/opt/ml/processing/input/data/",
        s3_data_distribution_type="ShardedByS3Key",
    )
]

processing_outputs = [
    ProcessingOutput(
        output_name="bert-train",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/bert/train",
    ),
    ProcessingOutput(
        output_name="bert-validation",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/bert/validation",
    ),
    ProcessingOutput(
        output_name="bert-test",
        s3_upload_mode="EndOfJob",
        source="/opt/ml/processing/output/bert/test",
    ),
]

processing_step = ProcessingStep(
    name="Processing",
    code="preprocess-scikit-text-to-bert-feature-store.py",
    processor=processor,
    inputs=processing_inputs,
    outputs=processing_outputs,
    job_arguments=[
        "--train-split-percentage",
        str(train_split_percentage.default_value),
        "--validation-split-percentage",
        str(validation_split_percentage.default_value),
        "--test-split-percentage",
        str(test_split_percentage.default_value),
        "--max-seq-length",
        str(max_seq_length.default_value),
        "--balance-dataset",
        str(balance_dataset.default_value),
        "--feature-store-offline-prefix",
        str(feature_store_offline_prefix.default_value),
        "--feature-group-name",
        str(feature_group_name.default_value),
    ],
)

print(processing_step)

ProcessingStep(name='Processing', step_type=<StepTypeEnum.PROCESSING: 'Processing'>)


Finally, we use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance's `run` method, for those familiar with the existing Python SDK.

Note the `input_data` parameters passed into `ProcessingStep` as the input data of the step itself. This input data will be used by the processor instance when it is run.

Also, take note the `"bert-train"`, `"bert-validation"` and `"bert-test"` named channels specified in the output configuration for the processing job. Such step `Properties` can be used in subsequent steps and will resolve to their runtime values at execution. In particular, we'll call out this usage when we define our training step.

# Train Step

![Define a Training Step to Train a Model](img/pipeline-3.png)

In [21]:
train_instance_type = ParameterString(name="TrainInstanceType", default_value="ml.m5.4xlarge")

train_instance_count = ParameterInteger(name="TrainInstanceCount", default_value=1)

# Setup Training Hyper-Parameters
Note that `max_seq_length` is re-used from the processing hyper-parameters above

In [22]:
epochs = ParameterInteger(name="Epochs", default_value=1)

learning_rate = ParameterFloat(name="LearningRate", default_value=0.00001)

epsilon = ParameterFloat(name="Epsilon", default_value=0.00000001)

train_batch_size = ParameterInteger(name="TrainBatchSize", default_value=12)

validation_batch_size = ParameterInteger(name="ValidationBatchSize", default_value=12)

test_batch_size = ParameterInteger(name="TestBatchSize", default_value=12)

train_steps_per_epoch = ParameterInteger(name="TrainStepsPerEpoch", default_value=10)

validation_steps = ParameterInteger(name="ValidationSteps", default_value=10)

test_steps = ParameterInteger(name="TestSteps", default_value=10)

train_volume_size = ParameterInteger(name="TrainVolumeSize", default_value=1024)

use_xla = ParameterString(
    name="UseXLA",
    default_value="False",
)

use_amp = ParameterString(
    name="UseAMP",
    default_value="True",
)

freeze_bert_layer = ParameterString(
    name="FreezeBERTLayer",
    default_value="False",
)

enable_sagemaker_debugger = ParameterString(
    name="EnableSageMakerDebugger",
    default_value="False",
)

enable_checkpointing = ParameterString(
    name="EnableCheckpointing",
    default_value="False",
)

enable_tensorboard = ParameterString(
    name="EnableTensorboard",
    default_value="False",
)

input_mode = ParameterString(
    name="InputMode",
    default_value="File",
)

run_validation = ParameterString(
    name="RunValidation",
    default_value="True",
)

run_test = ParameterString(
    name="RunTest",
    default_value="False",
)

run_sample_predictions = ParameterString(
    name="RunSamplePredictions",
    default_value="False",
)

# Setup Metrics To Track Model Performance

In [23]:
metrics_definitions = [
    {"Name": "train:loss", "Regex": "loss: ([0-9\\.]+)"},
    {"Name": "train:accuracy", "Regex": "accuracy: ([0-9\\.]+)"},
    {"Name": "validation:loss", "Regex": "val_loss: ([0-9\\.]+)"},
    {"Name": "validation:accuracy", "Regex": "val_accuracy: ([0-9\\.]+)"},
]

In [None]:
!pygmentize src/tf_bert_reviews.py

### Setup Debugger and Profiler
Define Debugger Rules as described here:  https://docs.aws.amazon.com/sagemaker/latest/dg/debugger-built-in-rules.html

In [24]:
from sagemaker.debugger import Rule, ProfilerRule, rule_configs
from sagemaker.debugger import DebuggerHookConfig
from sagemaker.debugger import ProfilerConfig, FrameworkProfile

debugger_hook_config = DebuggerHookConfig(
    s3_output_path="s3://{}".format(bucket),
)

profiler_config = ProfilerConfig(
    system_monitor_interval_millis=500,
    framework_profile_params=FrameworkProfile(local_path="/opt/ml/output/profiler/", start_step=5, num_steps=10),
)

In [25]:
rules = [ProfilerRule.sagemaker(rule_configs.ProfilerReport())]

# Define a Training Step to Train a Model

We configure an Estimator and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_dir` so that it can be hosted later.

We also specify the model path where the models from training will be saved.

Note the `train_instance_type` parameter passed may be also used and passed into other places in the pipeline. In this case, the `train_instance_type` is passed into the estimator.

In [26]:
from sagemaker.tensorflow import TensorFlow

estimator = TensorFlow(
    entry_point="tf_bert_reviews.py",
    source_dir="src",
    role=role,
    instance_count=train_instance_count,  # Make sure you have at least this number of input files or the ShardedByS3Key distibution strategy will fail the job due to no data available
    instance_type=train_instance_type,
    volume_size=train_volume_size,
    py_version="py37",
    framework_version="2.3.1",
    hyperparameters={
        "epochs": epochs,
        "learning_rate": learning_rate,
        "epsilon": epsilon,
        "train_batch_size": train_batch_size,
        "validation_batch_size": validation_batch_size,
        "test_batch_size": test_batch_size,
        "train_steps_per_epoch": train_steps_per_epoch,
        "validation_steps": validation_steps,
        "test_steps": test_steps,
        "use_xla": use_xla,
        "use_amp": use_amp,
        "max_seq_length": max_seq_length,
        "freeze_bert_layer": freeze_bert_layer,
        "enable_sagemaker_debugger": enable_sagemaker_debugger,
        "enable_checkpointing": enable_checkpointing,
        "enable_tensorboard": enable_tensorboard,
        "run_validation": run_validation,
        "run_test": run_test,
        "run_sample_predictions": run_sample_predictions,
    },
    input_mode=input_mode,
    metric_definitions=metrics_definitions,
    debugger_hook_config=debugger_hook_config,
    profiler_config=profiler_config,
    rules=rules,
)

### Setup Pipeline Step Caching
Cache pipeline steps for a duration of time using [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601#Durations) format.  

More details on SageMaker Pipeline step caching here:  https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html

In [27]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

### Configure Training Step

Finally, we use the estimator instance to construct a `TrainingStep` as well as the `Properties` of the prior `ProcessingStep` used as input in the `TrainingStep` inputs and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to an estimator's `fit` method, for those familiar with the existing Python SDK.

In particular, we pass in the `S3Uri` of the `"train"`, `"validation"` and `"test"` output channel to the `TrainingStep`. The `properties` attribute of a Workflow step match the object model of the corresponding response of a describe call. These properties can be referenced as placeholder values and are resolved, or filled in, at runtime. For example, the `ProcessingStep` `properties` attribute matches the object model of the [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response object.

In [28]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name="Train",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["bert-train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["bert-validation"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=processing_step.properties.ProcessingOutputConfig.Outputs["bert-test"].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
    cache_config=cache_config,
)

print(training_step)

TrainingStep(name='Train', step_type=<StepTypeEnum.TRAINING: 'Training'>)


# Evaluation Step

![Define a Model Evaluation Step to Evaluate the Trained Model](img/pipeline-4.png)

First, we develop an evaluation script that will be specified in a Processing step that will perform the model evaluation.

The evaluation script `evaluation.py` takes the trained model and the test dataset as input, and produces a JSON file containing classification evaluation metrics such as accuracy.

After pipeline execution, we will examine the resulting `evaluation.json` for analysis.

The evaluation script:

* loads in the model
* reads in the test data
* issues a bunch of predictions against the test data
* builds a classification report, including accuracy
* saves the evaluation report to the evaluation directory

Next, we create an instance of a `ScriptProcessor` processor and we use that in our `ProcessingStep`.

Note the `processing_instance_type` parameter passed into the processor.

In [29]:
from sagemaker.sklearn.processing import SKLearnProcessor

evaluation_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    env={"AWS_DEFAULT_REGION": region},
    max_runtime_in_seconds=7200,
)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [None]:
!pygmentize evaluate_model_metrics.py

We use the processor instance to construct a `ProcessingStep`, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. This is very similar to a processor instance's `run` method, for those familiar with the existing Python SDK.

The `TrainingStep` and `ProcessingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) and  [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response objects, respectively.

In [30]:
from sagemaker.workflow.properties import PropertyFile

evaluation_report = PropertyFile(name="EvaluationReport", output_name="metrics", path="evaluation.json")

In [31]:
evaluation_step = ProcessingStep(
    name="EvaluateModel",
    processor=evaluation_processor,
    code="evaluate_model_metrics.py",
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/input/model",
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingInputs["raw-input-data"].S3Input.S3Uri,
            destination="/opt/ml/processing/input/data",
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="metrics", s3_upload_mode="EndOfJob", source="/opt/ml/processing/output/metrics/"
        ),
    ],
    job_arguments=[
        "--max-seq-length",
        str(max_seq_length.default_value),
    ],
    property_files=[evaluation_report],
)

In [32]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            evaluation_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

print(model_metrics)

<sagemaker.model_metrics.ModelMetrics object at 0x7f9365e4af50>


# Register Model Step

![](img/pipeline-5.png)

We use the estimator instance that was used for the training step to construct an instance of `RegisterModel`. The result of executing `RegisterModel` in a pipeline is a Model Package. A Model Package is a reusable model artifacts abstraction that packages all ingredients necessary for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

A Model Package Group is a collection of Model Packages. You can create a Model Package Group for a specific ML business problem, and you can keep adding versions/model packages into it. Typically, we expect customers to create a ModelPackageGroup for a SageMaker Workflow Pipeline so that they can keep adding versions/model packages to the group for every Workflow Pipeline run.

The construction of `RegisterModel` is very similar to an estimator instance's `register` method, for those familiar with the existing Python SDK.

In particular, we pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.

Of note, we provided a specific model package group name which we will use in the Model Registry and CI/CD work later on.

In [33]:
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

deploy_instance_type = ParameterString(name="DeployInstanceType", default_value="ml.m5.4xlarge")

deploy_instance_count = ParameterInteger(name="DeployInstanceCount", default_value=1)

In [34]:
model_package_group_name = f"BERT-Reviews-{timestamp}"

print(model_package_group_name)

BERT-Reviews-1669944412


In [35]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="tensorflow",
    region=region,
    version="2.3.1",
    py_version="py37",
    instance_type=deploy_instance_type,
    image_scope="inference",
)
print(inference_image_uri)

INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py37.


763104351884.dkr.ecr.us-east-1.amazonaws.com/tensorflow-inference:2.3.1-cpu


In [36]:
from sagemaker.workflow.step_collections import RegisterModel

register_step = RegisterModel(
    name="RegisterModel",
    #    entry_point='inference.py', # Adds a Repack Step:  https://github.com/aws/sagemaker-python-sdk/blob/01c6ee3a9ec1831e935e86df58cf70bc92ed1bbe/src/sagemaker/workflow/_utils.py#L44
    #    source_dir='src',
    estimator=estimator,
    image_uri=inference_image_uri,  # we have to specify, by default it's using training image
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["application/jsonlines"],
    response_types=["application/jsonlines"],
    inference_instances=[deploy_instance_type],
    transform_instances=["ml.m4.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

# Create Model for Deployment Step

![](img/pipeline-5.png)


In [37]:
from sagemaker.model import Model

model_name = "bert-model-{}".format(timestamp)

model = Model(
    name=model_name,
    image_uri=inference_image_uri,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

In [38]:
from sagemaker.inputs import CreateModelInput

create_inputs = CreateModelInput(
    instance_type=deploy_instance_type,
)

In [39]:
from sagemaker.workflow.steps import CreateModelStep

create_step = CreateModelStep(
    name="CreateModel",
    model=model,
    inputs=create_inputs,
)

# Define a Condition Step to Check Accuracy and Conditionally Register Model

![](img/pipeline-6.png)

Finally, we'd like to only register this model if the accuracy of the model, as determined by our evaluation step `step_eval`, exceeded some value. A `ConditionStep` allows for pipelines to support conditional execution in the pipeline DAG based on conditions of step properties. 

Below, we:

* define a `ConditionGreaterThan` on the accuracy value found in the output of the evaluation step, `step_eval`.
* use the condition in the list of conditions in a `ConditionStep`
* pass the `RegisterModel` step collection into the `if_steps` of the `ConditionStep`

In [40]:
min_accuracy_value = ParameterFloat(name="MinAccuracyValue", default_value=0.01)

In [41]:
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)

minimum_accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step=evaluation_step,
        property_file=evaluation_report,
        json_path="metrics.accuracy.value",
    ),
    right=min_accuracy_value,  # accuracy
)

minimum_accuracy_condition_step = ConditionStep(
    name="AccuracyCondition",
    conditions=[minimum_accuracy_condition],
    if_steps=[register_step, create_step],  # success, continue with model registration
    else_steps=[],  # fail, end the pipeline
)

# Define a Pipeline of Parameters, Steps, and Conditions

Let's tie it all up into a workflow pipeline so we can execute it, and even schedule it.

A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair so we tack on the timestamp to the name.

Note:

* All the parameters used in the definitions must be present.
* Steps passed into the pipeline need not be in the order of execution. The SageMaker Workflow service will resolve the _data dependency_ DAG as steps the execution complete.
* Steps must be unique to either pipeline step list or a single condition step if/else list.

In [42]:
%store -r pipeline_name

In [43]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        input_data,
        processing_instance_count,
        processing_instance_type,
        max_seq_length,
        balance_dataset,
        train_split_percentage,
        validation_split_percentage,
        test_split_percentage,
        feature_store_offline_prefix,
        feature_group_name,
        train_instance_type,
        train_instance_count,
        epochs,
        learning_rate,
        epsilon,
        train_batch_size,
        validation_batch_size,
        test_batch_size,
        train_steps_per_epoch,
        validation_steps,
        test_steps,
        train_volume_size,
        use_xla,
        use_amp,
        freeze_bert_layer,
        enable_sagemaker_debugger,
        enable_checkpointing,
        enable_tensorboard,
        input_mode,
        run_validation,
        run_test,
        run_sample_predictions,
        min_accuracy_value,
        model_approval_status,
        deploy_instance_type,
        deploy_instance_count,
    ],
    steps=[processing_step, training_step, evaluation_step, minimum_accuracy_condition_step],
    sagemaker_session=sess,
)

Let's examine the Json of the pipeline definition that meets the SageMaker Workflow Pipeline DSL specification.

By examining the definition, we're also confirming that the pipeline was well-defined, and that the parameters and step properties resolve correctly.

In [44]:
import json
from pprint import pprint

definition = json.loads(pipeline.definition())

pprint(definition)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: latest.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


{'Metadata': {},
 'Parameters': [{'DefaultValue': 's3://sagemaker-us-east-1-531299576666/climate-tweet-data/',
                 'Name': 'InputData',
                 'Type': 'String'},
                {'DefaultValue': 1,
                 'Name': 'ProcessingInstanceCount',
                 'Type': 'Integer'},
                {'DefaultValue': 'ml.t3.xlarge',
                 'Name': 'ProcessingInstanceType',
                 'Type': 'String'},
                {'DefaultValue': 18, 'Name': 'MaxSeqLength', 'Type': 'Integer'},
                {'DefaultValue': 'True',
                 'Name': 'BalanceDataset',
                 'Type': 'String'},
                {'DefaultValue': 0.9,
                 'Name': 'TrainSplitPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 0.05,
                 'Name': 'ValidationSplitPercentage',
                 'Type': 'Float'},
                {'DefaultValue': 0.05,
                 'Name': 'TestSplitPercentage',
                

### Submit the pipeline to SageMaker and start execution

Let's submit our pipeline definition to the workflow service. The role passed in will be used by the workflow service to create all the jobs defined in the steps.

In [45]:
print(pipeline_experiment_name)

BERT-pipeline-1669943858


## Ignore the `WARNING` below

In [46]:
response = pipeline.create(role_arn=role)

pipeline_arn = response["PipelineArn"]
print(pipeline_arn)



arn:aws:sagemaker:us-east-1:531299576666:pipeline/bert-pipeline-1669943858


We'll start the pipeline, accepting all the default parameters.

Values can also be passed into these pipeline parameters on starting of the pipeline, and will be covered later. 

In [48]:
execution = pipeline.start(
    # parameters=dict(
    #     InputData=raw_input_data_s3_uri,
    #     ProcessingInstanceCount=1,
    #     ProcessingInstanceType="ml.t3.xlarge",
    #     MaxSeqLength=18,
    #     BalanceDataset="True",
    #     TrainSplitPercentage=0.9,
    #     ValidationSplitPercentage=0.05,
    #     TestSplitPercentage=0.05,
    #     FeatureStoreOfflinePrefix="reviews-feature-store-" + str(timestamp),
    #     FeatureGroupName="climate-feature-group-" + str(timestamp),
    #     LearningRate=0.000012,
    #     TrainInstanceType="ml.c5.9xlarge",
    #     TrainInstanceCount=1,
    #     Epochs=1,
    #     Epsilon=0.00000001,
    #     TrainBatchSize=128,
    #     ValidationBatchSize=128,
    #     TestBatchSize=128,
    #     TrainStepsPerEpoch=50,
    #     ValidationSteps=50,
    #     TestSteps=50,
    #     TrainVolumeSize=1024,
    #     UseXLA="True",
    #     UseAMP="True",
    #     FreezeBERTLayer="False",
    #     EnableSageMakerDebugger="False",
    #     EnableCheckpointing="False",
    #     EnableTensorboard="False",
    #     InputMode="File",
    #     RunValidation="True",
    #     RunTest="False",
    #     RunSamplePredictions="False",
    #     MinAccuracyValue=0.01,
    #     ModelApprovalStatus="PendingManualApproval",
    #     DeployInstanceType="ml.m4.xlarge",
    #     DeployInstanceCount=1,
    # )
)

print(execution.arn)

arn:aws:sagemaker:us-east-1:531299576666:pipeline/bert-pipeline-1669943858/execution/yv85e13eu5mg


### Workflow Operations: examining and waiting for pipeline execution

Now we describe execution instance and list the steps in the execution to find out more about the execution.

In [49]:
from pprint import pprint

execution_run = execution.describe()
pprint(execution_run)

{'CreatedBy': {'DomainId': 'd-ubp3xlmfbyl3',
               'UserProfileArn': 'arn:aws:sagemaker:us-east-1:531299576666:user-profile/d-ubp3xlmfbyl3/default-1669642825906',
               'UserProfileName': 'default-1669642825906'},
 'CreationTime': datetime.datetime(2022, 12, 2, 2, 12, 31, 234000, tzinfo=tzlocal()),
 'LastModifiedBy': {'DomainId': 'd-ubp3xlmfbyl3',
                    'UserProfileArn': 'arn:aws:sagemaker:us-east-1:531299576666:user-profile/d-ubp3xlmfbyl3/default-1669642825906',
                    'UserProfileName': 'default-1669642825906'},
 'LastModifiedTime': datetime.datetime(2022, 12, 2, 2, 12, 31, 234000, tzinfo=tzlocal()),
 'PipelineArn': 'arn:aws:sagemaker:us-east-1:531299576666:pipeline/bert-pipeline-1669943858',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:531299576666:pipeline/bert-pipeline-1669943858/execution/yv85e13eu5mg',
 'PipelineExecutionDisplayName': 'execution-1669947151352',
 'PipelineExecutionStatus': 'Executing',
 'ResponseMetadata': {'H

# Add Execution Run as Trial to Experiments

In [50]:
execution_run_name = execution_run["PipelineExecutionDisplayName"]
print(execution_run_name)

execution-1669947151352


In [51]:
pipeline_execution_arn = execution_run["PipelineExecutionArn"]
print(pipeline_execution_arn)

arn:aws:sagemaker:us-east-1:531299576666:pipeline/bert-pipeline-1669943858/execution/yv85e13eu5mg


# List Execution Steps

In [52]:
import time

# Giving the first step time to start up
time.sleep(30)

execution.list_steps()

[{'StepName': 'Processing',
  'StartTime': datetime.datetime(2022, 12, 2, 2, 12, 31, 942000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:531299576666:processing-job/pipelines-yv85e13eu5mg-processing-4ooeenxof3'}}}]

# Wait for the Pipeline to Complete

# _Note: If this cell errors out with `WaiterError: Waiter PipelineExecutionComplete failed: Max attempts exceeded`, just re-run it and keep waiting._

In [53]:
%store -r pipeline_name

In [54]:
%%time

import time
from pprint import pprint

executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

while pipeline_execution_status == "Executing":
    try:
        executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
        pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
    #        print('Executions for our pipeline...')
    #        print(pipeline_execution_status)
    except Exception as e:
        print("Please wait...")
        time.sleep(30)

pprint(executions_response)

Executing
[{'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:531299576666:pipeline/bert-pipeline-1669943858/execution/yv85e13eu5mg',
  'PipelineExecutionDisplayName': 'execution-1669947151352',
  'PipelineExecutionStatus': 'Succeeded',
  'StartTime': datetime.datetime(2022, 12, 2, 2, 12, 31, 234000, tzinfo=tzlocal())}]
CPU times: user 31.9 s, sys: 1.34 s, total: 33.2 s
Wall time: 34min 17s


# Wait for the Pipeline ^^ Above ^^ to Complete

# _Note: If this cell errors out with `WaiterError: Waiter PipelineExecutionComplete failed: Max attempts exceeded`, just re-run it and keep waiting._

In [55]:
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

Succeeded


In [56]:
pipeline_execution_arn = executions_response[0]["PipelineExecutionArn"]
print(pipeline_execution_arn)

arn:aws:sagemaker:us-east-1:531299576666:pipeline/bert-pipeline-1669943858/execution/yv85e13eu5mg


We can list the execution steps to check out the status and artifacts:

# List Pipeline Execution Steps

In [57]:
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

Succeeded


In [58]:
from pprint import pprint

steps = sm.list_pipeline_execution_steps(PipelineExecutionArn=pipeline_execution_arn)

pprint(steps)

{'PipelineExecutionSteps': [{'AttemptCount': 0,
                             'EndTime': datetime.datetime(2022, 12, 2, 2, 48, 3, 974000, tzinfo=tzlocal()),
                             'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:531299576666:model-package/bert-reviews-1669944412/1'}},
                             'StartTime': datetime.datetime(2022, 12, 2, 2, 48, 3, 73000, tzinfo=tzlocal()),
                             'StepName': 'RegisterModel',
                             'StepStatus': 'Succeeded'},
                            {'AttemptCount': 0,
                             'EndTime': datetime.datetime(2022, 12, 2, 2, 48, 4, 299000, tzinfo=tzlocal()),
                             'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:531299576666:model/pipelines-yv85e13eu5mg-createmodel-jucrybjmz7'}},
                             'StartTime': datetime.datetime(2022, 12, 2, 2, 48, 3, 73000, tzinfo=tzlocal()),
                             'StepName': 'Cre

# List All Artifacts Generated By The Pipeline

In [59]:
processing_job_name = None
training_job_name = None

In [60]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(steps["PipelineExecutionSteps"]):
    print(execution_step)
    # We are doing this because there appears to be a bug of this LineageTableVisualizer handling the Processing Step
    if execution_step["StepName"] == "Processing":
        processing_job_name = execution_step["Metadata"]["ProcessingJob"]["Arn"].split("/")[-1]
        print(processing_job_name)
        display(viz.show(processing_job_name=processing_job_name))
    elif execution_step["StepName"] == "Train":
        training_job_name = execution_step["Metadata"]["TrainingJob"]["Arn"].split("/")[-1]
        print(training_job_name)
        display(viz.show(training_job_name=training_job_name))
    else:
        display(viz.show(pipeline_execution_step=execution_step))
        time.sleep(5)

{'StepName': 'Processing', 'StartTime': datetime.datetime(2022, 12, 2, 2, 12, 31, 942000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 12, 2, 2, 22, 7, 470000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:531299576666:processing-job/pipelines-yv85e13eu5mg-processing-4ooeenxof3'}}}
pipelines-yv85e13eu5mg-processing-4ooeenxof3


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...ess-scikit-text-to-bert-feature-store.py,Input,DataSet,ContributedTo,artifact
1,s3://...-east-1-531299576666/climate-tweet-data/,Input,DataSet,ContributedTo,artifact
2,68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
3,s3://...2022-12-02-02-01-06-405/output/bert-test,Output,DataSet,Produced,artifact
4,s3://...2-02-02-01-06-405/output/bert-validation,Output,DataSet,Produced,artifact
5,s3://...022-12-02-02-01-06-405/output/bert-train,Output,DataSet,Produced,artifact


{'StepName': 'Train', 'StartTime': datetime.datetime(2022, 12, 2, 2, 22, 7, 916000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 12, 2, 2, 39, 13, 655000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:531299576666:training-job/pipelines-yv85e13eu5mg-Train-pyyd9oWO7m'}}}
pipelines-yv85e13eu5mg-Train-pyyd9oWO7m


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...2022-12-02-02-01-06-405/output/bert-test,Input,DataSet,ContributedTo,artifact
1,s3://...2-02-02-01-06-405/output/bert-validation,Input,DataSet,ContributedTo,artifact
2,s3://...022-12-02-02-01-06-405/output/bert-train,Input,DataSet,ContributedTo,artifact
3,76310...s.com/tensorflow-training:2.3.1-cpu-py37,Input,Image,ContributedTo,artifact


{'StepName': 'EvaluateModel', 'StartTime': datetime.datetime(2022, 12, 2, 2, 39, 14, 611000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 12, 2, 2, 48, 1, 409000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:531299576666:processing-job/pipelines-yv85e13eu5mg-evaluatemodel-8g5duwpo2i'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...185/input/code/evaluate_model_metrics.py,Input,DataSet,ContributedTo,artifact
1,s3://...-east-1-531299576666/climate-tweet-data/,Input,DataSet,ContributedTo,artifact
2,s3://...5mg-Train-pyyd9oWO7m/output/model.tar.gz,Input,DataSet,ContributedTo,artifact
3,68331...om/sagemaker-scikit-learn:0.23-1-cpu-py3,Input,Image,ContributedTo,artifact
4,s3://...n-2022-12-02-01-47-46-902/output/metrics,Output,DataSet,Produced,artifact


{'StepName': 'AccuracyCondition', 'StartTime': datetime.datetime(2022, 12, 2, 2, 48, 2, 79000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 12, 2, 2, 48, 2, 674000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'Condition': {'Outcome': 'True'}}}


None

{'StepName': 'CreateModel', 'StartTime': datetime.datetime(2022, 12, 2, 2, 48, 3, 73000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 12, 2, 2, 48, 4, 299000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:531299576666:model/pipelines-yv85e13eu5mg-createmodel-jucrybjmz7'}}}


None

{'StepName': 'RegisterModel', 'StartTime': datetime.datetime(2022, 12, 2, 2, 48, 3, 73000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 12, 2, 2, 48, 3, 974000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:531299576666:model-package/bert-reviews-1669944412/1'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...5mg-Train-pyyd9oWO7m/output/model.tar.gz,Input,DataSet,ContributedTo,artifact
1,76310...onaws.com/tensorflow-inference:2.3.1-cpu,Input,Image,ContributedTo,artifact
2,bert-reviews-1669944412-1-PendingManualApprova...,Input,Approval,ContributedTo,action
3,BERT-Reviews-1669944412-1669949283-aws-model-p...,Output,ModelGroup,AssociatedWith,context


# Track Additional Parameters in our Experiment

In [61]:
# -aws-processing-job is the default name assigned by ProcessingJob
processing_job_tc = "{}-aws-processing-job".format(processing_job_name)
print(processing_job_tc)

pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job


In [62]:
%store -r pipeline_trial_name

In [63]:
print(pipeline_trial_name)

trial-1669943858


In [64]:
response = sm.associate_trial_component(TrialComponentName=processing_job_tc, TrialName=pipeline_trial_name)

In [65]:
# -aws-training-job is the default name assigned by TrainingJob
training_job_tc = "{}-aws-training-job".format(training_job_name)
print(training_job_tc)

pipelines-yv85e13eu5mg-Train-pyyd9oWO7m-aws-training-job


In [66]:
response = sm.associate_trial_component(TrialComponentName=training_job_tc, TrialName=pipeline_trial_name)

In [67]:
from smexperiments import tracker

processing_job_tracker = tracker.Tracker.load(trial_component_name=processing_job_tc)

In [68]:
processing_job_tracker.log_parameters(
    {
        "balance_dataset": str(balance_dataset),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

TrialComponent(sagemaker_boto_client=<botocore.client.SageMaker object at 0x7f935f322750>,trial_component_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',trial_component_arn='arn:aws:sagemaker:us-east-1:531299576666:experiment-trial-component/pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',display_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',source=TrialComponentSource(source_arn='arn:aws:sagemaker:us-east-1:531299576666:processing-job/pipelines-yv85e13eu5mg-processing-4ooeenxof3',source_type='SageMakerProcessingJob'),status=TrialComponentStatus(primary_status='Completed',message='Status: Completed, exit message: null, failure reason: null'),start_time=datetime.datetime(2022, 12, 2, 2, 16, 50, tzinfo=tzlocal()),end_time=datetime.datetime(2022, 12, 2, 2, 22, 6, tzinfo=tzlocal()),creation_time=datetime.datetime(2022, 12, 2, 2, 12, 33, 153000, tzinfo=tzlocal()),created_by={},last_modified_time=datetime.datetime(2022, 12, 2,

In [69]:
processing_job_tracker.log_parameters(
    {
        "train_split_percentage": str(train_split_percentage),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

TrialComponent(sagemaker_boto_client=<botocore.client.SageMaker object at 0x7f935f322750>,trial_component_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',trial_component_arn='arn:aws:sagemaker:us-east-1:531299576666:experiment-trial-component/pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',display_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',source=TrialComponentSource(source_arn='arn:aws:sagemaker:us-east-1:531299576666:processing-job/pipelines-yv85e13eu5mg-processing-4ooeenxof3',source_type='SageMakerProcessingJob'),status=TrialComponentStatus(primary_status='Completed',message='Status: Completed, exit message: null, failure reason: null'),start_time=datetime.datetime(2022, 12, 2, 2, 16, 50, tzinfo=tzlocal()),end_time=datetime.datetime(2022, 12, 2, 2, 22, 6, tzinfo=tzlocal()),creation_time=datetime.datetime(2022, 12, 2, 2, 12, 33, 153000, tzinfo=tzlocal()),created_by={},last_modified_time=datetime.datetime(2022, 12, 2,

In [70]:
processing_job_tracker.log_parameters(
    {
        "validation_split_percentage": str(validation_split_percentage),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

TrialComponent(sagemaker_boto_client=<botocore.client.SageMaker object at 0x7f935f322750>,trial_component_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',trial_component_arn='arn:aws:sagemaker:us-east-1:531299576666:experiment-trial-component/pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',display_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',source=TrialComponentSource(source_arn='arn:aws:sagemaker:us-east-1:531299576666:processing-job/pipelines-yv85e13eu5mg-processing-4ooeenxof3',source_type='SageMakerProcessingJob'),status=TrialComponentStatus(primary_status='Completed',message='Status: Completed, exit message: null, failure reason: null'),start_time=datetime.datetime(2022, 12, 2, 2, 16, 50, tzinfo=tzlocal()),end_time=datetime.datetime(2022, 12, 2, 2, 22, 6, tzinfo=tzlocal()),creation_time=datetime.datetime(2022, 12, 2, 2, 12, 33, 153000, tzinfo=tzlocal()),created_by={},last_modified_time=datetime.datetime(2022, 12, 2,

In [71]:
processing_job_tracker.log_parameters(
    {
        "test_split_percentage": str(test_split_percentage),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

TrialComponent(sagemaker_boto_client=<botocore.client.SageMaker object at 0x7f935f322750>,trial_component_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',trial_component_arn='arn:aws:sagemaker:us-east-1:531299576666:experiment-trial-component/pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',display_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',source=TrialComponentSource(source_arn='arn:aws:sagemaker:us-east-1:531299576666:processing-job/pipelines-yv85e13eu5mg-processing-4ooeenxof3',source_type='SageMakerProcessingJob'),status=TrialComponentStatus(primary_status='Completed',message='Status: Completed, exit message: null, failure reason: null'),start_time=datetime.datetime(2022, 12, 2, 2, 16, 50, tzinfo=tzlocal()),end_time=datetime.datetime(2022, 12, 2, 2, 22, 6, tzinfo=tzlocal()),creation_time=datetime.datetime(2022, 12, 2, 2, 12, 33, 153000, tzinfo=tzlocal()),created_by={},last_modified_time=datetime.datetime(2022, 12, 2,

In [72]:
processing_job_tracker.log_parameters(
    {
        "max_seq_length": str(max_seq_length),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

TrialComponent(sagemaker_boto_client=<botocore.client.SageMaker object at 0x7f935f322750>,trial_component_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',trial_component_arn='arn:aws:sagemaker:us-east-1:531299576666:experiment-trial-component/pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',display_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',source=TrialComponentSource(source_arn='arn:aws:sagemaker:us-east-1:531299576666:processing-job/pipelines-yv85e13eu5mg-processing-4ooeenxof3',source_type='SageMakerProcessingJob'),status=TrialComponentStatus(primary_status='Completed',message='Status: Completed, exit message: null, failure reason: null'),start_time=datetime.datetime(2022, 12, 2, 2, 16, 50, tzinfo=tzlocal()),end_time=datetime.datetime(2022, 12, 2, 2, 22, 6, tzinfo=tzlocal()),creation_time=datetime.datetime(2022, 12, 2, 2, 12, 33, 153000, tzinfo=tzlocal()),created_by={},last_modified_time=datetime.datetime(2022, 12, 2,

In [73]:
time.sleep(5)  # avoid throttling exception

processing_job_tracker.log_parameters(
    {
        "feature_store_offline_prefix": str(feature_store_offline_prefix),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

TrialComponent(sagemaker_boto_client=<botocore.client.SageMaker object at 0x7f935f322750>,trial_component_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',trial_component_arn='arn:aws:sagemaker:us-east-1:531299576666:experiment-trial-component/pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',display_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',source=TrialComponentSource(source_arn='arn:aws:sagemaker:us-east-1:531299576666:processing-job/pipelines-yv85e13eu5mg-processing-4ooeenxof3',source_type='SageMakerProcessingJob'),status=TrialComponentStatus(primary_status='Completed',message='Status: Completed, exit message: null, failure reason: null'),start_time=datetime.datetime(2022, 12, 2, 2, 16, 50, tzinfo=tzlocal()),end_time=datetime.datetime(2022, 12, 2, 2, 22, 6, tzinfo=tzlocal()),creation_time=datetime.datetime(2022, 12, 2, 2, 12, 33, 153000, tzinfo=tzlocal()),created_by={},last_modified_time=datetime.datetime(2022, 12, 2,

In [74]:
time.sleep(5)  # avoid throttling exception

processing_job_tracker.log_parameters(
    {
        "feature_group_name": str(feature_group_name),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

TrialComponent(sagemaker_boto_client=<botocore.client.SageMaker object at 0x7f935f322750>,trial_component_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',trial_component_arn='arn:aws:sagemaker:us-east-1:531299576666:experiment-trial-component/pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',display_name='pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job',source=TrialComponentSource(source_arn='arn:aws:sagemaker:us-east-1:531299576666:processing-job/pipelines-yv85e13eu5mg-processing-4ooeenxof3',source_type='SageMakerProcessingJob'),status=TrialComponentStatus(primary_status='Completed',message='Status: Completed, exit message: null, failure reason: null'),start_time=datetime.datetime(2022, 12, 2, 2, 16, 50, tzinfo=tzlocal()),end_time=datetime.datetime(2022, 12, 2, 2, 22, 6, tzinfo=tzlocal()),creation_time=datetime.datetime(2022, 12, 2, 2, 12, 33, 153000, tzinfo=tzlocal()),created_by={},last_modified_time=datetime.datetime(2022, 12, 2,

# Analyze Experiment

In [75]:
from sagemaker.analytics import ExperimentAnalytics

time.sleep(30)  # avoid throttling exception

import pandas as pd

pd.set_option("max_colwidth", 500)

experiment_analytics = ExperimentAnalytics(
    experiment_name=pipeline_experiment_name,
)

experiment_analytics_df = experiment_analytics.dataframe()
experiment_analytics_df

Unnamed: 0,TrialComponentName,DisplayName,SourceArn,AWS_DEFAULT_REGION,SageMaker.InstanceCount,SageMaker.InstanceType,SageMaker.VolumeSizeInGB,balance_dataset,feature_group_name,feature_store_offline_prefix,...,train:loss - Last,train:loss - Count,test - MediaType,test - Value,train - MediaType,train - Value,validation - MediaType,validation - Value,SageMaker.DebugHookOutput - MediaType,SageMaker.DebugHookOutput - Value
0,pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job,pipelines-yv85e13eu5mg-processing-4ooeenxof3-aws-processing-job,arn:aws:sagemaker:us-east-1:531299576666:processing-job/pipelines-yv85e13eu5mg-processing-4ooeenxof3,us-east-1,1.0,ml.t3.xlarge,30.0,True,climatetweet-feature-group-1669944412,climate-feature-store-1669944412,...,,,,,,,,,,
1,pipelines-yv85e13eu5mg-Train-pyyd9oWO7m-aws-training-job,pipelines-yv85e13eu5mg-Train-pyyd9oWO7m-aws-training-job,arn:aws:sagemaker:us-east-1:531299576666:training-job/pipelines-yv85e13eu5mg-Train-pyyd9oWO7m,,1.0,ml.m5.4xlarge,1024.0,,,,...,1.3745,7.0,text/csv,s3://sagemaker-us-east-1-531299576666/sagemaker-scikit-learn-2022-12-02-02-01-06-405/output/bert-test,text/csv,s3://sagemaker-us-east-1-531299576666/sagemaker-scikit-learn-2022-12-02-02-01-06-405/output/bert-train,text/csv,s3://sagemaker-us-east-1-531299576666/sagemaker-scikit-learn-2022-12-02-02-01-06-405/output/bert-validation,,s3://sagemaker-us-east-1-531299576666


# Analyze SageMaker Debugger Results 

In [76]:
restored_estimator = sagemaker.estimator.Estimator.attach(training_job_name)


2022-12-02 02:39:37 Starting - Preparing the instances for training
2022-12-02 02:39:37 Downloading - Downloading input data
2022-12-02 02:39:37 Training - Training image download completed. Training in progress.
2022-12-02 02:39:37 Uploading - Uploading generated training model
2022-12-02 02:39:37 Completed - Training job completed


In [77]:
from IPython.core.display import display, HTML

display(
    HTML(
        '<b>Review <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}?prefix={}/">S3 Debugger Output Data</a></b>'.format(
            bucket, restored_estimator.base_job_name
        )
    )
)

# Download SageMaker Debugger Profiling Report

In [78]:
profiler_report_s3_uri = "s3://{}/{}/rule-output/ProfilerReport/profiler-output".format(
    bucket, restored_estimator.base_job_name
)

In [79]:
!aws s3 ls $profiler_report_s3_uri/

                           PRE profiler-reports/
2022-12-02 02:39:18     344640 profiler-report.html
2022-12-02 02:39:18     188855 profiler-report.ipynb


In [80]:
!aws s3 cp --recursive $profiler_report_s3_uri ./profiler_report/

download: s3://sagemaker-us-east-1-531299576666/pipelines-yv85e13eu5mg-Train-pyyd9oWO7m/rule-output/ProfilerReport/profiler-output/profiler-report.ipynb to profiler_report/profiler-report.ipynb
download: s3://sagemaker-us-east-1-531299576666/pipelines-yv85e13eu5mg-Train-pyyd9oWO7m/rule-output/ProfilerReport/profiler-output/profiler-report.html to profiler_report/profiler-report.html
download: s3://sagemaker-us-east-1-531299576666/pipelines-yv85e13eu5mg-Train-pyyd9oWO7m/rule-output/ProfilerReport/profiler-output/profiler-reports/BatchSize.json to profiler_report/profiler-reports/BatchSize.json
download: s3://sagemaker-us-east-1-531299576666/pipelines-yv85e13eu5mg-Train-pyyd9oWO7m/rule-output/ProfilerReport/profiler-output/profiler-reports/CPUBottleneck.json to profiler_report/profiler-reports/CPUBottleneck.json
download: s3://sagemaker-us-east-1-531299576666/pipelines-yv85e13eu5mg-Train-pyyd9oWO7m/rule-output/ProfilerReport/profiler-output/profiler-reports/OverallFrameworkMetrics.json t

In [81]:
from IPython.core.display import display, HTML

display(HTML('<b>Review <a target="blank" href="./profiler_report/profiler-report.html">Profiler Report</a></b>'))

# Review the Profiling Report in SM Studio
![SageMaker Studio Extensions](img/studio_pipeline_training_debugger_assigned.png)

# Review the Pipeline in SM Studio
![SageMaker Studio Extensions](img/sm_studio_extensions.png)

# Release Resources

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>

<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

In [None]:
%%javascript

try {
    Jupyter.notebook.save_checkpoint();
    Jupyter.notebook.session.delete();
}
catch(err) {
    // NoOp
}