# [모듈 8.5] 전체 모델 빌딩 파이프라인 개발 (SageMaker Model Building Pipeline 모든 스텝)

이 노트북은 아래와 같은 목차로 진행 됩니다. 전체를 모두 실행시에 완료 시간은 **약 30분** 소요 됩니다.

- 0. SageMaker Model Building Pipeline 개요
- 1. SageMaker Model Building Pipeline 모든 스텝 실행 및 실행
    - (0) 기본 세이지 메이커 정보 및 기본 변수 로딩
    - (1) 모델 빌딩 파이프라인 변수 생성
    - (2) 전처리 스텝 단계 정의    
    - (3) 모델 학습을 위한 학습단계 정의 
    - (4) 세이지 메이커 모델 생성 스텝 생성    
    - (5) 실시간 엔드 포인트 배포 스텝 생성    
    - (6) 모델 빌딩 파이프라인 정의 및 실행
    
- [참고] 위의 각 스텝에 대한 상세 사항은 이전 노트북의 각 스텝을 참조 하세요.    
    
---
### 노트북 커널
- 이 워크샵은 노트북 커널이 `conda_python3` 를 사용합니다. 다른 커널일 경우 변경 해주세요.
---



# 0.SageMaker Model Building Pipeline 개요

Amazon SageMaker 모델 구축 파이프라인은 직접 SageMaker 통합을 활용하는 머신 러닝 파이프라인을 구축하기 위한 도구입니다. 이러한 통합으로 인해 많은 단계 생성 및 관리를 처리하는 도구를 사용하여 파이프라인을 생성하고 오케스트레이션용 SageMaker Projects를 설정할 수 있습니다. SageMaker 파이프라인은 다른 파이프라인에 비해 다음과 같은 이점을 제공합니다

- 상세 사항은 개발자 가이드 참조 하세요. --> [Amazon SageMaker 모델 구축 파이프라인](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines.html)

![mdp_how_it_works.png](img/mdp_how_it_works.png)

---
# 1. SageMaker Model Building Pipeline 모든 스텝 실행 및 실행

## (0) 기본 세이지 메이커 정보 및 기본 변수 로딩

In [1]:
import boto3
import sagemaker
import pandas as pd

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()

%store -r 

## (1) 모델 빌딩 파이프라인 변수 생성

파이프라인에 인자로 넘길 변수는 아래 크게 3가지 종류가 있습니다.
- 프로세싱 스텝을 위한 인스턴스 타입 및 인스턴스 수
    - 데이터 전처리 스텝 및 실시간 앤드 포인트 스텝에 사용 됨.
- 훈련 스텝을 위한 인스턴스 타입 및 인스턴스 수     
- 원본 데이터 세트에 대한 S3 주소
    - 데이터 전처리 스텝에서 사용 됩니다.


In [2]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)

training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.m5.xlarge"
)

training_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1
)


input_data = ParameterString(
    name="InputData",
    default_value=input_data_uri,
)


## (2) 전처리 스텝 단계 정의

크게 아래와 같은 순서로 정의 합니다.
- 프로세싱 오브젝트 정의 (SKLearnProcessor)
- 프로세싱 스텝 정의
    - 일력 데이터 세트
        - source: S3 경로 (input_data_uri)
        - destination: 도커 컨테이너의 내부 폴더 위치
    - 출력 위치
        - 훈련 전처리 데이터 결과 위치
        - 테스트 전처리 데이터 결과 위치
    - 프로세싱 코드
    - 프로세싱 코드에 넘길 인자 


In [3]:
from sagemaker.sklearn.processing import SKLearnProcessor

split_rate = 0.2
framework_version = "0.23-1"

sklearn_processor = SKLearnProcessor(
    framework_version=framework_version,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="sklearn-fraud-process",
    role=role,
)
print("input_data: \n", input_data)

input_data: 
 s3://sagemaker-ap-northeast-2-057716757052/sagemaker-pipeline-step-by-step/input


In [4]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
    
step_process = ProcessingStep(
    name="FraudScratchProcess",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(source=input_data, destination='/opt/ml/processing/input'),        
         ],
    outputs=[ProcessingOutput(output_name="train",
                              source='/opt/ml/processing/output/train'),
             ProcessingOutput(output_name="test",
                              source='/opt/ml/processing/output/test')],
    job_arguments=["--split_rate", f"{split_rate}"],        
    code= 'src/preprocessing.py',
)


## (3) 모델 학습을 위한 학습단계 정의 

학습 스텝을 정의하기 위해서는 크게 아래와 같은 과정이 있습니다.
- XGBoost Estimator 정의
- 학습 스텝 정의
    - 아래와 같은 중요한 인자가 필요 합니다.
        - Estimator (위에서 정의한 것 사용)
        - 훈련을 위한 입력 데이터 위치


### 기본 훈련 변수 및 하이퍼파라미터 설정

In [5]:
from sagemaker.xgboost.estimator import XGBoost

bucket = sagemaker_session.default_bucket()
prefix = 'fraud2train'

estimator_output_path = f's3://{bucket}/{prefix}/training_jobs'

hyperparameters = {
       "scale_pos_weight" : "29",        
        "max_depth": "3",
        "eta": "0.2",
        "objective": "binary:logistic",
        "num_round": "100",
}

In [6]:
xgb_train = XGBoost(
    entry_point = "xgboost_starter_script.py",
    source_dir = "src",
    output_path = estimator_output_path,
    code_location = estimator_output_path,
    hyperparameters = hyperparameters,
    role = role,
    instance_count = training_instance_count,
    instance_type = training_instance_type,
    framework_version = "1.0-1")

훈련의 입력이 이전 전처리의 결과가 제공됩니다.
- `step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri`

In [7]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name="FraudScratchTrain",
    estimator=xgb_train,
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
    },
)

## (4) 세이지 메이커 모델 생성 스텝 생성

모델 생성 스텝에는 크게 아래 두가지가 필요 합니다.
- 세이지 메이커 모델 생성 
    - Model() 생성시에 아래 두 파리미터의 입력이 이전 스텝의 결과가 제공됩니다.
        - image_uri= step_train.properties.AlgorithmSpecification.TrainingImage,
        - model_data= step_train.properties.ModelArtifacts.S3ModelArtifacts,
- 모델 스텝 정의
    - 인자로서 아래 두가지가 필요 합니다.
        - Model 오브젝트
        - 인스턴스 타입으로서 입력



In [8]:
from sagemaker.model import Model
    
model = Model(
    image_uri= step_train.properties.AlgorithmSpecification.TrainingImage,
    model_data= step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=sagemaker_session,
    role=role,
)

In [9]:
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.steps import CreateModelStep


inputs = CreateModelInput(
    instance_type="ml.m5.large",
)
step_create_model = CreateModelStep(
    name="FraudScratchModel",
    model=model,
    inputs=inputs,
)

## (5) 실시간 엔드 포인트 배포 스텝 생성

앤드포인트를 생성하기 위해서는 프로세싱 스텝을 통해서 합니다. 프레세싱 스텝에 앤드포인트 생성에 필요한 코드(스크립트)를 작성하여 프로세싱 스텝에서 실행하여 생성하게 합니다. 크게 아래와 같은 과정으로 합니다.

- 앤드포인트 생성 코드를 S3 에 업로드
- SKLearnProcessor 오브젝트 생성
- ProcessingStep 정의 (중요한 인자는 아래와 같습니다.)
    - processor (SKLearnProcessor 오브젝트 제공)
    - 코드에 전달할 커맨드 인자
        - endpoint config 생성시에, 이전 단계의 모델 결과를 제공합니다.
        - "--model_name", step_create_model.properties.ModelName,     
    - 앤드포인트 생성 코드



In [10]:
from datetime import datetime
suffix = datetime.now().microsecond

In [11]:

local_deploy_code_path = 'src/deploy_model.py'
s3_deploy_code_path = f"s3://{default_bucket}/{project_prefix}/code"
s3_deploy_code_uri = sagemaker.s3.S3Uploader.upload(
    local_path=local_deploy_code_path, 
    desired_s3_uri=s3_deploy_code_path,
)
print("s3_deploy_code_uri: ", s3_deploy_code_uri)

all_pipeline_endpoint_name = 'all-pipeline-endpoint-' + str(suffix)
endpoint_instance_type = "ml.m5.xlarge"

s3_deploy_code_uri:  s3://sagemaker-ap-northeast-2-057716757052/sagemaker-pipeline-step-by-step/code/deploy_model.py


In [12]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.workflow.steps import ProcessingStep

deploy_model_processor = SKLearnProcessor(
    framework_version='0.23-1',
    role= role,
    instance_type="ml.t3.medium",
    instance_count=1,
    base_job_name='fraud-scratch-deploy-model',
    sagemaker_session=sagemaker_session)


step_deploy = ProcessingStep(
    name='DeployModel',
    processor=deploy_model_processor,
    job_arguments=[
        "--model_name", step_create_model.properties.ModelName, 
        "--region", region,
        "--endpoint_instance_type", endpoint_instance_type,
        "--endpoint_name", all_pipeline_endpoint_name
    ],
    code=s3_deploy_code_uri)

## (6) 모델 빌딩 파이프라인 정의 및 실행
위에서 정의한 아래의 4개의 스텝으로 파이프라인 정의를 합니다.

- 위에서 정의한 파라미터를 제공
- 실행할 스텝 기술
    - steps=[step_process, step_train, step_create_model, step_deploy],
- 아래는 약 20분 정도 소요 됩니다.

In [13]:
from sagemaker.workflow.pipeline import Pipeline

project_prefix = 'sagemaker-pipeline-step-by-step'

pipeline_name = project_prefix
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_type, 
        processing_instance_count,
        training_instance_type,        
        training_instance_count,                
        input_data,
    ],
    steps=[step_process, step_train, step_create_model, step_deploy],
)

In [14]:
import json

definition = json.loads(pipeline.definition())
# definition

#### 파이프라인을 SageMaker에 제출하고 실행하기 

파이프라인 정의를 SageMaker Pipelines 서비스에 제출하여 파이프라인을 생성하거나 파이프라인이 이미 존재하면 파이프라인 정의를 업데이트합니다. 함께 전달되는 역할(role)을 이용하여 AWS에서 파이프라인을 생성하고 작업의 각 단계를 실행할 것입니다.   

In [15]:
pipeline.upsert(role_arn=role)

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:pipeline/sagemaker-pipeline-step-by-step',
 'ResponseMetadata': {'RequestId': 'f60552f3-5bdf-4dfb-ac17-02505fe647e5',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': 'f60552f3-5bdf-4dfb-ac17-02505fe647e5',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '104',
   'date': 'Tue, 03 Aug 2021 08:56:10 GMT'},
  'RetryAttempts': 0}}

디폴트값을 이용하여 파이프라인을 샐행합니다. 

In [16]:
execution = pipeline.start()

### 파이프라인 운영: 파이프라인 대기 및 실행상태 확인

워크플로우의 실행상황을 살펴봅니다. 

In [17]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:pipeline/sagemaker-pipeline-step-by-step',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:pipeline/sagemaker-pipeline-step-by-step/execution/zee5tw1ruau9',
 'PipelineExecutionDisplayName': 'execution-1627980970972',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2021, 8, 3, 8, 56, 10, 890000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2021, 8, 3, 8, 56, 10, 890000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '63a330d6-5b01-47c0-bfae-5aee77a4a13c',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '63a330d6-5b01-47c0-bfae-5aee77a4a13c',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '435',
   'date': 'Tue, 03 Aug 2021 08:56:10 GMT'},
  'RetryAttempts': 0}}

In [18]:
execution.wait()

실행이 완료될 때까지 기다립니다.

실행된 단계들을 리스트업합니다. 파이프라인의 단계실행 서비스에 의해 시작되거나 완료된 단계를 보여줍니다.

In [19]:
execution.list_steps()

[{'StepName': 'DeployModel',
  'StartTime': datetime.datetime(2021, 8, 3, 9, 3, 28, 400000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 8, 3, 9, 20, 24, 203000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:processing-job/pipelines-zee5tw1ruau9-deploymodel-vrtjfofjsv'}}},
 {'StepName': 'FraudScratchModel',
  'StartTime': datetime.datetime(2021, 8, 3, 9, 3, 27, 410000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 8, 3, 9, 3, 28, 230000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:ap-northeast-2:057716757052:model/pipelines-zee5tw1ruau9-fraudscratchmodel-vsv9snuiln'}}},
 {'StepName': 'FraudScratchTrain',
  'StartTime': datetime.datetime(2021, 8, 3, 9, 0, 30, 610000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2021, 8, 3, 9, 3, 26, 970000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TrainingJob': {'

### [옵션] SageMaker Studio에서 확인하기
- 아래의 그림 처럼 SageMaker Studio에 로긴후에 따라하시면, SageMaker Studio 에서도 실행 내역을 확인할 수 있습니다.
- [알림] 이번 실습은 SageMaker Studio 와 병행하여 진행하는 것을 권장 드립니다. 하지만, SageMaker Studio 에서 다커 컨테이너를 실행하는(로컬 모드) 부분은 현재 기능이 없기에 SageMaker Notebook Instance를 사용해서 실습했습니다. 다커 컨테이너의 실행 부분을 생략한다면 SageMaker Studio에서 SageMaker Building Pipeline 의 작업을 권장 드립니다. 이유는 가시적으로 워크플로를 확인하고, GUI 에서 제공하는 여러 링크들이 작업을 하는데에 효과적이기 때문입니다.
- SageMaker Studio 개발자 가이드 --> [SageMaker Studio](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/studio.html)

![all_pipeline.png](img/all_pipeline.png)



#### 아티펙트 경로 추출

In [20]:
def get_proc_artifact(execution, client, kind=0):
    '''
    kind: 0 --> train
    kind: 2 --> test
    '''
    response = execution.list_steps()

    proc_arn = response[-1]['Metadata']['ProcessingJob']['Arn'] # index -1은 가장 처음 실행 step
    #proc_arn = response[-1]['Metadata']
    # print("proc_arn: ", proc_arn)
    proc_job_name = proc_arn.split('/')[-1]
    print("proc_job_name: ", proc_job_name)
    
    response = client.describe_processing_job(ProcessingJobName = proc_job_name)
    test_preprocessed_file = response['ProcessingOutputConfig']['Outputs'][kind]['S3Output']['S3Uri'] # index 1: test 파일    
    print("test_preprocessed_file: \n ", test_preprocessed_file)
    
    return test_preprocessed_file

import boto3
client = boto3.client("sagemaker")

test_preproc_dir_artifact = get_proc_artifact(execution, client, kind=1 )

#print("test_preproc__dir_artifact: ", test_preproc_dir_artifact)



proc_job_name:  pipelines-zee5tw1ruau9-fraudscratchprocess-kbxtc07h6i
test_preprocessed_file: 
  s3://sagemaker-ap-northeast-2-057716757052/sklearn-fraud-process-2021-08-03-08-56-09-377/output/test


## 변수 저장

In [21]:
%store test_preproc_dir_artifact
%store all_pipeline_endpoint_name

Stored 'test_preproc_dir_artifact' (str)
Stored 'all_pipeline_endpoint_name' (str)
