# Amazon SageMaker Pipelines: Introduction

Amazon SageMaker Pipelines enable the creation and management of automated workflows for machine learning models. By defining steps and organizing them logically, SageMaker Pipelines streamline the process of building, training, tuning, and deploying machine learning models at scale. These steps can encompass a wide range of tasks, from data preparation and processing to model training and deployment.

In Unit Two, we will continue building on the foundation established in unit 1 by reloading the essential components and configurations necessary for working with Amazon SageMaker. This includes importing the SageMaker library and essential utility functions, initializing the SageMaker session, and retrieving crucial configurations such as the AWS role ARN and S3 bucket details through helper functions. We will also reuse the XGBoost Docker image to ensure consistency in our environment setup.

In [None]:
import sagemaker
from utils.helpers import get_secret

session = sagemaker.Session()
role = get_secret('role_arn')
s3_bucket_uri = get_secret('s3_bucket_uri')
s3_bucket_name = get_secret('s3_bucket_name')

image_uri = sagemaker.image_uris.retrieve('xgboost',
                                          region='us-east-1',
                                          version='1.5-1')

In Unit 1, we utilized the `upload_dataset_to_s3` function to upload the Iris dataset to Amazon S3. This dataset serves as a foundational element for our machine learning tasks with Amazon SageMaker. If you haven't already executed the following lines in your environment, please do so before proceeding further in this unit:

```python
from utils.helpers import upload_dataset_to_s3
dataset_name = 'iris'
upload_dataset_to_s3(dataset_name, s3_bucket_name)
```


Initializing a PipelineSession with `pipeline_session = PipelineSession()` from the `sagemaker.workflow.pipeline_context` module equips you with a specialized session designed for Amazon SageMaker Pipelines. This session extends the standard SageMaker `Session`, adding functionalities tailored for managing machine learning pipelines.

In [None]:
from sagemaker.workflow.pipeline_context import PipelineSession

pipeline_session = PipelineSession()

Configuring an `Estimator` with `sagemaker_session=pipeline_session` customizes it for SageMaker Pipelines integration. This approach renders the estimator *"pipeline-aware"*, enhancing its compatibility with pipeline-specific features such as resource optimization, seamless execution dependencies, artifact management, and support for conditional executions and parallelism. The rest of the following snippet is identical to unit 1.

In [None]:
from sagemaker.inputs import TrainingInput

estimator = sagemaker.estimator.Estimator(
    image_uri=image_uri,
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=f"{s3_bucket_uri}/pipelines-output",
    sagemaker_session=pipeline_session
)

estimator.set_hyperparameters(
    max_depth=5,
    objective='multi:softmax',
    num_class=3,
    num_round=10
)

s3_train = TrainingInput(
    s3_data=f's3://{s3_bucket_name}/iris_dataset/train_data.csv',
    content_type='csv'
)

s3_validate = TrainingInput(
    s3_data=f's3://{s3_bucket_name}/iris_dataset/test_data.csv',
    content_type='csv'
)

## Basic Pipeline

We'll explore a basic yet fundamental workflow within SageMaker Pipelines, starting with two primary steps: `TrainingStep` and `ModelStep`. 

![Basic pipeline](./img/basic_pipeline.png)

The objective here is straightforward but fundamental in the machine learning lifecycle:

- `TrainingStep`: This step involves training a machine learning model using the provided dataset and chosen algorithm within SageMaker. The training process adjusts the model's parameters to minimize the error in predictions, effectively learning from the training data.

- `ModelStep`: Following the successful training of the model, the next step is to register this model within SageMaker's Model Registry. **The Model Registry is a centralized repository for managing models, where each model version is tracked, cataloged, and versioned**. Automating the transition from training to registration ensures that once a model is trained, it can be easily and systematically stored, versioned, and later retrieved for deployment or further analysis.

More information about all available pipeline steps is found [here](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html).

In [None]:
from sagemaker.workflow.steps import TrainingStep

training_step = TrainingStep(
    name='training-step',
    step_args=estimator.fit({
                'train': s3_train,
                'validation': s3_validate
    })
)

As the warning indicates, defining a `TrainingStep` within the pipeline doesn't immediately start a training job as calling `estimator.fit()` outside of a pipeline does (Unit 1). Instead, it specifies a part of the workflow for later execution.

The `Model` class is used to represent a machine learning model. Here are some of the key functionalities:

- **Model Artifacts:** it allows you to specify the location of the model artifacts. These are the output of your training job, stored in Amazon S3.

- **Inference Code:** you can define the Docker image that contains your inference code. This can be a default image provided by SageMaker for common machine learning frameworks (like TensorFlow, PyTorch, MXNet, etc.) or a custom Docker image that you have created.

- **Environment Variables:** the class allows you to set environment variables that your inference code might need for execution.

- **Execution Role:** you specify an AWS IAM role that SageMaker assumes to perform tasks on your behalf, such as accessing your model artifacts and Docker images in S3 and ECR respectively.

- **Deploy:** one of the most important functionalities is deploying your model to an endpoint. This involves creating an instance (or instances) of the model, which can then be used to perform real-time or batch predictions.

- **Endpoint Configuration:** when deploying a model, you can specify the type and number of instances that you want to use for the endpoint, enabling you to scale your inference according to your needs.

In [None]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role
)

`training_step.properties.ModelArtifacts.S3ModelArtifacts` is a reference used in Amazon SageMaker Pipelines to dynamically access the S3 location of the model artifacts generated by a training step. When you define a TrainingStep in a SageMaker Pipeline, it trains a model and outputs artifacts, such as the trained model parameters, to a specified S3 bucket.

The `register` [method](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.Model.register) is used to register a model in the SageMaker Model Registry. The Model Registry is a centralized repository for managing models, allowing you to catalog, version, and manage models systematically across your organization. By registering a model, you can track its versions, metadata, and lineage, which facilitates model governance, auditing, and collaboration among teams.

In [None]:
from sagemaker.workflow.model_step import ModelStep

register_params = {
    'model_package_group_name': 'iris-classification-group',
    'description': 'XGBoost model for Iris classification',
    'image_uri': image_uri,
    'task': 'CLASSIFICATION'
}

register_model_step = ModelStep(
    name='register-model-step',
    step_args=model.register(**register_params)
)

### Pipeline Creation

The Pipeline object from sagemaker.workflow.pipeline is utilized to assemble these steps into a coherent workflow, facilitating the automation of model training and registration. It requires the pipeline name and the steps that compose it.

In [None]:
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name='iris-classification-pipeline',
    steps=[training_step, register_model_step]
)

In this section, we configure and create the pipeline with specific settings for parallel execution and tagging:

Parallelism Configuration: The parallelism_config dictionary is set with MaxParallelExecutionSteps: 5, . This optimizes the pipeline execution time by allowing multiple steps to run concurrently, up to the specified limit.

- `role_arn`: specifies the AWS Identity and Access Management (IAM) role that SageMaker assumes to perform tasks on your behalf during pipeline execution.
- `parallelism_config`: indicates the maximum number of steps that can be executed in parallel to 5.
- `tags`: assigns metadata to the pipeline.

In [None]:
parallelism_config = {
    'MaxParallelExecutionSteps': 5
}

pipeline.create(
    role_arn=role,
    parallelism_config=parallelism_config,
    tags=[
        {'Key': 'Project', 'Value': 'AWSTutorials'},
        {'Key': 'Environment', 'Value': 'Development'}
    ]
)

To create a SageMaker pipeline from an already existing pipeline, you can reuse the pipeline name within a new `Pipeline` object instantiation.

```python
pipeline = Pipeline('iris-classification-model-pipeline', sagemaker_session=session)
```

To delete a pipeline, you utilize the delete() method on the pipeline object. This action removes the pipeline from SageMaker, along with its associated metadata and configurations.

```python
pipeline.delete()
```
---
To initiate the execution of a SageMaker pipeline, you use the `start()`pipeline = Pipeline('iris-classification-model-pipeline', sagemaker_session=session)
 method on the pipeline object. 

In [None]:
pipeline.start(
    execution_display_name='pipeline-execution',
    execution_description='First Pipeline Execution'
)

### Sagemaker Studio

#### Pipeline Execution in SageMaker Studio

After initiating the pipeline execution with the start() method and the given parameters, SageMaker Studio provides a visual representation and status update of the pipeline's progress.

![](./img/pipelines_UI.png)

- Execution Overview: The 'Executions' tab in SageMaker Studio displays a list of pipeline executions. In this case, the pipeline-execution is shown with a status of 'Succeeded', indicating a successful completion.

- Execution Details: The 'Elapsed Time' shows the duration of the pipeline execution, which is '3m 40s' for the pipeline-execution. The 'Modified On' and 'Created On' timestamps provide auditability and tracking for the pipeline execution.

#### Model Registry in SageMaker Studio

The Model Registry interface in SageMaker Studio is designed to manage and track your models' versions systematically.

![](./img/model_registry_UI.png)

- Model Group Overview: The 'Models' tab showcases registered model groups.

- Model Versions:The interface displays the versions of models within a group. For instance, 'Version 1' is currently showing a 'Pending manual approval' status, indicating that a human review is needed before this model version can be fully registered or deployed.

- Tagging and Organization: The registry provides capabilities to search and tag models, which can help organize and retrieve models based on different criteria such as project, environment, or any other relevant identifiers.

In [67]:
1+1

2

In [None]:
import boto3
sm_client = boto3.client('sagemaker')

In [None]:
# Call to list model packages
response = sm_client.list_model_packages(ModelPackageGroupName='iris-classification-group')

# Print the list of model packages
for model_package in response['ModelPackageSummaryList']:
    print(f"Model Package Group Name: {model_package['ModelPackageGroupName']}")
    print(f"Model Package ARN: {model_package['ModelPackageArn']}")
    print(f"Model Package Version: {model_package.get('ModelPackageVersion', 'N/A')}")
    print(f"Model Package Status: {model_package['ModelPackageStatus']}")
    print("------")

model_package_arn = response['ModelPackageSummaryList'][0]['ModelPackageArn']

In [None]:
from sagemaker import ModelPackage
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer

from sagemaker.predictor import Predictor

model_package_arn = "arn:aws:sagemaker:us-east-1:035665124469:model-package/iris-classification-group/1"

model = ModelPackage(
    role=role,
    model_package_arn=model_package_arn,
    sagemaker_session=session,
    predictor_cls=Predictor
)

predictor = model.deploy(
    initial_instance_count=1,
    instance_type='ml.t2.medium',
    endpoint_name='iris-endpointss',
    serializer=CSVSerializer(),
    deserializer=JSONDeserializer()
)

In [None]:

predictor = Predictor(
    endpoint_name=endpoint_name,
    serializer=CSVSerializer()
)

In [None]:
predictor

In [None]:
import json
import boto3

sagemaker_runtime = boto3.client('sagemaker-runtime')
endpoint_name = 'iris-endpoints'
response = sagemaker_runtime.invoke_endpoint(EndpointName=endpoint_name,
                                             ContentType='text/csv',
                                             Body="7.2, 3, 6, 1.6")

response['Body'].read()

In [None]:
predictor.predict([7.2, 3, 6, 1.6])

### Parameters

In [None]:
from sagemaker.workflow.parameters import (
    ParameterString,
    ParameterInteger
)

from sagemaker.workflow.functions import Join

training_dataset_path = ParameterString(
    name='Training_Set_Path',
    default_value='train_data.csv'
)

s3_train = TrainingInput(
    s3_data=Join(
    on='/',
    values=[
        s3_bucket_uri,
        'iris_dataset',
        training_dataset_path
        ]
    ),
    content_type='csv'
)

s3_validate = TrainingInput(
    s3_data=f's3://{s3_bucket_name}/iris_dataset/test_data.csv',
    content_type='csv'
)

In [None]:
estimator = sagemaker.estimator.Estimator(
    image_uri=image_uri,
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=f"{s3_bucket_uri}/pipelines-output",
    sagemaker_session=pipeline_session
)

max_depth_param = ParameterInteger(name="max_depth",
                                   default_value=5)
objective_param = ParameterString(name="objective",
                                  default_value='multi:softmax')
num_class_param = ParameterInteger(name="num_class",
                                   default_value=3)
num_round_param = ParameterInteger(name="num_round",
                                   default_value=10)

estimator.set_hyperparameters(
    max_depth=max_depth_param,
    objective=objective_param,
    num_class=num_class_param,
    num_round=num_round_param
)

In [None]:
# Training step

training_step = TrainingStep(
    name='training-step',
    step_args=estimator.fit({
                'train': s3_train,
                'validation': s3_validate
    })
)

# Register step

model = Model(
    image_uri=image_uri,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role
)

register_params = {
    'model_package_group_name': 'iris-classification-group',
    'description': 'XGBoost model for Iris classification',
    'image_uri': image_uri,
    'task': 'CLASSIFICATION'
}

register_model_step = ModelStep(
    name='register-model-step',
    step_args=model.register(**register_params)
)

pipeline = Pipeline(
    name='iris-pipeline-with-params',
    steps=[training_step, register_model_step],
    parameters=[
        max_depth_param,
        objective_param,
        num_class_param,
        num_round_param,
        training_dataset_path
    ]
)

parallelism_config = {
    'MaxParallelExecutionSteps': 5
}

pipeline.create(
    role_arn=role,
    parallelism_config=parallelism_config,
    tags=[
        {'Key': 'Project', 'Value': 'AWSTutorials'},
        {'Key': 'Environment', 'Value': 'Development'}
    ]
)

pipeline.start(
    execution_display_name='pipeline-execution',
    execution_description='First Pipeline Execution'
)

custom_parameter_values = {
    'max_depth': 1,
    'num_round': 50
}

### Start the pipeline execution with custom parameters
execution = pipeline.start(
    parameters=custom_parameter_values
)

In [None]:
validation:accuracy


## Fail Steps

In [None]:
from sagemaker.workflow.fail_step import FailStep


estimator = sagemaker.estimator.Estimator(
    image_uri=image_uri,
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=f"{s3_bucket_uri}/pipelines-output",
    sagemaker_session=pipeline_session
)

estimator.set_hyperparameters(
    max_depth=max_depth_param,
    objective=objective_param,
    num_class=num_class_param,
    num_round=num_round_param
)

estimator.set_hyperparameters(eval_metric='merror')

#estimator.hyperparameters()

# Training step

training_step = TrainingStep(
    name='training-step',
    step_args=estimator.fit({
                'train': s3_train,
                'validation': s3_validate
    })
)

# Register step

model = Model(
    image_uri=image_uri,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role
)

register_params = {
    'model_package_group_name': 'iris-classification-group',
    'description': 'XGBoost model for Iris classification',
    'image_uri': image_uri,
    'task': 'CLASSIFICATION'
}

register_model_step = ModelStep(
    name='register-model-step',
    step_args=model.register(**register_params)
)

# Fail Step

fail_step = FailStep(
    name='iris_fail',
    error_message='What a bad model!'
)

In [None]:
#https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost-tuning.html

In [None]:
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionLessThan

condition_step = ConditionStep(
    name='condition-step',
    conditions=[
        ConditionLessThan(
            left=training_step.properties.FinalMetricDataList['validation:merror'].Value,
            right=0.04
    )],
    if_steps=[register_model_step],
    else_steps=[fail_step]
)

AND conditions work by adding more than one condition to conditions

Pipeline steps cannot have duplicate names. In addition, steps added in "
    209         "the ConditionStep cannot be added in the Pipeline steps list."

In [None]:
pipeline = Pipeline(
    name='iris-pipeline-with-conditions',
    steps=[training_step,
           condition_step],
    parameters=[
        max_depth_param,
        objective_param,
        num_class_param,
        num_round_param,
        training_dataset_path
    ]
)

parallelism_config = {
    'MaxParallelExecutionSteps': 5
}

pipeline.create(
    role_arn=role,
    parallelism_config=parallelism_config,
    tags=[
        {'Key': 'Project', 'Value': 'AWSTutorials'},
        {'Key': 'Environment', 'Value': 'Development'}
    ]
)

pipeline.start(
    execution_display_name='pipeline-execution',
    execution_description='First Pipeline Execution'
)

## Hyperparameter tuning

In [None]:
from sagemaker.tuner import IntegerParameter, HyperparameterTuner

hyperparameter_ranges = {
    'max_depth':IntegerParameter(1, 10, scaling_type = 'Auto')
}




In [None]:
estimator = sagemaker.estimator.Estimator(
    image_uri=image_uri,
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=f"{s3_bucket_uri}/pipelines-output",
    sagemaker_session=session,
    hyperparameters={
        'objective': 'multi:softmax',
        'num_class': 3,
        'num_round': 10,
    }
)

In [None]:
https://github.com/dmlc/xgboost/blob/master/doc/parameter.rst#learning-task-parameters

https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost-tuning.html

In [None]:
hyperparameter_tuner = HyperparameterTuner(
    estimator=estimator,
    objective_metric_name='validation:merror',
    hyperparameter_ranges=hyperparameter_ranges,
    strategy='Random',
    max_jobs=10,
    max_parallel_jobs=5,
    objective_type='Minimize'
)

from sagemaker.inputs import TrainingInput

s3_train = TrainingInput(
    s3_data=f's3://{s3_bucket_name}/iris_dataset/train_data.csv',
    content_type='csv'
)

s3_validate = TrainingInput(
    s3_data=f's3://{s3_bucket_name}/iris_dataset/test_data.csv',
    content_type='csv'
)

In [None]:
hyperparameter_tuner.fit({
                'train': s3_train,
                'validation': s3_validate
    })

In [None]:
estimator = sagemaker.estimator.Estimator(
    image_uri=image_uri,
    role=role,
    instance_count=1,
    instance_type="ml.m5.large",
    output_path=f"{s3_bucket_uri}/pipelines-output",
    sagemaker_session=pipeline_session,
    hyperparameters={
        'objective': 'multi:softmax',
        'num_class': 3,
        'num_round': 10,
    }
)

hyperparameter_tuner = HyperparameterTuner(
    estimator=estimator,
    objective_metric_name='validation:merror',
    hyperparameter_ranges=hyperparameter_ranges,
    strategy='Random',
    max_jobs=10,
    max_parallel_jobs=10,
    objective_type='Minimize'
)

In [None]:
from sagemaker.workflow.steps import TuningStep

tuning_step = TuningStep(
    name='tuning-step',
    step_args=hyperparameter_tuner.fit({
                'train': s3_train,
                'validation': s3_validate
    })
)



https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep.get_top_model_s3_uri

In [None]:
from sagemaker.analytics import HyperparameterTuningJobAnalytics

tuning_job_name = 'sagemaker-xgboost-240219-1358'


tuning_job_result = HyperparameterTuningJobAnalytics(tuning_job_name, sagemaker_session=session)

# Fetch the training job summaries
job_summaries = tuning_job_result.training_job_summaries()

# List the names of the training jobs
for summary in job_summaries:
    print(summary['TrainingJobName'])


In [None]:
from sagemaker.predictor import Predictor

prefix = f"pipelines_output"

best_model = Model(
    image_uri=image_uri,
    model_data=tuning_step.get_top_model_s3_uri(
        top_k=0, s3_bucket=f"{s3_bucket_name}/pipelines-output"#, prefix=prefix
    ),
    predictor_cls=Predictor,
    sagemaker_session=pipeline_session,
    role=role,
)

register_params = {
    'model_package_group_name': 'iris-classification-group',
    'description': 'Top performing model from Hyperparameter tuning',
    'image_uri': image_uri,
    'task': 'CLASSIFICATION'
}

register_best_model_step = ModelStep(
    name="register-best-step",
    step_args=best_model.register(**register_params),
)


In [None]:
s3_bucket_name

In [None]:
pipeline = Pipeline(
    name='iris',
    steps=[tuning_step,
           register_best_model_step],
    parameters=[
        objective_param,
        num_class_param,
        num_round_param,
        training_dataset_path
    ]
)

parallelism_config = {
    'MaxParallelExecutionSteps': 5
}

pipeline.create(
    role_arn=role,
    parallelism_config=parallelism_config,
    tags=[
        {'Key': 'Project', 'Value': 'AWSTutorials'},
        {'Key': 'Environment', 'Value': 'Development'}
    ]
)

pipeline.start(
    execution_display_name='pipeline-execution',
    execution_description='First Pipeline Execution'
)

In [None]:
pipeline_names = [
    'iris-classification-pipeline',
]

for pipeline_name in pipeline_names:
    pipeline = Pipeline(pipeline_name, sagemaker_session=session)
    pipeline.delete()

In [None]:
tuning_job_result.dataframe()

In [None]:
model_package_group_name = 'iris-classification-group'
model_packages = sm_client.list_model_packages(ModelPackageGroupName=model_package_group_name)

for model_package in model_packages['ModelPackageSummaryList']:
    model_package_arn = model_package['ModelPackageArn']
    sm_client.delete_model_package(ModelPackageName=model_package_arn)

In [None]:
sm_client.delete_model_package_group(ModelPackageGroupName=model_package_group_name)