# SageMaker 모델 빌드 파이프라인을 이용한 모델 빌드 오케스트레이션
Amazon SageMaker Model building pipeline은 머신러닝 워크플로우를 개발하는 데이터 과학자, 엔지니어들에게 SageMaker작업과 재생산가능한 머신러닝 파이프라인을 오케스트레이션하는 기능을 제공합니다. 또한 커스텀빌드된 모델을 실시간 추론환경이나 배치변환을 통한 추론 실행환경으로 배포하거나, 생성된 아티팩트의 계보(lineage)를 추적하는 기능을 제공합니다. 이 기능들을 통해 모델 아티팩트를 배포하고, 업무환경에서의 워크플로우를 배포/모니터링하고, 간단한 인터페이스를 통해 아티팩트의 계보 추적하고, 머신러닝 애플리케이션 개발의 베스트 프렉티스를 도입하여, 보다 안정적인 머신러닝 애플리케이션 운영환경을 구현할 수 있습니다.

SageMaker pipeline 서비스는 JSON 선언으로 구현된 SageMaker Pipeline DSL(Domain Specific Language, 도메인종속언어)를 지원합니다. 이 DSL은 파이프라인 파라마터와 SageMaker 작업단계의 DAG(Directed Acyclic Graph)를 정의합니다. SageMaker Python SDK를 이용하면 이 파이프라인 DSL의 생성을 보다 간편하게 할 수 있습니다.

In [1]:
LOCAL_MODE = False

# 0. 사용 코드

### 0-1. prediction-autogluon

In [2]:
%%writefile src/v2.0/prediction-autogluon.py

import argparse
import os
import requests
import tempfile
import subprocess, sys
import json

import glob
import pandas as pd
import joblib
import pickle
import tarfile
from io import StringIO, BytesIO

import logging
import logging.handlers

import time
from datetime import datetime as dt

import boto3


###############################
######### util 함수 설정 ##########
###############################
def _get_logger():
    loglevel = logging.DEBUG
    l = logging.getLogger(__name__)
    if not l.hasHandlers():
        l.setLevel(loglevel)
        logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))        
        l.handler_set = True
    return l  
logger = _get_logger()

def get_bucket_key_from_uri(uri):
    uri_aws_path = uri.split('//')[1]
    uri_bucket = uri_aws_path.rsplit('/')[0]
    uri_file_path = '/'.join(uri_aws_path.rsplit('/')[1:])
    return uri_bucket, uri_file_path

def get_secret():
    secret_name = "dev/ForecastPalmOilPrice"
    region_name = "ap-northeast-2"
    
    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name,
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        if e.response['Error']['Code'] == 'DecryptionFailureException': # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
            raise e
        elif e.response['Error']['Code'] == 'InternalServiceErrorException': # An error occurred on the server side.
            raise e
        elif e.response['Error']['Code'] == 'InvalidParameterException': # You provided an invalid value for a parameter.
            raise e
        elif e.response['Error']['Code'] == 'InvalidRequestException': # You provided a parameter value that is not valid for the current state of the resource.
            raise e
        elif e.response['Error']['Code'] == 'ResourceNotFoundException': # We can't find the resource that you asked for.
            raise e
    else:
        if 'SecretString' in get_secret_value_response:
            secret = get_secret_value_response['SecretString']
            return secret
        else:
            decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            return decoded_binary_secret
        
def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('--output_dir', type = str, default = "/opt/ml/processing/output", help='예측 결과값이 저장되는 곳, Inference 결과가 저장된다.')
    parser.add_argument('--ml_algorithm_name', type=str, default = 'Autogluon')  
    parser.add_argument('--model_package_group_name', type=str, default = BUCKET_NAME_USECASE)  
    parser.add_argument('--qs_data_name', type=str, default = 'forecast result')    

    return parser.parse_args()
        
if __name__=='__main__':
    ########################################### 
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'autogluon==0.6.1'])
    from autogluon.timeseries import TimeSeriesDataFrame, TimeSeriesPredictor
    
    ############################################
    ###### Secret Manager에서 키값 가져오기  #######
    ########################################### 
    logger.info(f"\n### Loading Key value from Secret Manager")
    keychain = json.loads(get_secret())
    ACCESS_KEY_ID = keychain['AWS_ACCESS_KEY_ID']
    ACCESS_SECRET_KEY = keychain['AWS_ACCESS_SECRET_KEY']
    BUCKET_NAME_USECASE = keychain['PROJECT_BUCKET_NAME']
    DATALAKE_BUCKET_NAME = keychain['DATALAKE_BUCKET_NAME']
    S3_PATH_REUTER = keychain['S3_PATH_REUTER']
    S3_PATH_WWO = keychain['S3_PATH_WWO']
    S3_PATH_STAGE = keychain['S3_PATH_STAGE']
    S3_PATH_GOLDEN = keychain['S3_PATH_GOLDEN']
    S3_PATH_TRAIN = keychain['S3_PATH_TRAIN']
    S3_PATH_FORECAST = keychain['S3_PATH_PREDICTION']
    
    boto3_session = boto3.Session(aws_access_key_id = ACCESS_KEY_ID,
                                  aws_secret_access_key = ACCESS_SECRET_KEY,
                                  region_name = 'ap-northeast-2')
    
    s3_client = boto3_session.client('s3')
    sm_client = boto3_session.client('sagemaker')
    qs_client = boto3_session.client('quicksight')

    sts_client = boto3_session.client("sts")
    user_account_id = sts_client.get_caller_identity()["Account"]
    ######################################
    ## 커맨드 인자, Hyperparameters 처리 ##
    ######################################  
    logger.info("######### Argument Info ####################################")
    logger.info("### start training code")    
    logger.info("### Argument Info ###")
    args = parse_args()             
    logger.info(f"args.output_dir: {args.output_dir}")
    logger.info(f"args.ml_algorithm_name: {args.ml_algorithm_name}")
    logger.info(f"args.model_package_group_name: {args.model_package_group_name}")
    logger.info(f"args.qs_data_name: {args.qs_data_name}")
# prediction_output_path = f"s3://crude-palm-oil-prices-forecast/predicted-data/2023/03/19/1679292475.0/result"

    output_dir = args.output_dir
    ml_algorithm_name = args.ml_algorithm_name
    model_package_group_name = args.model_package_group_name
    qs_data_name = args.qs_data_name

    model_dir = f'{output_dir}/model'
    os.makedirs(model_dir, exist_ok=True)
    
    result_dir = f'{output_dir}/result'
    os.makedirs(result_dir, exist_ok=True)

    manifest_base_path = f'{output_dir}/manifest'
    os.makedirs(manifest_base_path, exist_ok=True)
    ##########################################################
    ###### 적합한 모델의 URI 찾고, 탑 성능 모델 이름 가져오기 ##########
    #########################################################
    logger.info("\n######### Finding suitable model uri ####################################")
    model_registry_list = sm_client.list_model_packages(ModelPackageGroupName = model_package_group_name)['ModelPackageSummaryList']
    for model in model_registry_list:
        if (model['ModelPackageGroupName'] == model_package_group_name and
            model['ModelApprovalStatus'] == 'Approved' and
            model['ModelPackageDescription'] == ml_algorithm_name):
            mr_arn = model['ModelPackageArn']
            break
    model_spec = sm_client.describe_model_package(ModelPackageName=mr_arn)
    train_data_dir = model_spec['CustomerMetadataProperties']['train_data']
    test_data_dir = model_spec['CustomerMetadataProperties']['test_data']
    
    s3_model_uri = model_spec['InferenceSpecification']['Containers'][0]['ModelDataUrl']
    champion_model = model_spec['CustomerMetadataProperties']['champion_model']

    logger.info(f"Found suitable model uri: {s3_model_uri}")
    logger.info(f"And top model name: {champion_model}")
    
    logger.info("\n#########Download suitable model file  ####################################")
    model_bucket, model_key = get_bucket_key_from_uri(s3_model_uri)  
    logger.info(f"\nmodel_bucket: {model_bucket}, model_key: {model_key}  ####################################")
    model_obj = s3_client.get_object(Bucket = model_bucket, Key = model_key)
    
    ##########################################################
    ###### 모델 압축 풀고 TimeseriesDataFrame으로 변환 ##########
    #########################################################
    logger.info("\n######### Model zip file extraction ####################################")
    with tarfile.open(fileobj=model_obj['Body'], mode='r|gz') as file:
        file.extractall(output_dir)
    logger.info(f"list in {model_dir}: {os.listdir(model_dir)}")
    logger.info("\n######### Convert df_test dataframe into TimeSeriesDataFrame  ###########")        
    df_train = pd.read_csv(os.path.join(train_data_dir, 'train_fold1.csv'))
    df_test = pd.read_csv(os.path.join(test_data_dir, 'test_fold1.csv'))
    sum_df = pd.concat([df_train, df_test]).reset_index(drop = True)
    sum_df.loc[:, "ds"] = pd.to_datetime(sum_df.loc[:, "ds"])
    
    tdf_train = TimeSeriesDataFrame.from_data_frame(
        sum_df,
        id_column="ric",
        timestamp_column="ds",
    )
    logger.info(f"sum_df sample: head(2) \n {sum_df.head(2)}")
    logger.info(f"sum_df sample: tail(2) \n {sum_df.tail(2)}")

    ################################
    ###### Prediction 시작 ##########
    ###############################
    logger.info("\n######### Start prediction  ###########")        
    loaded_trainer = pickle.load(open(f"{model_dir}/models/trainer.pkl", 'rb'))
    logger.info(f"loaded_trainer: {loaded_trainer}")
    prediction_ag_model = loaded_trainer.predict(data = tdf_train,
                                                 model = champion_model)
    logger.info(f"prediction_ag_model sample: head(2) \n {prediction_ag_model.head(2)}")
    prediction_result = prediction_ag_model.loc['FCPOc3']
    logger.info(f"prediction_ag_model sample: head(2) \n {prediction_result.head(2)}")
    prediction_result.to_csv(f'{result_dir}/prediction_result.csv')

Overwriting src/v2.0/prediction-autogluon.py


### 0-2. visualization

In [3]:
%%writefile src/v2.0/visualization.py

import argparse
import os
import requests
import tempfile
import subprocess, sys
import json

import glob
import pandas as pd
import joblib
import pickle
import tarfile
from io import StringIO, BytesIO

import logging
import logging.handlers

import time
import calendar
from datetime import datetime as dt

import boto3


###############################
######### util 함수 설정 ##########
###############################
def _get_logger():
    loglevel = logging.DEBUG
    l = logging.getLogger(__name__)
    if not l.hasHandlers():
        l.setLevel(loglevel)
        logging.getLogger().addHandler(logging.StreamHandler(sys.stdout))        
        l.handler_set = True
    return l  
logger = _get_logger()

def get_secret():
    secret_name = "dev/ForecastPalmOilPrice"
    region_name = "ap-northeast-2"
    
    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name,
    )

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        if e.response['Error']['Code'] == 'DecryptionFailureException': # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
            raise e
        elif e.response['Error']['Code'] == 'InternalServiceErrorException': # An error occurred on the server side.
            raise e
        elif e.response['Error']['Code'] == 'InvalidParameterException': # You provided an invalid value for a parameter.
            raise e
        elif e.response['Error']['Code'] == 'InvalidRequestException': # You provided a parameter value that is not valid for the current state of the resource.
            raise e
        elif e.response['Error']['Code'] == 'ResourceNotFoundException': # We can't find the resource that you asked for.
            raise e
    else:
        if 'SecretString' in get_secret_value_response:
            secret = get_secret_value_response['SecretString']
            return secret
        else:
            decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            return decoded_binary_secret
        

def register_manifest(source_path,
                      target_path,
                      s3_client,
                      BUCKET_NAME_USECASE):
    template_json = {"fileLocations": [{"URIPrefixes": []}],
                     "globalUploadSettings": {
                         "format": "CSV",
                         "delimiter": ","
                     }}
    paginator = s3_client.get_paginator('list_objects_v2')
    response_iterator = paginator.paginate(Bucket = BUCKET_NAME_USECASE,
                                           Prefix = source_path.split(BUCKET_NAME_USECASE+'/')[1]
                                          )
    for page in response_iterator:
        logger.info(f"\n#### page {page}")
        for content in page['Contents']:
            template_json['fileLocations'][0]['URIPrefixes'].append(f's3://{BUCKET_NAME_USECASE}/'+content['Key'])
    with open(f'./manifest_testing.manifest', 'w') as f:
        json.dump(template_json, f, indent=2)

    res = s3_client.upload_file('./manifest_testing.manifest',
                                BUCKET_NAME_USECASE,
                                f"{target_path.split(BUCKET_NAME_USECASE+'/')[1]}/visual_validation.manifest")
    return f"{target_path.split(BUCKET_NAME_USECASE+'/')[1]}/visual_validation.manifest"
    

def refresh_of_spice_datasets(user_account_id,
                              qs_data_name,
                              manifest_file_path,
                              BUCKET_NAME_USECASE,
                              qs_client):
    
    ds_list = qs_client.list_data_sources(AwsAccountId=user_account_id)
    datasource_ids = [summary["DataSourceId"] for summary in ds_list["DataSources"] if qs_data_name in summary["Name"]]    
    for datasource_id in datasource_ids:
        response = qs_client.update_data_source(
            AwsAccountId=user_account_id,
            DataSourceId=datasource_id,
            Name=qs_data_name,
            DataSourceParameters={
                'S3Parameters': {
                    'ManifestFileLocation': {
                        'Bucket': BUCKET_NAME_USECASE,
                        'Key':  manifest_file_path
                    },
                },
            })
        logger.info(f"datasource_id:{datasource_id} 의 manifest를 업데이트: {response}")
    
    res = qs_client.list_data_sets(AwsAccountId = user_account_id)
    datasets_ids = [summary["DataSetId"] for summary in res["DataSetSummaries"] if qs_data_name in summary["Name"]]
    ingestion_ids = []

    for dataset_id in datasets_ids:
        try:
            ingestion_id = str(calendar.timegm(time.gmtime()))
            qs_client.create_ingestion(DataSetId = dataset_id,
                                       IngestionId = ingestion_id,
                                       AwsAccountId = user_account_id)
            ingestion_ids.append(ingestion_id)
        except Exception as e:
            logger.info(e)
            pass
    for ingestion_id, dataset_id in zip(ingestion_ids, datasets_ids):
        while True:
            response = qs_client.describe_ingestion(DataSetId = dataset_id,
                                                    IngestionId = ingestion_id,
                                                    AwsAccountId = user_account_id)
            if response['Ingestion']['IngestionStatus'] in ('INITIALIZED', 'QUEUED', 'RUNNING'):
                time.sleep(5)     #change sleep time according to your dataset size
            elif response['Ingestion']['IngestionStatus'] == 'COMPLETED':
                print("refresh completed. RowsIngested {0}, RowsDropped {1}, IngestionTimeInSeconds {2}, IngestionSizeInBytes {3}".format(
                    response['Ingestion']['RowInfo']['RowsIngested'],
                    response['Ingestion']['RowInfo']['RowsDropped'],
                    response['Ingestion']['IngestionTimeInSeconds'],
                    response['Ingestion']['IngestionSizeInBytes']))
                break
            else:
                response = "refresh failed for {0}! - status {1}".format(dataset_id, response['Ingestion']['IngestionStatus'])
                break
    return response
        
def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--source_path", type=str, help='prediction_data')
    parser.add_argument("--qs_data_name", type=str, default='forecast_result')
    parser.add_argument('--model_package_group_name', type=str, default = BUCKET_NAME_USECASE)  
    return parser.parse_args()
        
if __name__=='__main__':
    ############################################
    ###### Secret Manager에서 키값 가져오기  #######
    ########################################### 
    logger.info(f"\n### Loading Key value from Secret Manager")
    keychain = json.loads(get_secret())
    ACCESS_KEY_ID = keychain['AWS_ACCESS_KEY_ID']
    ACCESS_SECRET_KEY = keychain['AWS_ACCESS_SECRET_KEY']
    BUCKET_NAME_USECASE = keychain['PROJECT_BUCKET_NAME']
    DATALAKE_BUCKET_NAME = keychain['DATALAKE_BUCKET_NAME']
    S3_PATH_REUTER = keychain['S3_PATH_REUTER']
    S3_PATH_WWO = keychain['S3_PATH_WWO']
    S3_PATH_STAGE = keychain['S3_PATH_STAGE']
    S3_PATH_GOLDEN = keychain['S3_PATH_GOLDEN']
    S3_PATH_TRAIN = keychain['S3_PATH_TRAIN']
    S3_PATH_FORECAST = keychain['S3_PATH_PREDICTION']
    
    boto3_session = boto3.Session(aws_access_key_id = ACCESS_KEY_ID,
                                  aws_secret_access_key = ACCESS_SECRET_KEY,
                                  region_name = 'ap-northeast-2')
    
    s3_client = boto3_session.client('s3')
    sm_client = boto3_session.client('sagemaker')
    qs_client = boto3_session.client('quicksight')

    sts_client = boto3_session.client("sts")
    user_account_id = sts_client.get_caller_identity()["Account"]
    ######################################
    ## 커맨드 인자, Hyperparameters 처리 ##
    ######################################  
    logger.info("######### Argument Info ####################################")
    logger.info("### start training code")    
    logger.info("### Argument Info ###")
    args = parse_args()             
    logger.info(f"args.source_path: {args.source_path}")
    logger.info(f"args.qs_data_name: {args.qs_data_name}")
    logger.info(f"args.model_package_group_name: {args.model_package_group_name}")
 
    source_path = args.source_path
    qs_data_name = args.qs_data_name    
    model_package_group_name = args.model_package_group_name
    
    target_path = source_path.rsplit('/',1)[0]+'/manifest'
    logger.info(f"\n#### target_path : {target_path}")

    logger.info(f"\n#### register_manifest")
    manifest_file_path = register_manifest(source_path, 
                                           target_path,
                                           s3_client,
                                           BUCKET_NAME_USECASE)
    logger.info(f'### manifest_file_path : {manifest_file_path}')
    logger.info(f"\n#### refresh_of_spice_datasets")
    res = refresh_of_spice_datasets(user_account_id,
                                    qs_data_name,
                                    manifest_file_path,
                                    BUCKET_NAME_USECASE,
                                    qs_client)
    logger.info(f'### refresh_of_spice_datasets : {res}')

Overwriting src/v2.0/visualization.py


# 1. 환경설정

## 1.1 라이브러리 및 변수 로딩

In [4]:
import argparse
import os
import requests
import tempfile
import subprocess, sys

import pandas as pd
import numpy as np
from glob import glob
import copy
from collections import OrderedDict
from pathlib import Path
import joblib

import logging
import logging.handlers

import json
import base64
import boto3
import sagemaker
from botocore.client import Config
from botocore.exceptions import ClientError

import time
from datetime import datetime as dt
import calendar
import datetime
from pytz import timezone
from dateutil.relativedelta import *

from sagemaker.mxnet import MXNet
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker import get_execution_role
from sagemaker.image_uris import retrieve
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.inputs import TrainingInput

In [5]:
# 한국 시간
KST = dt.today() + relativedelta(hours=9)
KST_aday_before = KST - relativedelta(days=1) 
yyyy, mm, dd = str(KST_aday_before.year), str(KST_aday_before.month).zfill(2), str(KST_aday_before.day).zfill(2)
print(f"Start job time: {KST}")

Start job time: 2023-03-24 17:34:03.054723


In [6]:
# 코드 버전
code_version = '2.0'
print(f"Code version: {code_version}")

Code version: 2.0


In [7]:
def get_secret():
    secret_name = "dev/ForecastPalmOilPrice"
    region_name = "ap-northeast-2"
    
    # Create a Secrets Manager client
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name,
    )
    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
        if e.response['Error']['Code'] == 'DecryptionFailureException': # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
            raise e
        elif e.response['Error']['Code'] == 'InternalServiceErrorException': # An error occurred on the server side.
            raise e
        elif e.response['Error']['Code'] == 'InvalidParameterException': # You provided an invalid value for a parameter.
            raise e
        elif e.response['Error']['Code'] == 'InvalidRequestException': # You provided a parameter value that is not valid for the current state of the resource.
            raise e
        elif e.response['Error']['Code'] == 'ResourceNotFoundException': # We can't find the resource that you asked for.
            raise e
    else:
        if 'SecretString' in get_secret_value_response:
            secret = get_secret_value_response['SecretString']
            return secret
        else:
            decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])
            return decoded_binary_secret

keychain = json.loads(get_secret())
ACCESS_KEY_ID = keychain['AWS_ACCESS_KEY_ID']
ACCESS_SECRET_KEY = keychain['AWS_ACCESS_SECRET_KEY']
BUCKET_NAME_USECASE = keychain['PROJECT_BUCKET_NAME']
DATALAKE_BUCKET_NAME = keychain['DATALAKE_BUCKET_NAME']
S3_PATH_REUTER = keychain['S3_PATH_REUTER']
S3_PATH_WWO = keychain['S3_PATH_WWO']
S3_PATH_STAGE = keychain['S3_PATH_STAGE']
S3_PATH_GOLDEN = keychain['S3_PATH_GOLDEN']
S3_PATH_TRAIN = keychain['S3_PATH_TRAIN']
S3_PATH_FORECAST = keychain['S3_PATH_PREDICTION']

boto3_session = boto3.Session(aws_access_key_id = ACCESS_KEY_ID,
                              aws_secret_access_key = ACCESS_SECRET_KEY,
                              region_name = 'ap-northeast-2')

s3_client = boto3_session.client('s3')
sm_client = boto3_session.client('sagemaker')
qs_client = boto3_session.client('quicksight')

sts_client = boto3_session.client("sts")
user_account_id = sts_client.get_caller_identity()["Account"]

In [8]:
!aws s3 cp 'src/v2.0' 's3://crude-palm-oil-prices-forecast/src' --recursive --exclude ".ipynb_checkpoints*"

upload: src/v2.0/preprocessing.py to s3://crude-palm-oil-prices-forecast/src/preprocessing.py
upload: src/v2.0/train.py to s3://crude-palm-oil-prices-forecast/src/train.py
upload: src/v2.0/prediction-autogluon.py to s3://crude-palm-oil-prices-forecast/src/prediction-autogluon.py
upload: src/v2.0/model_validation.py to s3://crude-palm-oil-prices-forecast/src/model_validation.py
upload: src/v2.0/visualization.py to s3://crude-palm-oil-prices-forecast/src/visualization.py


In [9]:
prediction_code = 's3://crude-palm-oil-prices-forecast/src/prediction-autogluon.py'
visualization_code = 's3://crude-palm-oil-prices-forecast/src/visualization.py'
%store prediction_code
%store visualization_code

Stored 'prediction_code' (str)
Stored 'visualization_code' (str)


# 2. 모델 빌딩 파이프라인 의 스텝(Step) 생성

## 2.1 모델 빌딩 파이프라인 변수 생성
파이프라인에서 사용할 파이프라인 파라미터를 정의합니다. 파이프라인을 스케줄하고 실행할 때 파라미터를 이용하여 실행조건을 커스마이징할 수 있습니다. 파라미터를 이용하면 파이프라인 실행시마다 매번 파이프라인 정의를 수정하지 않아도 됩니다.

지원되는 파라미터 타입은 다음과 같습니다:

- ParameterString - 파이썬 타입에서 str
- ParameterInteger - 파이썬 타입에서 int
- ParameterFloat - 파이썬 타입에서 float
이들 파라미터를 정의할 때 디폴트 값을 지정할 수 있으며 파이프라인 실행시 재지정할 수도 있습니다. 지정하는 디폴트 값은 파라미터 타입과 일치하여야 합니다.

본 노트북에서 사용하는 파라미터는 다음과 같습니다.

- processing_instance_type - 프로세싱 작업에서 사용할 ml.* 인스턴스 타입
- processing_instance_count - 프로세싱 작업에서 사용할 인스턴스 개수
- training_instance_type - 학습작업에서 사용할 ml.* 인스턴스 타입
- model_approval_status - 학습된 모델을 CI/CD를 목적으로 등록할 때의 승인 상태 (디폴트는 "PendingManualApproval")
- input_data - 입력데이터에 대한 S3 버킷 URI
파이프라인의 각 스텝에서 사용할 변수를 파라미터 변수로서 정의 합니다.

In [10]:
from sagemaker.workflow.parameters import (ParameterInteger,
                                           ParameterString,
                                          )
###################
## 1) 모델 inference 위한 파이프라인 변수  ####################################
###################
prediction_instance_type = ParameterString(
    name = "PredctionInstanceType",
    default_value = "ml.m5.xlarge"
)
prediction_instance_count = ParameterInteger(
    name = "PredctionInstanceCount",
    default_value = 1
)
###################
## 2) 모델 시각화 등록위한 파이프라인 변수  ####################################
###################

visualization_instance_type = ParameterString(
    name = "VisualizationInstanceType",
    default_value = "ml.m5.xlarge"
)
visualization_instance_count = ParameterInteger(
    name = "VisualizationInstanceCount",
    default_value = 1
)

## 2.2 프로세서 단계 정의

In [11]:
from sagemaker import get_execution_role
from sagemaker.image_uris import retrieve
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor

###################
# 1) 모델 예측 ###
##################
role = sagemaker.get_execution_role()
image_uri = retrieve(framework='mxnet',
                     region='ap-northeast-2',
                     version='1.9.0',
                     py_version='py38',
                     image_scope='training',
                     instance_type="ml.m5.xlarge")

script_processor_prediction = ScriptProcessor(
    command=['python3'],
    image_uri=image_uri,
    instance_type = prediction_instance_type,
    instance_count = prediction_instance_count,
    base_job_name = f"{BUCKET_NAME_USECASE}(Prediction)",
    role = role,
)

############################
# 2) 시각화 데이터 업데이트###
############################
skframework_version = "1.0-1"#"0.23-1"
skprocessor_visualization = SKLearnProcessor(
    framework_version = skframework_version,
    instance_type = visualization_instance_type,
    instance_count = visualization_instance_count,
    base_job_name = f"{BUCKET_NAME_USECASE}(Visualization)",
    role = role,
)

The input argument instance_type of function (sagemaker.image_uris.retrieve) is a pipeline variable (<class 'sagemaker.workflow.parameters.ParameterString'>), which is not allowed. The default_value of this Parameter object will be used to override it. Please make sure the default_value is valid.


## 2.3 파이프라인 스텝 단계 정의

In [13]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

###################
# 1) 모델 예측 ###
##################
step_prediction = ProcessingStep(
    name = f"{BUCKET_NAME_USECASE}-Prediction",
    processor = script_processor_prediction,
    inputs=[],
    outputs=[
        ProcessingOutput(output_name = "prediction_data",
                         source = "/opt/ml/processing/output/result",
                         destination = 's3://crude-palm-oil-prices-forecast/trained-model/2023/03/23/1679675822.0/prediction')
        ],
    job_arguments = ["--model_package_group_name", BUCKET_NAME_USECASE,
                     "--ml_algorithm_name", 'Autogluon'],
    code = prediction_code
)
############################
# 2) 시각화 데이터 업데이트###
############################
step_visualization = ProcessingStep(
    name = f"{BUCKET_NAME_USECASE}-Visualization",
    processor = skprocessor_visualization,
    job_arguments = ["--source_path", step_prediction.properties.ProcessingOutputConfig.Outputs["prediction_data"].S3Output.S3Uri,
                     "--qs_data_name", 'forecast result',
                     "--model_package_group_name", BUCKET_NAME_USECASE], 
    code = visualization_code,
)

# 3. 최종 파이프라인 정의 및 실행

### 3.1 최종 파이프라인 정의
1. prediction: model registry에 있는 데이터를 미래 원자재 예측
2. visualization: quicksight data source의 참조 manifest를 최신으로 업이트
     
참조하자: [MLOps Pipeline](https://aws.amazon.com/ko/blogs/machine-learning/deploy-an-mlops-solution-that-hosts-your-model-endpoints-in-aws-lambda/)

In [14]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep

pipeline = Pipeline(name = BUCKET_NAME_USECASE,
                    parameters = [
                        prediction_instance_type,        
                        prediction_instance_count,
                        visualization_instance_type,        
                        visualization_instance_count,
                        ],
                    steps=[step_prediction,
                           step_visualization
                          ]
)

### 3.2 파이프라인 정의 확인

In [15]:
import json

definition = json.loads(pipeline.definition())
definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'PredctionInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'PredctionInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'VisualizationInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'VisualizationInstanceCount',
   'Type': 'Integer',
   'DefaultValue': 1}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'crude-palm-oil-prices-forecast-Prediction',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': {'Get': 'Parameters.PredctionInstanceType'},
      'InstanceCount': {'Get': 'Parameters.PredctionInstanceCount'},
      'VolumeSizeInGB': 30}},
    'AppSpecification': {'ImageUri': '763104351884.dkr.ecr.ap-northeast-2.amazonaws.com/mxnet-training:1.9.0-cpu-py38',
     'ContainerArgum

### 3.3 파이프라인 정의를 제출하고 실행하기

In [16]:
%%time
start = time.time()
pipeline.upsert(role_arn=sagemaker.get_execution_role())
execution = pipeline.start()
execution.wait() #실행이 완료될 때까지 기다린다.
end = time.time()

WaiterError: Waiter PipelineExecutionComplete failed: Waiter encountered a terminal failure state: For expression "PipelineExecutionStatus" we matched expected path: "Failed"

In [17]:
print(f"visualization 시간 : {end - start:.1f} sec")
print(f"visualization 시간 : {((end - start)/60):.1f} min")

NameError: name 'end' is not defined

[2022년 11월 29일]
- prediction 시간 : 423.1 sec
- prediction 시간 : 7.1 min

### 3.4 Debug Model Validation 

In [18]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:108594546720:pipeline/crude-palm-oil-prices-forecast',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-northeast-2:108594546720:pipeline/crude-palm-oil-prices-forecast/execution/25mjsvejllfl',
 'PipelineExecutionDisplayName': 'execution-1679647134739',
 'PipelineExecutionStatus': 'Failed',
 'PipelineExperimentConfig': {'ExperimentName': 'crude-palm-oil-prices-forecast',
  'TrialName': '25mjsvejllfl'},
 'FailureReason': 'Step failure: One or multiple steps failed.',
 'CreationTime': datetime.datetime(2023, 3, 24, 8, 38, 54, 585000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2023, 3, 24, 8, 49, 50, 854000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': 'e131c821-030d-479e-a76f-dd64d4567822',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'e131c821-030d-479e-a76f-dd64d4567822',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '600',
   'date'

In [19]:
execution.list_steps()

[{'StepName': 'crude-palm-oil-prices-forecast-Visualization',
  'StartTime': datetime.datetime(2023, 3, 24, 8, 45, 39, 826000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 3, 24, 8, 49, 50, 559000, tzinfo=tzlocal()),
  'StepStatus': 'Failed',
  'AttemptCount': 0,
  'FailureReason': 'ClientError: AlgorithmError: See job logs for more information',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:108594546720:processing-job/pipelines-25mjsvejllfl-crude-palm-oil-price-aqjzzxvj2s'}}},
 {'StepName': 'crude-palm-oil-prices-forecast-Prediction',
  'StartTime': datetime.datetime(2023, 3, 24, 8, 38, 56, 641000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2023, 3, 24, 8, 45, 39, 343000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:108594546720:processing-job/pipelines-25mjsvejllfl-crude-palm-oil-price-poakw3jekp'}}}]

In [20]:
processing_response = execution.list_steps()[0]
processing_arn = processing_response['Metadata']['ProcessingJob']['Arn'] # index -1은 가장 처음 실행 step
processing_job_name = processing_arn.split('/')[-1] # Processing job name만 추출
processing_response = sm_client.describe_processing_job(ProcessingJobName = processing_job_name)
processing_response

{'ProcessingInputs': [{'InputName': 'code',
   'AppManaged': False,
   'S3Input': {'S3Uri': 's3://crude-palm-oil-prices-forecast/src/visualization.py',
    'LocalPath': '/opt/ml/processing/input/code',
    'S3DataType': 'S3Prefix',
    'S3InputMode': 'File',
    'S3DataDistributionType': 'FullyReplicated',
    'S3CompressionType': 'None'}}],
 'ProcessingJobName': 'pipelines-25mjsvejllfl-crude-palm-oil-price-AQjzzxvJ2s',
 'ProcessingResources': {'ClusterConfig': {'InstanceCount': 1,
   'InstanceType': 'ml.m5.xlarge',
   'VolumeSizeInGB': 30}},
 'StoppingCondition': {'MaxRuntimeInSeconds': 86400},
 'AppSpecification': {'ImageUri': '366743142698.dkr.ecr.ap-northeast-2.amazonaws.com/sagemaker-scikit-learn:1.0-1-cpu-py3',
  'ContainerEntrypoint': ['python3',
   '/opt/ml/processing/input/code/visualization.py'],
  'ContainerArguments': ['--source_path',
   's3://crude-palm-oil-prices-forecast/trained-model/2023/03/23/1679675822.0/prediction',
   '--qs_data_name',
   'forecast result',
   '--