# SageMaker Model Building Pipelines

## Runtime

This notebook takes approximately an half hour to run.

## Contents

1. [A SageMaker Pipeline](#A-SageMaker-Pipeline)
1. [Define Parameters to Parametrize Pipeline Execution](#Define-Parameters-to-Parametrize-Pipeline-Execution)
1. [Define a Cache](#Define-a-Cache)
1. [Define a Processing Step for Feature Engineering](#Define-a-Processing-Step-for-Feature-Engineering)
1. [Define a Training Step to Train a Model](#Define-a-Training-Step-to-Train-a-Model)
1. [Define a Model Evaluation Step to Evaluate the Trained Model](#Define-a-Model-Evaluation-Step-to-Evaluate-the-Trained-Model)
1. [Define a Register Model Step to Create a Model Package](#Define-a-Register-Model-Step-to-Create-a-Model-Package)
1. [Define a Lambda Step to deploy endpoint](#Define-a-Lambda-Step-to-deploy-endpoint)
1. [Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed](#Define-a-Fail-Step-to-Terminate-the-Pipeline-Execution-and-Mark-it-as-Failed)
1. [Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State](#Define-a-Condition-Step-to-Check-Accuracy-and-Conditionally-Create-a-Model-and-Run-a-Batch-Transformation-and-Register-a-Model-in-the-Model-Registry,-Or-Terminate-the-Execution-in-Failed-State)
1. [Define a Pipeline of Parameters, Steps, and Conditions](#Define-a-Pipeline-of-Parameters,-Steps,-and-Conditions)
1. [Submit the pipeline to SageMaker and start execution](#Submit-the-pipeline-to-SageMaker-and-start-execution)

## A SageMaker Pipeline

생성하는 파이프라인은 사전 처리, 교육, 평가, 모델 생성 및 모델 등록의 일반적인 기계 학습(ML) 애플리케이션 패턴을 따릅니다:

![Model building pipeline](img/pipeline-full.png)

In [None]:
import boto3
import sagemaker

nick_name = 'NickName'  # 다음에서 실습할 리소스명을 변경해주세요.

# 여러명 동시 작업을 위해 다음 변수 커스터마이징
pipeline_name = f"{nick_name}-Realtime-Pipeline" # 다음에서 실습할 파이프라인명을 변경해주세요.
model_package_group_name = f"{nick_name}-PackageGroupName" # 모델 레지스트리에 등록될 package group name을 변경해주세요.

sagemaker_session = sagemaker.session.Session()
region = sagemaker_session.boto_region_name
role = sagemaker.get_execution_role()
# default_bucket = sagemaker_session.default_bucket()
default_bucket = 'sagemake-pipeline-workshop'

이제 기본 버킷에 데이터를 업로드합니다. 'input_data_uri'에 대한 자체 데이터 세트 경로를 입력할 수도 있습니다.

In [None]:
!mkdir -p data

In [None]:
local_path = "data/abalone-dataset.csv"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-sample-files").download_file(
    "datasets/tabular/uci_abalone/abalone.csv", local_path
)

base_uri = f"s3://{default_bucket}/abalone"
input_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(input_data_uri)

모델 생성 후 일괄 변환을 위한 두 번째 데이터 세트를 다운로드합니다. 적절하게 `batch_data_uri`에 대한 자체 데이터 세트 경로를 입력할 수도 있습니다.

In [None]:
local_path = "data/abalone-dataset-batch"

s3 = boto3.resource("s3")
s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file(
    "dataset/abalone-dataset-batch", local_path
)

base_uri = f"s3://{default_bucket}/abalone"
batch_data_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_path,
    desired_s3_uri=base_uri,
)
print(batch_data_uri)

## Define Parameters to Parametrize Pipeline Execution

파이프라인을 매개변수화하는데 사용할 수 있는 파이프라인 매개변수를 정의합니다. 매개변수를 사용하면 파이프라인 정의를 수정하지 않고도 사용자 지정 파이프라인 실행 및 일정을 사용할 수 있습니다.

The supported parameter types include:

* `ParameterString` - represents a `str` Python type
* `ParameterInteger` - represents an `int` Python type
* `ParameterFloat` - represents a `float` Python type

이러한 매개변수는 파이프라인 실행 시 재정의할 수 있는 기본값 제공을 지원합니다. 지정된 기본값은 매개변수 유형과 동일한 타입이어야 합니다.

The parameters defined in this workflow include:

* `processing_instance_type` - 처리 작업의 `ml.*` 인스턴스 유형.
* `processing_instance_count` - 처리 작업의 인스턴스 수입니다.
* `instance_type` - 학습 작업의 `ml.*` 인스턴스 유형입니다.
* `model_approval_status` - CI/CD 목적으로 훈련된 모델에 등록하기 위한 승인 상태입니다(기본값은 "Approved").
* `input_data` - 입력 데이터의 S3 버킷 URI 위치입니다.
* `batch_data` - 배치 데이터의 S3 버킷 URI 위치입니다.
* `mse_threshold` - 모델의 정확도를 확인하는 데 사용되는 MSE(평균 제곱 오차) 임계값입니다.

In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)


processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType", default_value="ml.m5.large"
)
instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.large")
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="Approved"
)
input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)
batch_data = ParameterString(
    name="BatchData",
    default_value=batch_data_uri,
)
mse_threshold = ParameterFloat(name="MseThreshold", default_value=6.0)

## Define a Cache
- 참고: 캐싱 파이프라인 단계:  [Caching Pipeline Steps](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines-caching.html)

In [None]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, 
                           expire_after="7d")

![Define Parameters](img/pipeline-1.png)

## Define a Processing Step for Feature Engineering

먼저 처리 단계에서 지정한 전처리 스크립트를 개발합니다.

이 노트북 셀은 전처리 스크립트가 포함된 `preprocessing_abalone.py` 파일을 작성합니다. 스크립트를 업데이트하고 이 셀을 다시 실행하여 덮어쓸 수 있습니다. 전처리 스크립트는 `scikit-learn`을 사용하여 다음을 수행합니다.

* 누락된 성별 카테고리 데이터를 채우고 훈련에 적합하도록 인코딩합니다.
* 성별 및 고리 숫자 데이터를 제외한 모든 숫자 필드를 확장하고 정규화합니다.
* 데이터를 훈련, 검증 및 테스트 데이터 세트로 분할합니다.

처리 단계는 입력 데이터에 대한 스크립트를 실행합니다. 교육 단계에서는 사전 처리된 교육 기능과 레이블을 사용하여 모델을 교육합니다. 평가 단계에서는 훈련된 모델과 사전 처리된 테스트 기능 및 레이블을 사용하여 모델을 평가합니다.

In [None]:
!mkdir -p abalone

In [None]:
%%writefile abalone/preprocessing.py
import argparse
import os
import requests
import tempfile

import numpy as np
import pandas as pd

from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder


# Since we get a headerless CSV file, we specify the column names here.
feature_columns_names = [
    "sex",
    "length",
    "diameter",
    "height",
    "whole_weight",
    "shucked_weight",
    "viscera_weight",
    "shell_weight",
]
label_column = "rings"

feature_columns_dtype = {
    "sex": str,
    "length": np.float64,
    "diameter": np.float64,
    "height": np.float64,
    "whole_weight": np.float64,
    "shucked_weight": np.float64,
    "viscera_weight": np.float64,
    "shell_weight": np.float64,
}
label_column_dtype = {"rings": np.float64}


def merge_two_dicts(x, y):
    z = x.copy()
    z.update(y)
    return z


if __name__ == "__main__":
    base_dir = "/opt/ml/processing"

    df = pd.read_csv(
        f"{base_dir}/input/abalone-dataset.csv",
        header=None,
        names=feature_columns_names + [label_column],
        dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype),
    )
    numeric_features = list(feature_columns_names)
    numeric_features.remove("sex")
    numeric_transformer = Pipeline(
        steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]
    )

    categorical_features = ["sex"]
    categorical_transformer = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]
    )

    preprocess = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features),
        ]
    )

    y = df.pop("rings")
    X_pre = preprocess.fit_transform(df)
    y_pre = y.to_numpy().reshape(len(y), 1)

    X = np.concatenate((y_pre, X_pre), axis=1)

    np.random.shuffle(X)
    train, validation, test = np.split(X, [int(0.7 * len(X)), int(0.85 * len(X))])

    pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False)
    pd.DataFrame(validation).to_csv(
        f"{base_dir}/validation/validation.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)

다음으로 `SKLearnProcessor` 프로세서의 인스턴스를 만들고 `ProcessingStep`에서 사용합니다.

또한 이 노트북 전체에서 사용할 'framework_version'을 지정합니다.

프로세서 인스턴스에서 사용하는 'processing_instance_type' 및 'processing_instance_count' 매개변수에 유의하세요.

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor


framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-abalone-process",
    role=role,
)

마지막으로 프로세서 인스턴스를 사용하여 입력 및 출력 채널, 파이프라인이 파이프라인 실행을 호출할 때 실행되는 코드와 함께 `ProcessingStep`을 구성합니다. 이는 Python SDK에서 프로세서 인스턴스의 'run()' 메서드와 유사합니다.

`ProcessingStep`에 전달된 `input_data` 매개변수는 단계에서 사용되는 입력 데이터입니다. 이 입력 데이터는 프로세서 인스턴스가 실행될 때 사용됩니다.

또한 처리 작업에 대한 출력 구성에 지정된 `"train_data"` 및 `"test_data"` 이름의 채널에 유의하십시오. '속성' 단계는 후속 단계에서 사용할 수 있으며 실행 시 런타임 값으로 확인할 수 있습니다. 특히, 이 사용법은 훈련 단계를 정의할 때 호출됩니다.

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep


step_process = ProcessingStep(
    name="AbaloneProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test"),
    ],
    code="abalone/preprocessing.py",
    cache_config = cache_config, # 캐시 정의
)

![Define a Processing Step for Feature Engineering](img/pipeline-2.png)

## Define a Training Step to Train a Model

이 섹션에서는 Amazon SageMaker의 [XGBoost Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html)을 사용하여 이 데이터 세트를 교육합니다. XGBoost 알고리즘 및 입력 데이터 세트에 대한 Estimator를 구성합니다. 일반적인 훈련 스크립트는 입력 채널에서 데이터를 로드하고, 하이퍼파라미터로 훈련을 구성하고, 모델을 훈련하고, 나중에 호스팅할 수 있도록 모델을 `model_dir`에 저장합니다.

훈련의 모델이 저장되는 모델 경로도 지정됩니다.

'instance_type' 매개변수는 파이프라인의 여러 위치에서 사용될 수 있습니다. 이 경우 'instance_type'이 estimator로 전달됩니다.

In [None]:
from sagemaker.estimator import Estimator


model_path = f"s3://{default_bucket}/AbaloneTrain"
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region=region,
    version="1.0-1",
    py_version="py3",
    instance_type=instance_type,
)
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type=instance_type,
    instance_count=1,
    output_path=model_path,
    role=role,
    disable_profiler=True      
)
xgb_train.set_hyperparameters(
    objective="reg:linear",
    num_round=50,
    max_depth=5,
    eta=0.2,
    gamma=4,
    min_child_weight=6,
    subsample=0.7,
)

마지막으로 estimator 인스턴스를 사용하여 `TrainingStep` 입력과 파이프라인이 파이프라인 실행을 호출할 때 실행되는 코드에서 입력으로 사용된 이전 `ProcessingStep`의 `properties`와 `TrainingStep`을 구성합니다. 이것은 Python SDK의 estimator의 'fit' 방법과 유사합니다.

`"train_data"` 출력 채널의 `S3Uri`를 `TrainingStep`에 전달합니다. 또한 파이프라인에서 모델 평가를 위해 다른 `"test_data"` 출력 채널을 사용합니다. 파이프라인 단계의 'properties' 속성은 설명 호출에 대한 해당 응답의 개체 모델과 일치합니다. 이러한 속성은 자리 표시자 값으로 참조될 수 있으며 런타임에 확인됩니다. 예를 들어 `ProcessingStep` `properties` 속성은 [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) 응답 객체의 객체 모델과 일치합니다.

In [None]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="AbaloneTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
            content_type="text/csv",
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv",
        ),
    },
    cache_config = cache_config, # 캐시 정의
)

![Define a Training Step to Train a Model](img/pipeline-3.png)

## Define a Model Evaluation Step to Evaluate the Trained Model

먼저 모델 평가를 수행하는 처리 단계에서 지정된 평가 스크립트를 개발합니다.

파이프라인 실행 후 분석을 위해 결과 'evaluation.json'을 검사할 수 있습니다.

평가 스크립트는 `xgboost`를 사용하여 다음을 수행합니다.

* 모델을 로드합니다.
* 테스트 데이터를 읽습니다.
* 테스트 데이터에 대한 예측을 발행합니다.
* 정확도 및 ROC 곡선을 포함하는 분류 보고서를 작성합니다.
* 평가 보고서를 평가 디렉토리에 저장합니다.

In [None]:
%%writefile abalone/evaluation.py
import json
import pathlib
import pickle
import tarfile

import joblib
import numpy as np
import pandas as pd
import xgboost

from sklearn.metrics import mean_squared_error


if __name__ == "__main__":
    model_path = f"/opt/ml/processing/model/model.tar.gz"
    with tarfile.open(model_path) as tar:
        tar.extractall(path=".")

    model = pickle.load(open("xgboost-model", "rb"))

    test_path = "/opt/ml/processing/test/test.csv"
    df = pd.read_csv(test_path, header=None)

    y_test = df.iloc[:, 0].to_numpy()
    df.drop(df.columns[0], axis=1, inplace=True)

    X_test = xgboost.DMatrix(df.values)

    predictions = model.predict(X_test)

    mse = mean_squared_error(y_test, predictions)
    std = np.std(y_test - predictions)
    report_dict = {
        "regression_metrics": {
            "mse": {"value": mse, "standard_deviation": std},
        },
    }

    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

다음으로 `ScriptProcessor` 프로세서의 인스턴스를 만들고 `ProcessingStep`에서 사용합니다.

프로세서에 전달된 `processing_instance_type` 매개변수에 유의하십시오.

In [None]:
from sagemaker.processing import ScriptProcessor


script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name="script-abalone-eval",
    role=role,
)

프로세서 인스턴스를 사용하여 입력 및 출력 채널과 파이프라인이 파이프라인 실행을 호출할 때 실행되는 코드와 함께 `ProcessingStep`을 구성합니다. 이는 Python SDK에서 프로세서 인스턴스의 'run' 메서드와 유사합니다.

특히, `step_train` `properties`의 `S3ModelArtifacts`와 `step_process` `properties`의 `"test_data"` output channel의 `S3Uri`이 입력으로 넘어갑니다. `TrainingStep`과 `ProcessingStep` `properties` 어트리뷰트는 각각 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html)의 object model과 [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html) response objects로 매칭됩니다.

In [None]:
from sagemaker.workflow.properties import PropertyFile


evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)
step_eval = ProcessingStep(
    name="AbaloneEval",
    processor=script_eval,
    inputs=[
        ProcessingInput(
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        ProcessingInput(
            source=step_process.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"),
    ],
    code="abalone/evaluation.py",
    cache_config = cache_config, # 캐시 정의
    property_files=[evaluation_report],
)

![Define a Model Evaluation Step to Evaluate the Trained Model](img/pipeline-4.png)

## Define a Register Model Step to Create a Model Package

훈련 단계에서 지정된 estimator 인스턴스를 사용하여 `RegisterModel`의 인스턴스를 구성합니다. 파이프라인에서 'RegisterModel'을 실행한 결과는 모델 패키지입니다. 모델 패키지는 추론에 필요한 모든 구성 요소를 패키징하는 재사용 가능한 모델 아티팩트의 추상화입니다. 기본적으로 optional 모델 가중치 위치와 함께 사용할 추론 이미지를 정의하는 추론 사양으로 구성됩니다.

모델 패키지 그룹은 모델 패키지의 모음입니다. 특정 ML 비즈니스 문제에 대해 모델 패키지 그룹을 생성할 수 있으며 여기에 모델 패키지의 새 버전을 추가할 수 있습니다. 일반적으로 고객은 모든 SageMaker 파이프라인 실행에 대해 모델 패키지 버전을 그룹에 추가할 수 있도록 SageMaker 파이프라인용 ModelPackageGroup을 생성해야 합니다.

'RegisterModel'의 구성은 Python SDK의 estimator 인스턴스의 'register' 메서드와 유사합니다.

특히 `TrainingStep`, `step_train` 속성에서 `S3ModelArtifacts`를 전달합니다. `TrainingStep` `properties` 속성은 [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html) 응답 객체의 객체 모델과 일치합니다.

이 노트북에 제공된 특정 모델 패키지 그룹 이름은 SageMaker 프로젝트와 함께 모델 레지스트리 및 CI/CD 작업에서 사용할 수 있습니다.

In [None]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.step_collections import RegisterModel


model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)
step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

![Define a Lambda Step and Create Model/Create Endpoint config/Create Endpoint](img/pipeline-5.png)

## Define a Lambda Step to deploy endpoint

이 섹션에서는 다음 단계를 안내합니다:

* Lambda Step을 활용하여 Sagemaker model/endpoint configuration/endpoint를 생성합니다.

In [None]:
%%writefile lambda_deployer.py

"""
This Lambda function creates an Endpoint Configuration and deploys a model to an Endpoint. 
The name of the model to deploy is provided via the `event` argument
"""

import json
import boto3


def lambda_handler(event, context):
    """ """
    sm_client = boto3.client("sagemaker")

    # The name of the model created in the Pipeline CreateModelStep
    model_name = event["model_name"]
    model_package_arn = event["model_package_arn"]
    endpoint_config_name = event["endpoint_config_name"]
    endpoint_name = event["endpoint_name"]
    role = event["role"]
    
    container = {"ModelPackageName": model_package_arn}

    create_model_respose = sm_client.create_model(ModelName=model_name, 
                                                  ExecutionRoleArn=role, 
                                                  Containers=[container] )

    create_endpoint_config_response = sm_client.create_endpoint_config(
        EndpointConfigName=endpoint_config_name,
        ProductionVariants=[
            {
                "InstanceType": "ml.m5.xlarge",
                "InitialVariantWeight": 1,
                "InitialInstanceCount": 1,
                "ModelName": model_name,
                "VariantName": "AllTraffic",
            }
        ]
    )

    try:
        create_endpoint_response = sm_client.create_endpoint(EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)
    except Exception as e:
        print(e)
        print("update endpoint!!")
        
        sm_client.update_endpoint(EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)
        
    return {
        "statusCode": 200,
        "body": json.dumps("Created Endpoint!")
    }

In [None]:
# 최초 한번만 실행

import time
from sagemaker.lambda_helper import Lambda

# Use the current time to define unique names for the resources created
current_time = time.strftime("%m-%d-%H-%M-%S", time.localtime())

deploy_model_name = f'{nick_name}-abalone-model' + current_time
endpoint_config_name = f"{nick_name}-abalone-endpoint-config" + current_time
endpoint_name = f'{nick_name}-abalone-endpoint'
function_name = f"{nick_name}-sagemaker-abalone-lambda-step"

# Lambda helper class can be used to create the Lambda function
func = Lambda(
    function_name=function_name,
    execution_role_arn=sagemaker.get_execution_role(),
    script="lambda_deployer.py",
    handler="lambda_deployer.lambda_handler",
    timeout=600,
    memory_size=3008
)

lambda_create_res = func.create()

In [None]:
func_from_exist_lambda = Lambda(
    function_arn=lambda_create_res['FunctionArn']
)


In [None]:
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)

# The dictionary retured by the Lambda function is captured by LambdaOutput, each key in the dictionary corresponds to a
# LambdaOutput

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)

# The inputs provided to the Lambda function can be retrieved via the `event` object within the `lambda_handler` function
# in the Lambda
step_deploy_lambda = LambdaStep(
    name="AbaloneDeploy",
    lambda_func=func_from_exist_lambda,
    inputs={
        "model_name": deploy_model_name,
        "endpoint_config_name": endpoint_config_name,
        "endpoint_name": endpoint_name,
        "model_package_arn": step_register.steps[0].properties.ModelPackageArn,
        "role": role,
    },
    cache_config=cache_config,
    outputs=[output_param_1, output_param_2])

## Define a Fail Step to Terminate the Pipeline Execution and Mark it as Failed

이 섹션에서는 다음 단계를 안내합니다:

* 실행 실패의 원인을 나타내는 사용자 정의 오류 메시지와 함께 'FailStep'을 정의합니다.
* 보다 유익한 오류 메시지를 작성하기 위해 동적 `mse_threshold` 매개변수가 있는 정적 텍스트 문자열을 추가하는 `Join` 기능으로 `FailStep` 오류 메시지를 입력합니다.

In [None]:
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join

step_fail = FailStep(
    name="AbaloneMSEFail",
    error_message=Join(on=" ", values=["Execution failed due to MSE >", mse_threshold]),
)

![Define a Fail Step to Terminate the Execution in Failed State](img/pipeline-8.png)

## Define a Condition Step to Check Accuracy and Conditionally Create a Model and Run a Batch Transformation and Register a Model in the Model Registry, Or Terminate the Execution in Failed State

이 단계에서는 평가 단계 'step_eval'에 의해 결정된 모델의 정확도가 지정된 값을 초과한 경우에만 모델을 등록합니다. 그렇지 않으면 파이프라인 실행이 실패하고 종료됩니다. 'ConditionStep'을 사용하면 파이프라인이 단계 속성의 조건에 따라 파이프라인 DAG에서 조건부 실행을 지원할 수 있습니다.

다음 섹션에서는 다음을 수행합니다:

* 평가 단계 'step_eval'의 출력에서 ​​찾은 정확도 값에 'ConditionLessThanOrEqualTo'를 정의합니다.
* `ConditionStep`의 조건 목록에 있는 조건을 사용합니다.
* `CreateModelStep` 및 `TransformStep` 단계와 `RegisterModel` 단계 컬렉션을 `ConditionStep`의 `if_steps`에 전달합니다. 이는 조건이 `True`로 평가되는 경우에만 실행됩니다.
* 조건이 'False'로 평가되는 경우에만 실행되는 'ConditionStep'의 'else_steps'에 'FailStep' 단계를 전달합니다.

In [None]:
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet


cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value",
    ),
    right=mse_threshold,
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_deploy_lambda],
    else_steps=[step_fail],
)

![Define a Condition Step to Check Accuracy and Conditionally Execute Steps](img/pipeline-6.png)

## Define a Pipeline of Parameters, Steps, and Conditions

이 섹션에서는 실행할 수 있도록 단계를 파이프라인으로 결합합니다.

파이프라인에는 '이름', '매개변수', '단계'가 필요합니다. 이름은 `(account, region)` 쌍 내에서 고유해야 합니다.

메모:

* 정의에 사용된 모든 매개변수가 있어야 합니다.
* 파이프라인으로 전달된 단계는 실행 순서대로 나열할 필요가 없습니다. SageMaker 파이프라인 서비스는 실행을 완료하기 위한 단계로 데이터 종속성 DAG를 확인합니다.
* 단계는 파이프라인 단계 목록과 모든 조건 단계 if/else 목록에서 고유해야 합니다.

In [None]:
from sagemaker.workflow.pipeline import Pipeline


pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type,
        processing_instance_count,
        instance_type,
        model_approval_status,
        input_data,
        batch_data,
        mse_threshold,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
)

![Define a Pipeline of Parameters, Steps, and Conditions](img/pipeline-7.png)

### (Optional) Examining the pipeline definition

파이프라인 정의의 JSON을 검사하여 파이프라인이 잘 정의되어 있고 매개변수와 단계 속성이 올바르게 해석되는지 확인할 수 있습니다.

In [None]:
import json


definition = json.loads(pipeline.definition())
definition

## Submit the pipeline to SageMaker and start execution

파이프라인 정의를 파이프라인 서비스에 제출하십시오. 파이프라인 서비스는 전달된 역할을 사용하여 단계에서 정의된 모든 작업을 생성합니다.

In [None]:
pipeline.upsert(role_arn=role)

파이프라인을 시작하고 모든 기본 매개변수를 accept합니다.

In [None]:
execution = pipeline.start()

## Pipeline Operations: Examining and Waiting for Pipeline Execution

파이프라인 실행을 describe 합니다.

In [None]:
execution.describe()

실행이 완료될 때까지 기다리십시오.

In [None]:
execution.wait()

실행 단계를 나열하십시오. 단계 실행기 서비스에서 해결한 파이프라인의 단계입니다.

In [None]:
execution.list_steps()

### Invoke the endpoint

파이프라인이 완료된 후 결과 모델 테스트를 합니다.

In [None]:
# pipeline definition 에서 test 데이터 uri를 찾아보세요.

In [None]:
!aws s3 cp s3://sagemaker-ap-northeast-2-387402383014/AbaloneProcess-94160e00688e34972e58a18a33ec6342/output/test/test.csv .

In [None]:
!head -n 3 test.csv

In [None]:
# Payload for inference.
payload = "-99,-0.4496398881842701,-0.38175978348421347,-0.22754502585380668,-0.624562975084181,-0.7135715659196552,-0.43424377639795386,-0.5304474339068213,0.0,0.0,1.0" 

In [None]:
import boto3

client = boto3.client('sagemaker-runtime')

endpoint_name = 'abalone-endpoint'                                     # Your endpoint name.
content_type = "text/csv"                                        # The MIME type of the input data in the request body.
accept = "text/csv"                                              # The desired MIME type of the inference in the response.
response = client.invoke_endpoint(
    EndpointName=endpoint_name, 
    ContentType=content_type,
    Accept=accept,
    Body=payload
    )

print(response)   

In [None]:
response['Body'].read()

## Clearning Resources

* lambda 삭제
* !!! model / endpoint config / endpoint 삭제 (실행하지 말아주세요, 일괄 삭제하겠습니다)
* pipeline 삭제

In [None]:
from sagemaker.lambda_helper import Lambda
func_from_exist_lambda = Lambda(
    function_arn='arn:aws:lambda:ap-northeast-2:387402383014:function:sagemaker-abalone-lambda-step'
)

In [None]:
func_from_exist_lambda

In [None]:
# lambda 삭제
func_from_exist_lambda.delete()

In [None]:
# model / endpoint config / endpoint 삭제
# sm_client = boto3.client("sagemaker")

In [None]:
## model이 있는지 확인
# response = sm_client.list_models(
#     NameContains=f'{nick_name}-abalone-model',
# )
# model_names = [model_info['ModelName'] for model_info in response['Models']]

In [None]:
# model_names

In [None]:
## 모델들 삭제
# for model_name in model_names:
#     sm_client.delete_model(ModelName=model_name)

In [None]:
## endpoint config 있는지 확인
# response = sm_client.list_endpoint_configs(
#     NameContains=f'{nick_name}-abalone-endpoint-config',
# )

# ep_config_names = [ep_config_info['EndpointConfigName'] for ep_config_info in response['EndpointConfigs']]

In [None]:
# ep_config_names

In [None]:
## endpoint_configs 삭제
# for ep_config_name in ep_config_names:
#     sm_client.delete_endpoint_config(EndpointConfigName=ep_config_name)

In [None]:
## endpoints 있는지 확인
# response = sm_client.list_endpoints(
#     NameContains=f'{nick_name}-abalone-endpoint',
# )

# ep_names = [ep_info['EndpointName'] for ep_info in response['Endpoints']]

In [None]:
# ep_names

In [None]:
## endpoints 삭제
# for ep_name in ep_names:
#     sm_client.delete_endpoint(EndpointName=ep_name)

In [None]:
## pipeline삭제

import boto3
sm_client = boto3.client('sagemaker')

response = sm_client.describe_pipeline(
            PipelineName= pipeline_name
        )   
response

In [None]:
sm_client.delete_pipeline(PipelineName=pipeline_name)