# Amazon SageMaker Pipelines를 사용하여 모델 훈련 및 평가를 위한 작업 오케스트레이션

Amazon SageMaker Pipelines는 머신러닝(ML) 애플리케이션 개발자와 운영 엔지니어에게 SageMaker 작업을 조정하고 재현 가능한 ML 파이프라인을 작성할 수 있는 기능을 제공합니다. 또한 실시간으로 낮은 지연 시간으로 추론을 위해 맞춤형 모델을 배포하고, Batch Transform을 사용하여 오프라인 추론을 실행하며, 아티팩트의 계보를 추적할 수 있도록 합니다. 사용자는 생산 워크플로우 배포 및 모니터링, 모델 아티팩트 배포, 아티팩트 계보 추적 등을 단순한 인터페이스를 통해 수행하며, ML 애플리케이션 개발을 위한 안전성과 최선의 실천 원칙을 준수할 수 있습니다.

SageMaker Pipelines 서비스는 SageMaker Pipeline 도메인 특정 언어(DSL:domain specific language)를 지원하며, 이는 선언적 JSON 사양입니다. 이 DSL은 파이프라인 매개변수와 SageMaker 작업 단계를 정의하는 유향 비순환 그래프(DAG)를 정의합니다. SageMaker Python 소프트웨어 개발 키트(SDK)는 엔지니어와 과학자들이 이미 익숙한 구조를 사용하여 파이프라인 DSL 생성을 간소화합니다.

## Runtime

이 노트북은 약 40분이 소요됩니다.

## Contents

1. [SageMaker Pipelines](#SageMaker-Pipelines)
1. [Notebook Overview](#Notebook-Overview)
1. [A SageMaker Pipeline](#A-SageMaker-Pipeline)
1. [Dataset](#Dataset)
1. [Define Parameters to Parametrize Pipeline Execution](#Define-Parameters-to-Parametrize-Pipeline-Execution)
1. [Define a Processing Step for Feature Engineering](#Define-a-Processing-Step-for-Feature-Engineering)
1. [Define a Training Step to Train a Model](#Define-a-Training-Step-to-Train-a-Model)
1. [Define a Model Evaluation Step to Evaluate the Trained Model](#Define-a-Model-Evaluation-Step-to-Evaluate-the-Trained-Model)
1. [Define a Create Model Step to Create a Model](#Define-a-Create-Model-Step-to-Create-a-Model)
1. [Define a Transform Step to Perform Batch Transformation](#Define-a-Transform-Step-to-Perform-Batch-Transformation)
1. [Define a Register Model Step to Create a Model Package](#Define-a-Register-Model-Step-to-Create-a-Model-Package)
1. [Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed](#Define-a-Fail-Step-to-Terminate-the-Pipeline-Execution-and-Mark-it-as-Failed)
1. [Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State](#Define-a-Condition-Step-to-Check-Accuracy-and-Conditionally-Create-a-Model-and-Run-a-Batch-Transformation-and-Register-a-Model-in-the-Model-Registry,-Or-Terminate-the-Execution-in-Failed-State)
1. [Define a Pipeline of Parameters, Steps, and Conditions](#Define-a-Pipeline-of-Parameters,-Steps,-and-Conditions)
1. [Submit the pipeline to SageMaker and start execution](#Submit-the-pipeline-to-SageMaker-and-start-execution)
1. [Pipeline Operations: Examining and Waiting for Pipeline Execution]

## SageMaker Pipelines

SageMaker Pipelines는 다음 활동을 지원하며, 이 노트북에서 시연됩니다:

* Pipelines - SageMaker 작업 및 리소스 생성을 조정하기 위한 단계와 조건의 DAG(방향성 비순환 그래프)
* 처리 작업 단계 - SageMaker에서 데이터 처리 작업(예: Feature Engineering, 데이터 검증, 모델 평가, 모델 해석 등)을 실행하기 위한 간소화되고 관리되는 경험
* 훈련 작업 단계 - 훈련 데이터셋에서 예시를 제시하여 모델이 예측을 수행하도록 가르치는 반복적 프로세스
* 조건부 실행 단계 - 파이프라인 내 분기 실행을 조건부로 제공하는 단계
* 모델 등록 단계 - Model Registry에 모델 패키지 리소스를 생성하여 Amazon SageMaker에서 배포 가능한 모델을 생성하는 데 사용할 수 있는 단계
* 모델 생성 단계 - 변환 단계에서 사용하거나 나중에 엔드포인트로 게시하기 위해 모델을 생성하는 단계
* 변환 작업 단계 - 데이터셋에서 훈련이나 추론에 방해가 되는 노이즈나 편향을 제거하거나, 대규모 데이터셋에서 추론을 얻거나, 지속적 엔드포인트가 필요하지 않을 때 추론을 실행하는 배치 변환
* 실패 단계 - 파이프라인 실행을 중단하고 파이프라인 실행을 실패로 표시하는 단계
* 매개변수화 파이프라인 실행 - 지정된 매개변수에 따라 파이프라인 실행의 변형을 허용하는 단계 

## 노트북 개요

이 노트북에서는 다음과 같은 방법을 보여줍니다:

* SageMaker 파이프라인을 매개변수화하기 위해 사용할 수 있는 파이프라인 매개변수 집합을 정의합니다.
* 입력 데이터를 정제, 피처엔지니어링, 훈련 및 테스트 데이터 세트로 분할하는 처리 단계를 정의합니다.
* 사전 처리된 훈련 데이터 세트를 사용하여 모델을 훈련하는 훈련 단계를 정의합니다.
* 테스트 데이터 세트에서 훈련된 모델의 성능을 평가하는 처리 단계를 정의합니다.
* 훈련에 사용된 모델 아티팩트에서 모델을 생성하는 모델 생성 단계를 정의합니다.
* 생성된 모델을 기반으로 배치 변환을 수행하는 변환 단계를 정의합니다.
* 모델 훈련에 사용된 에스티메이터와 모델 아티팩트에서 모델 패키지를 생성하는 모델 등록 단계를 정의합니다.
* 이전 단계의 출력을 기반으로 조건을 측정하고 조건에 따라 다른 단계를 실행하는 조건부 단계를 정의합니다.
* 실행 실패의 원인을 표시하는 사용자 정의 오류 메시지와 함께 실패 단계를 정의합니다.
* 정의된 매개변수와 단계를 포함하여 DAG에서 파이프라인 정의를 정의하고 생성합니다.
* 파이프라인 실행을 시작하고 실행이 완료될 때까지 기다립니다.
* S3 버킷에서 모델 평가 보고서를 다운로드하여 검토합니다.
* 두 번째 파이프라인 실행을 시작합니다.

## A SageMaker Pipeline

생성하는 파이프라인은 전처리, 훈련, 평가, 모델 생성, 배치 변환, 모델 등록의 일반적인 머신러닝(ML) 애플리케이션 패턴을 따릅니다:

![A typical ML Application pipeline](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/main/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-full.png)

## Dataset

사용하는 데이터셋은 [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone) [1]입니다. 이 작업의 목적은 아발론 달팽이의 물리적 측정값을 기반으로 그 연령을 결정하는 것입니다. 근본적으로 이는 회귀 문제입니다.

데이터셋에는 다음과 같은 특징이 포함되어 있습니다: 길이(가장 긴 껍질 측정값), 직경(길이에 수직인 직경), 높이(껍질 내 육류 포함 높이), 전체 무게(전체 아발론 무게), 껍질 제거 후 무게(육류 무게), 내장 무게(출혈 후 내장 무게), 건조 후 껍질 무게, 성별(‘M’, ‘F’, 'I'에서 'I'는 유아를 의미함), 고리 수(정수).

고리 수는 연령의 좋은 근사치로 나타납니다(연령은 고리 수 + 1.5입니다). 그러나 이 수치를 얻기 위해서는 껍질을 원뿔 모양으로 절단하고, 절편을 염색한 후 현미경을 통해 고리 수를 세는 과정이 필요하며, 이는 시간이 많이 소요되는 작업입니다. 반면 다른 물리적 측정값은 더 쉽게 확인할 수 있습니다. 이 데이터셋을 사용하여 다른 물리적 측정값을 통해 고리 수 변수의 예측 모델을 구축합니다.

데이터를 S3 버킷에 업로드하기 전에 SageMaker Python SDK를 설치하고 이 노트북에서 나중에 사용할 수 있는 일부 상수를 수집합니다.

> [1] Dua, D. and Graff, C. (2019). [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml). Irvine, CA: University of California, School of Information and Computer Science.

In [None]:
import sys

!{sys.executable} -m pip install "sagemaker>=2.99.0"

import boto3
import sagemaker
from sagemaker.workflow.pipeline_context import PipelineSession

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
pipeline_session = PipelineSession()

model_package_group_name = f"AbaloneModelPackageGroupName"

In [None]:
print(f"Using Execution Role: {role}")
print(f"Region: {region}")

이제 데이터를 기본 버킷에 업로드하세요. `input_data_uri`에 자신의 데이터 세트를 적절히 선택할 수 있습니다.

In [None]:
!mkdir -p data

In [None]:
# Lookup Sagemaker Unified Studio project attributes using Project() class:
from sagemaker_studio import Project
project = Project()

print(project.id)
print(project.name)
print(project.domain_id)
print(project.project_status)
print(project.domain_unit_id)
print(project.project_profile_id)
print(project.user_id)

In [None]:
# Lookup Sagemaker Unified Studio Project S3 locations:

default_bucket = sagemaker_session.default_bucket()
default_prefix = sagemaker_session.default_bucket_prefix
print(f'SMUS default bucket: {default_bucket}')
print(f'SMUS default prefix: {default_prefix}')

smus_base_path = f"s3://{default_bucket}/{default_prefix}/"
print(smus_base_path)

In [None]:
# Download data file from Public S3 bucket

local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-sample-files").download_file(
    "datasets/tabular/uci_abalone/abalone.csv", local_path
)

In [None]:
# Add Sagemaker Unified Studio default prefix to path

base_uri = f"s3://{default_bucket}/{default_prefix}/abalone"

# Upload file to S3
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

모델 생성 후 배치 변환을 위해 두 번째 데이터셋을 다운로드합니다. `batch_data_uri`에 적합한 경우 자체 데이터셋을 선택할 수 있습니다.

In [None]:
local_path = "data/abalone-dataset-batch"

s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch", local_path
)

In [None]:
# Add Sagemaker Unified Studio default prefix to path

# Upload file to S3
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

## Define Parameters to Parametrize Pipeline Execution

파이프라인 매개변수를 정의하여 파이프라인을 매개변수화할 수 있습니다. 매개변수는 파이프라인 정의를 수정하지 않고도 맞춤형 파이프라인 실행 및 일정을 설정할 수 있도록 합니다.

지원되는 매개변수 유형은 다음과 같습니다:

* `ParameterString` - Python의 `str` 유형을 나타냅니다
* `ParameterInteger` - Python의 `int` 유형을 나타냅니다
* `ParameterFloat` - Python의 `float` 유형을 나타냅니다

이 매개변수는 기본값을 제공할 수 있으며, 파이프라인 실행 시 이 기본값을 덮어쓸 수 있습니다. 지정된 기본값은 해당 매개변수의 유형의 인스턴스여야 합니다

이 워크플로우에서 정의된 매개변수에는 다음과 같은 것이 포함됩니다:

* `processing_instance_count` - 처리 작업의 인스턴스 수
* `instance_type` - 훈련 작업의 `ml.*` 인스턴스 유형
* `model_approval_status` - CI/CD 목적으로 훈련된 모델에 등록할 승인 상태(“PendingManualApproval”이 기본값입니다
* `input_data` - 입력 데이터의 S3 버킷 URI 위치
* `batch_data` - 배치 데이터의 S3 버킷 URI 위치
* `mse_threshold` - 모델의 정확도를 검증하기 위해 사용되는 평균 제곱 오차(MSE) 임계값

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

![Define Parameters](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/main/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-1.png)

## Define a Processing Step for Feature Engineering

먼저, Processing 단계에서 지정된 사전 처리 스크립트를 개발합니다.

이 노트북 셀은 사전 처리 스크립트를 포함하는 파일 `preprocessing_abalone.py`를 작성합니다. 스크립트를 업데이트한 후 이 셀을 다시 실행하여 덮어쓸 수 있습니다. 사전 처리 스크립트는 `scikit-learn`을 사용하여 다음과 같은 작업을 수행합니다:

* 성별 카테고리 데이터의 누락된 값을 채우고 훈련에 적합하도록 인코딩합니다.
* 성별 및 고리 수치 데이터를 제외한 모든 수치 필드를 스케일링 및 정규화합니다.
* 데이터를 훈련, 검증, 테스트 데이터셋으로 분할합니다.

처리 단계는 입력 데이터에 스크립트를 실행합니다. 훈련 단계는 사전 처리된 훈련 특징 및 라벨을 사용하여 모델을 훈련합니다. 평가 단계는 훈련된 모델과 사전 처리된 테스트 특징 및 라벨을 사용하여 모델을 평가합니다.

In [None]:
!mkdir -p code

In [None]:
%%writefile code/preprocessing.py
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Since we get a headerless CSV file, we specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
    )

    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

다음으로 `SKLearnProcessor` 프로세서의 인스턴스를 생성하고 이를 `ProcessingStep`에서 사용합니다.

이 노트북 전체에서 사용할 `framework_version`을 지정합니다.

프로세서 인스턴스가 사용하는 `processing_instance_count` 매개변수에 주의하세요.

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type="ml.m5.xlarge",
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
    sagemaker_session=pipeline_session,
)

마지막으로, 프로세서의 `run` 메서드의 출력을 `ProcessingStep`의 인수로 전달합니다. `pipeline_session`을 `sagemaker_session`에 전달하면, `.run()`을 호출해도 처리 작업이 실행되지 않고, 파이프라인의 단계로 작업을 실행하기 위해 필요한 인수를 반환합니다.

처리 작업의 출력 구성에서 지정된 `“train_data”` 및 `“test_data”`라는 이름의 채널에 주의하세요. 단계 `Properties`는 후속 단계에서 사용할 수 있으며, 실행 시점에 해당 런타임 값으로 해결됩니다. 특히 이 사용 방법은 훈련 단계를 정의할 때 명시적으로 언급됩니다.

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

processor_args = sklearn_processor.run(
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="code/preprocessing.py",
)

step_process = ProcessingStep(name="AbaloneProcess", step_args=processor_args)

![Define a Processing Step for Feature Engineering](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/main/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-2.png)

## 모델을 훈련하기 위해 훈련 단계를 정의합니다.

이 섹션에서는 Amazon SageMaker의 [XGBoost 알고리즘](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html)을 사용하여 이 데이터셋으로 모델을 훈련합니다. XGBoost 알고리즘과 입력 데이터셋을 위한 Estimator를 구성합니다. 일반적인 훈련 스크립트는 입력 채널에서 데이터를 로드하고 하이퍼파라미터로 훈련을 구성한 후 모델을 훈련시키고, 나중에 호스팅할 수 있도록 모델을 `model_dir`에 저장합니다.

훈련에서 생성된 모델이 저장되는 모델 경로도 지정됩니다.

참고: `instance_type` 매개변수는 파이프라인의 여러 곳에서 사용될 수 있습니다. 이 경우 `instance_type`은 Estimator에 전달됩니다.

In [None]:
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput

model_path = f"s3://{default_bucket}/{default_prefix}/AbaloneTrain"

image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    sagemaker_session=pipeline_session,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

train_args = xgb_train.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    }
)

Finally, we use the output of the estimator's `.fit()` method as arguments to the `TrainingStep`. By passing the `pipeline_session` to the `sagemaker_session`, calling `.fit()` does not launch the training job, it returns the arguments needed to run the job as a step in the pipeline.

Pass in the `S3Uri` of the `"train_data"` output channel to the `.fit()` method. Also, use the other `"test_data"` output channel for model evaluation in the pipeline. The `properties` attribute of a Pipeline step matches the object model of the corresponding response of a describe call. These properties can be referenced as placeholder values and are resolved at runtime. For example, the `ProcessingStep` `properties` attribute matches the object model of the [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response object.

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    step_args=train_args,
)

![Define a Training Step to Train a Model](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/main/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-3.png)

## 모델 평가 단계를 정의하여 훈련된 모델을 평가합니다

먼저, 모델 평가를 수행하는 Processing 단계에 지정된 평가 스크립트를 작성합니다.

파이프라인 실행 후, 분석을 위해 생성된 `evaluation.json` 파일을 확인할 수 있습니다.

평가 스크립트는 `xgboost`를 사용하여 다음과 같은 작업을 수행합니다:

* 모델을 로드합니다.
* 테스트 데이터를 읽습니다.
* 테스트 데이터에 대한 예측을 수행합니다.
* 정확도와 ROC 곡선을 포함한 분류 보고서를 생성합니다.
* 평가 보고서를 평가 디렉토리에 저장합니다.

In [None]:
%%writefile code/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

다음으로 `ScriptProcessor` 프로세서의 인스턴스를 생성하고 이를 `ProcessingStep`에서 사용합니다.

In [None]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
    sagemaker_session=pipeline_session,
)

eval_args = script_eval.run(
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="code/evaluation.py",
)

`.run()` 메서드에서 반환된 프로세서 인수를 사용하여 `ProcessingStep`을 생성하고, 입력 및 출력 채널과 파이프라인이 파이프라인 실행을 호출할 때 실행될 코드를 포함합니다.

구체적으로, `step_train` `properties`의 `S3ModelArtifacts`와 `step_process` `properties`의 `“test_data”` 출력 채널의 `S3Uri`가 입력으로 전달됩니다. `TrainingStep` 및 `ProcessingStep`의 `properties` 속성은 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) 및 [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) 응답 객체의 객체 모델과 각각 일치합니다.

In [None]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="AbaloneEval",
    step_args=eval_args,
    property_files=[evaluation_report],
)

![Define a Model Evaluation Step to Evaluate the Trained Model](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/main/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-4.png)

## 모델 생성 단계 정의하여 모델 생성

예제 모델을 사용하여 배치 변환을 수행하려면 SageMaker 모델을 생성합니다.

구체적으로, `TrainingStep`의 `step_train` 속성에서 `S3ModelArtifacts`를 전달합니다. `TrainingStep`의 `properties` 속성은 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) 응답 객체의 객체 모델과 일치합니다.

In [None]:
from sagemaker.model import Model

model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)

`ModelStep`을 정의하려면 `model.create()`의 반환 값을 단계 인수로 제공하십시오.

In [None]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep

step_create_model = ModelStep(
    name="AbaloneCreateModel",
    step_args=model.create(instance_type="ml.m5.large", accelerator_type="ml.eia1.medium"),
)

## 배치 변환을 수행하기 위해 변환 단계를 정의합니다.

모델 인스턴스가 정의되었으므로 적절한 모델 유형, 컴퓨팅 인스턴스 유형 및 원하는 출력 S3 URI를 사용하여 `Transformer` 인스턴스를 생성합니다.

구체적으로, `CreateModelStep`의 `step_create_model` 속성에서 `ModelName`을 전달합니다. `CreateModelStep`의 `properties` 속성은 [DescribeModel](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeModel.html) 응답 객체의 객체 모델과 일치합니다.

In [None]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/{default_prefix}/AbaloneTransform",
)

transformer 인스턴스와 이전에 정의된 `batch_data` 파이프라인 매개변수와 함께 `TransformInput`을 전달합니다.

In [None]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data)
)

## 모델 패키지를 생성하기 위해 등록 모델 단계를 정의합니다

모델 패키지는 추론에 필요한 모든 구성 요소를 포함하는 재사용 가능한 모델 아티팩트의 추상화입니다. 주로 추론에 사용할 추론 이미지를 정의하는 추론 사양과 선택적 모델 가중치 위치로 구성됩니다.

모델 패키지 그룹은 모델 패키지의 집합입니다. 특정 머신러닝 비즈니스 문제에 대해 모델 패키지 그룹을 생성할 수 있으며, 새로운 버전의 모델 패키지를 해당 그룹에 추가할 수 있습니다. 일반적으로 고객은 SageMaker 파이프라인을 위해 ModelPackageGroup을 생성하여 각 SageMaker 파이프라인 실행 시 모델 패키지 버전을 그룹에 추가할 수 있도록 합니다.

모델 레지스트리에 모델을 등록하려면 이전 단계에서 생성한 모델을 사용합니다
```
model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=pipeline_session,
    role=role,
)
```
그리고 모델 등록에 필요한 모든 매개변수를 전달하여 `.register()` 함수를 호출합니다.

`.register()` 호출의 출력을 가져와서 `ModelStep`의 단계 인수로 전달합니다.

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)
step_register = ModelStep(name="AbaloneRegisterModel", step_args=register_args)

![Define a Create Model Step and Batch Transform to Process Data in Batch at Scale](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/main/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-5.png)


## 파이프라인 실행을 종료하고 실패로 표시하기 위해 Fail Step 정의

이 섹션에서는 다음 단계를 안내합니다:

* 사용자 정의 오류 메시지를 포함한 `FailStep`을 정의하여 실행 실패의 원인을 표시합니다.
* `Join` 함수를 사용하여 `FailStep` 오류 메시지에 동적 `mse_threshold` 매개변수와 정적 텍스트 문자열을 결합하여 더 자세한 오류 메시지를 생성합니다.

In [None]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="AbaloneMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

![Define a Fail Step to Terminate the Execution in Failed State](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/main/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-8.png)

## 조건 단계 정의: 정확도 확인 및 조건에 따라 모델 생성, 배치 변환 실행, 모델 레지스트리에 모델 등록 또는 실패 상태에서 실행 종료

이 단계에서 모델은 평가 단계 `step_eval`을 통해 결정된 모델의 정확도가 지정된 값을 초과한 경우에만 등록됩니다. 그렇지 않으면 파이프라인 실행이 실패하고 종료됩니다. `ConditionStep`은 단계 속성의 조건에 따라 파이프라인 DAG에서 조건부 실행을 지원하도록 파이프라인을 구성할 수 있습니다.

다음 섹션에서 다음과 같이 수행합니다:

* 평가 단계 `step_eval`의 출력에서 발견된 정확도 값에 대해 `ConditionLessThanOrEqualTo`를 정의합니다.
* 조건을 `ConditionStep`의 조건 목록에 사용합니다.
* `CreateModelStep` 및 `TransformStep` 단계와 `RegisterModel` 단계 컬렉션을 `ConditionStep`의 `if_steps`에 전달합니다. 이 단계들은 조건이 `True`로 평가될 경우에만 실행됩니다.
* `FailStep` 단계를 `ConditionStep`의 `else_steps`에 전달합니다. 이 단계는 조건이 `False`로 평가될 경우에만 실행됩니다.

In [None]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[step_fail],
)

![Define a Condition Step to Check Accuracy and Conditionally Execute Steps](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/main/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-6.png)

## 매개변수, 단계, 및 조건의 파이프라인 정의

이 섹션에서는 단계를 파이프라인으로 결합하여 실행할 수 있도록 합니다.

파이프라인에는 `name`, `parameters`, 및 `steps`가 필요합니다. name은 `(account, region)` 쌍 내에서 고유해야 합니다.

참고:

* 정의에 사용된 모든 매개변수는 반드시 포함되어야 합니다.
* 파이프라인에 전달된 단계는 실행 순서대로 나열될 필요가 없습니다. SageMaker Pipeline 서비스는 데이터 의존성 DAG를 단계로 해결하여 실행을 완료합니다.
* 단계는 파이프라인 단계 목록 및 모든 조건 단계의 if/else 목록 전체에서 고유해야 합니다.

In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbalonePipeline-v1"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

![Define a Pipeline of Parameters, Steps, and Conditions](https://raw.githubusercontent.com/aws/amazon-sagemaker-examples/main/sagemaker-pipelines/tabular/abalone_build_train_deploy/img/pipeline-7.png)

## 파이프라인을 SageMaker에 제출하고 실행을 시작합니다

파이프라인 정의를 파이프라인 서비스에 제출합니다. 파이프라인 서비스는 전달된 역할을 사용하여 단계에 정의된 모든 작업을 생성합니다.

In [None]:
pipeline.upsert(role_arn=role)

# Create and Add Tags to Pipeline
#pipeline.upsert(role_arn=role, tags = [tag_dz_env, tag_dz_project, tag_dz_domain, tag_dz_scope])

파이프라인을 시작하고 모든 기본 매개변수를 적용합니다.

In [None]:
execution = pipeline.start()

## Pipeline Operations: Examining and Waiting for Pipeline Execution

파이프라인 실행을 설명합니다.

In [None]:
execution.describe()

실행이 완료될 때까지 기다리십시오. 

In [None]:
execution.wait()

행 단계 목록을 표시합니다. 이는 단계 실행 서비스에 의해 해결된 파이프라인의 단계입니다.

In [None]:
execution.list_steps()

### 평가 검토

파이프라인이 완료된 후 생성된 모델 평가 결과를 검토합니다. S3에서 생성된 `evaluation.json` 파일을 다운로드하고 보고서를 출력합니다.

In [None]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file(
    "{}/evaluation.json".format(
        step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
    )
)
pprint(json.loads(evaluation_json))