In [None]:
import os
import sys
import logging

import boto3
import numpy as np
import pandas as pd
import sagemaker
from sagemaker.pytorch import PyTorch
from botocore.exceptions import ClientError

# A SageMaker Workflow

The pipeline that we create follows a typical Machine Learning Application pattern of pre-processing, training, evaluation, and model registration:

![A typical ML Application pipeline](img/pipeline-full.png)

### Create SageMaker Clients and Session

First, we create a new SageMaker Session in the current AWS region. We also acquire the role arn for the session.

This role arn should be the execution role arn that you set up in the Prerequisites section of this notebook.

In [None]:
session = sagemaker.Session()
bucket = session.default_bucket()
role = sagemaker.get_execution_role()
region = "eu-central-1"
sm = boto3.Session().client(service_name="sagemaker", region_name=region)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {bucket}")
print(f"sagemaker session region: {region}")

## BYO Docker

In [None]:
import boto3

account_id = boto3.client("sts").get_caller_identity().get("Account")
ecr_repository = "sagemaker-pytorch-processing-container"
tag = ":latest"

uri_suffix = "amazonaws.com"
processing_repository_uri = f"{account_id}.dkr.ecr.{region}.{uri_suffix}/{ecr_repository + tag}"

### Define Parameters to Parametrize Pipeline Execution

Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

* `ParameterString` - represents a `str` Python type
* `ParameterInteger` - represents an `int` Python type
* `ParameterFloat` - represents a `float` Python type

These parameters support providing a default value, which can be overridden on pipeline execution. The default value specified should be an instance of the type of the parameter.

The parameters defined in this workflow include:

* `processing_instance_type` - The `ml.*` instance type of the processing job.
* `processing_instance_count` - The instance count of the processing job.
* `training_instance_type` - The `ml.*` instance type of the training job.
* `model_approval_status` - What approval status to register the trained model with for CI/CD purposes ( "PendingManualApproval" is the default).
* `input_data` - The S3 bucket URI location of the input data
* `batch_data` - The S3 bucket URI location of the batch data

# Pipeline Parameters

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

In [None]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")

# Processing Step Parameters

![Define a Processing Step for Feature Engineering](img/pipeline-2.png)

In [None]:
raw_input_uri = f"s3://{bucket}/imdb/processing/raw/small/raw.csv"
batch_input_uri = f"s3://{bucket}/imdb/batch/input/small/test.csv"

In [None]:
!aws s3 ls $raw_input_uri

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)


processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount", default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.t3.medium"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType", default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=raw_input_uri,
)

### Define a Processing Step for Feature Engineering

In [None]:
from sagemaker.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

script_processor = ScriptProcessor(
    command=["python3"],
    image_uri=processing_repository_uri,
    role=role,
    instance_count=processing_instance_count,
    instance_type=processing_instance_type,
    base_job_name="imdb-preprocess",
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="ImdbProcess",
    processor=script_processor,
    inputs=[ProcessingInput(source=input_data, destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="source/preprocessing.py",
    job_arguments=["--train-test-split-ratio", "0.2", "--model_name", "distilbert-base-uncased"],
    cache_config=cache_config,
)

print(step_process)

# Setup Training Hyper-Parameters

![Define a Training Step to Train a Model](img/pipeline-3.png)

# Setup Metrics To Track Model Performance

In [None]:
metric_definitions=[
    {'Name': 'traning:loss', 'Regex': "'loss': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'validation:loss', 'Regex': "'eval_loss': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'validation:accuracy', 'Regex': "'eval_accuracy': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'validation:f1', 'Regex': "'eval_f1': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'validation:precision', 'Regex': "'eval_precision': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'validation:recall', 'Regex': "'eval_recall': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'validation:runtime', 'Regex': "'eval_runtime': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'validation:samples_per_second', 'Regex': "'eval_samples_per_second': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'learning_rate', 'Regex': "'learning_rate': ([0-9]+(.|e\-)[0-9]+),?"},
    {'Name': 'epoch', 'Regex': "'epoch': ([0-9]+(.|e\-)[0-9]+),?"}]

# Define a Training Step to Train a Model

We configure an Estimator and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to `model_dir` so that it can be hosted later.

We also specify the model path where the models from training will be saved.

Note the `train_instance_type` parameter passed may be also used and passed into other places in the pipeline. In this case, the `train_instance_type` is passed into the estimator.

In [None]:
from sagemaker.huggingface import HuggingFace
from sagemaker.pytorch import PyTorch

model_path_tf = f"s3://{bucket}/imdb/model_train/tf"
model_path_svm = f"s3://{bucket}/imdb/model_train/svm"

hyperparameters = {
    "epochs": 1,
    "train_batch_size": 32,
    "model_name": "distilbert-base-uncased"
,
}

# create the Estimator
estimator_tf = PyTorch(
    entry_point="train_tf.py",
    source_dir="source",
    instance_type=training_instance_type,
    instance_count=1,
    role=role,
    framework_version="1.6",
    py_version="py36",
    hyperparameters=hyperparameters,
    output_path=model_path_tf,
    metric_definitions=metric_definitions,
)

estimator_svm = PyTorch(
    entry_point="train_svm.py",
    source_dir="source",
    role=role,
    framework_version="1.6",
    py_version="py3",
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path_svm,
)

### Setup Pipeline Step Caching
Cache pipeline steps for a duration of time using [ISO 8601](https://en.wikipedia.org/wiki/ISO_8601#Durations) format.  

More details on SageMaker Pipeline step caching here:  https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train_tf = TrainingStep(
    name="ImdbTrainTF",
    estimator=estimator_tf,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
    cache_config=cache_config,
)
print(step_train_tf)

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train_svm = TrainingStep(
    name="ImdbTrainSVM",
    estimator=estimator_svm,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
    cache_config=cache_config,
)
print(step_train_svm)

## Model Evaluation

![Define a Model Evaluation Step to Evaluate the Trained Model](img/pipeline-4.png)

In [None]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval_tf = ProcessingStep(
    name="ImdbEvalTF",
    processor=script_processor,
    inputs=[
        ProcessingInput(
            source=step_train_tf.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="source/evaluation_tf.py",
    property_files=[evaluation_report],
)

print(step_eval_tf)

In [None]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval_svm = ProcessingStep(
    name="ImdbEvalSVM",
    processor=script_processor,
    inputs=[
        ProcessingInput(
            source=step_train_svm.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="source/evaluation_svm.py",
    property_files=[evaluation_report],
)

print(step_eval_svm)

# Register Model Step

![](img/pipeline-5.png)

We use the estimator instance that was used for the training step to construct an instance of `RegisterModel`. The result of executing `RegisterModel` in a pipeline is a Model Package. A Model Package is a reusable model artifacts abstraction that packages all ingredients necessary for inference. Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

A Model Package Group is a collection of Model Packages. You can create a Model Package Group for a specific ML business problem, and you can keep adding versions/model packages into it. Typically, we expect customers to create a ModelPackageGroup for a SageMaker Workflow Pipeline so that they can keep adding versions/model packages to the group for every Workflow Pipeline run.

The construction of `RegisterModel` is very similar to an estimator instance's `register` method, for those familiar with the existing Python SDK.

In particular, we pass in the `S3ModelArtifacts` from the `TrainingStep`, `step_train` properties. The `TrainingStep` `properties` attribute matches the object model of the [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) response object.

Of note, we provided a specific model package group name which we will use in the Model Registry and CI/CD work later on.

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics_tf = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=f"""{
            step_eval_tf.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        }/evaluation.json""",
        content_type="application/json",
    )
)

model_metrics_svm = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=f"""{
            step_eval_svm.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        }/evaluation.json""",
        content_type="application/json",
    )
)

In [None]:
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="PendingManualApproval")

deploy_instance_type = ParameterString(name="DeployInstanceType", default_value="ml.t2.medium")

deploy_instance_count = ParameterInteger(name="DeployInstanceCount", default_value=1)

In [None]:
model_package_group_name = "Imdb-ModelGroup"

print(model_package_group_name)

In [None]:
inference_image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=region,
    version="1.6.0",
    py_version="py36",
    instance_type=deploy_instance_type,
    image_scope="inference",
)
print(inference_image_uri)

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


step_register_tf = RegisterModel(
    name="ImdbRegisterModelTF",
    estimator=estimator_tf,
    model_data=step_train_tf.properties.ModelArtifacts.S3ModelArtifacts,
    image_uri=inference_image_uri,  # we have to specify, by default it's using training image
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[deploy_instance_type],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics_tf,
    entry_point='inference_tf.py',
    source_dir='source',
)

step_register_svm = RegisterModel(
    name="ImdbRegisterModelSVM",
    estimator=estimator_svm,
    model_data=step_train_svm.properties.ModelArtifacts.S3ModelArtifacts,
    image_uri=inference_image_uri,  # we have to specify, by default it's using training image
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=[deploy_instance_type],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics_svm,
    entry_point='inference_svm.py',
    source_dir='source',
)

### Define a Pipeline of Parameters, Steps, and Conditions

In this section, combine the steps into a Pipeline so it can be executed.

A pipeline requires a `name`, `parameters`, and `steps`. Names must be unique within an `(account, region)` pair.

Note:

* All of the parameters used in the definitions must be present.
* Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the _data dependency_ DAG as steps for the execution to complete.
* Steps must be unique to across the pipeline step list and all condition step if/else lists.

In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline = Pipeline(
    name="Imdb-2in1-Pipeline",
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        deploy_instance_count,
        deploy_instance_type
    ],
    steps=[step_process, step_train_tf, step_train_svm, step_eval_tf, step_eval_svm, step_register_tf, step_register_svm],
)

#### (Optional) Examining the pipeline definition

The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly.

Note: Doesn't seem to work with FrameworkModel

In [None]:
import json


definition = json.loads(pipeline.definition())
definition

### Submit the pipeline to SageMaker and start execution

Submit the pipeline definition to the Pipeline service. The role passed in will be used by the Pipeline service to create all the jobs defined in the steps.

## Ignore the `WARNING` below

In [None]:
response = pipeline.upsert(role_arn=role)

pipeline_arn = response["PipelineArn"]
print(pipeline_arn)

Start the pipeline and accept all of the default parameters.

In [None]:
execution = pipeline.start()

### Workflow Operations: examining and waiting for pipeline execution

Now we describe execution instance and list the steps in the execution to find out more about the execution.

In [None]:
from pprint import pprint

execution_run = execution.describe()
pprint(execution_run)

# Add Execution Run as Trial to Experiments

In [None]:
execution_run_name = execution_run["PipelineExecutionDisplayName"]
print(execution_run_name)

In [None]:
pipeline_execution_arn = execution_run["PipelineExecutionArn"]
print(pipeline_execution_arn)

# List Execution Steps

In [None]:
import time

# Giving the first step time to start up
time.sleep(30)

execution.list_steps()

# Wait for the Pipeline to Complete

# _Note: If this cell errors out with `WaiterError: Waiter PipelineExecutionComplete failed: Max attempts exceeded`, just re-run it and keep waiting._

In [None]:
%store -r pipeline_name

In [None]:
%%time

import time
from pprint import pprint

executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

while pipeline_execution_status == "Executing":
    try:
        executions_response = sm.list_pipeline_executions(PipelineName=pipeline_name)["PipelineExecutionSummaries"]
        pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
    #        print('Executions for our pipeline...')
    #        print(pipeline_execution_status)
    except Exception as e:
        print("Please wait...")
        time.sleep(30)

pprint(executions_response)

# Wait for the Pipeline ^^ Above ^^ to Complete

# _Note: If this cell errors out with `WaiterError: Waiter PipelineExecutionComplete failed: Max attempts exceeded`, just re-run it and keep waiting._

In [None]:
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

In [None]:
pipeline_execution_arn = executions_response[0]["PipelineExecutionArn"]
print(pipeline_execution_arn)

We can list the execution steps to check out the status and artifacts:

# List Pipeline Execution Steps

In [None]:
pipeline_execution_status = executions_response[0]["PipelineExecutionStatus"]
print(pipeline_execution_status)

In [None]:
from pprint import pprint

steps = sm.list_pipeline_execution_steps(PipelineExecutionArn=pipeline_execution_arn)

pprint(steps)

# List All Artifacts Generated By The Pipeline

In [None]:
processing_job_name = None
training_job_name = None

In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer

viz = LineageTableVisualizer(sagemaker.session.Session())

for execution_step in reversed(steps["PipelineExecutionSteps"]):
    print(execution_step)
    # We are doing this because there appears to be a bug of this LineageTableVisualizer handling the Processing Step
    if execution_step["StepName"] == "Processing":
        processing_job_name = execution_step["Metadata"]["ProcessingJob"]["Arn"].split("/")[-1]
        print(processing_job_name)
        display(viz.show(processing_job_name=processing_job_name))
    elif execution_step["StepName"] == "Train":
        training_job_name = execution_step["Metadata"]["TrainingJob"]["Arn"].split("/")[-1]
        print(training_job_name)
        display(viz.show(training_job_name=training_job_name))
    else:
        display(viz.show(pipeline_execution_step=execution_step))
        time.sleep(5)

# Track Additional Parameters in our Experiment

In [None]:
# -aws-processing-job is the default name assigned by ProcessingJob
processing_job_tc = "{}-aws-processing-job".format(processing_job_name)
print(processing_job_tc)

In [None]:
%store -r pipeline_trial_name

In [None]:
print(pipeline_trial_name)

In [None]:
response = sm.associate_trial_component(TrialComponentName=processing_job_tc, TrialName=pipeline_trial_name)

In [None]:
# -aws-training-job is the default name assigned by TrainingJob
training_job_tc = "{}-aws-training-job".format(training_job_name)
print(training_job_tc)

In [None]:
response = sm.associate_trial_component(TrialComponentName=training_job_tc, TrialName=pipeline_trial_name)

In [None]:
from smexperiments import tracker

processing_job_tracker = tracker.Tracker.load(trial_component_name=processing_job_tc)

In [None]:
processing_job_tracker.log_parameters(
    {
        "balance_dataset": str(balance_dataset),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

In [None]:
processing_job_tracker.log_parameters(
    {
        "train_split_percentage": str(train_split_percentage),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

In [None]:
processing_job_tracker.log_parameters(
    {
        "validation_split_percentage": str(validation_split_percentage),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

In [None]:
processing_job_tracker.log_parameters(
    {
        "test_split_percentage": str(test_split_percentage),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

In [None]:
processing_job_tracker.log_parameters(
    {
        "max_seq_length": str(max_seq_length),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

In [None]:
time.sleep(5)  # avoid throttling exception

processing_job_tracker.log_parameters(
    {
        "feature_store_offline_prefix": str(feature_store_offline_prefix),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

In [None]:
time.sleep(5)  # avoid throttling exception

processing_job_tracker.log_parameters(
    {
        "feature_group_name": str(feature_group_name),
    }
)

# must save after logging
processing_job_tracker.trial_component.save()

# Analyze Experiment

In [None]:
from sagemaker.analytics import ExperimentAnalytics

time.sleep(30)  # avoid throttling exception

import pandas as pd

pd.set_option("max_colwidth", 500)

experiment_analytics = ExperimentAnalytics(
    experiment_name=pipeline_experiment_name,
)

experiment_analytics_df = experiment_analytics.dataframe()
experiment_analytics_df

In [None]:
response = sm.list_model_packages(
    ModelPackageGroupName=model_package_group_name,
    ModelApprovalStatus="Approved",
    SortBy="CreationTime",
    MaxResults=100,
)
approved_packages = response["ModelPackageSummaryList"]

# Fetch more packages if none returned with continuation token
while len(approved_packages) == 0 and "NextToken" in response:
    response = sm.list_model_packages(
        ModelPackageGroupName=model_package_group_name,
        ModelApprovalStatus="Approved",
        SortBy="CreationTime",
        MaxResults=100,
        NextToken=response["NextToken"],
    )
    approved_packages.extend(response["ModelPackageSummaryList"])

# Return error if no packages found
if len(approved_packages) == 0:
    error_message = (
        f"No approved ModelPackage found for ModelPackageGroup: {model_package_group_name}"
    )
    logger.error(error_message)
    raise Exception(error_message)

# Return the pmodel package arn
model_package_arn = approved_packages[0]["ModelPackageArn"]

In [None]:
sm.describe_model_package(ModelPackageName=model_package_arn)

In [None]:
from sagemaker import ModelPackage

In [None]:
model_pkg = ModelPackage(model_package_arn=model_package_arn, role=role)
test_input_path = f"s3://{bucket}/imdb/batch/input/small/test.csv"

In [None]:
# Just a 200-line csv file needs "ml.m5.2xlarge"? Crazy
transformer = model_pkg.transformer(instance_count=1, instance_type="ml.m5.large", accept='text/csv',
                                        strategy='SingleRecord')

In [None]:
transformer.transform(test_input_path, content_type='text/csv', split_type='Line')