# Orchestrate Jobs to Train and Evaluate Models with Amazon SageMaker Pipelines


◉ 본 노트북 코드와 이미지는 AWS Sagemaker Example 코드를 참고하여 작성됐습니다.


Amazon SageMaker Pipelines은 ML Application 개발자와 Ops 엔지니어들에게 ML 작업을 조율하고 재현 가능한 ML Pipeline을 작성할 수 있는 기능을 제공한다.

## 개요

1. [SageMaker Pipelines](#sagemaker-pipelines)
1. [Notebook Overview](#notebook-overview)
1. [A SageMaker Pipeline](#a-sagemaker-pipeline)
1. [데이터셋](#데이터셋)
1. [Pipeline 구성/실행을 위한 Parameter 정의](#pipeline-구성실행을-위한-parameter-정의)
1. [Processing Step 정의 (Feature Engineeirng)](#processing-step-정의-feature-engineeirng)
1. [Training Step 정의 (모델 학습)](#training-step-정의-모델-학습)
1. [Model Evaluation Step 정의 (학습된 모델을 평가하기 위한 Step 정의)](#model-evaluation-step-정의-학습된-모델을-평가하기-위한-step-정의)
1. [Model Step 정의 (모델 생성)](#model-step-정의-모델-생성)
1. [Batch Transformation을 위한 Transform step 정의](#batch-transformation을-위한-transform-step-정의)
1. [Model Package를 생성하기 위해 Register Model Step을 정의](#model-package를-생성하기-위해-register-model-step을-정의)
1. [Pipeline 실행을 종료하고 Fail을 나타내기 위한 FailStep을 정의](#pipeline-실행을-종료하고-fail을-나타내기-위한-failstep을-정의)
1. [Accuracy을 확인하는 Condition Step을 정의. step의 진행 조건에 따라 Pipeline DAG에서 조건부 실행을 지원할 수 있음.](#accuracy을-확인하는-condition-step을-정의-step의-진행-조건에-따라-pipeline-dag에서-조건부-실행을-지원할-수-있음)
1. [Pipeline을 정의](#pipeline을-정의)
1. [Pipeline 확인](#pipeline-확인)
1. [Pipeline을 sagemaker에 등록 및 실행](#pipeline을-sagemaker에-등록-및-실행)
1. [파이프라인 작업: 파이프라인 실행 검사 및 대기](#파이프라인-작업-파이프라인-실행-검사-및-대기)
   1. [Pipeline 결과 검토](#pipeline-결과-검토)
   1. [Lineage](#lineage)
   1. [Parametrized Executions](#parametrized-실행)

## Reference

- https://sagemaker-examples.readthedocs.io/en/latest/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform_outputs.html


## SageMaker Pipelines

본 노트북에서 다룰 Sagemaker Pipeline 기능 리스트

- Pipelines - Sagemaker Job과 Resource Creation을 관리/자동화하기 위한 DAG (step, condition으로 구성)
- Processing job steps - feature engineering, data validation, model evaluation, model interpretation와 같은 Data processing workload를 실행하기 위한 DAG Step (job steps)
- Training job steps - 훈련 데이터셋을 활용하여 모델 학습과 예측을 반복하는 DAG Step (job steps)
- Conditional execution steps - Pipeline의 분기에서 조건부 실행을 제공하는 Step
- Register model steps - SageMaker' Model Registry에서 사용할 수 있는 모델 패키지 리소스를 생성하는 단계로, 이를 통해 Amazon Sagemaker에서 배포 가능한 모델을 등록하는 Step
- Create model steps - Transformation 혹은 Endpoint에 Model을 적용하기 위한 모델 생성 Step
- Transform job steps - 데이터 셋에서 훈련이나 추론에 방해되는 잡음이나 편향을 제거하거나 대용량 데이터셋에서 추론을 실행하는 Step. 지속적인 Endpoint가 필요없는 경우에 Inference를 수행하는 Batch Step
- Fail steps - Pipeline 실행을 중지하고 Pipeline 실행을 실패로 표시하는 Step
- Parametrized Pipeline executions - 지정된 Parameter에 따란 Pipeline 실행을 다양화할 수 있도록 하는 Component


## Notebook Overview

1. SageMaker Pipeline 동작을 위한 요소들을 매개변수화할 수 있는 Pipeline parameters를 정의
2. Processing Step 정의 (data cleansing, feature engineering, data set split(훈련 데이터, 테스트 데이터))
3. Training Step 정의 (전처리된 데이터셋을 활용하여 모델 학습을 진행)
4. Processing Step 정의 (학습된 모델의 성능을 평가하는 단계)
5. Create Model Step 정의 (훈련에 사용된 Model artifact를 활용하여 모델을 생성하는 단계)
6. Transform Step 정의 (생성된 모델을 기반으로 배치 작업을 수행하는 설정)
7. Register Model Step 정의 (학습된 모델을 model package화 하는 단계)
8. Conditional Step 정의 (Pipeline 이전 단계의 출력을 기반으로 조건을 확인하고, 조건에 따른 다른 단게를 조건부로 실행하는 조건 단계 정의)
9. Fail Step 정의 (실패 상황을 정의하고, customized error message 를 설정하는 단계)
10. Pipeline 생성과 정의 (앞에서 생성한 Step과 parameter를 활용하여 pipeline 생성)
11. Pipeline 실행
12. Model Evaluation 결과 확인
13. Condition 설정과 Pipeline 재실행


## A SageMaker Pipeline

본 실습에서 생성하는 Pipeline은 전처리, 학습, 평가, 모델 생성, 배치 변환 및 모델 등록과 같은 전형적인 Machine Learning Application 패턴을 따릅니다 :

![A](./img/pipeline-full.png)


## 데이터셋

1. 출처 : [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone)
2. 데이터 설명 : 전복의 외형적 수치 데이터와 전복의 나이(생존기간) 데이터가 포함되어 있습니다.
3. 데이터 분석 목적 : 전복의 외형적 수치 데이터를 기반으로 하여 전복의 나이(생존기간) 예측 / Regression(회귀) 문제.
4. 데이터 셋 구성

   | 특성                        | 설명                        |
   | --------------------------- | --------------------------- |
   | 길이(length)                | 전복의 껍질 길이            |
   | 직경(diameter)              | 전복의 직경                 |
   | 높이(height)                | 전복의 높이                 |
   | 전체 무게(whole_weight)     | 전체 전복의 무게            |
   | 분리된 무게(shucked_weight) | 전복의 무게                 |
   | 내장 무게(viscera_weight)   | 전복의 무게(피를 제외)      |
   | 건조된 무게(shell_weight)   | 건조된 뒤의 전복의 무게     |
   | 성별(sex)                   | 'M', 'F', 'I' 중 'I'는 유아 |
   | 링의 수(rings)              | 나이, 생존기간              |


In [None]:
!pip install -U sagemaker

In [None]:
import sys

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"PipelineForAbaloneRings"

데이터 업로드 진행. default bucket을 활용하며, sagemaker에서 자동으로 bucket을 생성해줌.


In [None]:
!mkdir -p data

In [None]:
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-example-files-prod-{region}").download_file(
    "datasets/tabular/uci_abalone/abalone.csv", local_path
)

base_uri = f"s3://{default_bucket}/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

모델 생성 후에 Batch Transformation을 위한 Dataset 준비.


In [None]:
local_path = "data/abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch", local_path
)

base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

## Pipeline 구성/실행을 위한 Parameter 정의

Pipeline을 매개변수화하는데 사용할 수 있는 Pipeline Parameter(매개변수) 정의합니다. Parameter를 사용하면 Pipeline 정의를 수정하지 않고 User-defined Pipeline 실행 및 일정을 설정합니다.

![B](./img/pipeline-1.png)

지원되는 Parameter 유형은 아래와 같습니다.

- `ParameterString` - Python `str` type
- `ParameterInteger` - Python `int` type
- `ParameterFloat` - Python `float` type

이러한 매개변수는 디폴트 값이 설정되어야 하며, 파이프라인 실행 중에 재정의가 가능하다. 지정된 기본값은 매개변수 유형의 instance 여야한다.

Pipeline의 Workflow에서 정의되는 Parameter는 아래와 같습니다.

- `processing_instance_count` - Instance 갯수
- `instance_type` - Instance 유형
- `model_approval_status` - CI/CD 목적으로 훈련된 모델을 등록할 approval status
- `input_data` - 입력 데이터의 s3 bucket uri
- `batch_data` - 배치 데이터의 s3 bucket uri
- `mse_threshold` - 모델 정확도를 확인하는 데 사용되는 MSE Threshold


In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(
    name="TrainingInstanceType", default_value="ml.t3.medium")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval")

input_data = ParameterString(name="InputData", default_value=input_data_uri)
batch_data = ParameterString(name="BatchData", default_value=batch_data_uri)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

## Processing Step 정의 (Feature Engineeirng)

우선, Processing Step 정의를 위하여 Feature Engineering을 위한 전처리 Script가 개발되어야 한다.
해당 코드는 ./code/preprocessing.py 에 저장되어 있다.

Processing Step은 해당 script를 input data와 실행한다. Training Step은 전처리된 훈련 특성과 label을 사용하여 모델을 학습한다. Evaluation 단계는 훈련된 모델과 Test Data를 활용하여 모델을 평가한다.

![C](./img/pipeline-2.png)


1. SKLearnProcessor 인스턴스 생성하며, 추후 ProcessingStep에 활용한다
2. 위에서 정의한 parameter(processing_instance_count, instance_type)를 사용한다 (불필요하게 많이 하면 과금될 수 있음. 1개로 진행해도 충분)


In [None]:
# SKLearnProcessor Estimator를 사용하여 전처리 작업을 수행합니다.
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "1.2-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

3. 위에서 생성한 sklearn_processor에 run() method와 함께 인자들을 추가하고, ProcessingStep으로 정의한다.


In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

# sklearn_processor의 input, output, code를 정의합니다.
processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data,  # 이 input_data를 사용해서 전처리 작업을 수행합니다.
                        # 이 destination은 local경로이다. 여기에 input_data를 저장합니다. 그리고 이 esimator는 이 경로에서 Input data를 가져와서 사용한다.
                        destination="/opt/ml/processing/input"),
    ],

    outputs=[
        ProcessingOutput(output_name="train",
                         source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation",
                         source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="AbaloneProcess", step_args=processor_args)

## Training Step 정의 (모델 학습)

Amazon Sagemaker에서 지원하는 XGBoost 알고리즘을 활용하여 모델을 학습한다.

1. XGBoost 알고리즘과 input data에 대한 Estimator를 구성한다. 이때, sagemaker_session에 pipeline_session을 넘겨주면 바로 실행하지 않고, pipeline 작업 단계에 필요한 인수를 반환한다.
2. XGBoost 알고리즘에서 사용할 hyperparameter를 정의한다.
3. 1번에서 생성된 Estimator에 fit 메소드를 적용하고, 학습을 위한 parameter를 적용한다. 이때, Processing Step에서 생성된 data를 input으로 넘겨준다.

![D](./img/pipeline-3.png)


In [None]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

# Model Training with Estimator and Trainig job
model_path = f"s3://{default_bucket}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            # 이전에 전처리 작업( = step_process) 을 수행한 output을 사용하여 학습을 수행합니다.
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

4. 위에서 생성된 train_arg를 TrainingStep에 인수로 전달하여, Step을 정의 완료한다.


In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    step_args=train_args,
)

## Model Evaluation Step 정의 (학습된 모델을 평가하기 위한 Step 정의)

1. 모델 평가를 위한 script 개발이 필요하다. 해당 코드는 ./code/evaluation.py에 위치해있다.
2. ScriptProcessor를 생성하여, 모델 평가 script를 실행할 수 있는 Instance를 생성한다.
3. 해당 ScriptProcessor를 ProcessingStep로 정의한다.

![E](./img/pipeline-4.png)


In [None]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation",
                         source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

4. evaluation_report를 설정하고 이를 ProcessingStep의 인자로 넣어주면, 추후에 evaluation_report의 path(evaluation.json)에서 분석이 가능하다.


In [None]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json")
step_eval = ProcessingStep(
    name="AbaloneEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

## Model Step 정의 (모델 생성)

생성된 모델을 활용해서 배치 transformation(inference 등)을 수행하기 위해 정의한다.
위의 train step에서 생성된 model artifact를 전달한다.


In [None]:
from sagemaker.model import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
step_create_model = ModelStep(
    name="AbaloneCreateModel",
    step_args=model.create(instance_type="ml.m5.large",
                           accelerator_type="ml.eia1.medium"),
)

## Batch Transformation을 위한 Transform step 정의

`CreateModelStep`의 ModelName을 `Transformer` 에 전달하여 객체를 생성한다.
그리고 해당 객체를 `TransformStep`에 전달하여 Step을 정의 완료한다.


In [None]:
from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform",
)
step_transform = TransformStep(
    name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

## Model Package를 생성하기 위해 Register Model Step을 정의

Model Package는 추론에 필요한 모든 구성 요소를 포함하는 재사용 가능한 Model Artifact의 추상화버전이다. (추론에 사용할 imagedhk Model 가중치를 정의)
Model Package Group은 여러 개의 Model Pacakge를 포함

Model을 Model Registry에 등록하기 위해 생성한 Model을 가져와 register() 진행한다.

![A](./img/pipeline-5.png)


In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="AbaloneRegisterModel", step_args=register_args)

## Pipeline 실행을 종료하고 Fail을 나타내기 위한 FailStep을 정의

특정 조건(아래에서는 mse_threshold)을 만족하지 못하면, 사용자 정의 오류 메시지와 함께 Fail 상황을 알리고 Pipeline을 멈추도록 설정
![B](img/pipeline-8.png)


In [None]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="AbaloneMSEFail",
    error_message=Join(  # Fail시에 원하는 에러 메세지를 출력할 수 있습니다.
        on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

## Accuracy을 확인하는 Condition Step을 정의. step의 진행 조건에 따라 Pipeline DAG에서 조건부 실행을 지원할 수 있음.

![D](./img/pipeline-6.png)


In [None]:
# 여러가지 조건이 있는데 ConditionLessThanOrEqualTo을 써 볼 것이다.
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(  # left값이 mse_threshold 보다 작거나 같으면 True를 반환합니다.
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    # conditions가 True 일 때, 즉 조건이 만족할 경우 실행할 step을 정의합니다.
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],  # 조건이 만족하지 않을 경우 실행할 step을 정의합니다.
)

## Pipeline을 정의

- 위에서 생성한 step, parameters, condition을 조합하여 한 개의 workflow로 정의
- Pipeline 정의시에 구성 요소를 순서대로 나열할 필요없이, DAG 기준으로 순차적으로 진행이 됨

![E](./img/pipeline-7.png)


In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    # step_eval 다음에 조건문에 따라서 어떤 step이 실행될지가 결정되기 때문에
    # step_eval다음엔 step_cond가 와야하고,
    # step_cond 에서
    #   if_steps=[step_register, step_create_model, step_transform],
    #   else_steps=[step_fail],  # 조건이 만족하지 않을 경우 실행할 step을 정의합니다.
    # 가 알아서 조건에 맞게 실행된다.
)

## Pipeline 확인

Pipeline의 define된 정보를 json 형태로 조회 가능하여 올바르게 구현했는지 여부를 검토


In [None]:
import json


definition = json.loads(pipeline.definition())
definition

## Pipeline을 sagemaker에 등록 및 실행

- 정의된 pipeline을 sagemaker에 submit 진행
- sagemaker는 해당 pipeline의 필요한 작업을 생성


In [None]:
pipeline.upsert(role_arn=role)

Pipeline 실행


In [None]:
execution = pipeline.start()

## 파이프라인 작업: 파이프라인 실행 검사 및 대기

Pipeline 정보를 조회하거나 대기 설정을 진행


In [None]:
execution.describe()

In [None]:
execution.wait()

실행되었던 pipeline list를 조회


In [None]:
execution.list_steps()

### Pipeline 결과 검토

Pipeline 이 완료되면 모델 평가를 검토한다. S3에서 생성된 evaluation.json 파일을 확인


In [None]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))

### Lineage


In [None]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

### Parametrized 실행

1. Pipeline을 추가 실행하고 다른 Pipeline의 parameter를 지정 가능.
   아래와 같이 Model Approval Status를 approval로 설정하면, RegisterModel 단계에서 생성된 Model package가 자동으로 CI/CD Pipeline을 통해 자동으로 배포 준비가 완료.


In [None]:
execution = pipeline.start(
    parameters=dict(
        ModelApprovalStatus="Approved",
    )
)
execution.wait()
execution.list_steps()

2. MSE 임계값을 조정하여 모델 정확도 기준을 높이고 싶을 경우 활용할 수 있음. faile step은 try/except로 처리하여 출력


In [None]:
execution = pipeline.start(parameters=dict(MseThreshold=3.0))
try:
    execution.wait()
except Exception as error:
    print(error)
execution.list_steps()