# Training Pipeline

## 1. Model code artifacts
폴더 구조 참조 : [Getting started with deploying real-time models on Amazon SageMaker](https://aws.amazon.com/blogs/machine-learning/getting-started-with-deploying-real-time-models-on-amazon-sagemaker/)

```
# Pytorch
model.tar.gz/
             |- model.pth
             |- code/
                     |- inference.py
                     |- requirements.txt # only for versions 1.3.1 and higher
```

In [87]:
# project namespace
project_prefix = "proto"

# model prefix - depends on model version
model_group = "ncf-sample-1"

# pipeline and model props
training_pipeline_name = f'{project_prefix}-{model_group}-training'
model_package_group_name = f'{project_prefix}-{model_group}'

# code repository
model_code_dir = "src_v1"

# parameter store keys
key_repackage_lambda_arn = f'/{project_prefix}/MlOps/Lambda/Function/ModelRepackage'
key_metric_lambda_arn = f'/{project_prefix}/MlOps/Lambda/Function/ModelMetric'
key_s3_mlops_bucket_name = f'/{project_prefix}/MlOps/S3/Bucket/Name/s3MlOpsBucket'

# training pipeline manifest file
pipeline_manifest_file = "pipeline_config.json"

## 2. Training setup

In [88]:
%load_ext autoreload
%autoreload 2

import sys
import boto3
import sagemaker
import json

# src 폴더 경로 설정
sys.path.append(f'./{model_code_dir}')
ssm = boto3.client('ssm')

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [174]:
# S3 Bucket
paramRes = ssm.get_parameter(Name=key_s3_mlops_bucket_name, WithDecryption=False)
bucket = paramRes["Parameter"]["Value"]

# S3 data path
data_prefix = f"data/{model_group}"
code_prefix=f'code/{model_group}'
model_prefix=f'model/{model_group}'

s3_input_data_uri = f"s3://{bucket}/{data_prefix}"
s3_model_output_uri =  f"s3://{bucket}/{model_prefix}"

In [175]:
# sagemaker
region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()

print("bucket: ", bucket)
print("role: ", role)

sm_client = boto3.client('sagemaker', region_name=region)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
bucket:  cjproto-975050344093-ap-northeast-2-mlops-bucket
role:  arn:aws:iam::975050344093:role/cjprotoDev-cjprotoExperim-SageMakerStudioSageMakerE-ovIXL1hq7LV4


In [176]:
print("s3_input_data_uri: \n", s3_input_data_uri)
print("bucket: \n", bucket)
print("project_prefix: \n", project_prefix)
print("model_package_group_name: \n", model_prefix)

s3_input_data_uri: 
 s3://cjproto-975050344093-ap-northeast-2-mlops-bucket/data/ncf-sample
bucket: 
 cjproto-975050344093-ap-northeast-2-mlops-bucket
project_prefix: 
 cjproto
model_package_group_name: 
 model/ncf-sample


## 3. Package Code

### 3.1 compress code

In [177]:
import os

package_dir = 'code_pkg'
os.makedirs(package_dir, exist_ok=True)

code_dir = f'../{model_code_dir}'

In [178]:
%%sh -s {package_dir} {code_dir}

package_dir=$1
code_dir=$2

cd $package_dir # 폴더 생성
echo $PWD
rm -rf ./*
cp -r $code_dir/*.py  .  # src py 모두 카피
cp -r $code_dir/*.txt  .  # src 파일 모두 카피
cp -r $code_dir/*.json  .  # json 파일 모두 카피
tar -czvf source.tar.gz * # model.tar.gz 파일 생성

/home/sagemaker-user/cjproto-mlops-repo/mlpipelines/sample_recomm_model/code_pkg
common_utils.py
config.py
data_utils.py
evaluate.py
inference.py
model.py
model_config.json
requirements.txt
train.py
train_lib.py


## 3.2 upload code

In [179]:
source_path = os.path.join(package_dir, 'source.tar.gz')
source_artifact = sagemaker_session.upload_data(source_path, bucket, code_prefix)
print("source_artifact: \n", source_artifact)

source_artifact: 
 s3://cjproto-975050344093-ap-northeast-2-mlops-bucket/code/ncf-sample/source.tar.gz


## 4. Create SageMaker Pipeline

### 4.1 Pipeline setup

In [95]:
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
)

# 입력 데이터
s3_data_loc = ParameterString(
    name="InputData",
    default_value=s3_input_data_uri,
)

# 모델 승인 상태
model_approval_status = ParameterString(
    name="ModelApprovalStatus", default_value="PendingManualApproval"
)

# 학습 코드 경로
s3_model_code = ParameterString(
    name="TrainCode",
    default_value=source_artifact,
)

# Pipeline Session
# LOCAL_MODE = True # 로컬 모드시 사용
LOCAL_MODE = False # 클라우드 모드시 사용
if LOCAL_MODE:
    from sagemaker.workflow.pipeline_context import LocalPipelineSession
    pipeline_session = LocalPipelineSession()
    print("### --> Local Mode")
else:
    from sagemaker.workflow.pipeline_context import PipelineSession
    pipeline_session = PipelineSession()
    print("### --> Cloud Mode")  


sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
### --> Cloud Mode


### 4.2 Cache definition
* 캐싱 파이프라인 단계: [Caching Pipeline Steps](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/pipelines-caching.html)

In [96]:
from sagemaker.workflow.steps import CacheConfig

cache_config = CacheConfig(enable_caching=True, 
                           expire_after="1d")

### 4.3 Hyperparameters

In [158]:
host_hyperparameters = {'epochs': 1, 
                       'lr': 0.001,
                       'batch_size': 256,
                       'top_k' : 10,
                       'dropout' : 0.0,
                       'factor_num' : 32,
                       'num_layers' : 3,
                       'num_ng' : 4,
                       'test_num_ng' : 99,                   
                    }  

### 4.4 Metric definitions
* 메트릭 지정 참고 : [Monitor and Analyze Training Jobs Using Amazon CloudWatch ](https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/training-metrics.html)

In [159]:
metric_definitions=[
       {'Name': 'HR', 'Regex': 'HR=(.*?);'},
       {'Name': 'NDCG', 'Regex': 'NDCG=(.*?);'},
       {'Name': 'Loss', 'Regex': 'Loss=(.*?);'}        
    ]

### 4.5 Create Estimator

In [160]:
from sagemaker.pytorch import PyTorch

estimator_output_path = s3_model_output_uri
print("estimator_output_path: \n", estimator_output_path)

base_job_name = f'{project_prefix}-{model_prefix}'
instance_type = 'ml.p3.2xlarge'
instance_count = 1

# 로컬의 train.py 파일로 학습 : model_code_dir
# S3 경로에 압축파일로 올라간 압축파일로 학습 : s3_model_code
host_estimator = PyTorch(
    base_job_name = base_job_name,
    entry_point="train.py",    
    source_dir=s3_model_code,    
    role=role,
    output_path = estimator_output_path,    
    framework_version='1.12.1',
    py_version='py38',
    disable_profiler = True,
    instance_count=instance_count,
    instance_type=instance_type,
    session = pipeline_session,
    hyperparameters=host_hyperparameters,
    metric_definitions = metric_definitions
)

estimator_output_path: 
 s3://cjproto-975050344093-ap-northeast-2-mlops-bucket/model/ncf-sample
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


The source_dir is a pipeline variable: <class 'sagemaker.workflow.parameters.ParameterString'>. During pipeline execution, the interpreted value of source_dir has to be an S3 URI and must point to a tar.gz file


### 4.6 Create TrainingStep

In [161]:
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep


step_train = TrainingStep(
    name= "NCF-Training",
    estimator=host_estimator,
    inputs={
        "train": TrainingInput(
            s3_data= s3_data_loc
        ),
        "test": TrainingInput(
            s3_data= s3_data_loc
        ),        
    },
    cache_config = cache_config, # 캐시 정의     
)

### 4.7 Lambda Step for packaging outputs
* 학습 끝난 녀석의 압축을 풀면 pth 파일 하나만 있다.
* inference.py 파일과 같이 묶어줘야 한다.

In [162]:
# Lamnda function ARN 조회
import boto3
ssm = boto3.client('ssm')
paramRes = ssm.get_parameter(Name=key_repackage_lambda_arn, WithDecryption=False)
repackage_lambda_arn = paramRes["Parameter"]["Value"]

print('Repackage Lambda : ', repackage_lambda_arn)

Repackage Lambda :  arn:aws:lambda:ap-northeast-2:975050344093:function:cjproto-training-pipeline-model-repackage


In [163]:
# Lambda Function 인스턴스
from sagemaker.lambda_helper import Lambda

func_repackage_model = Lambda(function_arn=repackage_lambda_arn)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [164]:
# LambdaStep 생성
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from datetime import datetime

currentDateAndTime = datetime.now()
currentTime = currentDateAndTime.strftime("%Y-%m-%d-%H-%M-%S")
bucket_prefix = f'repackage/{model_prefix}/{currentTime}'

output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name="S3_Model_URI", output_type=LambdaOutputTypeEnum.String)

step_repackage_lambda = LambdaStep(
    name="LambdaRepackageStep",
    lambda_func=func_repackage_model,
    inputs={
        "source_path" : source_artifact,
        "model_path": step_train.properties.ModelArtifacts.S3ModelArtifacts,
        "bucket" : bucket,
        "prefix" : bucket_prefix
    },
    outputs=[output_param_1, output_param_2, output_param_3],
)

### 4.8 Lambda Step for metric outputs
* 모델 모니터링 : https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/model-monitor.html
* 모델 품질 지표 : https://docs.aws.amazon.com/ko_kr/sagemaker/latest/dg/model-monitor-model-quality-metrics.html
* [(notebook sample) Monitor and Analyze Training Jobs Using Metrics ](https://github.com/aws-samples/TensorFlow-in-SageMaker-workshop/blob/master/1_Monitoring_your_TensorFlow_scripts.ipynb)
* [(notebook sample) Comparing model metrics with SageMaker Pipelines and SageMaker Model Registry](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipeline-compare-model-versions/notebook.ipynb)

In [165]:
# Lamnda function ARN 조회
import boto3
ssm = boto3.client('ssm')
paramRes = ssm.get_parameter(Name=key_metric_lambda_arn, WithDecryption=False)
metric_lambda_arn = paramRes["Parameter"]["Value"]

print('Metric Lambda : ', metric_lambda_arn)

Metric Lambda :  arn:aws:lambda:ap-northeast-2:975050344093:function:cjproto-training-pipeline-model-metric


In [166]:
# Lambda Function 인스턴스
from sagemaker.lambda_helper import Lambda

func_metric_model = Lambda(function_arn=metric_lambda_arn)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [167]:
# LambdaStep 생성
from sagemaker.workflow.lambda_step import (
    LambdaStep,
    LambdaOutput,
    LambdaOutputTypeEnum,
)
from sagemaker.workflow.functions import Join

metric_output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
metric_output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
metric_output_param_3 = LambdaOutput(output_name="S3_Metric_URI", output_type=LambdaOutputTypeEnum.String)

output_path = Join(
    on="/",
    values=[
        step_train.properties.OutputDataConfig.S3OutputPath, 
        step_train.properties.TrainingJobName,
        "output"
    ]
)

step_metric_lambda = LambdaStep(
    name="LambdaMetricStep",
    lambda_func=func_metric_model,
    inputs={
        "output_path" : output_path,
        "metric_file": "metrics.json",
        "model_package_group" : model_package_group_name
    },
    outputs=[metric_output_param_1, metric_output_param_2, metric_output_param_3],
)

### 4.9 Model Registration Step

In [168]:
# 모델 그룹 생성
model_package_group_input_dict = {
 "ModelPackageGroupName" : model_package_group_name,
 "ModelPackageGroupDescription" : "Sample model package group"
}
response = sm_client.list_model_package_groups(NameContains=model_package_group_name)
if len(response['ModelPackageGroupSummaryList']) == 0:
    print("No model group exists")
    print("Create model group")    
    
    create_model_pacakge_group_response = sm_client.create_model_package_group(**model_package_group_input_dict)
    print('ModelPackageGroup Arn : {}'.format(create_model_pacakge_group_response['ModelPackageGroupArn']))    
else:
    print(f"{model_package_group_name} exitss")

cjproto-ncf-sample exitss


In [169]:
# 모델 메트릭 등록
from sagemaker.model_metrics import MetricsSource, ModelMetrics
from sagemaker.workflow.functions import Join
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri=step_metric_lambda.properties.Outputs["S3_Metric_URI"],
        content_type="application/json",
    )
)

In [170]:
# 모델 등록 스텝 정의
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model

# sagemaker의 pytorch 기본 이미지
# 참고 : https://github.com/aws/deep-learning-containers/blob/master/available_images.md
inference_image_uri = f'763104351884.dkr.ecr.{region}.amazonaws.com/pytorch-inference:1.12.1-gpu-py38-cu113-ubuntu20.04-sagemaker'

model = Model(
    image_uri=inference_image_uri,
    model_data = step_repackage_lambda.properties.Outputs["S3_Model_URI"],
    role=role,
    sagemaker_session=pipeline_session,
)


register_model_step_args = model.register(
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.g4dn.xlarge", "ml.p2.xlarge"],
    transform_instances=["ml.g4dn.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics,
)

step_model_registration = ModelStep(
   name="RegisterModel",
   step_args=register_model_step_args,
)

## 5. Pipeline Definition

In [171]:
from sagemaker.workflow.pipeline import Pipeline

# 파이프라인 정의
pipeline = Pipeline(
    name=training_pipeline_name,
    parameters=[
        s3_data_loc,
        model_approval_status,
        s3_model_code,
    ],
    sagemaker_session=pipeline_session,
    #steps=[step_train, step_model_registration],
    steps=[step_train, step_repackage_lambda, step_metric_lambda, step_model_registration],    
)


In [172]:
# 파이프라인 등록
import json

definition = json.loads(pipeline.definition())
print(" definition : \n", definition)

pipeline.upsert(role_arn=role)


Popping out 'TrainingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'CertifyForMarketplace' from the pipeline definition since it will be overridden in pipeline execution time.
Popping out 'ModelPackageName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'TrainingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ModelPackageName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist thi

 definition : 
 {'Version': '2020-12-01', 'Metadata': {}, 'Parameters': [{'Name': 'InputData', 'Type': 'String', 'DefaultValue': 's3://cjproto-975050344093-ap-northeast-2-mlops-bucket/data/ncf-sample'}, {'Name': 'ModelApprovalStatus', 'Type': 'String', 'DefaultValue': 'PendingManualApproval'}, {'Name': 'TrainCode', 'Type': 'String', 'DefaultValue': 's3://cjproto-975050344093-ap-northeast-2-mlops-bucket/code/ncf-sample/source.tar.gz'}], 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'}, 'TrialName': {'Get': 'Execution.PipelineExecutionId'}}, 'Steps': [{'Name': 'NCF-Training', 'Type': 'Training', 'Arguments': {'AlgorithmSpecification': {'TrainingInputMode': 'File', 'TrainingImage': '763104351884.dkr.ecr.ap-northeast-2.amazonaws.com/pytorch-training:1.12.1-gpu-py38', 'MetricDefinitions': [{'Name': 'HR', 'Regex': 'HR=(.*?);'}, {'Name': 'NDCG', 'Regex': 'NDCG=(.*?);'}, {'Name': 'Loss', 'Regex': 'Loss=(.*?);'}], 'EnableSageMakerMetricsTimeSeries': True}, 'Outpu

Popping out 'TrainingJobName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.
Popping out 'ModelPackageName' from the pipeline definition by default since it will be overridden at pipeline execution time. Please utilize the PipelineDefinitionConfig to persist this field in the pipeline definition if desired.


{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:975050344093:pipeline/cjproto-ncf-sample-training',
 'ResponseMetadata': {'RequestId': '97275345-6fdc-4e53-a740-b4ccfdd744b1',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '97275345-6fdc-4e53-a740-b4ccfdd744b1',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '100',
   'date': 'Fri, 08 Mar 2024 02:23:14 GMT'},
  'RetryAttempts': 0}}

In [157]:
json.dumps(definition)

'{"Version": "2020-12-01", "Metadata": {}, "Parameters": [{"Name": "InputData", "Type": "String", "DefaultValue": "s3://cjproto-975050344093-ap-northeast-2-mlops-bucket/data/ncf-sample"}, {"Name": "ModelApprovalStatus", "Type": "String", "DefaultValue": "PendingManualApproval"}, {"Name": "TrainCode", "Type": "String", "DefaultValue": "s3://cjproto-975050344093-ap-northeast-2-mlops-bucket/code/ncf-sample/source.tar.gz"}], "PipelineExperimentConfig": {"ExperimentName": {"Get": "Execution.PipelineName"}, "TrialName": {"Get": "Execution.PipelineExecutionId"}}, "Steps": [{"Name": "NCF-Training", "Type": "Training", "Arguments": {"AlgorithmSpecification": {"TrainingInputMode": "File", "TrainingImage": "763104351884.dkr.ecr.ap-northeast-2.amazonaws.com/pytorch-training:1.12.1-gpu-py38", "MetricDefinitions": [{"Name": "HR", "Regex": "HR=(.*?);"}, {"Name": "NDCG", "Regex": "NDCG=(.*?);"}, {"Name": "Loss", "Regex": "Loss=(.*?);"}], "EnableSageMakerMetricsTimeSeries": true}, "OutputDataConfig": {

In [173]:
# (TEST) 파이프라인 시작
# execution = pipeline.start()

# 6. Export pipeline config
* 파이프라인 실행을 위한 매니패스트 파일 생성 및 업로드
* S3에 매니패스트 파일이 업로드이벤트를 박아서 파이프라인 시작할때 사용

In [72]:
# 파이프라인 실행을 위한 환경변수 내보내기
pipeline_execution_config = {
    'PipelineName' : training_pipeline_name,
    'PipelineParameters' : [
        { 'Name': s3_data_loc.name, 'Value': s3_data_loc.default_value},
        { 'Name': s3_model_code.name, 'Value': s3_model_code.default_value},
        { 'Name': model_approval_status.name, 'Value': model_approval_status.default_value},
    ]
}

In [73]:
# 파일로 기록
with open(pipeline_manifest_file, 'w', encoding='utf-8') as f:
    json.dump(pipeline_execution_config, f, ensure_ascii=False, indent=4)


In [74]:
# S3 코드 경로에 업로드
manifest_artifact = sagemaker_session.upload_data(pipeline_manifest_file, bucket, code_prefix)