# Orchestrating Jobs with Amazon SageMaker Model Building Pipelines

***본 노트북 코드는 [영문 노트북](https://github.com/aws/amazon-sagemaker-examples/blob/master/sagemaker-pipelines/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.ipynb)의 번역본으로 직역이 아닌 중간중간 설명을 덧붙였습니다.***

Amazon SageMaker 모델 구축 파이프라인은 머신 러닝 (ML) 애플리케이션 개발자 및 운영 엔지니어가 SageMaker 작업을 조율하고(orchestrate) 재현 가능한 ML 파이프라인을 작성할 수 있는 기능들을 제공합니다. 또한 짧은 지연 시간(latency)으로 실시간 추론을 위한 사용자 지정 빌드 모델을 배포하고 배치 변환(batch transform)으로 오프라인 추론을 실행하고 아티팩트의 계보(lineage)를 추적할 수 있습니다. 프로덕션 워크 플로 배포 및 모니터링, 모델 아티팩트 배포, 간단한 인터페이스를 통해 아티팩트 계보 추적, ML 애플리케이션 개발을 위한 안전 및 모범 사례 패러다임을 준수하는 데 있어 건전한 운영 관행을 도입할 수 있습니다.

SageMaker Pipelines 서비스는 선언적 JSON 사양인 SageMaker 파이프라인 DSL(Domains Specific Language)을 지원합니다. 이 DSL은 파이프라인 매개 변수 및 SageMaker 작업 단계의 DAG(Directed Acyclic Graph)를 정의합니다. SageMaker Python SDK(Software Developer Kit)를 사용하면 엔지니어와 과학자가 파이프라인 DSL 생성을 간소화할 수 있습니다.

<br>

## 1. 들어가며
---

### 1.1. SageMaker Pipelines

SageMaker Pipelines는 아래의 기능들을 지원합니다.

* 파이프라인 (Pipelines) - SageMaker 작업 및 리소스 생성을 조율하기 위한 계 및 조건의 DAG입니다.
* 처리 작업 단계 (Processing job step) - SageMaker에서 피쳐 엔지니어링, 데이터 유효성 검사, 모델 평가 및 모델 해석과 같은 데이터 처리 워크로드를 실행하는 단순화된 관리 환경입니다.
* 훈련 작업 단계 (Training job steps) - 훈련 데이터셋의 예를 제시하여 예측을 수행하도록 모델을 훈련시키는 반복적인 프로세스입니다.
* 조건부 실행 단계 (Conditional execution steps) - 파이프라인에서 분기의 조건부 실행을 제공하는 단계입니다.
* 모델 단계 등록 (Register model steps) - Amazon SageMaker에서 배포 가능한 모델을 생성하는 데 사용할 수 있는 모델 레지스트리에서 모델 패키지 리소스를 생성하는 단계입니다.
* 모델 단계 만들기 (Create model steps) - 변환 단계 또는 나중에 엔드포인트로 게시할 때 사용할 모델을 생성허는 단계입니다.
* 작업 단계 변환 (Transform job steps) - 데이터셋에서 훈련 또는 추론을 방해하는 노이즈 또는 편향(bias)을 제거하고, 대규모 데이터셋에서 추론을 가져오고, 영구 엔드포인트가 필요하지 않을 때 추론을 실행하기 위해 데이터셋을 사전 처리하는 일괄 변환입니다.
* 매개 변수화된 파이프라인 실행 (Parametrized Pipeline executions) - 지정된 매개 변수에 따라 파이프라인 실행의 변형을 활성화합니다.

[Note] SageMaker Pipelines은 캐싱을 지원합니다. 자세한 내용은 아래 가이드를 참조해 주세요.<br> 
https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html

### 1.2. 노트북 개요

이 노트북은 아래의 방법들을 보여줍니다.

* Pipeline parameters - SageMaker 파이프라인을 매개 변수화하는 데 사용할 수 있는 파이프라인 매개 변수 셋을 정의합니다.
* Processing step - 클린징, 피쳐 엔지니어링, 입력 데이터를 훈련 및 테스트 데이터셋으로 분할하는 처리 단계를 정의합니다.
* Training step - 전처리된 훈련 데이터셋에서 모델을 훈련하는 훈련 단계를 정의합니다.
* Processing step - 테스트 데이터셋에서 훈련된 모델의 성능을 평가하는 처리 단계를 정의합니다.
* Create Model step - 훈련에 사용되는 모델 아티팩트에서 모델을 생성하는 모델 생성 단계를 정의합니다.
* Transform step - 생성된 모델을 기반으로 일괄 변환을 수행하는 변환 단계를 정의합니다.
* Register Model step - 모델 훈련에 사용되는 estimator와 모델 아티팩트에서 모델 패키지를 생성하는 모델 등록 단계를 정의합니다.
* Conditional step - 이전 단계의 출력을 기반으로 조건을 측정하고 다른 단계를 조건부로 실행하는 조건부 단계를 정의합니다.
* Pipeline definition - 정의된 매개 변수 및 단계를 사용하여 DAG에서 파이프라인 정의를 정의하고 생성합니다.
* Pipeline execution - 파이프라인 실행을 시작하고 실행이 완료될 때까지 기다립니다.
* Model evaluation - 검사를 위해 S3 버켓에서 모델 평가 보고서를 다운로드합니다.
* 두 번째 파이프라인 실행을 시작합니다.

### A SageMaker Pipeline

여러분이 생성하는 파이프라인은 전처리, 훈련, 평가, 모델 생성, 일괄 변환, 모델 등록의 일반적인 머신 러닝 (ML) 애플리케이션 패턴을 따릅니다.

![A typical ML Application pipeline](img/pipeline-full.png)

### 1.3. 데이터셋 개요

본 노트북에서 사용하는 데이터셋은 [UCI Machine Learning Abalone Dataset](https://archive.ics.uci.edu/ml/datasets/abalone) [1] 으로 전복의 나이를 추정하는 회귀(regression) 문제입니다. 

데이터셋의 컬럼은 length (가장 긴 껍질 측정),diameter (지름), height (높이), whole_weight (전체 전복 무게), shucked_weight (몸통 무게), viscera_weight (내장 무게), shell_weight (껍질 무게), 성별 ('M', 'F', 'I';'I'는 새끼 전복인 경우), ring (껍질의 고리 수) 으로 구성되어 있습니다.

고리 수는 나이를 추측할 수 있는 좋은 근사치로 밝혀졌습니다 (나이 = 고리 * 1.5). 그러나 이 숫자를 얻으려면 원뿔을 통해 껍질을 자르고, 단면을 염색하고, 현미경을 통해 고리의 수를 계산해야 하는데, 이는 시간이 많이 걸리는 작업입니다. 하지만, 머신 러닝 모델로 고리 수를 예측하는 모델을 구축한다면 물리적인 시간을 절약할 수 있습니다.

[1] Dua, D. and Graff, C. (2019). [UCI Machine Learning Repository](http://archive.ics.uci.edu/ml). Irvine, CA: University of California, School of Information and Computer Science.

In [1]:
import boto3
import sagemaker


region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
model_package_group_name = f"AbaloneModelPackageGroupName"

데이터를 여러분 계정의 S3 버켓에 업로드합니다. 

In [2]:
!mkdir -p data

In [3]:
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset.csv",
    local_path
)

base_uri = f"s3://{default_bucket}/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(input_data_uri)

s3://sagemaker-us-east-1-387793684046/abalone/abalone-dataset.csv


모델 생성 후 배치 변환을 위한 두 번째 데이터셋을 다운로드합니다.

In [4]:
local_path = "data/abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch",
    local_path
)

base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path, 
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

s3://sagemaker-us-east-1-387793684046/abalone/abalone-dataset-batch


In [5]:
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Specify the column names for the .csv file.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z



df = pd.read_csv(
    f"./data/abalone-dataset.csv",
    header=None, 
    names=feature_columns_names + [label_column],
    dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
)
numeric_features = list(feature_columns_names)
numeric_features.remove("sex")
numeric_transformer = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", StandardScaler())
    ]
)

categorical_features = ["sex"]
categorical_transformer = Pipeline(
    steps=[
        ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
        ("onehot", OneHotEncoder(handle_unknown="ignore"))
    ]
)

preprocess = ColumnTransformer(
    transformers=[
        ("num", numeric_transformer, numeric_features),
        ("cat", categorical_transformer, categorical_features)
    ]
)
    
y = df.pop("rings")
X_pre = preprocess.fit_transform(df)
y_pre = y.to_numpy().reshape(len(y), 1)
    
X = np.concatenate((y_pre, X_pre), axis=1)

pd.DataFrame(X).to_csv(f"data/all.csv", header=False, index=False)

<br>

## 2. 파이프라인 정의
---

### 2.1. 파이프라인 파라메터 정의: 파이프라인 실행 매개 변수화를 위한 매개 변수 정의

파이프라인을 매개 변수화하는 데 사용할 수 있는 파이프라인 매개 변수를 정의합니다. 매개 변수를 사용하면 파이프라인 정의를 수정하지 않고도 사용자 지정 파이프라인 실행 및 일정을 설정할 수 있습니다.

지원되는 매개 변수 유형들은 다음과 같습니다.

* `ParameterString` - `str`파이썬 타입을 나타냅니다.
* `ParameterInteger` - `int` 파이썬 타입을 나타냅니다.
* `ParameterFloat` - `float` 파이썬 타입을 나타냅니다.

이러한 매개 변수들은 파이프라인 실행 시 재정의할 수 있는 기본값 제공을 지원합니다. 지정된 기본값은 매개 변수 유형의 인스턴스여야 합니다.

이 워크플로에 정의된 매개 변수들은 다음과 같습니다.

* `processing_instance_type` - 처리 job의 `ml.*` 인스턴스 타입입니다.
* `processing_instance_count` - 처리 job의 인스턴스 개수입니다.
* `training_instance_type` - 훈련 job의 `ml.*` 인스턴스 타입입니다.
* `model_approval_status` - CI/CD 목적으로 훈련된 모델을 등록하기 위한 승인 상태입니다. ("PendingManualApproval"이 기본값)
* `input_data` - 입력 데이터의 S3 버켓 URI 위치입니다.
* `batch_data` - 배치 데이터의 S3 버켓 URI 위치입니다.

In [6]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)
training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)
model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)

![Define Parameters](img/pipeline-1.png)

### 2.2. 피쳐 엔지니어링을 위한 처리 단계(Processing Step) 정의

이 섹션에서는 전처리 스크립트를 포함하는 `preprocessing_abalone.py` 파일을 작성합니다. `%%writefile` 매직 커맨드를 사용해 스크립트를 업데이트하고 이 셀을 다시 실행하여 최신 버전으로 덮어쓸 수 있습니다. 전처리 스크립트는 scikit-learn을 사용하여 다음을 수행합니다.

- 누락된 성별 카테고리 데이터를 채우고 훈련에 적합하도록 인코딩합니다.
- 성별 및 링 숫자 데이터를 제외한 모든 숫자 필드의 크기를 조정하고 정규화합니다.
- 데이터를 훈련, 검증 및 테스트 데이터셋으로 분할합니다.
- 처리 단계는 입력 데이터에서 스크립트를 실행합니다. 훈련 단계에서는 사전 처리된 훈련 피쳐 및 레이블을 사용하여 모델을 훈련합니다. 평가 단계에서는 훈련된 모델과 사전 처리된 테스트 피쳐 및 레이블을 사용하여 모델을 평가합니다.

In [7]:
!mkdir -p abalone

In [8]:
%%writefile abalone/preprocessing.py
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Since we get a headerless CSV file we specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None, 
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype)
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler())
        ]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore"))
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features)
        ]
    )
    
    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)
    
    X = np.concatenate((y_pre, X_pre), axis=1)
    
    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))])

    
    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False)
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

Writing abalone/preprocessing.py


다음으로 `SKLearnProcessor` 프로세서의 인스턴스를 만들고 `ProcessingStep`에서 사용합니다.

또한, 이 노트북 전체에서 사용할 프레임워크 버전(`framework_version`)을 지정합니다.

프로세서 인스턴스에서 사용하는 `processing_instance_type` 및 `processing_instance_count` 매개 변수를 확인합니다.

In [9]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
)

마지막으로 프로세서 인스턴스를 사용하여 입력 및 출력 채널, 파이프라인이 파이프라인 실행을 호출 할때 실행될 코드와 함께 `ProcessingStep` 을 생성합니다. 이는 Python SDK 프로세서 인스턴스의 `run` 메소드와 유사합니다.

`ProcessingStep`에 전달된 `input_data` 매개 변수는 단계에서 사용되는 입력 데이터입니다. 이 입력 데이터는 실행될 때 프로세서 인스턴스에서 사용됩니다.

또한 처리 작업의 출력 구성에 지정된 `"train_data"` 및 `"test_data"` 채널을 확인합니다. `Properties` 단계는 후속 단계에서 사용할 수 있으며, 실행 시 런타임 값을 확인할 수 있습니다. 특히 이 사용법은 훈련 단계를 정의할 때 호출됩니다.


In [10]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    

step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    inputs=[
      ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),  
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="abalone/preprocessing.py",
)

![Define a Processing Step for Feature Engineering](img/pipeline-2.png)

### 2.3. 모델 훈련을 위한 훈련 단계(Training Step) 정의

이 섹션에서는 Amazon SageMaker의 [XGBoost Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) 빌트인 알고리즘을 사용하여 이 데이터셋을 훈련합니다. 기존 SageMaker와 동일하게 XGBoost 알고리즘 및 입력 데이터셋에 대한 Estimator를 구성합니다. 일반적인 훈련 스크립트는 입력 채널에서 데이터를 로드하고, 하이퍼 파라메터로 훈련을 구성하고, 모델을 훈련시키고, 나중에 엔드포인트에 호스팅할 수 있도록 모델을 `model_dir`에 저장합니다.

`training_instance_type` 매개 변수는 파이프라인의 여러 위치에서 사용될 수 있다는 점을 참조하세요. 이 경우 `training_instance_type`은 estimator로 전달됩니다.

In [11]:
from sagemaker.estimator import Estimator


model_path = f"s3://{default_bucket}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=training_instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=training_instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
    silent=0
)

마지막으로 estimator 인스턴스와 이전 `ProcessingStep`의 속성을 사용하여 `TrainingStep`을 생성합니다. 이는 Python SDK의 estimator `fit` 메서드와 유사합니다.

구체적으로 `"train_data"` 출력 채널의 `S3Uri`를 `TrainingStep`으로 전달합니다. 또한, 파이프라인에서 모델 평가를 위해 다른 `"test_data"` 출력 채널을 사용합니다. 파이프라인 단계의 `properties`는 설명 호출의 해당 응답의 object 모델과 일치합니다. 이러한 속성은 placeholder 값으로 참조될 수 있으며 런타임 시 확인할 수 있습니다. 예를 들어 `ProcessingStep` `properties` 속성은 [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) 응답 object의 object 모델과 일치합니다.


In [12]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    },
)

![Define a Training Step to Train a Model](img/pipeline-3.png)

### 2.4. 훈련된 모델을 평가하기 위한 모델 평가 단계(Evaluation Step) 정의

먼저 모델 평가를 수행하는 처리 단계에 지정된 평가 스크립트를 작성합니다.

파이프라인 실행 후 분석을 위해 결과 `evaluation.json`을 검사할 수 있습니다.

평가 스크립트는 `xgboost`를 사용하여 다음을 수행합니다.

* 모델 로드
* 테스트 데이터 로드
* 테스트 데이터에 대한 예측 수행
* 정확도(accuracy) 및 ROC 곡선을 포함한 분류 보고서(classification report) 작성
* 평가 보고서를 평가 디렉터리에 저장

In [13]:
%%writefile abalone/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")
    
    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)
    
    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)
    
    X_test = xgboost.DMatrix(df.values)
    
    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {
                "value": mse,
                "standard_deviation": std
            },
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    
    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Writing abalone/evaluation.py


다음으로 `ScriptProcessor` 프로세서의 인스턴스를 만들고 `ProcessingStep`에서 사용합니다.

프로세서에 전달된 `processing_instance_type` 매개 변수에 유의하세요.

In [14]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
)

프로세서 인스턴스를 사용하여 입력 및 출력 채널과 파이프라인이 파이프라인 실행을 호출할 때 실행될 코드와 함께 `ProcessingStep`을 생성합니다. 이는 Python SDK에서 프로세서 인스턴스의 `run` 메서드와 유사합니다.

구체적으로, `step_train` 속성의 `S3ModelArtifacts`와 `step_process` 속성의 `"test_data"` 출력 채널의 `S3Uri`가 입력으로 전달됩니다. `TrainingStep` 및 `ProcessingStep` `properties` 속성은 각각 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) 및 [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) 응답 object의 object 모델과 일치합니다.

In [15]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
step_eval = ProcessingStep(
    name="AbaloneEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs[
                "test"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="abalone/evaluation.py",
    property_files=[evaluation_report],
)

![Define a Model Evaluation Step to Evaluate the Trained Model](img/pipeline-4.png)

### 2.5. 모델 생성을 위한 모델 생성 단계(Create Model Step) 정의

예제 모델을 사용하여 배치 변환을 수행하려면 SageMaker 모델을 생성해야 합니다. 

구체적으로 `TrainingStep`, `step_train` 속성에서 `S3ModelArtifacts`를 전달합니다. `TrainingStep` `properties` 속성은 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) 응답 object의 object 모델과 일치합니다.

In [16]:
from sagemaker.model import Model


model = Model(
    image_uri=image_uri,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

SageMaker 모델을 생성하기 위해 모델 입력값(`instance_type` 및 `accelerator_type`)을 제공한 다음 이전에 정의된 입력 및 모델 인스턴스를 전달하는 `CreateModelStep`을 정의합니다.

In [17]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep


inputs = CreateModelInput(
    instance_type="ml.m5.large",
    accelerator_type="ml.eia1.medium",
)
step_create_model = CreateModelStep(
    name="AbaloneCreateModel",
    model=model,
    inputs=inputs,
)

### 2.6. 배치 변환을 수행하기위한 변환 단계(Transform Step) 정의

이제 모델 인스턴스가 정의되었으므로 적절한 모델 유형, 컴퓨팅 인스턴스 유형 및 원하는 출력 S3 URI를 사용하여 Transformer 인스턴스를 생성합니다.

구체적으로 `CreateModelStep`, `step_create_model` 속성에서 `ModelName`을 전달합니다. `CreateModelStep` `properties` 속성은 [DescribeModel](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeModel.html) 응답 object의 object 모델과 일치합니다.

In [18]:
from sagemaker.transformer import Transformer


transformer = Transformer(
    model_name=step_create_model.properties.ModelName,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=f"s3://{default_bucket}/AbaloneTransform"
)

앞에서 정의한 `batch_data` 파이프라인 매개 변수를 사용하여 transformer 인스턴스와 `TransformInput`을 전달합니다.

In [19]:
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep


step_transform = TransformStep(
    name="AbaloneTransform",
    transformer=transformer,
    inputs=TransformInput(data=batch_data)
)

### 2.7. 모델 패키지 생성을 위한 모델 등록 단계(Register Model Step) 정의

훈련 단계에 지정된 estimator 인스턴스를 사용하여 `RegisterModel`의 인스턴스를 생성합니다. 파이프라인에서 `RegisterModel`을 실행한 결과는 모델 패키지입니다. 모델 패키지는 추론에 필요한 모든 요소를 패키징하는 재사용 가능한 모델 아티팩트 추상화입니다. 주로 선택적인 모델 가중치 위치와 함께 사용할 추론 이미지를 정의하는 추론 사양으로 구성됩니다.

모델 패키지 그룹은 모델 패키지의 컬렉션입니다. 특정 ML 비즈니스 문제에 대해 모델 패키지 그룹을 생성할 수 있으며 모델 패키지의 새 버전을 여기에 추가할 수 있습니다. 일반적으로 고객은 SageMaker 파이프라인을 실행할 때마다 모델 패키지 버전을 그룹에 추가할 수 있도록 SageMaker 파이프 라인에 대한 `ModelPackageGroup`을 생성해야 합니다.

`RegisterModel`의 구성은 Python SDK에 있는 estimator 인스턴스의 `register` 메서드와 유사합니다.

구체적으로 `TrainingStep`, `step_train` 속성에서 `S3ModelArtifacts`를 전달합니다. `TrainingStep`  `properties` 속성은 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) 응답 object의 object 모델과 일치합니다.

이 노트북에 제공된 특정 모델 패키지 그룹 이름은 모델 레지스트리에서 사용할 수 있으며, CI/CD는 SageMaker 프로젝트에서 작동합니다.

In [20]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics 
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)
step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

![Define a Create Model Step and Batch Transform to Process Data in Batch at Scale](img/pipeline-5.png)

### 2.8. 정확도를 확인하고 조건부로 모델을 생성하고 배치 변환을 실행하고 모델 레지스트리에 모델을 등록하기 위한 조건 단계(Condition Step) 정의

이 단계에서는 평가 단계 `step_eval에` 의해 결정된 모델의 정확도가 지정된 값을 초과하는 경우에만 모델이 등록됩니다. `ConditionStep`을 사용하면 파이프라인이 단계 속성의 조건에 따라 파이프 라인 DAG에서 조건부 실행을 지원할 수 있습니다.

아래 코드 셀에서는 다음을 수행합니다.

- 평가 단계 `step_eval`의 출력에서 찾은 정확도 값에 `ConditionLessThanOrEqualTo`를 정의합니다.
- `ConditionStep`의 조건 목록에 있는 조건을 사용합니다.
- `CreateModelStep` 및 `TransformStep` 단계와 `RegisterModel` 단계 컬렉션을 조건이 `True`로 평가되는 경우에만 실행되는 `ConditionStep`의 `if_steps`에 전달합니다.

In [21]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import (
    ConditionStep,
    JsonGet,
)


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step=step_eval,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=6.0
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[], 
)

![Define a Condition Step to Check Accuracy and Conditionally Execute Steps](img/pipeline-6.png)

### 2.9. 파이프라인 정의 (Parameters, Steps, Conditions로 구성)

이 섹션을 통해 앞 섹션에서 정의한 단계들을 파이프라인으로 결합하여 실행할 수 있습니다.

`Pipeline` 인스턴스 생성 시 `name`, `parameters`, 그리고 `steps`이 필요합니다. `name`은 `(account, region)` 쌍에서 고유해야 합니다.

Note:

* 정의에 사용된 모든 매개 변수가 있어야 합니다.
* 파이프라인으로 전달된 단계들은 실행 순서대로 나열할 필요가 없습니다. SageMaker Pipeline 서비스는 실행을 완료하기 위한 단계로 데이터 종속성 DAG를 해결합니다.
* 단계들은 파이프라인 단계 리스트와 모든 조건 단계 if/else 리스트에서 고유해야 합니다.

In [22]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = f"AbalonePipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

![Define a Pipeline of Parameters, Steps, and Conditions](img/pipeline-7.png)

<br>

## 3. 파이프라인 실행
---

파이프라인 정의를 생성하였으면, 이를 곧바로 SageMaker에 제출하여 파이프라인을 실행할 수 있습니다.

### 3.0. (Optional) 파이프라인 정의 검토

파이프라인 정의의 JSON을 검사하여 파이프라인이 잘 정의되어 있고 매개 변수 및 단계 속성이 올바르게 해석되는지 확인할 수 있습니다.

In [23]:
import json

definition = json.loads(pipeline.definition())
definition

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ProcessingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-387793684046/abalone/abalone-dataset.csv'},
  {'Name': 'BatchData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-387793684046/abalone/abalone-dataset-batch'}],
 'Steps': [{'Name': 'AbaloneProcess',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.ProcessingInstanceType'},
      'InstanceCount': {'Get': 'Parameters.ProcessingInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {

### 3.1. 파이프라인을 SageMaker에 제출하고 실행 시작

파이프라인 서비스에 파이프라인 정의를 제출합니다. 전달된 역할은 파이프라인 서비스에서 단계들 내에서 정의된 모든 작업들을 생성하는 데 사용됩니다.


In [24]:
pipeline.upsert(role_arn=role)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'PipelineArn': 'arn:aws:sagemaker:us-east-1:387793684046:pipeline/abalonepipeline',
 'ResponseMetadata': {'RequestId': '2bf82436-c447-41cc-aec9-c465e28d6ffd',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '2bf82436-c447-41cc-aec9-c465e28d6ffd',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Sun, 06 Dec 2020 09:01:12 GMT'},
  'RetryAttempts': 0}}

Start the pipeline and accept all of the default parameters.

In [25]:
execution = pipeline.start()

### 3.2. Pipeline Operations: 파이프라인 실행 검사 및 대기

파이프라인 실행을 확인합니다.

In [26]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:387793684046:pipeline/abalonepipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:387793684046:pipeline/abalonepipeline/execution/xfyoqflqx40c',
 'PipelineExecutionDisplayName': 'execution-1607245275804',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2020, 12, 6, 9, 1, 15, 709000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2020, 12, 6, 9, 1, 15, 709000, tzinfo=tzlocal()),
 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:387793684046:user-profile/d-qgroslwe72ll/daekeun-smstudio-1',
  'UserProfileName': 'daekeun-smstudio-1',
  'DomainId': 'd-qgroslwe72ll'},
 'LastModifiedBy': {'UserProfileArn': 'arn:aws:sagemaker:us-east-1:387793684046:user-profile/d-qgroslwe72ll/daekeun-smstudio-1',
  'UserProfileName': 'daekeun-smstudio-1',
  'DomainId': 'd-qgroslwe72ll'},
 'ResponseMetadata': {'RequestId': 'd340bc4d-a537-4e44-87e8-78b7108716d3',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {

실행이 완료될 때까지 기다리세요.

In [27]:
execution.wait()

실행 단계들을 나열합니다. step executor 서비스에서 처리된 파이프라인의 단계들입니다.

In [28]:
execution.list_steps()

[{'StepName': 'AbaloneTransform',
  'StartTime': datetime.datetime(2020, 12, 6, 9, 14, 9, 384000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2020, 12, 6, 9, 18, 52, 287000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:387793684046:transform-job/pipelines-xfyoqflqx40c-abalonetransform-3qdibodphd'}}},
 {'StepName': 'AbaloneRegisterModel',
  'StartTime': datetime.datetime(2020, 12, 6, 9, 14, 7, 823000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2020, 12, 6, 9, 14, 8, 793000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:387793684046:model-package/abalonemodelpackagegroupname/1'}}},
 {'StepName': 'AbaloneCreateModel',
  'StartTime': datetime.datetime(2020, 12, 6, 9, 14, 7, 779000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2020, 12, 6, 9, 14, 8, 901000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': {'Arn

### 3.3. 모델 평가 검토

파이프라인이 완료된 후, 결과 모델 평가를 검토합니다. S3에서 결과 `evaluation.json` 파일을 다운로드하고 보고서를 인쇄합니다.

In [29]:
from pprint import pprint


evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
))
pprint(json.loads(evaluation_json))

{'regression_metrics': {'mse': {'standard_deviation': 2.1106958936144284,
                                'value': 4.455134079123846}}}


### 3.4. Lineage

파이프라인에서 생성된 아티팩트의 계보를 검토합니다.

In [30]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(5)

{'StepName': 'AbaloneProcess', 'StartTime': datetime.datetime(2020, 12, 6, 9, 1, 16, 364000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 12, 6, 9, 6, 27, 391000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:387793684046:processing-job/pipelines-xfyoqflqx40c-abaloneprocess-zcek38fjoj'}}}


None

{'StepName': 'AbaloneTrain', 'StartTime': datetime.datetime(2020, 12, 6, 9, 6, 33, 752000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 12, 6, 9, 10, 5, 846000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:387793684046:training-job/pipelines-xfyoqflqx40c-abalonetrain-186y1or9a8'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...020-12-06-09-00-00-419/output/validation,Input,DataSet,ContributedTo,artifact
1,s3://...ess-2020-12-06-09-00-00-419/output/train,Input,DataSet,ContributedTo,artifact
2,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
3,model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'AbaloneEval', 'StartTime': datetime.datetime(2020, 12, 6, 9, 10, 6, 183000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 12, 6, 9, 13, 59, 262000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:387793684046:processing-job/pipelines-xfyoqflqx40c-abaloneeval-hbaklavinv'}}}


None

{'StepName': 'AbaloneMSECond', 'StartTime': datetime.datetime(2020, 12, 6, 9, 14, 6, 427000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 12, 6, 9, 14, 7, 252000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'Condition': {'Outcome': 'True'}}}


None

{'StepName': 'AbaloneCreateModel', 'StartTime': datetime.datetime(2020, 12, 6, 9, 14, 7, 779000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 12, 6, 9, 14, 8, 901000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-1:387793684046:model/pipelines-xfyoqflqx40c-abalonecreatemodel-mx87dhjtka'}}}


None

{'StepName': 'AbaloneRegisterModel', 'StartTime': datetime.datetime(2020, 12, 6, 9, 14, 7, 823000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 12, 6, 9, 14, 8, 793000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:387793684046:model-package/abalonemodelpackagegroupname/1'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,model.tar.gz,Input,Model,ContributedTo,artifact
1,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
2,abalonemodelpackagegroupname-1-PendingManualAp...,Input,Approval,ContributedTo,action
3,AbaloneModelPackageGroupName-1607246048-aws-mo...,Output,ModelGroup,AssociatedWith,context


{'StepName': 'AbaloneTransform', 'StartTime': datetime.datetime(2020, 12, 6, 9, 14, 9, 384000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 12, 6, 9, 18, 52, 287000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:387793684046:transform-job/pipelines-xfyoqflqx40c-abalonetransform-3qdibodphd'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,model.tar.gz,Input,Model,ContributedTo,artifact
1,68331...naws.com/sagemaker-xgboost:1.0-1-cpu-py3,Input,Image,ContributedTo,artifact
2,s3://...7793684046/abalone/abalone-dataset-batch,Input,DataSet,ContributedTo,artifact
3,s3://...-us-east-1-387793684046/AbaloneTransform,Output,DataSet,Produced,artifact


### 3.5. Parametrized Executions: 파이프라인 실행에 대한 기본 매개 변수 오버라이드

파이프라인의 추가 실행을 구동하고 다른 파이프라인 매개 변수를 지정할 수 있습니다. `parameters` 인수는 매개 변수 이름을 포함하는 사전(dictionary)이며 기본값을 오버라이드합니다.

모델의 성능에 따라 컴퓨팅 최적화 인스턴스 유형에서 다른 파이프라인 실행을 시작하고 모델 승인 상태를 자동으로 "Approved"로 설정할 수 있습니다. 즉, `RegisterModel` 단계에서 생성된 모델 패키지 버전이 SageMaker 프로젝트와 같은 CI/CD 파이프라인을 통해 자동으로 배포할 준비가 되었음을 의미합니다.

In [31]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType="ml.c5.xlarge",
        ModelApprovalStatus="Approved",
    )
)

In [32]:
execution.wait()

In [33]:
execution.list_steps()

[{'StepName': 'AbaloneTransform',
  'StartTime': datetime.datetime(2020, 12, 6, 9, 31, 23, 201000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2020, 12, 6, 9, 35, 39, 777000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-1:387793684046:transform-job/pipelines-3h3c2lohm9su-abalonetransform-b7olxeehy7'}}},
 {'StepName': 'AbaloneRegisterModel',
  'StartTime': datetime.datetime(2020, 12, 6, 9, 31, 21, 874000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2020, 12, 6, 9, 31, 22, 916000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:387793684046:model-package/abalonemodelpackagegroupname/2'}}},
 {'StepName': 'AbaloneCreateModel',
  'StartTime': datetime.datetime(2020, 12, 6, 9, 31, 21, 871000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2020, 12, 6, 9, 31, 22, 852000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': 

파이프라인 실행이 완료되면 Amazon S3에서 `evaluation.json` 파일을 다운로드하여 보고서를 검토합니다.

In [34]:
evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format(
    step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
))
json.loads(evaluation_json)

{'regression_metrics': {'mse': {'value': 4.79290734413584,
   'standard_deviation': 2.18645893923111}}}

### 3.6. (Optional) 파이프라인 실행 중지 및 삭제

파이프라인 작업을 마치면 진행 중인 실행을 중지하고 파이프라인을 삭제할 수 있습니다.

In [37]:
#execution.stop()

In [36]:
pipeline.delete()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:387793684046:pipeline/abalonepipeline',
 'ResponseMetadata': {'RequestId': '6f5eb318-85fe-4c8e-bdd6-fb7be9f22876',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6f5eb318-85fe-4c8e-bdd6-fb7be9f22876',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '83',
   'date': 'Sun, 06 Dec 2020 14:08:34 GMT'},
  'RetryAttempts': 0}}