# Pipelines

## Setup

### Environment

* Base Python3.0
* ml.t3.medium
* 2 vCPU + 4 GiB

### Dependencies

In [2]:
!pip install sagemaker
!pip install -U scikit-learn

You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.[0m
You should consider upgrading via the '/usr/local/bin/python -m pip install --upgrade pip' command.[0m


### Imports

In [3]:
import json
import pathlib
import logging

import sagemaker
import sklearn.model_selection
from sagemaker import get_execution_role, Session, image_uris
from sagemaker.inputs import TrainingInput
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.transformer import Transformer
from sagemaker.workflow import parameters, steps, pipeline, pipeline_context
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, TransformStep
from sagemaker.workflow.model_step import ModelStep # lol

from scripts import shared_constants

### Define SageMaker session and role

In [4]:
sagemaker_session = Session()
role = get_execution_role()

In [5]:
print(sagemaker.__version__) # 2.117.0

2.117.0


### Define Constants

In [6]:
SKLEARN_FRAMEWORK_VERSION = "1.0-1"
BASE_JOB_NAME = "birds-200-pipeline"
PIPELINE_NAME = "Birds200Pipeline"
IS_LOCAL_PIPELINE = False
REGION = "us-east-1"

#### Parameter Names

In [7]:
PROC_INSTANCE_TYPE = "ProcessingInstanceType"
PROC_INSTANCE_COUNT = "ProcessingInstanceCount"
TRAIN_INSTANCE_TYPE = "TrainingInstanceType"

## Build Model Pipeline

### Parameters

#### Define processing step parameters

In [8]:
PREPROCESSING_INSTANCE_TYPE = "PreprocessingInstanceType"
PREPROCESSING_INSTANCE_COUNT = "PreprocessingInstanceCount"
preprocessing_instance_type = parameters.ParameterString(name=PREPROCESSING_INSTANCE_TYPE, default_value="ml.m5.large")
preprocessing_instance_count = parameters.ParameterInteger(name=PREPROCESSING_INSTANCE_COUNT, default_value=1)

#### Define training step parameters

In [9]:
TRAINING_INSTANCE_TYPE = "TrainingInstanceType"
training_instance_type = parameters.ParameterString(name=TRAINING_INSTANCE_TYPE, default_value="ml.g4dn.4xlarge")

#### Define model step parameters

In [10]:
MODEL_INSTANCE_TYPE = "ModelInstanceType"
model_instance_type = parameters.ParameterString(name=MODEL_INSTANCE_TYPE, default_value="ml.m5.xlarge")

#### Define transform step parameters

In [11]:
TRANSFORM_INSTANCE_TYPE = "TransformInstanceType"
TRANSFORM_INSTANCE_COUNT = "TransformInstanceCount"
transform_instance_type = parameters.ParameterString(name=TRANSFORM_INSTANCE_TYPE, default_value="ml.m5.large")
transform_instance_count = parameters.ParameterInteger(name=TRANSFORM_INSTANCE_COUNT, default_value=1)

#### Define evaluation step parameters

In [12]:
EVALUATION_INSTANCE_TYPE = "EvaluationInstanceType"
EVALUATION_INSTANCE_COUNT = "EvaluationInstanceCount"
evaluation_instance_type = parameters.ParameterString(name=EVALUATION_INSTANCE_TYPE, default_value="ml.m5.large")
evaluation_instance_count = parameters.ParameterInteger(name=EVALUATION_INSTANCE_COUNT, default_value=1)

### Helpers

In [13]:
def generate_step_name(step):
    return f"{PIPELINE_NAME}-{step}"

### Pipeline Session

Use LocalPipelineSession for initial development. Then when confident with changes, switch to PipelineSession.

In [14]:
if IS_LOCAL_PIPELINE:
    pipeline_session = sagemaker.workflow.pipeline_context.LocalPipelineSession()
else:
    pipeline_session = sagemaker.workflow.pipeline_context.PipelineSession()

### Workflow Steps

#### Processing Step

##### Define SKLearn processor

In [15]:
preprocessing_sklearn_processor = SKLearnProcessor(
    framework_version=SKLEARN_FRAMEWORK_VERSION,
    instance_type=preprocessing_instance_type,
    instance_count=preprocessing_instance_count,
    base_job_name=BASE_JOB_NAME,
    role=role,
    sagemaker_session=pipeline_session,
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


##### Define Preprocessing Step

In [16]:
processing_step = ProcessingStep(
    name=generate_step_name("Preprocessing"),
    processor=preprocessing_sklearn_processor,
    inputs=[
        ProcessingInput(
            source="./scripts",
            destination=str(shared_constants.INPUT_DIR),
            input_name="scripts",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name=str(output_name), source=str(source))
        for (output_name, source) in [
            (shared_constants.TRAIN_CHANNEL, shared_constants.TRAIN_DIR),
            (shared_constants.VALIDATION_CHANNEL, shared_constants.VALIDATION_DIR),
            (shared_constants.TEST_CHANNEL, shared_constants.TEST_DIR),
            (shared_constants.LABELS_CHANNEL, shared_constants.LABELS_DIR),
        ]
    ],
    code="./scripts/install_packages.py",
)

#### Training Step

I utilize the built-in Docker image and model for object detection. One can also use a custom model within a prebuilt image or deploy a custom image. Here are some helpful resources:
* Learn more about training with Amazon SageMaker: [link](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html)
* Explore an example of using PyTorch for MNIST classification in SageMaker: [link](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-python-sdk/pytorch_mnist/pytorch_mnist.ipynb)
* Understand Docker containers in SageMaker: [link](https://docs.aws.amazon.com/sagemaker/latest/dg/docker-containers.html)

##### Get training image

In [17]:
training_image = sagemaker.image_uris.retrieve(region=REGION, framework="object-detection", version="latest")

Defaulting to the only supported framework/algorithm version: 1. Ignoring framework/algorithm version: latest.


##### Define estimator

In [18]:
estimator = sagemaker.estimator.Estimator(
    training_image,
    role,
    instance_count=1,
    instance_type=training_instance_type,
    volume_size=50,
    max_run=int(3600*1.5),
    input_mode="File",
    output_path=shared_constants.S3_OUTPUT_OBJECT_KEY,
    sagemaker_session=pipeline_session,
    base_job_name=BASE_JOB_NAME,
)

For information on object-detection hyperparameters, refer to the documentation at: https://docs.aws.amazon.com/sagemaker/latest/dg/object-detection-api-config.html.

##### Set hyperparameters

In [19]:
estimator.set_hyperparameters(
    num_classes=len(shared_constants.CLASS_IDS),
    num_training_samples=shared_constants.NUM_TRAINING_SAMPLES,
)

##### Define estimator inputs

In [20]:
estimator_inputs = {
    "train": TrainingInput(
        s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[shared_constants.TRAIN_CHANNEL].S3Output.S3Uri,
        content_type="application/x-recordio",
    ),
    "validation": TrainingInput(
        s3_data=processing_step.properties.ProcessingOutputConfig.Outputs[shared_constants.VALIDATION_CHANNEL].S3Output.S3Uri,
        content_type="application/x-recordio",
    ),
}

##### Define training step

In [21]:
training_step = TrainingStep(
    name=generate_step_name("Training"),
    estimator=estimator,
    inputs=estimator_inputs,
)

#### Model Step

##### Define model from trained estimator data

In [22]:
model = sagemaker.model.Model(
    image_uri=estimator.training_image_uri(),
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

##### Create model step

In [23]:
model_step = ModelStep(
   name=generate_step_name("Model"),
   step_args=model.create(instance_type=model_instance_type),
)



#### Batch Transform step

##### Define transformer

In [24]:
transformer = Transformer(
    model_name=model_step.properties.ModelName,
    instance_count=transform_instance_count,
    instance_type=transform_instance_type,
    sagemaker_session=pipeline_session,
)

##### Define transform step

In [25]:
transform_step = TransformStep(
    name='BatchTransform',
    transformer=transformer,
    inputs={
        'DataSource': {
            'S3DataSource': {
                'S3DataType': 'S3Prefix',
                'S3Uri': processing_step.properties.ProcessingOutputConfig.Outputs[shared_constants.TEST_CHANNEL].S3Output.S3Uri,
            }
        },
        'ContentType': 'application/x-image',
        'SplitType': 'None',
    }
)

#### Evaluation Step

##### Create the SKLearn processor

In [26]:
evaluation_sklearn_processor = sagemaker.sklearn.processing.SKLearnProcessor(
    framework_version=SKLEARN_FRAMEWORK_VERSION,
    instance_type=evaluation_instance_type,
    instance_count=evaluation_instance_count,
    base_job_name=BASE_JOB_NAME,
    role=role,
    sagemaker_session=pipeline_session,
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


##### Create the evaluation step (i.e. the processing step for evaluation)

In [27]:
evaluation_step = ProcessingStep(
    name=generate_step_name("Evaluation"),
    processor=evaluation_sklearn_processor,
    inputs=[
        ProcessingInput( # A trick that allows imports modules in scripts
            source="./scripts",
            destination=str(shared_constants.INPUT_DIR),
            input_name="scripts",
        ),
        ProcessingInput(
            source=transform_step.properties.TransformOutput.S3OutputPath,
            destination=shared_constants.TRANSFORM_DIR,
        ),
        ProcessingInput(
            source=processing_step.properties.ProcessingOutputConfig.Outputs[shared_constants.LABELS_CHANNEL].S3Output.S3Uri,
            destination=shared_constants.LABELS_DIR,
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name=shared_constants.EVALUATION_CHANNEL,
            source=shared_constants.EVALUATION_DIR,
        )
    ],
    code='scripts/evaluate.py',
)

### Create the Pipeline

In [19]:
pipeline = sagemaker.workflow.pipeline.Pipeline(
    name=PIPELINE_NAME,
    parameters=[processing_instance_type, processing_instance_count, training_instance_type],
    steps=[processing_step, training_step],
    sagemaker_session=pipeline_session,
)

#### Inspect the Pipeline Definition

In [20]:
json.loads(pipeline.definition())

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.large'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.g4dn.4xlarge'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Birds200Pipeline-Processing',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '683313688378.dkr.ecr.us-east-1.amazonaws.com/sagemaker-scikit-learn:1.2-1-cpu-py3',
     'ContainerEntrypoint': ['python3',
      '/opt/ml/processing/input/code/install_packages.py']},
    'RoleArn': '

### Build the Pipeline

In [21]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:180797159824:pipeline/birds200pipeline',
 'ResponseMetadata': {'RequestId': '407a320c-6c16-4588-9d52-65672cd276f4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '407a320c-6c16-4588-9d52-65672cd276f4',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '84',
   'date': 'Sat, 13 May 2023 21:10:38 GMT'},
  'RetryAttempts': 0}}

In [22]:
execution = pipeline.start()

In [23]:
execution.wait()

In [24]:
execution.list_steps()

[{'StepName': 'Birds200Pipeline-Training',
  'StartTime': datetime.datetime(2023, 5, 13, 21, 16, 56, 804000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 5, 13, 21, 25, 29, 392000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:180797159824:training-job/pipelines-ell1gp7g1fih-Birds200Pipeline-Tra-WjX1uMhOZj'}}},
 {'StepName': 'Birds200Pipeline-Processing',
  'StartTime': datetime.datetime(2023, 5, 13, 21, 10, 40, 935000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 5, 13, 21, 16, 55, 787000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:180797159824:processing-job/pipelines-ell1gp7g1fih-Birds200Pipeline-Pro-nBkUKdeWy6'}}}]