Overview
This notebook shows how to:

Define a set of Pipeline parameters that can be used to parametrize a SageMaker Pipeline.
Define a Training step that finetunes a sentence transformer embedding model.
Define a Create Model step that creates a model from the model artifacts used in training.
Define a Register Model step that creates a model package from the estimator and model artifacts used to finetune the model.
Define and create a Pipeline definition in a DAG, with the defined parameters and steps.
Start a Pipeline execution and wait for execution to complete.

In [None]:
!pip install 'sagemaker' --upgrade -q

In [None]:
import logging
logging.getLogger().setLevel(logging.ERROR)

In [None]:
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import LocalPipelineSession, PipelineSession

pipeline_session = PipelineSession()
region = pipeline_session.boto_region_name
default_bucket = pipeline_session.default_bucket()
role = sagemaker.get_execution_role()

In [None]:
from sagemaker.workflow.parameters import ParameterString, ParameterFloat, ParameterInteger, ParameterBoolean
from sagemaker.huggingface import HuggingFaceProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role
from sagemaker.workflow.steps import CacheConfig
from sagemaker.utils import name_from_base

training_instance_count = 1
evaluation_instance_count = 1
evaluation_instance_type = "ml.m5.xlarge"
training_instance_type = "ml.g5.2xlarge"


%store -r train_s3_path
%store -r valid_s3_path
%store -r prefix
%store -r model_id

model_output_s3_loc = f"s3://{default_bucket}/data/finetuning-{model_id.replace('/', '-')}/model"
# model_eval_s3_loc = f"s3://{default_bucket}/data/finetuning-{model_id.replace('/', '-')}/modeleval"

base_model_pkg_group_name = name_from_base(model_id.replace('/', '-'))

Setup setup caching to 12 hours

In [None]:
cache_config = CacheConfig(enable_caching=True, expire_after="T12H")

# Define Parameters to parametize SageMaker Pipeline Executions
Define Pipeline parameters that you can use to parametrize the pipeline. Parameters enable custom pipeline executions and schedules without having to modify the Pipeline definition.

The supported parameter types include:

* ParameterString - represents a str Python type
* ParameterInteger - represents an int Python type
* ParameterFloat - represents a float Python type

In [None]:
# Hyper-parameters
# model id
model_id_param = ParameterString(name="ModelId", default_value=model_id)
# epochs
epochs_param = ParameterInteger(name="Epochs", default_value=1)
# batch size
batch_size_param = ParameterInteger(name="BatchSize", default_value=10)
# eval steps
evaluation_steps_param = ParameterInteger(name="EvalSteps", default_value=50)

#data locations
training_dataset_s3_loc_param = ParameterString(name="TrainingDatasetS3LocParam", default_value=train_s3_path)
eval_dataset_s3_loc_param = ParameterString(name="EvalDatasetS3LocParam", default_value=valid_s3_path)
model_output_s3_loc_param = ParameterString(name="ModelOutputS3LocParam", default_value=model_output_s3_loc)

#instance type
training_job_instance_type_param = ParameterString(name="TrainingJobInstanceType", default_value="ml.g5.2xlarge")
eval_job_instance_type_param = ParameterString(name="EvaluationJobInstanceType", default_value="ml.g4dn.2xlarge")

base_model_group_name_param = ParameterString(name="BaseModelRegistryGroupName", default_value=base_model_pkg_group_name)

Training Step
In this section, use define a training step to finetune an embedding model on the given dataset. Configure an Estimator for the HuggingFace and the input dataset. A typical training script loads data from the input channels, configures training with hyperparameters, trains a model, and saves a model to model_dir so that it can be hosted later.

The model path where the models from training are saved is also specified.

Note: the instance_type parameter may be used in multiple places in the pipeline. In this case, the instance_type is passed into the estimator.

In [None]:
from sagemaker.huggingface import HuggingFace

# hyperparameters, which are passed into the training job
hyperparameters = {
    "model_id": model_id_param,                             # pre-trained model
    "epochs": epochs_param,
    "batch_size": batch_size_param,
    "evaluation_steps": evaluation_steps_param
}

# create the Estimator
huggingface_estimator = HuggingFace(
    entry_point="train.py",                                 # train script
    source_dir="scripts",                                   # directory which includes all the files needed for training
    instance_type=training_job_instance_type_param,         # instances type used for the training job
    instance_count=1,                                       # the number of instances used for training
    base_job_name=name_from_base(f"{prefix}-training-step"),          # the name of the training job
    role=role,                                              # Iam role used in training job to access AWS ressources, e.g. S3
    volume_size=100,                                        # the size of the EBS volume in GB
    transformers_version="4.28",                            # the transformers version used in the training job
    pytorch_version="2.0",                                  # the pytorch_version version used in the training job
    py_version="py310",                                     # the python version used in the training job
    hyperparameters=hyperparameters,                        # the hyperparameters passed to the training job
    environment={"HUGGINGFACE_HUB_CACHE": "/tmp/.cache"},   # set env variable to cache models in /tmp
    sagemaker_session=pipeline_session,                     # specifies a sagemaker session object
    output_path=model_output_s3_loc_param                   # s3 location for model artifact
)

In [None]:
# define a data input dictonary with our uploaded s3 uris
data = {"train": training_dataset_s3_loc_param, "valid": eval_dataset_s3_loc_param}

# starting the train job with our uploaded datasets as input
train_args = huggingface_estimator.fit(data, wait=True)

Define the Training step

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

step_train = TrainingStep(
    name="EmbeddingTrain",
    step_args=train_args,
    cache_config=cache_config
)

# Define an Evlatuion Step
A processing step is used for triggering a processing job for model evaluation.

In [None]:
from sagemaker.workflow.steps import ProcessingStep

# Initialize the HuggingFaceProcessor
hfp = HuggingFaceProcessor(
    role=role,
    instance_count=1,
    instance_type=eval_job_instance_type_param,
    transformers_version="4.28",
    pytorch_version="2.0",
    py_version="py310",
    base_job_name=name_from_base(f"{prefix}-evaluation-step"),
    sagemaker_session=pipeline_session
)

In [None]:
from sagemaker.workflow.properties import PropertyFile

# Run the processing job
step_args = hfp.run(
    code='evaluation.py',
    source_dir='pipeline',
    arguments=["base-model-id", "sentence-transformers/msmarco-bert-base-dot-v5", 
               "--model-file", "model.tar.gz",
               "--test-file", "val_dataset.json"],
    inputs=[
        ProcessingInput(
            input_name="data",
            source=eval_dataset_s3_loc_param,
            destination='/opt/ml/processing/input/data/'
        ),
        ProcessingInput(
            input_name="model",
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"),
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation"),
    ]
)

evaluation_report = PropertyFile(
    name=f"{prefix}-evaluation-report",
    output_name="evaluation",
    path="evaluation.json",
)

step_eval = ProcessingStep(
    name="EmbeddingEvaluation",
    step_args=step_args,
    property_files=[evaluation_report],
    cache_config=cache_config
)

# Register Model
SageMaker Model Registry supports the following features and functionality:

* Catalog models for production.
* Manage model versions. 
* Associate metadata, such as training metrics, with a model.
* Manage the approval status of a model.
* Deploy models to production.
* Automate model deployment with CI/CD.

In this workshop, we are going to register the finetuned embedding model as a model package using SageMaker Model Registry. 

A model package is an abstraction of reusable model artifacts that packages all ingredients required for inference. 
Primarily, it consists of an inference specification that defines the inference image to use along with an optional model weights location.

A model package group is a collection of model packages. A model package group can be created for a specific ML business problem, and new versions of the model packages can be added to it. Typically, customers are expected to create a ModelPackageGroup for a SageMaker pipeline so that model package versions can be added to the group for every SageMaker Pipeline run.

In [None]:
from sagemaker.huggingface import HuggingFaceModel
from sagemaker.huggingface import get_huggingface_llm_image_uri
import json
from sagemaker.workflow.model_step import ModelStep

# retrieve the llm image uri
triton_image=f"785573368785.dkr.ecr.{region}.amazonaws.com/sagemaker-tritonserver:22.12-py3"

# print ecr image uri
print(f"llm image uri: {triton_image}")

inference_instance_type = "ml.g5.2xlarge"
number_of_gpu = 1
health_check_timeout = 3600

# create HuggingFaceModel with the image uri
huggingface_model = HuggingFaceModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    image_uri=triton_image,
    transformers_version="4.28",
    pytorch_version="2.0",
    py_version="py310",
    model_server_workers=1,
    role=role,
    name=name_from_base(model_id.replace('/', '-')),
    sagemaker_session=pipeline_session
)

create_step_args = huggingface_model.create(instance_type=inference_instance_type)
step_create_model = ModelStep(
    name="CreateModel",
    step_args=create_step_args,
    depends_on=[step_eval]
)

# Model Metrics
To capture the model training and evalution metrics from a SageMaker Training job, we use a `ModelMetrics` class. We captured the model evaluation metrics in a `evaluation.json`, stored in the specified S3 location. With that information, we create a `ModelMetrics` object to incl

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
import os 

model_package_group_name = f"{model_id.replace('/', '-')}-finetuned"
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
                step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
            ),
            content_type="application/json",
    )
)

In [None]:
register_args = huggingface_model.register(
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=[
        "ml.p2.16xlarge",
        "ml.p3.16xlarge",
        "ml.g4dn.4xlarge",
        "ml.g4dn.8xlarge",
        "ml.g4dn.12xlarge",
        "ml.g4dn.16xlarge",
        "ml.g5.2xlarge",
        "ml.g5.12xlarge",
    ],
    model_package_group_name=model_package_group_name,
    customer_metadata_properties={"training-image-uri": huggingface_estimator.training_image_uri()}, #Store the training image url
    approval_status="PendingManualApproval",
    model_metrics=model_metrics
)
step_register = ModelStep(name="RegisterModel",
                          step_args=register_args,
                          depends_on=[step_eval, step_create_model])

# Define a Pipeline of Parameters and Steps 
In this section, we combine all the steps into a Pipeline so it can be executed.
A pipeline requires a name, parameters, and steps. Names must be unique within an (account, region) pair.

Note:

* All the parameters used in the definitions must be present.
* Steps passed into the pipeline do not have to be listed in the order of execution. The SageMaker Pipeline service resolves the data dependency DAG as steps for the execution to complete.
* Steps must be unique to across the pipeline step list.

In [None]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig

pipeline = Pipeline(
    name=f"{prefix}-pipeline",
    parameters=[
        model_id_param,
        epochs_param,
        batch_size_param,
        evaluation_steps_param,
        training_dataset_s3_loc_param,
        eval_dataset_s3_loc_param,
        model_output_s3_loc_param,
        training_job_instance_type_param,
        eval_job_instance_type_param,
        base_model_group_name_param
    ],
    steps=[step_train, step_eval, step_create_model, step_register],
    sagemaker_session=pipeline_session
)

## Examining the pipeline definition
The JSON of the pipeline definition can be examined to confirm the pipeline is well-defined and the parameters and step properties resolve correctly.

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

In [None]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()