# [모듈 3.1] 고급 모델 빌딩 파이프라인 개발 

이 노트북은 아래와 같은 목차로 진행 됩니다. 전체를 모두 실행시에 완료 시간은 **약 30분** 소요 됩니다.

- 1. SageMaker 모델 빌드 파이프라인을 이용한 모델 빌드 오케스트레이션
- 2. 파이프라인 개발자 가이드
- 3. 기본 라이브러리 로딩
- 4. 모델 빌딩 파이프라인 의 스텝(Step) 생성
- 5. 파리마터, 단계, 조건을 조합하여 최종 파이프라인 정의 및 실행
- 6. 세이지 메이커 스튜디오에서 실행 확인 하기
- 7. 계보(Lineage)
    
---

# 1. SageMaker 모델 빌드 파이프라인을 이용한 모델 빌드 오케스트레이션

Amazon SageMaker Model building pipeline은 머신러닝 워크플로우를 개발하는 데이터 과학자, 엔지니어들에게 SageMaker작업과 재생산가능한 머신러닝 파이프라인을 오케스트레이션하는 기능을 제공합니다. 또한 커스텀빌드된 모델을 실시간 추론환경이나 배치변환을 통한 추론 실행환경으로 배포하거나, 생성된 아티팩트의 계보(lineage)를 추적하는 기능을 제공합니다. 이 기능들을 통해 모델 아티팩트를 배포하고, 업무환경에서의 워크플로우를 배포/모니터링하고, 간단한 인터페이스를 통해 아티팩트의 계보 추적하고, 머신러닝 애플리케이션 개발의 베스트 프렉티스를 도입하여, 보다 안정적인 머신러닝 애플리케이션 운영환경을 구현할 수 있습니다. 

SageMaker pipeline 서비스는 JSON 선언으로 구현된 SageMaker Pipeline DSL(Domain Specific Language, 도메인종속언어)를 지원합니다. 이 DSL은 파이프라인 파라마터와 SageMaker 작업단계의 DAG(Directed Acyclic Graph)를 정의합니다. SageMaker Python SDK를 이용하면 이 파이프라인 DSL의 생성을 보다 간편하게 할 수 있습니다. 






# 2. 파이프라인 개발자 가이드
## SageMaker 파이프라인 소개





SageMaker 파이프라인은 다음 기능을 지원하며 본 lab_03_pipelinie 에서 일부를 다루게 됩니다. 

* Processing job steps - 데이터처러 워크로드를 실행하기 위한 SageMaker의 관리형 기능. Feature engineering, 데이터 검증, 모델 평가, 모델 해석 등에 주로 사용됨 
* Training job steps - 학습작업. 모델에게 학습데이터셋을 이용하여 모델에게 예측을 하도록 학습시키는 작업 
* Conditional execution steps - 조건별 실행분기. 파이프라인을 분기시키는 역할.
* Register model steps - 학습이 완료된 모델패키지 리소스를 이후 배포를 위한 모델 레지스트리에 등록하기 
* Create model steps - 추론 엔드포인트 또는 배치 추론을 위한 모델의 생성 
* Transform job steps - 배치추론 작업. 배치작업을 이용하여 노이즈, bias의 제거 등 데이터셋을 전처리하고 대량데이터에 대해 추론을 실행하는 단계
* Pipelines - Workflow DAG. SageMaker 작업과 리소스 생성을 조율하는 단계와 조건을 가짐
* Parametrized Pipeline executions - 특정 파라미터에 따라 파이프라인 실행방식을 변화시키기 


- 상세한 개발자 가이드는 아래 참조 하세요.
    - [세이지 메이커 모델 빌딩 파이프라인의 개발자 가이드](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines.html)




# 3. 기본 라이브러리 로딩

세이지 메이커 관련 라이브러리를 로딩 합니다.

In [1]:
import boto3
import sagemaker
import pandas as pd
import os

sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
sm_client = boto3.client("sagemaker")


## 3.1 노트북 변수 로딩


저장된 변수를 확인 합니다.

In [2]:
%store -r s3_input_data_uri
%store -r bucket
%store -r project_prefix



기존 노트북에서 저장한 변수를 로딩 합니다.

# 4. 모델 빌딩 파이프라인 의 스텝(Step) 생성


## 4.1 모델 빌딩 파이프라인 변수 생성


본 노트북에서 사용하는 파라미터는 다음과 같습니다.

* `model_approval_status` - 학습된 모델을 CI/CD를 목적으로 등록할 때의 승인 상태 (디폴트는 "PendingManualApproval")
* `input_data` - 입력데이터에 대한 S3 버킷 URI



파이프라인의 각 스텝에서 사용할 변수를 파라미터 변수로서 정의 합니다.


In [3]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

s3_data_loc = ParameterString(
    name="InputData",
    default_value=s3_input_data_uri,
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)


## 4.2 모델 학습을 위한 학습단계 정의 



###  하이퍼파라미터 세팅

In [4]:
host_hyperparameters = {'epochs': 1, 
                       'lr': 0.001,
                       'batch_size': 256,
                       'top_k' : 10,
                       'dropout' : 0.0,
                       'factor_num' : 32,
                       'num_layers' : 3,
                       'num_ng' : 4,
                       'test_num_ng' : 99,                   
                    }  

### 훈련 메트릭을 CloudWatch 에서 보기
- 개발자 가이드
    - [Monitor and Analyze Training Jobs Using Amazon CloudWatch ](https://docs.amazonaws.cn/en_us/sagemaker/latest/dg/training-metrics.html#define-train-metrics)

In [5]:
metric_definitions=[
       {'Name': 'HR', 'Regex': 'HR=(.*?);'},
       {'Name': 'NDCG', 'Regex': 'NDCG=(.*?);'},
       {'Name': 'Loss', 'Regex': 'Loss=(.*?);'}        
    ]


###  Estimator 생성

Estimator 생성시에 인자가 필요 합니다. 주요한 인자만 보겠습니다.


In [6]:
from sagemaker.pytorch import PyTorch

estimator_output_path = f's3://{bucket}/{project_prefix}/training_jobs'
print("estimator_output_path: \n", estimator_output_path)


instance_type = 'ml.p3.2xlarge'
instance_count = 1

host_estimator = PyTorch(
    entry_point="train.py",    
    source_dir='src',    
    role=role,
    output_path = estimator_output_path,    
    framework_version='1.8.1',
    py_version='py3',
    disable_profiler = True,
    instance_count=instance_count,
    instance_type=instance_type,
    session = sagemaker.Session(), # 세이지 메이커 세션
    hyperparameters=host_hyperparameters,
    metric_definitions = metric_definitions
    
)
# host_estimator.fit(s3_inputs, 
#                    experiment_config = experiment_config, # 실험 설정 제공                   
#                    wait=False)

estimator_output_path: 
 s3://sagemaker-us-east-1-057716757052/NCFModel/training_jobs


### 모델 훈련 스탭 생성


In [8]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name= "NCF-Training",
    estimator=host_estimator,
    inputs={
        "train": TrainingInput(
            s3_data= s3_data_loc
        ),
        "test": TrainingInput(
            s3_data= s3_data_loc
        ),        
    }
)

## 4.3 모델 등록 스텝

### 모델 그룹 생성

- 참고
    - 모델 그룹 릭스팅 API:  [ListModelPackageGroups](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListModelPackageGroups.html)
    - 모델 지표 등록: [Model Quality Metrics](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/model-monitor-model-quality-metrics.html)

In [9]:
model_package_group_name = f"{project_prefix}"
model_package_group_input_dict = {
 "ModelPackageGroupName" : model_package_group_name,
 "ModelPackageGroupDescription" : "Sample model package group"
}
response = sm_client.list_model_package_groups(NameContains=model_package_group_name)
if len(response['ModelPackageGroupSummaryList']) == 0:
    print("No model group exists")
    print("Create model group")    
    
    create_model_pacakge_group_response = sm_client.create_model_package_group(**model_package_group_input_dict)
    print('ModelPackageGroup Arn : {}'.format(create_model_pacakge_group_response['ModelPackageGroupArn']))    
else:
    print(f"{model_package_group_name} exitss")

NCFModel exitss


### 모델 등록 스텝 정의

In [10]:
from sagemaker.workflow.step_collections import RegisterModel



step_register = RegisterModel(
    name= "NCF-Model-Registry",
    estimator=host_estimator,
    image_uri= step_train.properties.AlgorithmSpecification.TrainingImage,
    model_data= step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.p2.xlarge", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    # model_metrics=model_metrics, # 현재 구현 안됨
)

# 5. 파리마터, 단계, 조건을 조합하여 최종 파이프라인 정의 및 실행


이제 지금까지 생성한 단계들을 하나의 파이프라인으로 조합하고 실행하도록 하겠습니다.

파이프라인은 name, parameters, steps 속성이 필수적으로 필요합니다. 
여기서 파이프라인의 이름은 (account, region) 조합에 대하여 유일(unique))해야 합니다.
우리는 또한 여기서 Experiment 설정을 추가 하여, 실험에 등록 합니다.

주의:

- 정의에 사용한 모든 파라미터가 존재해야 합니다.
- 파이프라인으로 전달된 단계(step)들은 실행순서와는 무관합니다. SageMaker Pipeline은 단계가 실행되고 완료될 수 있도록 의존관계를를 해석합니다.

## 5.1 파이프라인 정의


위에서 정의한 아래의 4개의 스텝으로 파이프라인 정의를 합니다.
-     steps=[step_process, step_train, step_create_model, step_deploy],
- 아래는 약 20분 정도 소요 됩니다.

In [11]:
from sagemaker.workflow.pipeline import Pipeline


pipeline_name = project_prefix
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        # processing_instance_type, 
        # processing_instance_count,
        # training_instance_type,        
        # training_instance_count,                
        s3_data_loc,
        # model_eval_threshold,
        model_approval_status,        
    ],
#   steps=[step_process, step_train, step_register, step_eval, step_cond],
  steps=[step_train, step_register],
)



## 5.2 파이프라인 정의 확인
위에서 정의한 파이프라인 정의는 Json 형식으로 정의 되어 있습니다.

In [12]:
import json

definition = json.loads(pipeline.definition())
definition

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'InputData',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-057716757052/NCFModel/data'},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'NCF-Training',
   'Type': 'Training',
   'Arguments': {'AlgorithmSpecification': {'TrainingInputMode': 'File',
     'TrainingImage': '763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-training:1.8.1-gpu-py3',
     'MetricDefinitions': [{'Name': 'HR', 'Regex': 'HR=(.*?);'},
      {'Name': 'NDCG', 'Regex': 'NDCG=(.*?);'},
      {'Name': 'Loss', 'Regex': 'Loss=(.*?);'}],
     'EnableSageMakerMetricsTimeSeries': True},
    'OutputDataConfig': {'S3OutputPath': 's3://sagemaker-us-east-1-057716757052/NCFModel/training_jobs'},
    'StoppingCondition': {'MaxRu

## 5.3 파이프라인 정의를 제출하고 실행하기 

파이프라인 정의를 파이프라인 서비스에 제출합니다. 함께 전달되는 역할(role)을 이용하여 AWS에서 파이프라인을 생성하고 작업의 각 단계를 실행할 것입니다.   

In [13]:
pipeline.upsert(role_arn=role)
execution = pipeline.start()

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


In [14]:
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:057716757052:pipeline/ncfmodel',
 'PipelineExecutionArn': 'arn:aws:sagemaker:us-east-1:057716757052:pipeline/ncfmodel/execution/d3l0o5s1fuba',
 'PipelineExecutionDisplayName': 'execution-1657276623880',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2022, 7, 8, 10, 37, 3, 750000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2022, 7, 8, 10, 37, 3, 750000, tzinfo=tzlocal()),
 'CreatedBy': {},
 'LastModifiedBy': {},
 'ResponseMetadata': {'RequestId': '80c3a46a-f2cc-4ebe-a04c-1c6d9c810adb',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '80c3a46a-f2cc-4ebe-a04c-1c6d9c810adb',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '379',
   'date': 'Fri, 08 Jul 2022 10:37:04 GMT'},
  'RetryAttempts': 0}}

## 5.4 파이프라인 실행 기다리기

In [15]:
execution.wait()

실행이 완료될 때까지 기다립니다.

실행된 단계들을 리스트업합니다. 파이프라인의 단계실행 서비스에 의해 시작되거나 완료된 단계를 보여줍니다.

## 5.5 파이프라인 실행 단계 기록 보기

In [16]:
execution.list_steps()

[{'StepName': 'NCF-Model-Registry',
  'StartTime': datetime.datetime(2022, 7, 8, 10, 48, 45, 33000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 7, 8, 10, 48, 45, 804000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:model-package/ncfmodel/13'}}},
 {'StepName': 'NCF-Training',
  'StartTime': datetime.datetime(2022, 7, 8, 10, 37, 4, 703000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2022, 7, 8, 10, 48, 44, 52000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'AttemptCount': 0,
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:training-job/pipelines-d3l0o5s1fuba-ncf-training-oq4ki81nm4'}}}]

# 6. 세이지 메이커 스튜디오에서 실행 확인 하기


# 7. 계보(Lineage)

파이프라인에 의해 생성된 아티팩트의 계보를 살펴봅니다.

In [17]:
import time
from sagemaker.lineage.visualizer import LineageTableVisualizer


viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
    print(execution_step)
    display(viz.show(pipeline_execution_step=execution_step))
    time.sleep(1)

{'StepName': 'NCF-Training', 'StartTime': datetime.datetime(2022, 7, 8, 10, 37, 4, 703000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 7, 8, 10, 48, 44, 52000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:training-job/pipelines-d3l0o5s1fuba-ncf-training-oq4ki81nm4'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...ker-us-east-1-057716757052/NCFModel/data,Input,DataSet,ContributedTo,artifact
1,76310...onaws.com/pytorch-training:1.8.1-gpu-py3,Input,Image,ContributedTo,artifact
2,s3://...-Training-OQ4ki81nM4/output/model.tar.gz,Output,Model,Produced,artifact


{'StepName': 'NCF-Model-Registry', 'StartTime': datetime.datetime(2022, 7, 8, 10, 48, 45, 33000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2022, 7, 8, 10, 48, 45, 804000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'AttemptCount': 0, 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-1:057716757052:model-package/ncfmodel/13'}}}


Unnamed: 0,Name/Source,Direction,Type,Association Type,Lineage Type
0,s3://...-Training-OQ4ki81nM4/output/model.tar.gz,Input,Model,ContributedTo,artifact
1,76310...onaws.com/pytorch-training:1.8.1-gpu-py3,Input,Image,ContributedTo,artifact
2,ncfmodel-13-PendingManualApproval-1657277325-a...,Input,Approval,ContributedTo,action
3,NCFModel-1656752899-aws-model-package-group,Output,ModelGroup,AssociatedWith,context
