# SageMaker Pipeline
--------------

ML workflow의 각 단계를 수동으로 수행했고, 이제는 모델 학습과 모델 registry에 등록하는 과정에 대한 파이프라인을 만듭니다. [Amazon SageMaker 모델 구축 파이프라인](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines.html)

<p align="center">
<center><img src="./img/mdp_how_it_works.png" height="250" width="850" alt=""><center>
<br><br>
</p>

## 1. import 패키지 설정

In [1]:
import joblib
import matplotlib.pyplot as plt
import sagemaker
# import splitfolders

import datetime
import glob
import os
import time
import warnings

from smexperiments.experiment import Experiment
from smexperiments.trial import Trial

# import wget
# import tarfile
import shutil

import boto3
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torchvision

# from tqdm import tqdm
from time import strftime
from PIL import Image
from torch.utils.data import Dataset
from torchvision import datasets, transforms

from sagemaker import get_execution_role
from sagemaker.pytorch import PyTorch

warnings.filterwarnings('ignore')
%config InlineBackend.figure_format = 'retina'

## 2. Experiments 관리

Amazon SageMaker에는 실험을 관리할 수 있는 [SageMaker Experiments](https://aws.amazon.com/ko/blogs/aws/amazon-sagemaker-experiments-organize-track-and-compare-your-machine-learning-trainings/) 서비스가 있습니다. 반복적인 실험에 대해 로깅을 남기기 위한 실험 이름 (create_experiment)과 trial (create_trial) 이름을 설정하는 함수입니다. <br> 이러한 메타 정보를 이용하여 향후 ML의 실험 관리가 용이해 질 수 있습니다.

In [2]:
def create_experiment(experiment_name):
    try:
        sm_experiment = Experiment.load(experiment_name)
    except:
        sm_experiment = Experiment.create(experiment_name=experiment_name,
                                          tags=[
                                              {
                                                  'Key': 'modelname',
                                                  'Value': 'informer'
                                              },
                                          ])

In [3]:
def create_trial(experiment_name, set_param, i_type, i_cnt, spot):
    create_date = strftime("%m%d-%H%M%s")
    
    algo = 'dp'
    
    spot = 's' if spot else 'd'
    i_tag = 'test'
    if i_type == 'ml.p3.16xlarge':
        i_tag = 'p3'
    elif i_type == 'ml.p3dn.24xlarge':
        i_tag = 'p3dn'
    elif i_type == 'ml.p4d.24xlarge':
        i_tag = 'p4d'    
        
    trial = "-".join([i_tag,str(i_cnt),algo, spot])
       
    sm_trial = Trial.create(trial_name=f'{experiment_name}-{trial}-{create_date}',
                            experiment_name=experiment_name)

    job_name = f'{sm_trial.trial_name}'
    return job_name

## 3. 데이터 저장소와 학습 script 위치 설정
SageMaker에는 학습에 사용할 데이터 위치와 학습 코드의 위치를 설정합니다. 편의를 위해 default_bucket을 사용했으나, 실제 활용 시에는 이미 생성한 bucket을 활용하는 것도 가능합니다. 

In [4]:
prefix = 'ETDataset'

sess = boto3.Session()
sagemaker_session = sagemaker.Session()
sm = sess.client('sagemaker')
default_bucket = sagemaker_session.default_bucket()

s3_data_path = f's3://{default_bucket}/{prefix}'
source_dir = 'Informer2020'
%store default_bucket

Stored 'default_bucket' (str)


## 4. 실험 설정

학습 시 사용한 소스코드와 output 정보를 저장할 위치를 선정합니다. 이 값은 필수로 설정하지 않아도 됩니다.

In [5]:
code_location = f's3://{default_bucket}/sm_codes'
output_path = f's3://{default_bucket}/poc_informer/output' 
checkpoint_s3_bucket = f's3://{default_bucket}/checkpoints'

실험에서 표준 출력으로 보여지는 metrics 값을 정규 표현식을 이용하여 SageMaker에서 값을 capture할 수 있습니다. 이 값은 필수로 설정하지 않아도 됩니다.

In [6]:
metric_definitions = [
    {'Name': 'Epoch', 'Regex': 'Epoch: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?),'},
    {'Name': 'train_loss', 'Regex': 'Train Loss: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?),'},
    {'Name': 'valid_loss', 'Regex': 'Valid Loss: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?),'},
    {'Name': 'test_loss', 'Regex': 'Test Loss: ([-+]?[0-9]*[.]?[0-9]+([eE][-+]?[0-9]+)?),'},
]

다양한 실험 조건을 테스트하기 위해 hyperparameters로 argument 값들을 노트북에서 설정할 수 있으며, 이 값은 학습 스크립트에서 argument인 변수로 받아서 활용이 가능합니다.

In [7]:
hyperparameters = {
        'model' : 'informer', # model of experiment, options: [informer, informerstack, informerlight(TBD)]
        'data' : 'ETTh1', # data
        'root_path' : 'ETT-small/', # root path of data file
        'data_path' : 'ETTh1.csv', # data file
        'features' : 'M', # forecasting task, options:[M, S, MS]; M:multivariate predict multivariate, S:univariate predict univariate, MS:multivariate predict univariate
        'target' : 'OT', # target feature in S or MS task
        'freq' : 'h', # freq for time features encoding, options:[s:secondly, t:minutely, h:hourly, d:daily, b:business days, w:weekly, m:monthly], you can also use more detailed freq like 15min or 3h
        'checkpoints' : 'informer_checkpoints', # location of model checkpoints

        'seq_len' : 96, # input sequence length of Informer encoder
        'label_len' : 48, # start token length of Informer decoder
        'pred_len' : 24, # prediction sequence length
        # Informer decoder input: concat[start token series(label_len), zero padding series(pred_len)]

        'enc_in' : 7, # encoder input size
        'dec_in' : 7, # decoder input size
        'c_out' : 7, # output size
        'factor' : 5, # probsparse attn factor
        'd_model' : 512, # dimension of model
        'n_heads' : 8, # num of heads
        'e_layers' : 2, # num of encoder layers
        'd_layers' : 1, # num of decoder layers
        'd_ff' : 2048, # dimension of fcn in model
        'dropout' : 0.05, # dropout
        'attn' : 'prob', # attention used in encoder, options:[prob, full]
        'embed' : 'timeF', # time features encoding, options:[timeF, fixed, learned]
        'activation' : 'gelu', # activation
        'distil' : True, # whether to use distilling in encoder
        'output_attention' : False, # whether to output attention in ecoder
        'mix' : True,
        'padding' : 0,
        'freq' : 'h',
        'do_predict' : True,
        'batch_size' : 32,
        'learning_rate' : 0.0001,
        'loss' : 'mse',
        'lradj' : 'type1',
        'use_amp' : False, # whether to use automatic mixed precision training

        'num_workers' : 0,
        'itr' : 1,
        'train_epochs' : 1,  ## Training epochs
        'patience' : 3,
        'des' : 'exp',
        'use_multi_gpu' : True
    }

experiment_name = 'informer-poc-exp1'
# instance_type = 'ml.p3.16xlarge'  # 'ml.p3.16xlarge', 'ml.p3dn.24xlarge', 'ml.p4d.24xlarge', 'local_gpu'
instance_type = 'ml.c5.4xlarge'
instance_count = 1
do_spot_training = True
max_wait = None
max_run = 3*60*60


분산학습과 spot 학습을 사용할지를 선정할 수 있습니다. <br>
분산학습의 경우 [SageMaker data parallel library](https://docs.aws.amazon.com/sagemaker/latest/dg/data-parallel.html)를 사용하고자 할 경우 distribution을 아래와 같이 설정한 후 사용할 수 있습니다. (학습 스크립트 일부 수정 필요) <br>
[spot 학습](https://docs.aws.amazon.com/sagemaker/latest/dg/model-managed-spot-training.html)을 사용하고자 할 경우 학습 파라미터에 spot 파라미터를 True로 변경한 다음, 자원이 없을 때 대기하는 시간인 max_wait (초)를 설정해야 합니다.

In [8]:
image_uri = None
train_job_name = 'sagemaker'


train_job_name = 'informer-dist'
distribution = {}

if instance_type in ['ml.p3.16xlarge', 'ml.p3dn.24xlarge', 'ml.p4d.24xlarge', 'local_gpu']:
    distribution["smdistributed"]={ 
                        "dataparallel": {
                            "enabled": True
                        }
                }
else:
    distribution = None

if do_spot_training:
    max_wait = max_run

print("train_job_name : {} \ntrain_instance_type : {} \ntrain_instance_count : {} \nimage_uri : {} \ndistribution : {}".format(train_job_name, instance_type, instance_count, image_uri, distribution))    

train_job_name : informer-dist 
train_instance_type : ml.c5.4xlarge 
train_instance_count : 1 
image_uri : None 
distribution : None


## 5. Pipeline parameters

[SageMaker Pipeline](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html)의 중요한 기능은 미리 단계를 정의한 다음, 파이프라인의 재정의 없이도 parameters를 실행 중인 단계에서 변경할 수 있다는 것입니다. parameters를 사용하여 이 작업을 수행할 수 있습니다. <br>
ParameterInteger, ParameterFloat, ParameterString를 사용할 수 있으며, 이후 `pipeline.start(parameters=parameters)`를 호출할 때 수정할 수 있는 값을 미리 정의합니다. 특정 parameters만으로 이러한 방식의 정의가 가능합니다.

In [9]:
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import CreateModelStep

from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.properties import PropertyFile
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.model import Model
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.parameters import ParameterInteger, ParameterFloat, ParameterString
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep, JsonGet

In [10]:
train_instance_param = ParameterString(
    name="TrainingInstance",
    default_value="ml.p3.16xlarge",
)

train_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

## 6. 학습을 위한 Estimator 선언

AWS 서비스 활용 시 role (역할) 설정은 매우 중요합니다. 이 노트북에서 사용하는 role은 노트북과 training job을 실행할 때 사용하는 role이며, role을 이용하여 다양한 AWS 서비스에 대한 접근 권한을 설정할 수 있습니다.

In [11]:
role = get_execution_role()
role

'arn:aws:iam::653762495386:role/TeamRole'

In [12]:
# all input configurations, parameters, and metrics specified in estimator 
# definition are automatically tracked
estimator = PyTorch(
    entry_point='main_informer.py',
    source_dir=source_dir,
    role=role,
    sagemaker_session=sagemaker_session,
    framework_version='1.8.1',
    py_version='py36',
    instance_count=train_count_param,    ## Parameter 값으로 변경
    instance_type=train_instance_param,  ## Parameter 값으로 변경
    volume_size=256,
    code_location = code_location,
    output_path=output_path,
    hyperparameters=hyperparameters,
    distribution=distribution,
    metric_definitions=metric_definitions,
    max_run=max_run,
    checkpoint_s3_uri=checkpoint_s3_bucket,
    use_spot_instances=do_spot_training,  # spot instance 활용
    max_wait=max_wait,
    base_job_name=f"informer-train",
)

## 7. Training 단계 선언

[training 단계](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html)를 사용하여 모델을 학습하는 training job을 생성합니다.<br>
training 단계에는 estimator, training과 validation 데이터 입력 등이 필요합니다. <br>

[caching](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines-caching.html) 를 사용하면 SageMaker 파이프라인이 단계를 실행하기 전에 동일한 인수를 사용하여 호출된 단계의 이전 실행을 찾으려고 시도합니다. 파이프라인은 인수가 가리키는 데이터 또는 코드가 변경되었는지 여부를 확인하지 않습니다. 

In [13]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, 
                           expire_after="7d")

In [14]:
training_step = TrainingStep(
    name="InformerTrain",
    estimator=estimator,
    inputs={
        "training": sagemaker.inputs.TrainingInput(
            s3_data=s3_data_path
        )
    },
    cache_config=cache_config
)

## 8. Processing 단계 - output에서 압축풀어 test_report.json 가져오기

[processing 단계](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep)는 데이터 처리를 위해 수행되는 processing job을 생성합니다. <br>
processing 단계는 processor, processing 코드를 정의하는 python 스크립트, processing을 위한 output, job 관련 arguments 등으로 구성됩니다.


In [15]:
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    instance_type="ml.c4.xlarge",
    instance_count=1,
    base_job_name=f"GeneratingReport",  # choose any name
    sagemaker_session=sagemaker_session,
    role=role,
)

INFO:sagemaker.image_uris:Same images used for training and inference. Defaulting to image scope: inference.
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


앞선 Train 단계에서의 model 산출물을 postprocessing의 input으로 추가합니다.

In [16]:
model_input = ProcessingInput(
                        source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
                        destination="/opt/ml/processing/model",
                    )

In [17]:
test_report = PropertyFile(
    name="TestReport",
    output_name="result",
    path="test_report.json",
)

In [18]:
postprocessing_step = ProcessingStep(
    name="PostProcessingforInformer",  # choose any name
    processor=sklearn_processor,
    inputs=[model_input],
    outputs=[
        ProcessingOutput(output_name="result", source="/opt/ml/processing/result")
    ],
    code=os.path.join(source_dir, "postprocess.py"),
    property_files=[test_report],
    cache_config=cache_config
)

## 9. Register Model

[register model 단계](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_collections.RegisterModel)를 사용하여 sagemaker.model.Model 또는 sagemaker.pipeline.PipelineModel을 SageMaker의 model registry에 등록합니다. <br>
PipelineModel은 inference pipeline을 나타내며, inference 요청을 처리하는 container들의 순서를 구성합니다. <br>
register model 단계에서는 등록된 모델의 metrics를 json 구조로 통합하여 등록할 수 있으며, 모델 승인에 대한 방법을 정의할 수 있습니다.

In [19]:
model_package_group_name = "ts-prediction-informer"

In [20]:
# Register model step that will be conditionally executed
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/test_report.json".format(
            postprocessing_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"],
        ),
        content_type="application/json",
    )
)

In [21]:
register_step = RegisterModel(
    name="InformerRegisterModel",
    estimator=estimator,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

## 10. Condition 단계

[condition 단계](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.condition_step.ConditionStep)를 사용하여 단계 properties의 조건을 평가하여 파이프라인에서 다음에 수행할 작업을 진행할지 여부를 판단합니다. <br>

condition 단계에는 condition 목록, condition이 참으로 평가될 경우 실행할 단계 목록, condition이 거짓으로 평가될 경우 실행할 단계 목록 등이 필요합니다. <br>

`[제한사항]`
- SageMaker 파이프라인은 nested condition 단계의 사용을 지원하지 않습니다. 즉, condition 단계를 다른 조건 단계의 입력으로 전달할 수 없습니다.
- condition 단계는 두 개의 분기에서 동일 단계를 사용할 수 없습니다. 즉, 두 개의 분기에서 동일 단계의 기능이 필요한 경우 단계를 복제하고 다른 이름을 지정합니다.

In [22]:
# Condition step for evaluating model quality and branching execution
cond_lte = ConditionLessThanOrEqualTo(  # You can change the condition here
    left=JsonGet(
        step=postprocessing_step,
        property_file=test_report,
        json_path="regression_metrics.mse.value",  # This should follow the structure of your report_dict defined in the postprocess.py file.
    ),
    right=1.0,  # You can change the threshold here
)
cond_step = ConditionStep(
    name="TestMSECond",
    conditions=[cond_lte],
    if_steps=[register_step],
    else_steps=[],
)

See: https://sagemaker.readthedocs.io/en/stable/v2.html for details.


## 11. Pipeline 수행

지금까지 선언한 Step (단계)를 모두 통합합니다. step의 순서는 DAG을 고려하여 자동으로 정의가 됩니다.

In [23]:
pipeline = Pipeline(
    name="ts-prediction-informer-pipeline",
    parameters=[train_instance_param, train_count_param, model_approval_status],
    steps=[
        training_step,
        postprocessing_step,
        cond_step
    ],
)

SageMaker pipeline 서비스에 정의된 pipeline를 제출하게 됩니다. 기존 정의된 동일한 이름의 pipeline이 있는 경우 덮어쓰기가 됩니다.

In [24]:
pipeline.upsert(role_arn=role)

INFO:sagemaker.image_uris:Defaulting to the only supported framework/algorithm version: latest.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.


{'PipelineArn': 'arn:aws:sagemaker:us-west-2:653762495386:pipeline/ts-prediction-informer-pipeline',
 'ResponseMetadata': {'RequestId': 'dcd6bc37-bb30-435d-b539-6773a09487f6',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'dcd6bc37-bb30-435d-b539-6773a09487f6',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '99',
   'date': 'Wed, 06 Oct 2021 03:20:21 GMT'},
  'RetryAttempts': 0}}

In [25]:
# Special pipeline parameters can be defined or changed here
parameters = {"TrainingInstance": "ml.c5.4xlarge"}

In [26]:
start_response = pipeline.start(parameters=parameters)

Pipeline의 진행 사항을 모니터링할 수 있습니다. wait() 함수는 종료가 될 때까지 대기하기 됩니다.

In [27]:
start_response.wait()
start_response.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-west-2:653762495386:pipeline/ts-prediction-informer-pipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-west-2:653762495386:pipeline/ts-prediction-informer-pipeline/execution/sptq0lo5lqul',
 'PipelineExecutionDisplayName': 'execution-1633490423507',
 'PipelineExecutionStatus': 'Succeeded',
 'PipelineExperimentConfig': {'ExperimentName': 'ts-prediction-informer-pipeline',
  'TrialName': 'sptq0lo5lqul'},
 'CreationTime': datetime.datetime(2021, 10, 6, 3, 20, 23, 442000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 10, 6, 3, 37, 3, 375000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '6ad5c3ce-4ce6-469b-97b7-2a78ca3814fe',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '6ad5c3ce-4ce6-469b-97b7-2a78ca3814fe',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '534',
   'date': 'Wed, 06 Oct 2021 03:37:28 GMT'},
  'RetryAttempts': 0}}

<br>
각 step 별로 진행사항을 파악할 수 있습니다.

In [28]:
start_response.list_steps()

[{'StepName': 'InformerRegisterModel',
  'StartTime': datetime.datetime(2021, 10, 6, 3, 37, 1, 720000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 10, 6, 3, 37, 2, 666000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-west-2:653762495386:model-package/ts-prediction-informer/1'}}},
 {'StepName': 'TestMSECond',
  'StartTime': datetime.datetime(2021, 10, 6, 3, 37, 0, 687000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 10, 6, 3, 37, 1, 313000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Condition': {'Outcome': 'True'}}},
 {'StepName': 'PostProcessingforInformer',
  'StartTime': datetime.datetime(2021, 10, 6, 3, 31, 37, 571000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 10, 6, 3, 36, 59, 805000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-west-2:653762495386:processing-job/pipelines-sptq0lo5lqul-postprocess

## 12. Model 등록 실행

아래 함수를 이용하여 현재 pending 중인 모델을 확인할 수 있습니다.

In [29]:
pending_model = sm.list_model_packages(
    ModelPackageGroupName=model_package_group_name,
    ModelApprovalStatus='PendingManualApproval',
    SortBy='Name',
    SortOrder='Descending'
)
pending_model

{'ModelPackageSummaryList': [{'ModelPackageGroupName': 'ts-prediction-informer',
   'ModelPackageVersion': 1,
   'ModelPackageArn': 'arn:aws:sagemaker:us-west-2:653762495386:model-package/ts-prediction-informer/1',
   'CreationTime': datetime.datetime(2021, 10, 6, 3, 37, 2, 559000, tzinfo=tzlocal()),
   'ModelPackageStatus': 'Completed',
   'ModelApprovalStatus': 'PendingManualApproval'}],
 'ResponseMetadata': {'RequestId': '5052c14d-a723-47ca-96bf-4b136d0343a3',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '5052c14d-a723-47ca-96bf-4b136d0343a3',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '315',
   'date': 'Wed, 06 Oct 2021 03:37:28 GMT'},
  'RetryAttempts': 0}}

<br>
아래 명령어를 이용하여 실제 승인을 수행하게 됩니다. 아래 예시는 가장 최근 등록된 버전의 모델을 가져와서 승인하도록 만들었습니다.

In [30]:
model_package_update_input_dict = {
    "ModelPackageArn" : pending_model['ModelPackageSummaryList'][0]['ModelPackageArn'],
    "ModelApprovalStatus" : "Approved"
}
model_package_update_response = sm.update_model_package(**model_package_update_input_dict)

In [31]:
model_package_update_response

{'ModelPackageArn': 'arn:aws:sagemaker:us-west-2:653762495386:model-package/ts-prediction-informer/1',
 'ResponseMetadata': {'RequestId': '0a34e0b0-0d15-4c63-bd1c-71813188a45a',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '0a34e0b0-0d15-4c63-bd1c-71813188a45a',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '101',
   'date': 'Wed, 06 Oct 2021 03:37:28 GMT'},
  'RetryAttempts': 0}}

승인된 모델 중에서 가장 최신 버전의 모델을 검색합니다.

In [32]:
approved_model = sm.list_model_packages(
    ModelPackageGroupName=model_package_group_name,
    ModelApprovalStatus='Approved',
    SortBy='Name',
    SortOrder='Descending'
)

In [33]:
sm.describe_model_package(ModelPackageName=approved_model['ModelPackageSummaryList'][0]['ModelPackageArn'])

{'ModelPackageGroupName': 'ts-prediction-informer',
 'ModelPackageVersion': 1,
 'ModelPackageArn': 'arn:aws:sagemaker:us-west-2:653762495386:model-package/ts-prediction-informer/1',
 'CreationTime': datetime.datetime(2021, 10, 6, 3, 37, 2, 559000, tzinfo=tzlocal()),
 'InferenceSpecification': {'Containers': [{'Image': '763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference:1.8.1-gpu-py36',
    'ImageDigest': 'sha256:3533e731f4dd08310eaf467d84a7f443d965609f41fb1870c467b16de854f70d',
    'ModelDataUrl': 's3://sagemaker-us-west-2-653762495386/poc_informer/output/pipelines-sptq0lo5lqul-InformerTrain-ZhxTyT0cYM/output/model.tar.gz'}],
  'SupportedTransformInstanceTypes': ['ml.m5.xlarge'],
  'SupportedRealtimeInferenceInstanceTypes': ['ml.m5.xlarge'],
  'SupportedContentTypes': ['text/csv'],
  'SupportedResponseMIMETypes': ['text/csv']},
 'ModelPackageStatus': 'Completed',
 'ModelPackageStatusDetails': {'ValidationStatuses': [],
  'ImageScanStatuses': []},
 'CertifyForMarketplace': F

## 13. SageMaker Model 생성
승인된 모델 정보를 이용하여 들어오는 데이터에 대한 전/후처리가 포함된 predictor.py 파일을 포함하여 SageMaker 모델을 생성합니다.
predictor.py의 전처리는 전체 데이터셋 (CSV)에서 학습에 활용하지 않은 테스트용 데이터셋을 짤라 Ground Truth 값을 제거하고 모델 입력 형태로 변환하는 작업을 합니다. 후처리는 예측된 결과와 Ground Truth 값을 다시 붙여서 결과 CSV로 S3에 저장하도록 구현하였습니다. 


향후 활용시에는 각각의 데이터셋과 모델 환경에 맞게 predictor.py를 구현하여 활용하시면 됩니다.

In [37]:
approved_model['ModelPackageSummaryList'][0]['ModelPackageArn']

'arn:aws:sagemaker:us-west-2:653762495386:model-package/ts-prediction-informer/1'

In [38]:
container_list = [
    {
        "ModelPackageName": approved_model['ModelPackageSummaryList'][0]['ModelPackageArn'], 
        "Environment": {"SAGEMAKER_PROGRAM": "predictor.py"}
    }
]

try:
    sm.delete_model(ModelName=model_package_group_name)
except:
    pass

create_model_response = sm.create_model(
    ModelName=model_package_group_name,
    ExecutionRoleArn=role,
    Containers=container_list
)
print("Model arn : {}".format(create_model_response["ModelArn"]))

Model arn : arn:aws:sagemaker:us-west-2:653762495386:model/ts-prediction-informer


## 14. SageMaker batch transform job 수행
앞에서 생성한 model을 이용하여 데이터셋 (CSV)의 위치가 있는 S3 URI와 입력되는 데이터의 형태, 결과 파일이 저장되는 S3 URI, 이 작업을 수행하는 인스턴스 타입과 개수를 지정하여 예측을 수행합니다.

In [41]:
from time import strftime

transform_jobname=model_package_group_name+"-"+strftime("%m%d-%H%M%s")

response = sm.create_transform_job(
  TransformJobName=transform_jobname,
  ModelName=model_package_group_name,
  MaxConcurrentTransforms=1,
  TransformInput={
      'DataSource': {
          'S3DataSource': {
              'S3DataType': 'S3Prefix',
              'S3Uri': f's3://{default_bucket}/ETDataset/ETT-small/ETTh1.csv'
          }
      },
      'ContentType' : 'text/csv',
      'SplitType': 'Line'
  },
  TransformOutput={
      'S3OutputPath': f"s3://{default_bucket}/batch_result",
      'AssembleWith': 'Line',
  },
  TransformResources={
      'InstanceType': 'ml.m5.xlarge',
      'InstanceCount': 1
  },
  Environment={
      'default_bucket': default_bucket
  },
)

In [43]:
sm.describe_transform_job(TransformJobName=transform_jobname)

{'TransformJobName': 'ts-prediction-informer-1006-04111633493474',
 'TransformJobArn': 'arn:aws:sagemaker:us-west-2:653762495386:transform-job/ts-prediction-informer-1006-04111633493474',
 'TransformJobStatus': 'Completed',
 'ModelName': 'ts-prediction-informer',
 'MaxConcurrentTransforms': 1,
 'Environment': {'default_bucket': 'sagemaker-us-west-2-653762495386'},
 'TransformInput': {'DataSource': {'S3DataSource': {'S3DataType': 'S3Prefix',
    'S3Uri': 's3://sagemaker-us-west-2-653762495386/ETDataset/ETT-small/ETTh1.csv'}},
  'ContentType': 'text/csv',
  'CompressionType': 'None',
  'SplitType': 'Line'},
 'TransformOutput': {'S3OutputPath': 's3://sagemaker-us-west-2-653762495386/batch_result',
  'AssembleWith': 'Line',
  'KmsKeyId': ''},
 'TransformResources': {'InstanceType': 'ml.m5.xlarge', 'InstanceCount': 1},
 'CreationTime': datetime.datetime(2021, 10, 6, 4, 11, 14, 731000, tzinfo=tzlocal()),
 'TransformStartTime': datetime.datetime(2021, 10, 6, 4, 14, 54, 868000, tzinfo=tzlocal(