# Sagemaker Pipeline to process data, train, evaluate and conditioning Batch Transform job

## End-to-End pipeline in Sagemaker for a Keras Tensorflow Text Classifier

In the last module of this workshop we will create a repeatable production workflow which is typically run outside notebooks. To demonstrate automating the workflow, we'll use [Amazon SageMaker Pipelines](https://aws.amazon.com/sagemaker/pipelines) for workflow orchestration. Purpose-built for machine learning (ML), SageMaker Pipelines helps you automate different steps of the ML workflow including data processing, model training, and batch prediction (scoring), and apply conditions such as approvals for model quality. It also includes a model registry and model lineage tracker.

## Workflow Automation with SageMaker Pipelines <a class="anchor" id="WorkflowAutomation">

In another notebook, we prototyped various steps of a TensorFlow project within the notebook itself, with the steps being run locally. Notebooks are great for prototyping, but generally are  not used in production-ready machine learning pipelines.  

A very simple pipeline in SageMaker includes processing the dataset to get it ready for training, performing the actual training, and then using the model to perform some form of inference such as batch predition (scoring). We'll use SageMaker Pipelines to automate these steps, keeping the pipeline simple for now: it easily can be extended into a far more complex pipeline.

## Install and load the libraries

In [2]:
!pip install --upgrade sagemaker=='2.91.1'

Collecting sagemaker==2.91.1
  Using cached sagemaker-2.91.1-py2.py3-none-any.whl
Collecting attrs==20.3.0
  Using cached attrs-20.3.0-py2.py3-none-any.whl (49 kB)
Installing collected packages: attrs, sagemaker
  Attempting uninstall: attrs
    Found existing installation: attrs 21.2.0
    Uninstalling attrs-21.2.0:
      Successfully uninstalled attrs-21.2.0
  Attempting uninstall: sagemaker
    Found existing installation: sagemaker 2.70.0
    Uninstalling sagemaker-2.70.0:
      Successfully uninstalled sagemaker-2.70.0
Successfully installed attrs-20.3.0 sagemaker-2.91.1
You should consider upgrading via the '/usr/local/bin/python3.8 -m pip install --upgrade pip' command.[0m


### Setting global variables and Sagemaker session parameters

In [3]:
import os
import time
import boto3
import sagemaker

sess = sagemaker.session.Session()
bucket = sess.default_bucket() 
role = sagemaker.get_execution_role()
region = boto3.Session().region_name

# Set the dataset folder
raw_s3 = f"s3://{bucket}/keras-text-classification/data/raw"    # sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)
# Set the dataset for batch predictions
batch_s3 = f"s3://{bucket}/keras-text-classification/data/batch"

In [4]:
print('Bucket:',bucket)
print('Region:',region)
print('Folder:',raw_s3)
print('Batch Folder:',batch_s3)
print('Role:', role)

Bucket: sagemaker-us-east-1-223817798831
Region: us-east-1
Folder: s3://sagemaker-us-east-1-223817798831/keras-text-classification/data/raw
Batch Folder: s3://sagemaker-us-east-1-223817798831/keras-text-classification/data/batch
Role: arn:aws:iam::223817798831:role/service-role/AmazonSageMaker-ExecutionRole-20200708T194212


Upload a dataset for batch transformation after model creation. You can select your own dataset for the batch_data_uri as is appropriate.

In [5]:
####### In this notebook we are not using this dataset  ###############################
local_path = "data/batch"
# Upload the dataset for batch transformation
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=batch_s3,
)
print(batch_data_uri)

s3://sagemaker-us-east-1-223817798831/keras-text-classification/data/batch


### Pipeline parameters <a class="anchor" id="PipelineParameters">

Before we begin to create the pipeline itself, we should think about how to parameterize it.  For example, we may use different instance types for different purposes, such as CPU-based types for data processing and GPU-based or more powerful types for model training.  These are all "knobs" of the pipeline that we can parameterize.  Parameterizing enables custom pipeline executions and schedules without having to modify the pipeline definition.

In [6]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

# raw input data
#input_data = ParameterString(name="InputData", default_value=raw_s3)

# processing step parameters
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.t3.xlarge") #"ml.m5.xlarge" o c5.xlarge
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)

# training step parameters
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.c5.xlarge")
training_instance_count = ParameterInteger(name="TrainingInstanceCount", default_value=1)

# batch inference step parameters
batch_instance_type = ParameterString(name="BatchInstanceType", default_value="ml.c5.xlarge")
batch_instance_count = ParameterInteger(name="BatchInstanceCount", default_value=1)
# Inference step parameters
endpoint_instance_type = ParameterString(name="EndpointInstanceType", default_value="ml.m5.large")

# Model approval parameter
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="Approved"
)
# Accuracy Threshold
acc_threshold = ParameterFloat(name="AccThreshold", default_value=0.8)
# dataset for batch transformation
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)

# training step parameters
training_epochs = ParameterString(name="TrainingEpochs", default_value="5")
max_vocab = ParameterString(name="MaxVocab", default_value="10000")
max_length = ParameterString(name="MaxLength", default_value="250")

# Project Name
project_name = 'keras-text-classification'
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

### Caching Configuration

This notebook demonstrates how to take advantage of pipeline step caching. With step caching, SageMaker tracks the arguments used for each step execution and re-uses previous, successful executions when the call signatures match. SageMaker only tracks arguments important for the output of the step, so pipeline steps are optimized for cache hits and unnecessary step executions are avoided.

Executing the step without changing its configurations, inputs, or outputs can be a waste. Thus, we can enable caching for pipeline steps. When caching is enabled, an expiration time (in ISO8601 duration string format) needs to be supplied. The expiration time indicates how old a previous execution can be to be considered for reuse.

In [7]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(
    enable_caching=True,
    expire_after="P7d" # 7-day
)

### Processing Step <a class="anchor" id="ProcessingStep">

The first step in the pipeline will preprocess the data to prepare it for training. We create a `SKLearnProcessor` object, parameterized so we can separately track and change the job configuration as needed, for example to increase the instance type size and count to accommodate a growing dataset.
    
Previously we have created a preprocessing scriptfile, which contains the preprocessing tasks:
    - Download the datafiles from a URL
    - Create the train, validation and test datasets

Then we create an instance of a SKLearnProcessor processor and use that in our ProcessingStep. You also specify the framework_version to use throughout this notebook.
Note the processing_instance_type and processing_instance_count parameter used by the processor instance.
    

In [8]:
import boto3
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor

framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="ktc-workflow-process",
    sagemaker_session=sess,
    role=role,
    max_runtime_in_seconds=1800
)

Setting tha parameters of the data processing stage:

In [9]:
url= "https://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz"
datapath = "aclImdb_v1"

In [10]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="KTCProcess",
    processor=sklearn_processor,
    inputs=None, #[ProcessingInput(source=input_data, destination="/opt/ml/processing/input", s3_data_distribution_type='ShardedByS3Key'),],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", s3_upload_mode='EndOfJob'),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation",s3_upload_mode='EndOfJob'),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", s3_upload_mode='EndOfJob'),
    ],
    code="scripts/preprocess.py",
    job_arguments=['--url', url,                   
                   '--datapath', datapath,
                   #'--feature-group-name', str(feature_group_name.default_value)
                  ],
    cache_config=cache_config

)

## Define a Training Step to Train a Keras model
### Model building and Training Step <a class="anchor" id="TrainingModelCreation">

The following code sets up a pipeline step for a training job. We start by specifying which SageMaker prebuilt TensorFlow 2 training container to use for the job.

In [11]:
from sagemaker.tensorflow import TensorFlow
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.step_collections import RegisterModel

tensorflow_version = '2.6.3'
python_version = 'py38'

image_uri_train = sagemaker.image_uris.retrieve(
                                        framework="tensorflow",
                                        region=region,
                                        version=tensorflow_version,
                                        py_version=python_version,
                                        instance_type=training_instance_type,
                                        image_scope="training"
                                       )

Next, we specify an `Estimator` object, and define a `TrainingStep` to insert the training job in the pipeline with inputs from the previous SageMaker Processing step. We should use it to find the best model.

Configure an Estimator for a Tensorflow model and the output path. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to model_dir so that it can be hosted later in the output_path defined.

Note the training_instance_type and training_instance_count parameters are used to train in the pipeline.

In [12]:
# Set the path to the trained model
model_path = f"s3://{bucket}/keras-text-classification/model"
training_parameters = {'epochs': training_epochs, 'max-vocab': max_vocab, 'max-length': max_length}

estimator = TensorFlow(
    image_uri=image_uri_train,
    source_dir='scripts',
    entry_point='train.py',
    instance_type=training_instance_type,
    instance_count=training_instance_count,
    role=role,
    base_job_name="keras-classifier-train",
    output_path=model_path,
    hyperparameters=training_parameters,
    disable_profiler= True,
)

In [13]:
step_train = TrainingStep(
    name="KTCWorkflowTrain",
    estimator=estimator,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri
        ),
        
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri
        )
    },
    cache_config=cache_config,
)

## Define a Model Evaluation Step to Evaluate the Trained Model

We have developed an evaluation script that is specified in a Processing step that performs the model evaluation. After pipeline execution, you can examine the resulting evaluation.json for analysis.

The evaluation script uses the trained model to do the following:

- Load the model.
- Read the test dataset
- Launch predictions against the test data.
- Calculate the accuracy on the test data.
- Save the result to the evaluation directory as a json file.

As another step, we create a SageMaker `Model` object to wrap the model artifact, and associate it with a separate SageMaker prebuilt TensorFlow Serving inference container to potentially use later.

Next, create an instance of a `SKLearnProcessor` and use it in the ProcessingStep.

Use the processor's arguments to construct a ProcessingStep, along with the input and output channels and the code that will be executed when the pipeline invokes pipeline execution. Specifically, the S3ModelArtifacts from the step_train properties and the S3Uri of the "test_data" output channel of the step_process properties are passed as inputs. The TrainingStep and ProcessingStep properties attribute matches the object model of the DescribeTrainingJob and DescribeProcessingJob response objects, respectively.

In [14]:
from sagemaker.workflow.properties import PropertyFile

# Create the processor
evaluation_scorer = SKLearnProcessor(
                    framework_version=framework_version,
                    instance_type=processing_instance_type,
                    instance_count=processing_instance_count,
                    base_job_name="KTC-workflow-evaluation",
                    sagemaker_session=sess,
                    role=role )

# Create a PropertyFile
# A PropertyFile is used to be able to reference outputs from a processing step, for instance to use in a condition step.
# For more information, visit https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

# Create the Evaluation Step
step_evaluation = ProcessingStep(
                    name="KTCWorkflowEvaluation",
                    processor=evaluation_scorer,
                    inputs=[
                        ProcessingInput(
                            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                            destination="/opt/ml/processing/model"
                        ),
                        ProcessingInput(
                            source=step_process.properties.ProcessingOutputConfig.Outputs[
                                "test"
                            ].S3Output.S3Uri,
                            destination="/opt/ml/processing/test"
                        )
                    ],
                    outputs=[
                        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
                    ],
                    code="./scripts/evaluation.py",
                    property_files=[evaluation_report],
                    cache_config=cache_config,
)

## Define a Register Model Step 

A model package is an abstraction of reusable model artifacts that packages all ingredients required for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

A model package group is a collection of model packages. A model package group can be created for a specific ML business problem, and new versions of the model packages can be added to it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker pipeline so that model package versions can be added to the group for every SageMaker Pipeline run.

To register a model in the Model Registry, we take the model created in the previous steps

In [15]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel

model_package_group_name = f"KTCModelPackageGroupName"

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_evaluation.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

step_register = RegisterModel(
    name="KTCRegisterModel",
    estimator=estimator,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.c5.xlarge"], #"ml.t2.xlarge"
    transform_instances=["ml.c5.xlarge"],
    model_package_group_name=model_package_group_name,
    #approval_status=model_approval_status,
    model_metrics=model_metrics,
)
#step_register = ModelStep(name="KTCRegisterModel", step_args=register_args)


The class RegisterModel has been renamed in sagemaker>=2.
See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


## Define a Create Model Step to Create a Model

In order to perform predictions using the model, we create a SageMaker model. Specifically, pass in the S3ModelArtifacts from the TrainingStep, step_train properties. The TrainingStep properties attribute matches the object model of the DescribeTrainingJob response object.

As another step, we create a SageMaker `Model` object to wrap the model artifact, and associate it with a separate SageMaker prebuilt TensorFlow Serving inference container to potentially use later.

In [16]:
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep
#from sagemaker.workflow.model_step import ModelStep

# Get the right image URI for a tensorflow model
image_uri_inference = sagemaker.image_uris.retrieve(
                                        framework="tensorflow",
                                        region=region,
                                        version=tensorflow_version,
                                        py_version=python_version,
                                        instance_type=batch_instance_type,
                                        image_scope="inference"
                                       )
# Create the model
model = Model(
    image_uri=image_uri_inference,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sess,
    role=role,
)

inputs_model = CreateModelInput(
    instance_type=batch_instance_type
)

step_create_model = CreateModelStep(
    name="KTCWorkflowCreateModel",
    model=model,
    inputs=inputs_model,
)


## TO BE REMOVED IN FUTURE REALESES
## Define a Transform Step to Perform Batch Transformation

Now that a model instance is defined, create a Transformer instance with the appropriate model type, compute instance type, and desired output S3 URI.

Specifically, pass in the ModelName from the CreateModelStep, step_create_model properties. The CreateModelStep properties attribute matches the object model of the DescribeModel response object.

Pass in the transformer instance and the TransformInput with the batch_data pipeline parameter defined earlier.


In [17]:
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type=batch_instance_type,
    instance_count=batch_instance_count,
    output_path=f"s3://{bucket}/KTCWorkflowTransform",
)

step_transform = TransformStep(
    name="KTCWorkflowTransform", transformer=transformer, inputs=TransformInput(data=batch_data),
    #cache_config=cache_config,
)

## Deploy model to SageMaker Endpoint Lambda Step

When defining the LambdaStep, the SageMaker Lambda helper class provides helper functions for creating the Lambda function. Users can either use the lambda_func argument to provide the function ARN to an already deployed Lambda function OR use the Lambda class to create a Lambda function by providing a script, function name and role for the Lambda function.

When passing inputs to the Lambda, the inputs argument can be used and within the Lambda function's handler, the event argument can be used to retrieve the inputs.

The dictionary response from the Lambda function is parsed through the LambdaOutput objects provided to the outputs argument. The output_name in LambdaOutput corresponds to the dictionary key in the Lambda's return dictionary.

**IAM Role**

The Lambda function needs an IAM role that will allow it to deploy a SageMaker Endpoint. The role ARN must be provided in the LambdaStep.

A helper function in iam_helper.py is available to create the Lambda function role. Please note that the role uses the Amazon managed policy - AmazonSageMakerFullAccess. This should be replaced with an IAM policy with the least privileges as per AWS IAM best practices.

In [17]:
from iam_helper import create_sagemaker_lambda_role
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

lambda_role = create_sagemaker_lambda_role("deploy-model-lambda-role")

endpoint_config_name = project_name+ "-endpoint-config"
endpoint_name = project_name+ "-endpoint-" + current_time

deploy_model_lambda_function_name = "sagemaker-deploy-model-lambda-" + current_time

deploy_model_lambda_function = Lambda(
    function_name=deploy_model_lambda_function_name,
    execution_role_arn=lambda_role,
    script="./scripts/deploy_model_lambda.py",
    handler="deploy_model_lambda.lambda_handler",
)

step_deploy_model_lambda = LambdaStep(
    name="KTCWorkflowModelEndpoint",
    lambda_func=deploy_model_lambda_function,
    inputs={
        "model_name": step_create_model.properties.ModelName,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "endpoint_instance_type": endpoint_instance_type,
    },
)

Waiting 30 seconds for the IAM role to propagate


## Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed

Define a FailStep with customized error message, which indicates the cause of the execution failure.
Enter the FailStep error message with a Join function, which appends a static text string with the dynamic mse_threshold parameter to build a more informative error message.

In [18]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="KTCWorkflowACCFail",
    error_message=Join(on=" ", values=["Execution failed due to ACC <", acc_threshold]),
)

## Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation Or Terminate the Execution in Failed State

In this step, the model is registered only if the accuracy of the model, as determined by the evaluation step step_eval, exceeded a specified value. Otherwise, the pipeline execution fails and terminates. A ConditionStep enables pipelines to support conditional execution in the pipeline DAG based on the conditions of the step properties.

In the following section, you:

Define a ConditionGreaterThan on the accuracy value found in the output of the evaluation step, step_eval.
Use the condition in the list of conditions in a ConditionStep.
Pass the CreateModelStep and TransformStep steps, and the RegisterModel step collection into the if_steps of the ConditionStep, which are only executed if the condition evaluates to True.
Pass the FailStep step into the else_stepsof the ConditionStep, which is only executed if the condition evaluates to False.

In [19]:
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_evaluation.name,
        property_file=evaluation_report,
        json_path="classification_metrics.accuracy.value",
    ),
    right=acc_threshold,
)

step_cond = ConditionStep(
    name="KTCWorkflowACCCond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_deploy_model_lambda], #, step_transform
    else_steps=[step_fail],
)

## Define a Pipeline of Parameters, Steps, and Conditions
In this section, combine the steps into a Pipeline so it can be executed.

A pipeline requires a name, parameters, and steps. Names must be unique within an (account, region) pair.

Note:

All the parameters used in the definitions must be present.
Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the data dependency DAG as steps for the execution to complete.
Steps must be unique to across the pipeline step list and all condition step if/else lists.


With all of the pipeline steps now defined, we can define the pipeline itself as a `Pipeline` object comprising a series of those steps.  Parallel and steps also are possible.

In [20]:
from sagemaker.workflow.pipeline import Pipeline

pipeline_name = f"KTCWorkflow"

pipeline = Pipeline(
    name=pipeline_name,
    parameters=[#input_data,
                processing_instance_type, 
                processing_instance_count, 
                training_instance_type, 
                training_instance_count,
                batch_instance_type,
                batch_instance_count,
                endpoint_instance_type,
                batch_data,
                #model_approval_status,
                training_epochs,
                max_vocab,
                max_length,
                acc_threshold],
    steps=[step_process, 
           step_train, 
           step_evaluation,
           step_cond
          ],
    sagemaker_session=sess
)

We can inspect the pipeline definition in JSON format:

In [21]:
import json

definition = json.loads(pipeline.definition())
definition

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.t3.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'BatchInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.xlarge'},
  {'Name': 'BatchInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'EndpointInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-223817798831/keras-text-classification/data/batch'},
  {'Name': 'TrainingEpochs', 'Type': 'String', 'DefaultValue': '5'},
  {'Name': 'MaxVocab', 'Type': 'String', 'DefaultValue': '10000'},
  {'Name': 'MaxLength', 'Type': 'String', 'DefaultValue': '250'},
  {'Name

After upserting its definition, we can start the pipeline with the `Pipeline` object's `start` method:

In [22]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


We can now confirm that the pipeline is executing.  In the log output below, confirm that `PipelineExecutionStatus` is `Executing`.

In [23]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:223817798831:pipeline/ktcworkflow',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:223817798831:pipeline/ktcworkflow/execution/kel3avcgfvih',
 'PipelineExecutionDisplayName': 'execution-1667992835096',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'ktcworkflow',
  'TrialName': 'kel3avcgfvih'},
 'CreationTime': datetime.datetime(2022, 11, 9, 11, 20, 35, 9000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 11, 9, 11, 20, 35, 9000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:223817798831:user-profile/d-apj1jigbprcn/sagemakeruser',
  'UserProfileName': 'sagemakeruser',
  'DomainId': 'd-apj1jigbprcn'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:223817798831:user-profile/d-apj1jigbprcn/sagemakeruser',
  'UserProfileName': 'sagemakeruser',
  'DomainId': 'd-apj1jigbprcn'},
 'ResponseMetadata': {'RequestId': '1f3fa859-708f-4

#### Review the Pipeline

After the pipeline started executing, you can view the pipeline run. 

To view them, choose the SageMakers Components and registries button.
On the Components and registires drop down, select Pipelines.


Click the `KTCWorkflow` pipeline, and then double click on the execution.


Now you can see the pipeline executing. Click on `KTCProcess` step to see additional details.


On this specific step, you'll be able to see the output, logs and information.


Typically this pipeline should take about 15-18 minutes to complete.  We can wait for completion by invoking `wait()`. After execution is complete, we can list the status of the pipeline steps.

In [21]:
execution.wait()

[{'StepName': 'TF2WorkflowCreateModel',
  'StartTime': datetime.datetime(2022, 8, 26, 16, 15, 59, 673000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 8, 26, 16, 16, 0, 997000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:223817798831:model/pipelines-eu668p7gi5we-tf2workflowcreatemod-sku5clkcl9'}}},
 {'StepName': 'TF2WorkflowBatchScoring',
  'StartTime': datetime.datetime(2022, 8, 26, 16, 15, 59, 673000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 8, 26, 16, 20, 53, 514000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:223817798831:processing-job/pipelines-eu668p7gi5we-tf2workflowbatchscor-r6oyk1l75i'}}},
 {'StepName': 'TF2WorkflowTrain',
  'StartTime': datetime.datetime(2022, 8, 26, 16, 12, 27, 405000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 8, 26, 16, 15, 58, 487000, tzin

In [24]:
execution.list_steps()

[{'StepName': 'KTCWorkflowModelEndpoint',
  'StartTime': datetime.datetime(2022, 11, 9, 11, 35, 3, 644000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 11, 9, 11, 35, 5, 719000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'Lambda': {'Arn': 'arn:aws:lambda:us-east-1:223817798831:function:sagemaker-deploy-model-lambda-11-09-11-06-45',
    'OutputParameters': [{'Name': 'body',
      'Value': '"Endpoint Created Successfully"'},
     {'Name': 'statusCode', 'Value': '200.0'}]}}},
 {'StepName': 'KTCRegisterModel',
  'StartTime': datetime.datetime(2022, 11, 9, 11, 35, 1, 944000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 11, 9, 11, 35, 2, 896000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:223817798831:model-package/ktcmodelpackagegroupname/8'}}},
 {'StepName': 'KTCWorkflowCreateModel',
  'StartTime': datetime.datetime(2022, 11, 9, 11, 35

### Check the score report

After the evaluation step in the pipeline is complete, the test scoring report is uploaded to S3.  For simplicity, this report simply states the test Accuracy, but in general reports can include as much detail as desired.  This reports can be use in conditional approval steps in SageMaker Pipelines.  

In [26]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_evaluation.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

{'classification_metrics': {'accuracy': {'value': 0.872160017490387}}}


## ML Lineage Tracking <a class="anchor" id="LineageOfPipelineArtifacts">

SageMaker ML Lineage Tracking creates and stores information about the steps of a ML workflow from data preparation to model deployment. With the tracking information you can reproduce the workflow steps, track model and dataset lineage, and establish model governance and audit standards.

Let's now check out the lineage of the model generated by the pipeline above.  The lineage table identifies the resources used in training, including the timestamped train and test data sources, and the specific version of the TensorFlow 2 container in use during the training job.  

In [27]:
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(execution.list_steps()):
    if execution_step['StepName'] == 'KTCWorkflowTrain':
        display(viz.show(pipeline_execution_step=execution_step))

Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...96c71c9090994788a5036ca51347/output/test,Input,DataSet,ContributedTo,artifact
1,s3://...9090994788a5036ca51347/output/validation,Input,DataSet,ContributedTo,artifact
2,s3://...6c71c9090994788a5036ca51347/output/train,Input,DataSet,ContributedTo,artifact
3,76310...s.com/tensorflow-training:2.6.3-cpu-py38,Input,Image,ContributedTo,artifact
4,s3://...flowTrain-dx8nGDrjOZ/output/model.tar.gz,Output,Model,Produced,artifact


## Clean up (optional)

**Stop / Close the Endpoint**

You should delete the endpoint before you close the notebook if you don't need to keep the endpoint running for serving real-time predictions.

In [29]:
print('Endpoint: ',endpoint_name)

Endpoint:  keras-text-classification-endpoint-11-09-11-06-45


In [30]:
import boto3

client = boto3.client("sagemaker")
client.delete_endpoint(EndpointName=endpoint_name)

{'ResponseMetadata': {'RequestId': '61fa3769-544b-416f-9542-453078ae423b',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '61fa3769-544b-416f-9542-453078ae423b',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Wed, 09 Nov 2022 11:40:48 GMT'},
  'RetryAttempts': 0}}

Delete the model registry and the pipeline to keep the studio environment tidy.

In [31]:
def delete_model_package_group(sm_client, package_group_name):
    try:
        model_versions = sm_client.list_model_packages(ModelPackageGroupName=package_group_name)

    except Exception as e:
        print("{} \n".format(e))
        return

    for model_version in model_versions["ModelPackageSummaryList"]:
        try:
            sm_client.delete_model_package(ModelPackageName=model_version["ModelPackageArn"])
        except Exception as e:
            print("{} \n".format(e))
        time.sleep(0.5)  # Ensure requests aren't throttled

    try:
        sm_client.delete_model_package_group(ModelPackageGroupName=package_group_name)
        print("{} model package group deleted".format(package_group_name))
    except Exception as e:
        print("{} \n".format(e))
    return


def delete_sagemaker_pipeline(sm_client, pipeline_name):
    try:
        sm_client.delete_pipeline(
            PipelineName=pipeline_name,
        )
        print("{} pipeline deleted".format(pipeline_name))
    except Exception as e:
        print("{} \n".format(e))
        return

In [32]:
delete_model_package_group(client, model_package_group_name)
delete_sagemaker_pipeline(client, pipeline_name)

KTCModelPackageGroupName model package group deleted
KTCWorkflow pipeline deleted


Delete the Lambda function

In [33]:
deploy_model_lambda_function.delete()

{'ResponseMetadata': {'RequestId': '58135353-588a-41b9-987d-c3d52fd2708f',
  'HTTPStatusCode': 204,
  'HTTPHeaders': {'date': 'Wed, 09 Nov 2022 11:44:21 GMT',
   'content-type': 'application/json',
   'connection': 'keep-alive',
   'x-amzn-requestid': '58135353-588a-41b9-987d-c3d52fd2708f'},
  'RetryAttempts': 0}}