_API Reference: https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#steps_

In [1]:
!pip install sagemaker transformers==4.44.2 --quiet

In [None]:
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import TrainingStep, ProcessingStep, CreateModelStep, CacheConfig
from sagemaker.workflow.parameters import ParameterString
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.model_step import ModelStep
from sagemaker import get_execution_role
from sagemaker.estimator import Estimator
import boto3

sess = sagemaker.Session()

sagemaker_session_bucket=None
if sagemaker_session_bucket is None and sess is not None:
    # set to default bucket if a bucket name is not given
    sagemaker_session_bucket = sess.default_bucket()

try:
    role = sagemaker.get_execution_role()
except ValueError:
    iam = boto3.client('iam')
    role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']

sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)

print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")

## 取得 Training Datasets

In [None]:
import json
import boto3

# 初始化 S3 客戶端，來源區域是 ap-northeast-1，目標區域是 us-west-2
s3_source = boto3.client('s3', region_name="us-west-2")
s3_target = boto3.client('s3', region_name="us-west-2")

def parse_s3_uri(uri):
    parts = uri.replace("s3://", "").split("/")
    bucket = parts[0]
    key = "/".join(parts[1:])
    return bucket, key

def copy_s3_object(source_uri, target_bucket):
    source_bucket, source_key = parse_s3_uri(source_uri)
    try:
        # 從來源 bucket 下載檔案
        response = s3_source.get_object(Bucket=source_bucket, Key=source_key)
        file_content = response['Body'].read()

        # 將檔案上傳到目標 bucket
        s3_target.put_object(Bucket=target_bucket, Key=source_key, Body=file_content)
        print(f"Copied {source_key} to {target_bucket}")
    except Exception as e:
        print(f"Error copying {source_key}: {str(e)}")

# s3 URI
base_uri = 's3://aws-educate-09-28-sagemaker-workshop-oregon/datasets/phi-3/'
train_uri = base_uri + 'train_dataset.json'
test_uri = base_uri + 'test_dataset.json'

# 你的目標 S3 bucket
target_bucket = sess.default_bucket()


# 複製 train 和 test 資料到新的 S3 bucket
copy_s3_object(train_uri, target_bucket)
copy_s3_object(test_uri, target_bucket)

datasets_s3_uri = "s3://" + target_bucket + "/datasets/phi-3/"


## Pipeline Parameters

In [24]:
# 定義參數
training_datasets_s3_uri = ParameterString(name="TrainingDatasetesS3Uri", default_value=datasets_s3_uri)
model_package_group_name = "Demo-SageMaker-Pipeline-Group"

# 其他配置
cache_config = CacheConfig(enable_caching=True, expire_after="30d")

## TraningStep

In [None]:
from sagemaker.huggingface import HuggingFace
from transformers import AutoTokenizer

model_id = "microsoft/Phi-3.5-mini-instruct"



# hyperparameters, which are passed into the training job
hyperparameters ={
  'model_id': model_id,                             # pre-trained model
  'dataset_path': '/opt/ml/input/data/training',    # path where sagemaker will save training dataset
  'num_train_epochs': 3,                            # number of training epochs
  'per_device_train_batch_size': 1,                 # batch size for training
  'gradient_accumulation_steps': 2,                 # Number of updates steps to accumulate 
  'gradient_checkpointing': True,                   # save memory but slower backward pass
  'fp16': True ,
  'learning_rate': 2e-4,                            # learning rate
  'max_grad_norm': 0.3,                             # Maximum norm (for gradient clipping)
  'warmup_ratio': 0.03,                             # warmup ratio
  "lr_scheduler_type":"constant",                   # learning rate scheduler
  'save_strategy': "epoch",                         # save strategy for checkpoints
  "logging_steps": 10,                              # log every x steps
  'merge_adapters': True,                           # wether to merge LoRA into the model (needs more memory)
  'use_flash_attn': True,                           # Whether to use Flash Attention
  'output_dir': '/tmp/run',                         # output directory, where to save assets during training
}

# define Training Job Name 
job_name = f'huggingface-qlora-{hyperparameters["model_id"].replace("/","-").replace(".","-")}'


huggingface_estimator = HuggingFace(
    entry_point          = 'run_qlora.py',    # train script
    source_dir           = '../scripts',      # directory which includes all the files needed for training
    instance_type        = 'ml.p3.2xlarge',   # instances type used for the training job
    instance_count       = 1,                 # the number of instances used for training
    max_run              = 2*24*60*60,        # maximum runtime in seconds (days * hours * minutes * seconds)
    base_job_name        = job_name,          # the name of the training job
    role                 = role,              # Iam role used in training job to access AWS ressources, e.g. S3
    volume_size          = 300,               # the size of the EBS volume in GB
    transformers_version = '4.36',            # the transformers version used in the training job
    pytorch_version      = '2.1',             # the pytorch_version version used in the training job
    py_version           = 'py310',           # the python version used in the training job
    hyperparameters      =  hyperparameters,  # the hyperparameters passed to the training job
    environment          = { "HUGGINGFACE_HUB_CACHE": "/tmp/.cache" }, # set env variable to cache models in /tmp
)


# 定義訓練步驟
train_step = TrainingStep(
    name="TrainModel",
    estimator=huggingface_estimator,
    inputs={
        'training': TrainingInput(training_datasets_s3_uri, content_type="application/json")
    },
    cache_config=cache_config
)


## Register Model Step
Documentation: https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-register-model

In [None]:
from sagemaker.huggingface import HuggingFaceModel
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.huggingface import get_huggingface_llm_image_uri

# retrieve the llm image uri
llm_image = get_huggingface_llm_image_uri(
  "huggingface",
  session=sess,
)

config = {
    'HF_MODEL_ID': "/opt/ml/model", # path to where sagemaker stores the model
    'SM_NUM_GPUS': json.dumps(1), # Number of GPU used per replica
    'MAX_INPUT_LENGTH': json.dumps(1024), # Max length of input text
    'MAX_TOTAL_TOKENS': json.dumps(2048), # Max length of the generation (including input text)
}

model = HuggingFaceModel( # https://sagemaker.readthedocs.io/en/stable/frameworks/huggingface/sagemaker.huggingface.html#hugging-face-model
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts, # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html
    role=role,
    image_uri=llm_image,
    sagemaker_session=sess,
    env=config
)

register_step = RegisterModel(
    name="DemoSageMakerPipelineModel",
    model=model,
    content_types=["application/json"],
    response_types=["application/json"],
    inference_instances=["ml.g4dn.xlarge", "ml.g5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status="Approved",
)

## Lambda Step for Deploying model endpoint
documents: 
- https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-lambda
- https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker/client/create_model.html

In [None]:
%%writefile lambda_deployer.py

import json

import boto3


def lambda_handler(event, context):
    """Lambda function to deploy a model to an Endpoint using boto3 and create an API Gateway for SageMaker"""

    # 使用 event 中的參數
    model_package_arn = event["model_package_arn"]
    model_name = event["model_name"]
    endpoint_config_name = event["endpoint_config_name"]
    endpoint_name = event["endpoint_name"]
    role = event["role"]
    apigateway_role = event["apigateway_role"]

    sm_client = boto3.client("sagemaker")
    apigateway_client = boto3.client("apigateway")

    # 創建 SageMaker 模型
    create_model_response = sm_client.create_model(
        ModelName=model_name,
        PrimaryContainer={"ModelPackageName": model_package_arn},
        ExecutionRoleArn=role,
    )

    # 創建端點配置
    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "InstanceType": "ml.g5.xlarge",
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "ModelName": model_name,
                "VariantName": "AllTraffic",
            }
        ],
    )

    # 創建端點
    create_endpoint_response = sm_client.create_endpoint(
        EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name
    )

    # 創建 API Gateway
    api_name = f"{model_name}-api"
    create_api_response = apigateway_client.create_rest_api(
        name=api_name,
        description=f"API Gateway for SageMaker endpoint {endpoint_name}",
        endpointConfiguration={"types": ["REGIONAL"]},
    )

    # 取得 API Gateway 的根資源 ID
    api_id = create_api_response["id"]
    root_id = apigateway_client.get_resources(restApiId=api_id)["items"][0]["id"]

    # 創建 POST 方法並設定
    apigateway_client.put_method(
        restApiId=api_id,
        resourceId=root_id,
        httpMethod="POST",
        authorizationType="NONE",
    )

    # 設置 SageMaker Runtime 與 API Gateway 的整合
    apigateway_client.put_integration(
        restApiId=api_id,
        resourceId=root_id,
        httpMethod="POST",
        type="AWS",  # 使用 AWS Service Integration
        integrationHttpMethod="POST",
        uri=f"arn:aws:apigateway:{boto3.Session().region_name}:runtime.sagemaker:path/endpoints/{endpoint_name}/invocations",
        credentials=apigateway_role,  # 指定具有 SageMaker InvokeEndpoint 權限的角色
    )

    # 設置 Integration Response，以便 API Gateway 正確處理 SageMaker 回應
    apigateway_client.put_integration_response(
        restApiId=api_id,
        resourceId=root_id,
        httpMethod="POST",
        statusCode="200",
        responseTemplates={"application/json": "$input.body"},
    )

    # 創建方法回應，確保有適當的狀態碼
    apigateway_client.put_method_response(
        restApiId=api_id,
        resourceId=root_id,
        httpMethod="POST",
        statusCode="200",
        responseModels={"application/json": "Empty"},
    )

    # 部署 API Gateway
    apigateway_client.create_deployment(
        restApiId=api_id,
        stageName="dev",
    )

    # 返回 API Gateway 的 URL
    api_url = (
        f"https://{api_id}.execute-api.{boto3.Session().region_name}.amazonaws.com/dev"
    )

    return {
        "statusCode": 200,
        "body": json.dumps(
            {
                "message": f"Created Endpoint {endpoint_name} and API Gateway {api_name}!",
                "api_url": api_url,
            }
        ),
    }


In [61]:
import boto3
import json

iam = boto3.client('iam')

def attach_policy_if_missing(role_name, policy_arn):
    """檢查並附加策略到角色，若策略已存在則跳過"""
    try:
        # 獲取附加到角色的現有策略
        attached_policies = iam.list_attached_role_policies(RoleName=role_name)['AttachedPolicies']
        attached_policy_arns = [policy['PolicyArn'] for policy in attached_policies]

        # 如果策略不在已附加列表中，則附加
        if policy_arn not in attached_policy_arns:
            iam.attach_role_policy(RoleName=role_name, PolicyArn=policy_arn)
            print(f"Attached policy {policy_arn} to role {role_name}")
        else:
            print(f"Policy {policy_arn} already attached to role {role_name}")

    except Exception as e:
        print(f"Error attaching policy {policy_arn} to role {role_name}: {e}")


def create_lambda_role(role_name, apigateway_role_arn):
    try:
        # 創建 IAM 角色
        response = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "lambda.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }),
            Description='Role for Lambda to interact with SageMaker and API Gateway'
        )

        role_arn = response['Role']['Arn']

    except iam.exceptions.EntityAlreadyExistsException:
        print(f'Using existing role: {role_name}')
        response = iam.get_role(RoleName=role_name)
        role_arn = response['Role']['Arn']

    # 無論角色是否已存在，都會檢查並附加所需的策略
    attach_policy_if_missing(role_name, 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole')
    attach_policy_if_missing(role_name, 'arn:aws:iam::aws:policy/AmazonSageMakerFullAccess')
    attach_policy_if_missing(role_name, 'arn:aws:iam::aws:policy/AmazonAPIGatewayAdministrator')

    # 在 Lambda 角色中直接附加 iam:PassRole 權限，允許傳遞 API Gateway 的角色
    policy_document = {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": "iam:PassRole",
                "Resource": apigateway_role_arn
            }
        ]
    }

    iam.put_role_policy(
        RoleName=role_name,
        PolicyName="PassRolePolicy",
        PolicyDocument=json.dumps(policy_document)
    )

    print(f"Attached iam:PassRole policy to {role_name} for {apigateway_role_arn}")

    return role_arn


def create_apigateway_role(role_name):
    try:
        # 創建 API Gateway 用的 IAM 角色
        response = iam.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps({
                "Version": "2012-10-17",
                "Statement": [
                    {
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "apigateway.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }),
            Description='Role for API Gateway to invoke SageMaker Endpoints'
        )

        role_arn = response['Role']['Arn']

    except iam.exceptions.EntityAlreadyExistsException:
        print(f'Using existing role: {role_name}')
        response = iam.get_role(RoleName=role_name)
        role_arn = response['Role']['Arn']

    # 附加允許 API Gateway 調用 SageMaker 的策略
    attach_policy_if_missing(role_name, 'arn:aws:iam::aws:policy/AmazonSageMakerFullAccess')

    return role_arn


In [None]:
import time
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
apigateway_role = create_apigateway_role("apigateway-role")
lambda_role = create_lambda_role("lambda-deployment-role", apigateway_role_arn=apigateway_role)


# Use the current time to define unique names for the resources created
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

model_name = "demo-sagemaker-pipeline-model" + current_time
endpoint_name = "demo-sagemaker-pipeline-endpoint-" + current_time
endpoint_config_name = "demo-sagemaker-pipeline-endpoint-config" + current_time
function_name = "demo-sagemaker-pipeline-lambda-step" + current_time

# Lambda helper class can be used to create the Lambda function
func = Lambda(
    function_name=function_name,
    execution_role_arn=lambda_role,
    script="lambda_deployer.py",
    handler="lambda_deployer.lambda_handler",
    timeout=600,
    memory_size=10240,
)

# The dictionary retured by the Lambda function is captured by LambdaOutput, each key in the dictionary corresponds to a
# LambdaOutput

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)

# The inputs provided to the Lambda function can be retrieved via the `event` object within the `lambda_handler` function
# in the Lambda
lambda_deploy_step = LambdaStep(
    name="LambdaStepDeployModelEndpoint",
    lambda_func=func,
    inputs={
        "model_package_arn": register_step.properties.ModelPackageArn,
        "model_name": model_name,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "role": role,
        "apigateway_role": apigateway_role
    },
    outputs=[output_param_1, output_param_2])

## Pipeline Definition

In [None]:
# 定義 Pipeline
pipeline = Pipeline(
    name="TrainingAndRegisterAndDeployPipeline",
    parameters=[training_datasets_s3_uri],
    steps=[train_step, register_step, lambda_deploy_step]
)

# 更新 SageMaker Pipeline (不存在則創建)
pipeline.upsert(role_arn=role)


In [31]:
# # 取得執行狀態
# execution.describe()

# # 等待 Pipeline 執行完成
# execution.wait()


## Manually Deploy a Model from the Registry
Domumentation: https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry-deploy.html#model-registry-deploy-smsdk

In [None]:
from sagemaker import ModelPackage # https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.ModelPackage
from time import gmtime, strftime

model_package_arn = "arn:aws:sagemaker:us-west-2:097724924093:model-package/Demo-SageMaker-Pipeline-Group/3"
model = ModelPackage(role=role, # https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.ModelPackage
                     model_package_arn=model_package_arn,
                     sagemaker_session=sess)
model.deploy(initial_instance_count=1, instance_type='ml.g5.xlarge', container_startup_health_check_timeout=1000) # https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.Model.deploy

## Manually Deploy a Model from S3 (uncompressed)

In [None]:
from sagemaker.huggingface import HuggingFaceModel
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.huggingface import get_huggingface_llm_image_uri

# Model s3 prefix(folder) uri  
model_s3_prefix_uri = "s3://sagemaker-us-west-2-097724924093/huggingface-qlora-microsoft-Phi-3-5-min-2024-09-11-04-03-17-319/output/model/"

# retrieve the llm image uri
llm_image = get_huggingface_llm_image_uri(
  "huggingface",
  session=sess,
)

config = {
    'HF_MODEL_ID': "/opt/ml/model", # path to where sagemaker stores the model
    'SM_NUM_GPUS': json.dumps(1), # Number of GPU used per replica
    'MAX_INPUT_LENGTH': json.dumps(1024), # Max length of input text
    'MAX_TOTAL_TOKENS': json.dumps(2048), # Max length of the generation (including input text)
}

model = HuggingFaceModel( # https://sagemaker.readthedocs.io/en/stable/frameworks/huggingface/sagemaker.huggingface.html#hugging-face-model
    model_data={'S3DataSource':{'S3Uri': model_s3_prefix_uri,'S3DataType': 'S3Prefix','CompressionType': 'None'}}, # We use a dict, for more details, please refer to these two documents: https://sagemaker.readthedocs.io/en/stable/api/inference/model.html, https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_S3ModelDataSource.html
    role=role,
    image_uri=llm_image,
    sagemaker_session=sess,
    env=config
)


model.deploy(initial_instance_count=1, instance_type='ml.g5.2xlarge', container_startup_health_check_timeout=1000) # https://sagemaker.readthedocs.io/en/stable/api/inference/model.html#sagemaker.model.Model.deploy