# Orchestrating Jobs, Model Registration, and Continuous Deployment with Amazon SageMaker

Amazon SageMaker는 머신 러닝 애플리케이션 개발자와 머신 러닝 운영 엔지니어에게 SageMaker 작업을 오케스트레이션하고 재현 가능한 머신 러닝 파이프라인을 작성하며, 짧은 지연 시간으로 실시간으로 추론을 위한 사용자 정의 구축 모델을 배포하거나 배치 변환을 통해 오프라인으로 추론하고 아티팩트의 계보를 추적할 수 있는 기능을 제공합니다. 프로덕션 워크플로우를 배포 및 모니터링하고, 모델 아티팩트를 배포하고, 간단한 인터페이스를 통해 아티팩트 계보를 추적하면서 머신 러닝 애플리케이션 개발을 위한 안전 및 모범 사례 패러다임을 준수하는 건전한 운영 관행을 확립할 수 있습니다.

SageMaker Pipelines service는 선언적 Json 사양인 SageMaker Machine Learning Pipeline Domain Specific Language(DSL)를 지원합니다. 이 DSL은 파이프라인 매개변수와 SageMaker 작업 단계의 방향성 비순환 그래프(DAG)를 정의합니다. SageMaker Python 소프트웨어 개발자 키트(SDK)는 엔지니어와 과학자 모두에게 이미 익숙한 구성을 사용하여 파이프라인 DSL 생성을 간소화합니다.


SageMaker Model Registry는 학습된 모델을 저장, 버전 관리 및 관리하는 곳입니다. 데이터 과학자와 머신 러닝 엔지니어는 단일 Model Registry에서 모델 버전을 비교하고, 배포할 모델을 승인하고, 서로 다른 AWS 계정에서 모델을 배포할 수 있습니다. SageMaker를 통해 고객은 머신 러닝 운영 모범 사례를 따르고 올바르게 시작할 수 있습니다. 고객은 단 한 번의 API 호출로 전체 ML Ops end-to-end 시스템을 구축할 수 있습니다.

## SageMaker Pipelines

Amazon SageMaker Pipelines은 다음과 같은 활동을 지원합니다:

* Pipelines - 단계 및 조건의 방향성 비순환 그래프로 SageMaker 작업 및 리소스 생성을 오케스트레이션합니다.
* Processing Job steps - feature engineering, data validation, model evaluation와 model interpretation 등과 같은 데이터 처리 워크로드를 실행하기 위한 SageMaker의 간소화된 관리 환경입니다.
* Training Job steps -학습 데이터 세트의 예제를 제시하여 예측을 수행하도록 모델을 가르치는 반복적인 프로세스입니다.
* Conditional step execution - Pipeline 에서 브랜치의 조건부 실행을 제공합니다.
* Registering Models - Model Registry에 model package 리소스를 생성하여 Amazon SageMaker에서 배포 가능한 모델을 만드는 데 사용할 수 있습니다.
* Creating Model steps - transform 단계에서 사용하거나 나중에 endpoint로 게시할 모델을 만듭니다.
* Parameterized Pipeline executions - 제공된 매개변수에 따라 파이프라인 실행을 변경할 수 있습니다.
* Transform Job steps - 데이터 세트에서 학습 또는 추론을 방해하는 노이즈나 편향을 제거하고, 대규모 데이터 세트에서 추론을 얻고, 영구적인 endpoint가 필요하지 않은 경우 추론을 실행하기 위해 데이터 세트를 전처리하는 batch transform 입니다.
* Pipelines - 단계 및 조건의 방향성 비순환 그래프로 SageMaker 작업 및 리소스 생성을 오케스트레이션합니다.

### A SageMaker Pipeline

우리가 만드는 파이프라인은 모델의 품질이 충분한 경우 pre-processing, training, evaluation, and conditional model registration 및 게시의 일반적인 머신 러닝 애플리케이션 패턴을 따릅니다.

![A typical ML Application pipeline](img/pipeline-full.png)

### Getting some constants

로컬 실행 환경에서 몇 가지 constants를 가져옵니다.

In [34]:
import os

import boto3
import sagemaker
import sagemaker.session

from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)
from sagemaker.processing import (
    ProcessingInput,
    ProcessingOutput,
    ScriptProcessor,
)
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
)
from sagemaker.workflow.functions import (
    JsonGet,
)
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model
from sagemaker.workflow.pipeline_context import PipelineSession

In [35]:
from pathlib import Path

BASE_DIR= Path.cwd()
BASE_DIR

PosixPath('/home/sagemaker-user/simple-sagemaker/lab_4_new_pipeline')

In [36]:
def get_sagemaker_client(region):
     """Gets the sagemaker client.

        Args:
            region: the aws region to start the session
            default_bucket: the bucket to use for storing the artifacts

        Returns:
            `sagemaker.session.Session instance
        """
     boto_session = boto3.Session(region_name=region)
     sagemaker_client = boto_session.client("sagemaker")
     return sagemaker_client


def get_session(region, default_bucket):
    """Gets the sagemaker session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        `sagemaker.session.Session instance
    """

    boto_session = boto3.Session(region_name=region)

    sagemaker_client = boto_session.client("sagemaker")
    runtime_client = boto_session.client("sagemaker-runtime")
    return sagemaker.session.Session(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        sagemaker_runtime_client=runtime_client,
        default_bucket=default_bucket,
    )

def get_pipeline_session(region, default_bucket):
    """Gets the pipeline session based on the region.

    Args:
        region: the aws region to start the session
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        PipelineSession instance
    """

    boto_session = boto3.Session(region_name=region)
    sagemaker_client = boto_session.client("sagemaker")

    return PipelineSession(
        boto_session=boto_session,
        sagemaker_client=sagemaker_client,
        default_bucket=default_bucket,
    )

def get_pipeline_custom_tags(new_tags, region, sagemaker_project_name=None):
    try:
        sm_client = get_sagemaker_client(region)
        response = sm_client.describe_project(ProjectName=sagemaker_project_name)
        sagemaker_project_arn = response["ProjectArn"]
        response = sm_client.list_tags(
            ResourceArn=sagemaker_project_arn)
        project_tags = response["Tags"]
        for project_tag in project_tags:
            new_tags.append(project_tag)
    except Exception as e:
        print(f"Error getting project tags: {e}")
    return new_tags

In [37]:
region = boto3.session.Session().region_name
default_bucket = sagemaker.Session().default_bucket()

In [38]:
def get_pipeline(
    region,
    sagemaker_project_name=None,
    role=None,
    default_bucket=None,
    model_package_group_name="AbalonePackageGroup",
    pipeline_name="AbalonePipeline",
    base_job_prefix="Abalone",
    processing_instance_type="ml.m5.xlarge",
    training_instance_type="ml.m5.xlarge",
):
    """Gets a SageMaker ML Pipeline instance working with on abalone data.

    Args:
        region: AWS region to create and run the pipeline.
        role: IAM role to create and run steps and pipeline.
        default_bucket: the bucket to use for storing the artifacts

    Returns:
        an instance of a pipeline
    """
    sagemaker_session = get_session(region, default_bucket)
    if role is None:
        role = sagemaker.session.get_execution_role(sagemaker_session)

    pipeline_session = get_pipeline_session(region, default_bucket)

    # parameters for pipeline execution
    processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
    model_approval_status = ParameterString(
        name="ModelApprovalStatus", default_value="PendingManualApproval"
    )
    input_data = ParameterString(
        name="InputDataUrl",
        default_value=f"s3://sagemaker-servicecatalog-seedcode-{region}/dataset/abalone-dataset.csv",
    )

    # processing step for feature engineering
    sklearn_processor = SKLearnProcessor(
        framework_version="0.23-1",
        instance_type=processing_instance_type,
        instance_count=processing_instance_count,
        base_job_name=f"{base_job_prefix}/sklearn-abalone-preprocess",
        sagemaker_session=pipeline_session,
        role=role,
    )
    step_args = sklearn_processor.run(
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
            ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
        ],
        code=os.path.join(BASE_DIR, "preprocess.py"),
        arguments=["--input-data", input_data],
    )
    step_process = ProcessingStep(
        name="PreprocessAbaloneData",
        step_args=step_args,
    )

    # training step for generating model artifacts
    model_path = f"s3://{sagemaker_session.default_bucket()}/{base_job_prefix}/AbaloneTrain"
    image_uri = sagemaker.image_uris.retrieve(
        framework="xgboost",
        region=region,
        version="1.0-1",
        py_version="py3",
        instance_type=training_instance_type,
    )
    xgb_train = Estimator(
        image_uri=image_uri,
        instance_type=training_instance_type,
        instance_count=1,
        output_path=model_path,
        base_job_name=f"{base_job_prefix}/abalone-train",
        sagemaker_session=pipeline_session,
        role=role,
    )
    xgb_train.set_hyperparameters(
        objective="reg:linear",
        num_round=50,
        max_depth=5,
        eta=0.2,
        gamma=4,
        min_child_weight=6,
        subsample=0.7,
        silent=0,
    )
    step_args = xgb_train.fit(
        inputs={
            "train": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "train"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
            "validation": TrainingInput(
                s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                    "validation"
                ].S3Output.S3Uri,
                content_type="text/csv",
            ),
        },
    )
    step_train = TrainingStep(
        name="TrainAbaloneModel",
        step_args=step_args,
    )

    # processing step for evaluation
    script_eval = ScriptProcessor(
        image_uri=image_uri,
        command=["python3"],
        instance_type=processing_instance_type,
        instance_count=1,
        base_job_name=f"{base_job_prefix}/script-abalone-eval",
        sagemaker_session=pipeline_session,
        role=role,
    )
    step_args = script_eval.run(
        inputs=[
            ProcessingInput(
                source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
                destination="/opt/ml/processing/model",
            ),
            ProcessingInput(
                source=step_process.properties.ProcessingOutputConfig.Outputs[
                    "test"
                ].S3Output.S3Uri,
                destination="/opt/ml/processing/test",
            ),
        ],
        outputs=[
            ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
        ],
        code=os.path.join(BASE_DIR, "evaluate.py"),
    )
    evaluation_report = PropertyFile(
        name="AbaloneEvaluationReport",
        output_name="evaluation",
        path="evaluation.json",
    )
    step_eval = ProcessingStep(
        name="EvaluateAbaloneModel",
        step_args=step_args,
        property_files=[evaluation_report],
    )

    # register model step that will be conditionally executed
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri="{}/evaluation.json".format(
                step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
            ),
            content_type="application/json"
        )
    )
    model = Model(
        image_uri=image_uri,
        model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
        sagemaker_session=pipeline_session,
        role=role,
    )
    step_args = model.register(
        content_types=["text/csv"],
        response_types=["text/csv"],
        inference_instances=["ml.t2.medium", "ml.m5.large"],
        transform_instances=["ml.m5.large"],
        model_package_group_name=model_package_group_name,
        approval_status=model_approval_status,
        model_metrics=model_metrics,
    )
    step_register = ModelStep(
        name="RegisterAbaloneModel",
        step_args=step_args,
    )

    # condition step for evaluating model quality and branching execution
    cond_lte = ConditionLessThanOrEqualTo(
        left=JsonGet(
            step_name=step_eval.name,
            property_file=evaluation_report,
            json_path="regression_metrics.mse.value"
        ),
        right=6.0,
    )
    step_cond = ConditionStep(
        name="CheckMSEAbaloneEvaluation",
        conditions=[cond_lte],
        if_steps=[step_register],
        else_steps=[],
    )

    # pipeline instance
    pipeline = Pipeline(
        name=pipeline_name,
        parameters=[
            processing_instance_type,
            processing_instance_count,
            training_instance_type,
            model_approval_status,
            input_data,
        ],
        steps=[step_process, step_train, step_eval, step_cond],
        sagemaker_session=pipeline_session,
    )
    return pipeline

### Get the pipeline instance

여기서 pipeline 모듈에서 pipeline 인스턴스를 가져와서 작업할 수 있습니다.

In [46]:
region = boto3.Session().region_name
role = sagemaker.get_execution_role()
default_bucket = sagemaker.session.Session().default_bucket()

# Change these to reflect your project/business name or if you want to separate ModelPackageGroup/Pipeline from the rest of your team
model_package_group_name = f"AbaloneModelPackageGroup-Example"
pipeline_name = f"AbalonePipeline-Example"

In [47]:
pipeline = get_pipeline(
    region=region,
    role=role,
    default_bucket=default_bucket,
    model_package_group_name=model_package_group_name,
    pipeline_name=pipeline_name,
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


### Submit the pipeline to SageMaker and start execution

workflow 서비스에 pipeline 정의를 제출해 보겠습니다. 전달된 role은 workflow 서비스에서 단계에 정의된 모든 작업을 만드는 데 사용됩니다.

In [48]:
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:us-west-2:283963181880:pipeline/AbalonePipeline-Example',
 'ResponseMetadata': {'RequestId': 'acd61f6f-0e2b-4e1d-8a04-eb723b3ea12a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'acd61f6f-0e2b-4e1d-8a04-eb723b3ea12a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '91',
   'date': 'Wed, 11 Sep 2024 03:22:55 GMT'},
  'RetryAttempts': 0}}

모든 기본 parameters를 수락하여 pipeline을 시작합니다.
pipeline을 시작할 때 이러한 pipeline parameters에 값을 전달할 수도 있으며, 이에 대해서는 나중에 다룰 예정입니다. 

In [49]:
execution = pipeline.start()

### Pipeline Operations: examining and waiting for pipeline execution

이제 실행 인스턴스를 설명하고 실행 단계를 나열하여 실행에 대해 자세히 알아보세요.

In [50]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:283963181880:pipeline/AbalonePipeline-Example',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-2:283963181880:pipeline/AbalonePipeline-Example/execution/87hgr1yb338x',
 'PipelineExecutionDisplayName': 'execution-1726024978490',
 'PipelineExecutionStatus': 'Executing',
 'PipelineExperimentConfig': {'ExperimentName': 'abalonepipeline-example',
  'TrialName': '87hgr1yb338x'},
 'CreationTime': datetime.datetime(2024, 9, 11, 3, 22, 58, 418000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 9, 11, 3, 22, 58, 418000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-west-2:283963181880:user-profile/d-c3uxums6sl0n/SageMakerUser',
  'UserProfileName': 'SageMakerUser',
  'DomainId': 'd-c3uxums6sl0n',
  'IamIdentity': {'Arn': 'arn:aws:sts::283963181880:assumed-role/sagemaker-immersion-day-SageMakerExecutionRole-C2pcH34njvfb/SageMaker',
   'PrincipalId': 'AROAUEHMGJM4MDKPAODPB:SageMaker'}},
 'LastModifiedBy'

실행 시 `wait()`을 호출하여 실행을 기다릴 수 있습니다:

In [51]:
execution.wait()

KeyboardInterrupt: 

실행 단계를 나열하여 상태와 아티팩트를 확인할 수 있습니다:

In [54]:
execution.list_steps()

[{'StepName': 'RegisterAbaloneModel-RegisterModel',
  'StartTime': datetime.datetime(2024, 9, 11, 3, 30, 25, 545000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 9, 11, 3, 30, 26, 411000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-west-2:283963181880:model-package/AbaloneModelPackageGroup-Example/2'}},
  'AttemptCount': 1},
 {'StepName': 'CheckMSEAbaloneEvaluation',
  'StartTime': datetime.datetime(2024, 9, 11, 3, 30, 24, 793000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 9, 11, 3, 30, 25, 47000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Condition': {'Outcome': 'True'}},
  'AttemptCount': 1},
 {'StepName': 'EvaluateAbaloneModel',
  'StartTime': datetime.datetime(2024, 9, 11, 3, 27, 51, 178000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2024, 9, 11, 3, 30, 23, 750000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagem

### Parameterized Executions

다른 파이프라인 매개변수를 지정하여 파이프라인의 추가 실행을 실행할 수 있습니다. 매개변수 인수는 이름이 매개변수 이름이고 값이 기본값의 오버라이드로 사용할 기본값인 dictionary입니다.

특히 모델의 성능에 따라 다른 파이프라인 실행을 시작하되 이번에는 컴퓨팅에 최적화된 인스턴스 유형에서 시작하고 모델 승인 상태를 자동으로 “Approved”으로 설정할 수 있습니다. 즉, `RegisterModel` 단계에서 생성된 모델 패키지 버전은 자동으로 CI/CD 파이프라인을 통해 배포할 수 있는 준비가 완료됩니다(예: SageMaker Projects).

In [None]:
# execution = pipeline.start(
#     parameters=dict(
#         ProcessingInstanceType="ml.c5.xlarge",
#         ModelApprovalStatus="Approved",
#     )
# )

In [None]:
# execution.wait()

In [None]:
# execution.list_steps()

## TEST

In [73]:
import json
import pandas as pd

In [74]:
sagemaker_client = boto3.client('sagemaker')

In [75]:
response = sagemaker_client.list_endpoints(StatusEquals='InService')

In [76]:
for endpoint_name in response['Endpoints']:
    print(f"endpoint_name : {endpoint_name['EndpointName']}")
    endpoint_name = endpoint_name['EndpointName']

endpoint_name : modelpackagegroup-240911-staging


In [94]:
%%writefile test.csv
5.0,-2.6981682974308594,-2.6996542509753234,-2.0208574277242835,-1.6187907159862003,-1.5539024835643052,-1.5747429841474172,-1.644065138582414,0.0,0.0,1.0
10.0,-0.7827552080726535,-0.5833158241356144,-0.7057616663526005,-0.8621579121407661,-0.8645157253893376,-0.9086914468217306,-0.7459863444891941,0.0,0.0,1.0
15.0,0.6746243164390247,0.6260204197727908,0.9679965753931773,0.6358118841516099,0.07268980206421295,0.9115452887464132,1.050171243697246,0.0,0.0,1.0
13.0,0.34150899655064076,0.27329734863283944,1.087550735517876,0.3115406825035666,-0.13682970287131657,-0.4068717954119667,1.0860943954609747,0.0,0.0,1.0

Overwriting test.csv


In [97]:
test_path = "test.csv"
test_df = pd.read_csv(test_path, header=None)
y_test = test_df.iloc[:, 0].to_numpy()
test_df.drop(test_df.columns[0], axis=1, inplace=True)

In [107]:
num = 4

In [108]:
import io
from io import StringIO
csv_file = io.StringIO()
test_df[num-1:num].to_csv(csv_file, sep=",", header=False, index=False)
payload = csv_file.getvalue()

response = runtime_client.invoke_endpoint(
    EndpointName=endpoint_name, 
    ContentType='text/csv',
    Accept='application/json',
    Body=payload
)

predict_value = json.loads(response['Body'].read().decode())
print(f"Ground_truth : {y_test[num-1]} , Predict_value : {predict_value}")

Ground_truth : 13.0 , Predict_value : 14.752860069274902
