# 전체 모델 빌딩 파이프라인 개발 (SageMaker Model Building Pipeline 모든 스텝)

이 노트북은 아래와 같은 목차로 진행 됩니다. 전체를 모두 실행시에 완료 시간은 **약 20분** 소요 됩니다.

- 1. SageMaker 모델 빌드 파이프라인을 이용한 모델 빌드 오케스트레이션
- 2. 파이프라인 개발자 가이드
- 3. 기본 라이브러리 로딩
- 4. 모델 빌딩 파이프라인 의 스텝(Step) 생성
    - 4.1. 모델 빌딩 파이프라인 변수 생성
    - 4.2. 전처리 스텝 단계 정의
    - 4.3. 모델 학습을 위한 학습단계 정의
    - 4.4. 세이지 메이커 모델 생성 스탭 생성
    - 4.5. 실시간 엔드 포인트 배포 스텝 생성
- 5. 파리마터, 단계, 조건을 조합하여 최종 파이프라인 정의 및 실행
- 6. 세이지 메이커 스튜디오에서 실행 확인 하기
- 7. 아티펙트 경로 추출
    
---
### 노트북 커널
- 이 워크샵은 노트북 커널이 `conda_python3` 를 사용합니다. 다른 커널일 경우 변경 해주세요.
---



# 1. SageMaker 모델 빌드 파이프라인을 이용한 모델 빌드 오케스트레이션

Amazon SageMaker 모델 구축 파이프라인은 직접 SageMaker 통합을 활용하는 머신 러닝 파이프라인을 구축하기 위한 도구입니다. 

- 상세 사항은 개발자 가이드 참조 하세요. --> [Amazon SageMaker 모델 구축 파이프라인](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines.html)

# 2. 파이프라인 개발자 가이드
- 상세 사항은 개발자 가이드 참조 하세요. --> [Amazon SageMaker 모델 구축 파이프라인](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines.html)


SageMaker 파이프라인은 다음 기능을 지원하며 본 노트북에서 하나씩 다루게 됩니다. 

* Processing job steps - 데이터처러 워크로드를 실행하기 위한 SageMaker의 관리형 기능. Feature engineering, 데이터 검증, 모델 평가, 모델 해석 등에 주로 사용됨 
* Training job steps - 학습작업. 모델에게 학습데이터셋을 이용하여 모델에게 예측을 하도록 학습시키는 반복적인 작업 
* Create model steps - 추론 엔드포인트 또는 배치 추론을 위한 모델의 생성 
* Pipelines - Workflow DAG. SageMaker 작업과 리소스 생성을 조율하는 단계와 조건을 가진다. 




In [None]:
%load_ext autoreload
%autoreload 2

# 3. 기본 라이브러리 로딩 

- 세이지 메이커 관련 라이브러리를 로딩 합니다.

In [None]:
import os
import boto3
import sagemaker
from time import strftime
from utils.ssm import parameter_store

In [None]:
region = boto3.Session().region_name
pm = parameter_store(region)

prefix = pm.get_params(key="PREFIX")
bucket_name = pm.get_params(key="-".join([prefix, "BUCKET-NAME"]))
data_path = pm.get_params(key="-".join([prefix, "DATA-PATH-S3"]))

role = pm.get_params(key="-".join([prefix, "SAGEMAKER-ROLE-ARN"]))
role_arn = sagemaker.get_execution_role()
print(f"PREFIX : {prefix}")
print(f"BUCKET_NAME : {bucket_name}")
print(f"DATA_PATH : {data_path}")
print(f"ROLE : {role}")

In [None]:
local_mode = False

In [None]:
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.pipeline_context import LocalPipelineSession
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, expire_after="P1d")

if local_mode:
    # Create a `LocalPipelineSession` object so that each pipeline step will run locally
    # To run this pipeline in the cloud, you must change `LocalPipelineSession()` to `PipelineSession()`
    pipeline_session = LocalPipelineSession()
    git_config = None
else:
    pipeline_session = PipelineSession()
    
    git_config = {
        'repo': f'https://{pm.get_params(key="-".join([prefix, "CODE_REPO"]))}',
        'branch': 'main',
        'username': pm.get_params(key="-".join([prefix, "CODECOMMIT-USERNAME"]), enc=True),
        'password': pm.get_params(key="-".join([prefix, "CODECOMMIT-PWD"]), enc=True)
    }

## 3.1 노트북 변수 로딩


기존 노트북에서 저장한 변수를 로딩 합니다.

# 4. 모델 빌딩 파이프라인 의 스텝(Step) 생성


## 4.1. 모델 빌딩 파이프라인 변수 생성

파이프라인에 인자로 넘길 변수는 아래 크게 3가지 종류가 있습니다.
- 프로세싱 스텝을 위한 인스턴스 타입 및 인스턴스 수
    - 데이터 전처리 스텝 및 실시간 앤드 포인트 스텝에 사용 됨.
- 훈련 스텝을 위한 인스턴스 타입 및 인스턴스 수     
- 엔트 포인트의 인스턴스 타입
- 원본 데이터 세트에 대한 S3 주소
    - 데이터 전처리 스텝에서 사용 됩니다.


In [None]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
processing_instance_type = ParameterString(
    name="ProcessingInstanceType",
    default_value="ml.m5.xlarge"
)

training_instance_type = ParameterString(
    name="TrainingInstanceType",
    default_value="ml.p3.2xlarge"
)

training_instance_count = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=1
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)


model_name = 'distilbert-base-uncased'

## 4.2. 전처리 스텝 단계 정의

크게 아래와 같은 순서로 정의 합니다.
- 프로세싱 오브젝트 정의 (SKLearnProcessor)
- 프로세싱 스텝 정의
    - 일력 데이터 세트
        - source: S3 경로 (input_data_uri)
        - destination: 도커 컨테이너의 내부 폴더 위치
    - 출력 위치
        - 훈련 전처리 데이터 결과 위치
        - 테스트 전처리 데이터 결과 위치
    - 프로세싱 코드
    - 프로세싱 코드에 넘길 인자 


In [None]:
from sagemaker.pytorch.estimator import PyTorch
from sagemaker.processing import FrameworkProcessor

create_date = strftime("%m%d-%H%M%s")
job_name=f'preprocessing-{model_name}-{create_date}'

dataset_processor = FrameworkProcessor(
    estimator_cls=PyTorch,
    framework_version="2.0.0",
    py_version='py310',
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    role=role,
    base_job_name=job_name, # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
    sagemaker_session=pipeline_session,
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

proc_prefix = "/opt/ml/processing"
data_path = pm.get_params(key="-".join([prefix, "DATA-PATH-S3"]))

step_args = dataset_processor.run(
    #job_name="preprocessing", ## 이걸 넣어야 캐시가 작동함, 안그러면 프로세서의 base_job_name 이름뒤에 날짜 시간이 붙어서 캐시 동작 안함
    code='preprocessing.py', #소스 디렉토리 안에서 파일 path
    source_dir= "./code", #현재 파일에서 소스 디렉토리 상대경로 # add processing.py and requirements.txt here
    git_config=git_config,
    inputs=[
        ProcessingInput(
            input_name="input-data",
            source=data_path,
            destination=os.path.join(proc_prefix, "input")
        ),
    ],
    outputs=[       
        ProcessingOutput(
            output_name="output-data",
            source=os.path.join(proc_prefix, "output"),
        ),
    ],
    arguments=[
        "--proc_prefix", proc_prefix,
        "--split_rate", "0.8"
    ]
)
step_process = ProcessingStep(
    name="PreprocessBertData",
    step_args=step_args,
    cache_config=cache_config
)

## 4.3. 모델 학습을 위한 학습단계 정의 

학습 스텝을 정의하기 위해서는 크게 아래와 같은 과정이 있습니다.
- XGBoost Estimator 정의
- 학습 스텝 정의
    - 아래와 같은 중요한 인자가 필요 합니다.
        - Estimator (위에서 정의한 것 사용)
        - 훈련을 위한 입력 데이터 위치


### 기본 훈련 변수 및 하이퍼파라미터 설정

In [None]:
from sagemaker.workflow.functions import Join
from sagemaker.inputs import TrainingInput
from sagemaker.huggingface import HuggingFace
from sagemaker.workflow.steps import TrainingStep

In [None]:
output_path = os.path.join(
    "s3://{}".format(bucket_name),
    prefix,
    "training",
    "model-output"
)

code_location = os.path.join(
    "s3://{}".format(bucket_name),
    prefix,
    "training",
    "backup_codes"
)

do_spot_training = False
max_wait = None
max_run = 1*60*60  

tokenizer_name = 'distilbert-base-uncased'
label_cnt=2

In [None]:
hyperparameters = {
    'epochs': 1,
    'train_batch_size': 64,
    'model_name': model_name,
    'tokenizer_name': tokenizer_name,
    'output_dir':'/opt/ml/checkpoints',
    'label_size':label_cnt
}

In [None]:
est = HuggingFace(
    entry_point='train.py',
    source_dir='./code',
    git_config=git_config,
    instance_type="ml.p3.2xlarge",
    instance_count=training_instance_count,
    role=role,
    volume_size=256,
    code_location = code_location,
    output_path=output_path,
    transformers_version='4.28.1', 
    pytorch_version='2.0.0',
    py_version='py310',
    hyperparameters = hyperparameters,
    max_run=max_run, # expected max run in seconds
    sagemaker_session=pipeline_session, 
)

훈련의 입력이 이전 전처리의 결과가 제공됩니다.
- `step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri`

In [None]:
from time import strftime

create_date = strftime("%m%d-%H%M%s")
job_name=f'training-{model_name}-{create_date}'

step_args = est.fit(
    inputs={
        "train": TrainingInput(
            s3_data=Join(
                on='/',
                values=[step_process.properties.ProcessingOutputConfig.Outputs["output-data"].S3Output.S3Uri, "train"]
            ),
            content_type="text/csv",
        ),
        "test": TrainingInput(
            s3_data=Join(
                on='/',
                values=[step_process.properties.ProcessingOutputConfig.Outputs["output-data"].S3Output.S3Uri, "test"]
            ),
            content_type="text/csv",
        ),
    },
    wait=False,
    job_name=job_name
)
step_train = TrainingStep(
    name="TrainBertModel",
    step_args=step_args,
    cache_config=cache_config,
)

## 4.4. 세이지 메이커 모델 검증 스텝 생성

In [None]:
from sagemaker.pytorch.estimator import PyTorch
from sagemaker.processing import FrameworkProcessor

create_date = strftime("%m%d-%H%M%s")
job_name=f'evaluation-{model_name}-{create_date}'

script_eval = FrameworkProcessor(
    estimator_cls=PyTorch,
    framework_version="2.0.0",
    py_version='py310',
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    role=role,
    base_job_name=job_name, # bucket에 보이는 이름 (pipeline으로 묶으면 pipeline에서 정의한 이름으로 bucket에 보임)
    sagemaker_session=pipeline_session,
)

In [None]:
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.workflow.properties import PropertyFile

proc_prefix = "/opt/ml/processing"
data_path = os.path.join(pm.get_params(key="-".join([prefix, "PREP-DATA-PATH"])), "test")

step_args = script_eval.run(
    #job_name="preprocessing", ## 이걸 넣어야 캐시가 작동함, 안그러면 프로세서의 base_job_name 이름뒤에 날짜 시간이 붙어서 캐시 동작 안함
    code='evaluation.py', #소스 디렉토리 안에서 파일 path
    source_dir= "./code", #현재 파일에서 소스 디렉토리 상대경로 # add processing.py and requirements.txt here
    git_config=git_config,
    inputs=[
        ProcessingInput(
            input_name="input-data",
            source=data_path,
            destination=os.path.join(proc_prefix, "input")
        ),
        ProcessingInput(
            input_name="model-data",
            source=step_train.properties.ModelArtifacts.S3ModelArtifacts,
            destination=os.path.join(proc_prefix, "model")
        ),
    ],
    outputs=[       
        ProcessingOutput(
            output_name="output-data",
            source=os.path.join(proc_prefix, "output")
        ),
    ],
    arguments=[
        "--proc_prefix", proc_prefix,
    ]
)

evaluation_report = PropertyFile(
    name="BertEvaluationReport",
    output_name="output-data",
    path="evaluation.json",
)

step_eval = ProcessingStep(
    name="EvaluateBertModel",
    step_args=step_args,
    property_files=[evaluation_report],
    cache_config=cache_config,
)

## 4.5. 세이지 메이커 모델 생성 스탭 생성

In [None]:
from sagemaker.model_metrics import (
    MetricsSource,
    ModelMetrics,
)

# register model step that will be conditionally executed
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json"
    )
)

In [None]:
from sagemaker.pytorch.model import PyTorchModel
from sagemaker.model import Model
from sagemaker.image_uris import retrieve

pytorch_model = PyTorchModel(
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    source_dir="./code",
    entry_point="inference.py",
    git_config=git_config,
    role=role,
    framework_version='2.0.0',
    py_version='py310',
    sagemaker_session=pipeline_session
)

In [None]:
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep

# from sagemaker.inputs import CreateModelInput
# from sagemaker.workflow.steps import CreateModelStep

create_date = strftime("%m%d-%H%M%s")

model_package_group_name="BertPackageGroup"

step_args = pytorch_model.register(
    content_types=["text/csv"],
    response_types=["application/json"],
    inference_instances=["ml.g4dn.xlarge", "ml.m5.xlarge", "ml.c5.large"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

step_register = ModelStep(
    name="RegisterBertModel",
    step_args=step_args,
)

# condition step for evaluating model quality and branching execution
cond_lte = ConditionGreaterThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="classification_metrics.f1.value"  ### evluation.json 확인 필요 
    ),
    right=0.8,  ### evluation.json 확인 필요 
)

step_cond = ConditionStep(
    name="CheckBertEvaluation",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[],
)

# 5. 파리마터, 단계, 조건을 조합하여 최종 파이프라인 정의 및 실행


이제 지금까지 생성한 단계들을 하나의 파이프라인으로 조합하고 실행하도록 하겠습니다.

파이프라인은 name, parameters, steps 속성이 필수적으로 필요합니다. 
여기서 파이프라인의 이름은 (account, region) 조합에 대하여 유일(unique))해야 합니다.
우리는 또한 여기서 Experiment 설정을 추가 하여, 실험에 등록 합니다.

주의:

- 정의에 사용한 모든 파라미터가 존재해야 합니다.
- 파이프라인으로 전달된 단계(step)들은 실행순서와는 무관합니다. SageMaker Pipeline은 단계가 실행되고 완료될 수 있도록 의존관계를를 해석합니다.

## 5.1 파이프라인 정의
- 위에서 정의한 파라미터를 제공
- 실행할 스텝 기술
    - steps=[step_process, step_train, step_eval, step_cond],
- 아래는 약 20분 정도 소요 됩니다.

In [None]:
from sagemaker.workflow.pipeline import Pipeline

# pipeline instance
pipeline = Pipeline(
    name="BertModelBuildPipeline",
    parameters=[
        processing_instance_type,
        processing_instance_count,
        training_instance_type,
        training_instance_count,
        model_approval_status
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=pipeline_session,
)

## 5.2 파이프라인 정의 확인
위에서 정의한 파이프라인 정의는 Json 형식으로 정의 되어 있습니다.

In [None]:
import json

definition = json.loads(pipeline.definition())
definition

## 5.3 파이프라인 정의를 제출하고 실행하기 

파이프라인 정의를 파이프라인 서비스에 제출합니다. 함께 전달되는 역할(role)을 이용하여 AWS에서 파이프라인을 생성하고 작업의 각 단계를 실행할 것입니다.   

In [None]:
## start_pipeline_execution operation: Step type RegisterModel is not supported in local mode.

if local_mode:
    processing_instance_type = 'local'
    processing_instance_count = '1'
    training_instance_type = 'local_gpu'
    training_instance_count = '1'
else:
    processing_instance_type = 'ml.m5.xlarge'
    processing_instance_count = '1'
    training_instance_type = 'ml.p3.2xlarge'
    training_instance_count = '1'

In [None]:
pipeline.upsert(role_arn=role_arn)
execution = pipeline.start(
    parameters=dict(
            ProcessingInstanceType=processing_instance_type,
            ProcessingInstanceCount=processing_instance_count,
            TrainingInstanceType=training_instance_type,
            TrainingInstanceCount=training_instance_count)

)

In [None]:
execution.describe()

## 5.4 파이프라인 실행 기다리기

In [None]:
execution.wait()

실행이 완료될 때까지 기다립니다.

## 5.5 파이프라인 실행 단계 기록 보기

실행된 단계들을 리스트업합니다. 파이프라인의 단계실행 서비스에 의해 시작되거나 완료된 단계를 보여줍니다.

In [None]:
execution.list_steps()

In [None]:
pm.put_params(key="-".join([prefix, "MODEL-NAME"]), value=model_name, overwrite=True)
pm.put_params(key="-".join([prefix, "MODEL_PACKAGE_GROUP_NAME"]), value=model_package_group_name, overwrite=True)