## 파이프라이닝

##### 전처리 프로세싱 스크립트 (preprocess.py)

In [1]:
import os
os.makedirs('script', exist_ok=True)

In [2]:
%%writefile script/preprocess.py
import argparse
import os
import numpy as np
import pandas as pd
import pickle
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.preprocessing import LabelEncoder

def preprocess_data(input_data_path, output_train_path, output_val_path, asset_path, n_components, test_size):
    # 데이터 읽기
    original_data = pd.read_csv(input_data_path)

    # 특성과 타겟 분리
    X = original_data.iloc[:, 1:]
    y = original_data.iloc[:, 0]
    
    # 결측치 처리
    X = X.replace('?', np.nan)
    
    # 타겟 변수 인코딩
    if y.dtype == 'object':
        y = y.map({
            '<=50K': 0,
            '<=50K.': 0,
            '>50K': 1,
            '>50K.': 1
        })
    else:
        print("타겟 변수가 이미 숫자형입니다.")

    # 범주형 변수와 수치형 변수 구분
    categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
    numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()

    # 범주형 변수에 'Unknown' 카테고리 추가 및 결측치 처리
    for feature in categorical_features: 
        X[feature] = X[feature].astype('category')
        X[feature] = X[feature].cat.add_categories('Unknown')
        X[feature] = X[feature].fillna('Unknown')

    # 수치형 특성의 결측치는 중앙값으로 대체
    for feature in numeric_features:
        X[feature] = X[feature].fillna(X[feature].median())
    
    # 훈련 - 검증 데이터셋 분할
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size = 0.3, random_state = 2024)

    # 데이터 스케일링
    scaler = StandardScaler()
    numeric_cols = X_train.select_dtypes(include=['int64', 'float64']).columns
    X_train[numeric_cols] = scaler.fit_transform(X_train[numeric_cols])
    X_val[numeric_cols] = scaler.transform(X_val[numeric_cols])
    
    # 레이블 인코딩
    encoders = {}
    for col in categorical_features:
        encoder = LabelEncoder()
        X_train[col] = encoder.fit_transform(X_train[col])
        encoders[col] = encoder
        X_val[col] = encoder.transform(X_val[col])
    

    # PCA 수행
    pca = PCA(n_components=n_components)
    X_train_pca = pd.DataFrame(pca.fit_transform(X_train), index=X_train.index, columns=[f'PC{i}' for i in range(1, pca.n_components_+1)])
    X_val_pca = pd.DataFrame(pca.transform(X_val), index=X_val.index, columns=[f'PC{i}' for i in range(1, pca.n_components_+1)])

    
    print(f"훈련데이터 차원축소 : {X_train.shape} -> {X_train_pca.shape}")
    print(f"검증데이터 차원축소 : {X_val.shape} -> {X_val_pca.shape}")
    
    # 레이블 데이터 추가
    train_data = pd.concat([y_train, X_train_pca], axis=1)
    val_data = pd.concat([y_val, X_val_pca], axis=1)

    # 전처리된 데이터 저장
    train_data.to_csv(output_train_path, index=False)
    val_data.to_csv(output_val_path, index=False)

    # 인코더와 스케일러 저장
    with open(os.path.join(asset_path, 'encoder.pkl'), 'wb') as f:
        pickle.dump(encoders, f)
    with open(os.path.join(asset_path, 'scaler.pkl'), 'wb') as f:
        pickle.dump(scaler, f)
    with open(os.path.join(asset_path, 'pca.pkl'), 'wb') as f:
        pickle.dump(pca, f)
    
    print("전처리 완료 및 데이터 저장 완료")

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--n-components', type=float, default=0.9)
    parser.add_argument('--test-size', type=float, default=0.2)
    
    args, _ = parser.parse_known_args()
    
    input_data_path = '/opt/ml/processing/input/original_data.csv'
    output_train_path = '/opt/ml/processing/output/train/train_data.csv'
    output_val_path = '/opt/ml/processing/output/validation/val_data.csv'
    asset_path = '/opt/ml/processing/output/asset'
    
    os.makedirs(os.path.dirname(output_train_path), exist_ok=True)
    os.makedirs(os.path.dirname(output_val_path), exist_ok=True)
    os.makedirs(asset_path, exist_ok=True)
    
    preprocess_data(input_data_path, output_train_path, output_val_path, asset_path, args.n_components, args.test_size)

Overwriting script/preprocess.py


##### 모델 훈련 스크립트 (train.py)

In [3]:
%%writefile script/train.py
import argparse
import os
import pandas as pd
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import xgboost as xgb
import pickle as pkl
from glob import glob

def main():
    parser = argparse.ArgumentParser()
    # SageMaker 특정 인자 설정 (기본값은 환경 변수에서 가져옴)
    parser.add_argument('--output_data_dir', type=str, default=os.environ.get('SM_OUTPUT_DATA_DIR'))
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--validation', type=str, default=os.environ.get('SM_CHANNEL_VALIDATION'))
    parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    # 하이퍼파라미터 설정
    parser.add_argument('--max-depth', type=int, default=3)
    parser.add_argument('--learning-rate', type=float, default=0.1)
    parser.add_argument('--reg-alpha', type=float, default=0)
    parser.add_argument('--reg-lambda', type=float, default=1)
    parser.add_argument('--subsample', type=float, default=1)
    parser.add_argument('--colsample-bytree', type=float, default=1)
    parser.add_argument('--num-round', type=int, default=200)
    parser.add_argument('--early-stopping-rounds', type=int, default=10)
    parser.add_argument('--objective', type=str, default='binary:logistic')
    parser.add_argument('--eval-metric', type=str, default='auc')
    args, _ = parser.parse_known_args()

    # 데이터 로드
    
    # CSV 파일 목록 가져오기
    train_files = glob(args.train + "/*.csv")
    train_data = pd.concat([pd.read_csv(file) for file in train_files], ignore_index=True)
    val_files = glob(args.validation + "/*.csv")
    val_data = pd.concat([pd.read_csv(file) for file in val_files], ignore_index=True)

    # 특성과 타겟 분리
    X_train = train_data.iloc[:, 1:]
    y_train = train_data.iloc[:, 0]
    X_val = val_data.iloc[:, 1:]
    y_val = val_data.iloc[:, 0]
    
    d_train = xgb.DMatrix(X_train, label=y_train)
    d_val = xgb.DMatrix(X_val, label=y_val)

    # XGBoost 모델 생성 및 훈련
    watchlist = [(d_train, '훈련'), (d_val, '검증')]
    
    params = {
        'max_depth': args.max_depth,
        'learning_rate': args.learning_rate,
        'reg_alpha': args.reg_alpha,
        'reg_lambda': args.reg_lambda,
        'subsample': args.subsample,
        'colsample_bytree': args.colsample_bytree,
        'objective': args.objective,
        'eval_metric': args.eval_metric,
    }
    xgb_model = xgb.train(params, d_train, args.num_round, watchlist, early_stopping_rounds=args.early_stopping_rounds, verbose_eval=10)
       
    # 검증 데이터로 성능 평가
    y_pred = xgb_model.predict(d_val)
    y_pred_binary = (y_pred > 0.5).astype(int)

    accuracy = accuracy_score(y_val, y_pred_binary)
    precision = precision_score(y_val, y_pred_binary)
    recall = recall_score(y_val, y_pred_binary)
    f1 = f1_score(y_val, y_pred_binary)

    print(f'검증 정확도: {accuracy:.4f}')
    print(f'검증 정밀도: {precision:.4f}')
    print(f'검증 재현율: {recall:.4f}')
    print(f'검증 F1 점수: {f1:.4f}')

    # 모델 저장
    model_path = os.path.join(args.model_dir, 'xgboost-model')
    pkl.dump(xgb_model, open(model_path, 'wb'))

if __name__ == '__main__':
    main()

Overwriting script/train.py


##### 모델 평가 스크립트 (evaluate.py)

In [4]:
%%writefile script/evaluate.py
import json
import pathlib
import pickle as pkl
import tarfile
import os
import boto3

import numpy as np
import pandas as pd
import xgboost as xgb

from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score

def preprocess_test_data(test_data, assets):
    """입력 데이터를 전처리합니다."""
    scaler, encoders, pca = assets

    y = test_data['income']
    X = test_data.drop(columns=['income'])

    # 결측치 처리
    X = X.replace('?', np.nan)
    
    # 타겟 변수 인코딩
    if y.dtype == 'object':
        y = y.map({
            '<=50K': 0,
            '<=50K.': 0,
            '>50K': 1,
            '>50K.': 1
        })
    else:
        print("타겟 변수가 이미 숫자형입니다.")

    # 범주형 변수와 수치형 변수 구분
    categorical_features = X.select_dtypes(include=['object', 'category']).columns.tolist()
    numeric_features = X.select_dtypes(include=['int64', 'float64']).columns.tolist()

    # 범주형 변수에 'Unknown' 카테고리 추가 및 결측치 처리
    for feature in categorical_features: 
        X[feature] = X[feature].astype('category')
        X[feature] = X[feature].cat.add_categories('Unknown')
        X[feature] = X[feature].fillna('Unknown')

    # 수치형 특성의 결측치는 중앙값으로 대체
    for feature in numeric_features:
        X[feature] = X[feature].fillna(X[feature].median())
        
    
    # 범주형 컬럼 레이블 인코딩
    for feature in encoders.keys() :
        le = encoders[feature]
        X[feature] = X[feature].astype(str)
        # 인코더 업데이트
        unique_values = np.unique(X[feature])
        le.classes_ = np.unique(np.concatenate([le.classes_, unique_values]))
        # 변환 처리
        X[feature] = le.transform(X[feature])

    # 스케일링
    X[numeric_features] = scaler.transform(X[numeric_features])

    # PCA 차원축소
    X_pca = pca.transform(X)
    X_pca = pd.DataFrame(X_pca, columns=[f'PC{i}' for i in range(1, pca.n_components_ + 1)])
    
    return X_pca, y

if __name__ == "__main__":
    # 모델 파일 로드
    model_path = '/opt/ml/processing/model/model.tar.gz'
    with tarfile.open(model_path) as tar:
        tar.extractall(path='.')
    
    xgb_model = pkl.load(open('xgboost-model', 'rb'))
    
    # S3에서 asset 파일을 로컬로 복사
    s3 = boto3.client('s3')
    bucket_name = 'dante-sagemaker' # 본인의 버킷명으로 반드시 수정하세요!
    project_name = 'income-prediction'
    
    # 자산 파일 로드
    scaler_key = f'{project_name}/asset/scaler.pkl'
    encoder_key = f'{project_name}/asset/encoder.pkl'
    pca_key = f'{project_name}/asset/pca.pkl'
    
    scaler_obj = s3.get_object(Bucket=bucket_name, Key=scaler_key)
    encoder_obj = s3.get_object(Bucket=bucket_name, Key=encoder_key)
    pca_obj = s3.get_object(Bucket=bucket_name, Key=pca_key)
    
    scaler = pkl.loads(scaler_obj['Body'].read())
    encoders = pkl.loads(encoder_obj['Body'].read())
    pca = pkl.loads(pca_obj['Body'].read())
    
    # 추론 데이터 로드
    test_data = pd.read_csv('/opt/ml/processing/test/test.csv')
    
    # 추론 데이터 전처리
    X, y_true = preprocess_test_data(test_data, (scaler, encoders, pca))
    
    # 추론 데이터 예측
    dmatrix = xgb.DMatrix(X)
    y_pred = xgb_model.predict(dmatrix)
    y_pred_binary = (y_pred > 0.5).astype(int)
    
    # 평가 데이터 저장
    report_dict = {
        'classification_metrics': {
            'roc_auc': roc_auc_score(y_true, y_pred_binary),
            'accuracy': accuracy_score(y_true, y_pred_binary),
            'precision': precision_score(y_true, y_pred_binary),
            'recall': recall_score(y_true, y_pred_binary),
            'f1': f1_score(y_true, y_pred_binary)
        }
    }
    
    output_dir = "/opt/ml/processing/evaluation"
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)

    evaluation_path = f"{output_dir}/evaluation.json"
    with open(evaluation_path, "w") as f:
        f.write(json.dumps(report_dict))

Overwriting script/evaluate.py


##### 모델 추론 스크립트 (inference.py)

In [5]:
%%writefile script/inference.py
import os
import json
import pickle as pkl
import numpy as np
import xgboost as xgb
import pandas as pd
import io
import boto3

def model_fn(model_dir):
    """XGBoost 모델과 필요한 자산을 `model_dir`에서 로드합니다."""
    model_file = 'xgboost-model'
    booster = pkl.load(open(os.path.join(model_dir, model_file), 'rb'))
    
    # S3에서 asset 파일을 로컬로 복사
    s3 = boto3.client('s3')
    bucket_name = 'dante-sagemaker' # 본인의 버킷명으로 반드시 수정하세요!
    project_name = 'income-prediction'
    scaler_key = f'{project_name}/asset/scaler.pkl'
    encoder_key = f'{project_name}/asset/encoder.pkl'
    pca_key = f'{project_name}/asset/pca.pkl'
    
    scaler_obj = s3.get_object(Bucket=bucket_name, Key=scaler_key)
    encoder_obj = s3.get_object(Bucket=bucket_name, Key=encoder_key)
    pca_obj = s3.get_object(Bucket=bucket_name, Key=pca_key)
    
    scaler = pkl.loads(scaler_obj['Body'].read())
    encoders = pkl.loads(encoder_obj['Body'].read())
    pca = pkl.loads(pca_obj['Body'].read())
    
    original_feature_columns = ['age', 'workclass', 'fnlwgt', 'education', 'education-num',
       'marital-status', 'occupation', 'relationship', 'race', 'sex',
       'capitalgain', 'capitalloss', 'hoursperweek', 'native-country']
    
    numeric_columns = ['age', 'fnlwgt', 'education-num', 'capitalgain', 'capitalloss', 'hoursperweek']
    
    return booster, (scaler, encoders, pca, original_feature_columns, numeric_columns)

def input_fn(request_body, request_content_type):
    """입력 데이터 페이로드를 파싱합니다."""
    if request_content_type != "text/csv":
        raise ValueError(f"지원되지 않는 컨텐츠 타입입니다: {request_content_type}")
    df = pd.read_csv(io.StringIO(request_body), header=None)
    return df.values

def predict_fn(input_data, model):
    """로드된 모델로 예측을 수행합니다."""
    booster, (scaler, encoders, pca, original_feature_columns, numeric_columns) = model
    prep_input_data = preprocess_input_data(input_data, (scaler, encoders, pca, original_feature_columns, numeric_columns))
    dmatrix = xgb.DMatrix(prep_input_data)
    return booster.predict(dmatrix)

def output_fn(prediction, accept):
    """예측 출력을 포맷팅합니다."""
    if accept != "text/csv":
        raise ValueError(f"지원되지 않는 accept 타입입니다: {accept}")
    return ','.join(map(str, prediction))

def preprocess_input_data(input_data, assets):
    """입력 데이터를 전처리합니다."""
    scaler, encoders, pca, original_feature_columns, numeric_columns = assets
    X = pd.DataFrame(input_data, columns=original_feature_columns)
    X[X == '?'] = np.nan

    # 범주형 변수에 'Unknown' 카테고리 추가 및 결측치 처리
    for feature in (set(original_feature_columns) - set(numeric_columns)): 
        X[feature] = X[feature].astype('category')
        X[feature] = X[feature].cat.add_categories('Unknown')
        X[feature] = X[feature].fillna('Unknown')

    # 수치형 특성의 결측치는 중앙값으로 대체
    for feature in set(numeric_columns):
        X[feature] = pd.to_numeric(X[feature], errors='coerce')
        X[feature] = X[feature].fillna(X[feature].median())
    
    X[numeric_columns] = X[numeric_columns].astype('float64')
    X[numeric_columns] = scaler.transform(X[numeric_columns])
    
    # 범주형 컬럼 레이블 인코딩
    for feature in encoders.keys() :
        le = encoders[feature]
        X[feature] = X[feature].astype(str)
        # 인코더 업데이트
        unique_values = np.unique(X[feature])
        le.classes_ = np.unique(np.concatenate([le.classes_, unique_values]))
        # 변환 처리
        X[feature] = le.transform(X[feature])
        
    # NaN 및 무한대 값 처리
    X = X.replace([np.inf, -np.inf], np.nan)
    X = X.fillna(X.mean())
    
    # PCA 변환 수행
    X_pca = pd.DataFrame(pca.transform(X), columns=[f'PC{i}' for i in range(1, pca.n_components_ + 1)])
    return X_pca

Overwriting script/inference.py


##### 유틸리티 라이브러리 로드

In [6]:
import warnings
warnings.filterwarnings('ignore')
import pandas as pd
import awswrangler as wr
import os
from dotenv import load_dotenv
load_dotenv("./../.env") # 환경변수 파일의 상대적 위치를 확인하세요!

True

##### 파이프라인 세션 생성

In [7]:
from sagemaker.workflow.pipeline_context import PipelineSession
import sagemaker
import boto3

boto3_session = boto3.Session(profile_name='awstutor') # boto3 세션 생성
sagemaker_session = sagemaker.Session(boto_session=boto3_session) # SageMaker 세션 생성
pipeline_session = PipelineSession(boto_session=boto3_session) # 파이프라인 세션 생성
role = os.getenv('SAGEMAKER_EXECUTION_ROLE_ARN') # 환경변수에서 역할 가져오기

sagemaker.config INFO - Not applying SDK defaults from location: /Library/Application Support/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /Users/dante/Library/Application Support/sagemaker/config.yaml


##### 데이터 경로 설정 / 데이터 업로드

In [8]:
bucket_name = 'dante-sagemaker' # 본인의 버킷명으로 반드시 수정하세요!
project_name = 'income-prediction'
script_path = f's3://{bucket_name}/{project_name}/script'
original_data_path = f's3://{bucket_name}/{project_name}/original_data'
training_path = f's3://{bucket_name}/{project_name}/train'
validation_path = f's3://{bucket_name}/{project_name}/val'
test_path = f's3://{bucket_name}/{project_name}/test'
evaluation_path = f's3://{bucket_name}/{project_name}/evaluation'
model_path = f's3://{bucket_name}/{project_name}/model'
asset_path = f's3://{bucket_name}/{project_name}/asset'
batch_input_folder = f's3://{bucket_name}/{project_name}/batch/input'
batch_output_folder = f's3://{bucket_name}/{project_name}/batch/output'

In [9]:
# 기존 데이터 삭제
wr.s3.delete_objects(f"s3://{bucket_name}/{project_name}/", boto3_session=boto3_session)

In [10]:
# 원본 데이터 업로드
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split

# Adult 데이터셋 로드
adult = fetch_openml(name='adult', version=1, as_frame=True)
X = adult.data
y = adult.target
y.name = 'income'

X_tmp, X_test, y_tmp, y_test = train_test_split(X, y, test_size = 0.1, random_state = 2024)
X_train, X_val, y_train, y_val = train_test_split(X_tmp, y_tmp, test_size = 0.3, random_state = 2024)
original_data = pd.concat([y_tmp, X_tmp], axis=1)
wr.s3.to_csv(original_data, os.path.join(original_data_path, 'original_data.csv'), index=False, boto3_session=boto3_session)

{'paths': ['s3://dante-sagemaker/income-prediction/original_data/original_data.csv'],
 'partitions_values': {}}

In [11]:
# 스크립트 업로드
wr.s3.upload(local_file='script/preprocess.py', path=os.path.join(script_path, 'preprocess.py'), boto3_session=boto3_session)
wr.s3.upload(local_file='script/train.py', path=os.path.join(script_path, 'train.py'), boto3_session=boto3_session)
wr.s3.upload(local_file='script/evaluate.py', path=os.path.join(script_path, 'evaluate.py'), boto3_session=boto3_session)
wr.s3.upload(local_file='script/inference.py', path=os.path.join(script_path, 'inference.py'), boto3_session=boto3_session)

In [12]:
# 테스트 데이터 업로드
wr.s3.to_csv(pd.concat([y_test, X_test], axis=1), path=os.path.join(test_path, 'test.csv'), index=False, boto3_session=boto3_session) # 평가 조건용
wr.s3.to_csv(X_test, path=os.path.join(batch_input_folder, 'test.csv'), index=False, boto3_session=boto3_session) # 배치 추론용

{'paths': ['s3://dante-sagemaker/income-prediction/batch/input/test.csv'],
 'partitions_values': {}}

##### 전처리 스텝 정의

In [13]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

# SKLearnProcessor 인스턴스 생성
# 데이터 전처리를 위한 SKLearn 프로세서를 설정합니다.
sklearn_processor = SKLearnProcessor(
    framework_version='0.23-1',  # SKLearn 프레임워크 버전
    role=role,  # 실행 역할
    instance_type='ml.m5.xlarge',  # 사용할 인스턴스 유형
    instance_count=1,  # 인스턴스 수
    base_job_name=f'{project_name}-preprocessing',  # 기본 작업 이름
    sagemaker_session=pipeline_session  # SageMaker 세션
)

# 프로세싱 작업 실행
# 전처리 스크립트를 실행하고 입력/출력을 정의합니다.
processor_args = sklearn_processor.run(
    code=os.path.join(script_path, 'preprocess.py'),  # 실행할 전처리 스크립트 경로
    inputs=[
        ProcessingInput(
            source=os.path.join(original_data_path, 'original_data.csv'),  # 입력 데이터 소스
            destination='/opt/ml/processing/input'  # 컨테이너 내 입력 데이터 위치
        )
    ],
    outputs=[
        ProcessingOutput(output_name='train', source='/opt/ml/processing/output/train', destination=training_path),  # 훈련 데이터 출력
        ProcessingOutput(output_name='validation', source='/opt/ml/processing/output/validation', destination=validation_path),  # 검증 데이터 출력
        ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test/", destination=test_path),  # 테스트 데이터 출력
        ProcessingOutput(output_name='asset', source='/opt/ml/processing/output/asset', destination=asset_path)  # 기타 자산 출력
    ],
    arguments=['--n-components', '0.9', '--test-size', '0.2']  # 전처리 스크립트에 전달할 인자
)

# 전처리 단계를 파이프라인 스텝으로 정의
preprocess_step = ProcessingStep(name="PreprocessingStep", step_args=processor_args)


##### 모델 훈련 스텝 정의

In [14]:
from sagemaker.xgboost import XGBoost
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.utils import name_from_base

In [15]:
# 하이퍼파라미터 설정
# XGBoost 모델의 하이퍼파라미터를 정의합니다.
hyperparams = {
    "max_depth": "5",  # 트리의 최대 깊이
    "eta": "0.2",  # 학습률
    "gamma": "4",  # 트리의 리프 노드를 추가적으로 나누기 위한 최소 손실 감소
    "min_child_weight": "6",  # 자식 노드에 필요한 최소 가중치 합
    "subsample": "0.7",  # 각 트리마다 사용할 훈련 데이터의 샘플링 비율
    "objective": "binary:logistic",  # 이진 분류를 위한 목적 함수
    "num_round": "200",  # 부스팅 라운드 수
    "early_stopping_rounds": "10",  # 조기 종료를 위한 라운드 수
    "eval_metric": "logloss",  # 평가 지표
}


In [16]:
# XGBoost 모델 객체 생성
xgb_model = XGBoost(
    role=role,  # IAM 역할
    entry_point="script/train.py",  # 훈련 스크립트 경로
    framework_version="1.7-1",  # XGBoost 프레임워크 버전
    output_path=model_path,  # 모델 출력 경로
    sagemaker_session=pipeline_session,  # SageMaker 세션
    instance_count=1,  # 사용할 인스턴스 수
    instance_type='ml.m5.xlarge',  # 인스턴스 유형
    base_job_name=f"{project_name}-xgboost",  # 기본 작업 이름
    max_run=60*60,  # 최대 실행 시간 (1시간)
    max_wait=60*60,  # 최대 대기 시간 (1시간)
    use_spot_instances=True,  # 스팟 인스턴스 사용
    hyperparameters=hyperparams,  # 하이퍼파라미터
    code_location=script_path  # 스크립트 위치를 명시적으로 지정
)


In [17]:
# XGBoost 모델 훈련 실행
train_args = xgb_model.fit(
    inputs={
        "train": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs['train'].S3Output.S3Uri, 
            content_type='text/csv'
        ),
        "validation": TrainingInput(
            s3_data=preprocess_step.properties.ProcessingOutputConfig.Outputs['validation'].S3Output.S3Uri, 
            content_type='text/csv'
        ),        
    },
    wait=True,
    logs=True,
    job_name=name_from_base(f"{project_name}-xgboost-training"),
)

# 훈련 단계 정의
train_step = TrainingStep(
    name='TrainingStep',  # 훈련 단계의 이름
    step_args=train_args,  # 훈련 인자
    cache_config=False  # 캐시 설정 비활성화
)

##### 모델 평가 스텝 정의

In [18]:
# SageMaker 처리 및 워크플로우 관련 모듈 임포트
from sagemaker.processing import ScriptProcessor
from sagemaker.workflow.properties import PropertyFile

# XGBoost 이미지 URI 검색
image_uri = sagemaker.image_uris.retrieve(
    framework="xgboost",
    region='ap-northeast-2',
    version="1.7-1",
    py_version="py3",
    instance_type="ml.m5.xlarge",
)

# 모델 평가를 위한 ScriptProcessor 객체 생성
script_eval = ScriptProcessor(
    image_uri=image_uri,
    command=["python3"],
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name=f"{project_name}-evaluation",
    role=role,
    sagemaker_session=pipeline_session,
)

# 평가 스크립트 실행을 위한 인자 설정
eval_args = script_eval.run(
    inputs=[
        # 훈련된 모델 아티팩트 입력
        ProcessingInput(
            source=train_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model",
        ),
        # 테스트 데이터셋 입력
        ProcessingInput(
            source=preprocess_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
            destination="/opt/ml/processing/test",
        ),
    ],
    outputs=[
        # 평가 결과 출력 설정
        ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", destination=evaluation_path),
    ],
    code="script/evaluate.py",  # 평가 스크립트 경로
)

# 평가 보고서 파일 속성 정의
evaluation_report = PropertyFile(
    name="EvaluationReport", output_name="evaluation", path="evaluation.json"
)

# 평가 단계 정의
eval_step = ProcessingStep(
    name="EvaluationStep",
    step_args=eval_args,
    property_files=[evaluation_report],
)

##### 추론 모델 생성 스텝 정의

In [19]:
from sagemaker.xgboost.model import XGBoostModel
from sagemaker.workflow.model_step import ModelStep

In [20]:
# XGBoost 모델 객체 생성
xgb_inf_model = XGBoostModel(
    model_data=train_step.properties.ModelArtifacts.S3ModelArtifacts,
    role=role,
    entry_point="script/inference.py",
    code_location=script_path,
    framework_version="1.7-1",
    sagemaker_session=pipeline_session,
)

In [21]:
model_args = xgb_inf_model.create(instance_type="ml.m5.xlarge")  # XGBoost 모델 생성 인자 설정 (인스턴스 타입: ml.m5.xlarge)

In [22]:
# XGBoost 모델 생성 단계 정의
create_model_step = ModelStep(
   name="XGBCreation",  # 단계 이름 설정
   step_args=model_args,
)

모델 등록 스텝 정의

In [23]:
from sagemaker.model_metrics import MetricsSource, ModelMetrics

# 모델 그룹 이름 정의
model_group_name = "AdultIncomePredictionXGBModelPackage"

# 모델 메트릭스 객체 생성
# 평가 단계에서 생성된 evaluation.json 파일의 S3 URI를 사용하여 모델 통계 설정
model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            eval_step.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

# XGBoost 추론 모델 등록을 위한 인자 설정
register_args = xgb_inf_model.register(
    content_types=["text/csv"],  # 입력 데이터 타입
    response_types=["text/csv"],  # 출력 데이터 타입
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],  # 추론에 사용할 인스턴스 유형
    transform_instances=["ml.m5.xlarge"],  # 배치 변환에 사용할 인스턴스 유형
    model_package_group_name=model_group_name,  # 모델 그룹 이름
    approval_status='PendingManualApproval',  # 모델 승인 상태 (수동 승인 대기)
    model_metrics=model_metrics,  # 모델 메트릭스
)

# 모델 등록 단계 정의
register_step = ModelStep(name="XGBRegistration", step_args=register_args)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.t2.medium.


##### 배치 변환 스텝 정의

In [24]:
# SageMaker Transformer와 TransformStep을 가져옵니다
from sagemaker.transformer import Transformer
from sagemaker.workflow.steps import TransformStep

# 배치 변환을 위한 Transformer 객체를 생성합니다
transformer = Transformer(
    model_name = create_model_step.properties.ModelName,  # 생성된 모델의 이름을 사용합니다
    instance_count = 1,  # 변환에 사용할 인스턴스 수
    instance_type = 'ml.m4.xlarge',  # 변환에 사용할 인스턴스 유형
    strategy = 'MultiRecord',  # 여러 레코드를 한 번에 처리하는 전략
    assemble_with = 'Line',  # 출력을 줄 단위로 조립합니다
    output_path = batch_output_folder,  # 변환 결과의 출력 경로
    base_transform_job_name=f'{project_name}-inference-batch',  # 변환 작업의 기본 이름
    sagemaker_session=pipeline_session,  # SageMaker 세션
    accept = 'text/csv'  # 출력 데이터 형식
)

# 배치 변환을 위한 인자 설정
tranaformer_args = transformer.transform(
    data=batch_input_folder,  # 배치 입력 데이터 위치
    content_type='text/csv'  # 입력 데이터 형식
)

# 배치 변환 단계 정의
transforming_step = TransformStep(
    name="TransformStep",  # 단계 이름
    step_args=tranaformer_args,  # 변환 인자
)

##### 실패 스텝 정의

In [25]:
# SageMaker 워크플로우 라이브러리에서 필요한 클래스들을 가져옵니다
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.parameters import ParameterFloat

# ROAUC 임계값을 파라미터로 정의합니다
roauc_threshold = ParameterFloat(name="ROAUCThreshold", default_value=0.65) 

# 실패 단계를 정의합니다
fail_step = FailStep(
    name="FailStep",  # 실패 단계의 이름
    error_message=Join(on=" ", values=["목표 모델생성에 실패했습니다. AUROC <", roauc_threshold]),  # 오류 메시지 설정
)

##### 조건 스텝 정의

In [26]:
# SageMaker 워크플로우 라이브러리에서 필요한 클래스들을 가져옵니다
from sagemaker.workflow.conditions import ConditionGreaterThan
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

# ROC AUC 점수가 임계값보다 큰지 확인하는 조건을 정의합니다
cond_gt = ConditionGreaterThan(
    left=JsonGet(
        step_name=eval_step.name,
        property_file=evaluation_report,
        json_path="classification_metrics.roc_auc",
    ),
    right=roauc_threshold,
)

# 조건에 따라 다른 단계를 실행하는 조건 단계를 정의합니다
condition_step = ConditionStep(
    name="ConditionStep",  # 조건 단계의 이름
    conditions=[cond_gt],  # 평가할 조건
    if_steps=[create_model_step, register_step, transforming_step],  # 조건이 참일 때 실행할 단계들
    else_steps=[fail_step],  # 조건이 거짓일 때 실행할 단계
)

##### 파이프라인 정의 및 실행

In [27]:
# SageMaker 워크플로우 파이프라인을 위한 라이브러리 임포트
from sagemaker.workflow.pipeline import Pipeline

# 파이프라인 이름 설정
pipeline_name = "AdultIncomePredictionPipeline"

# 처리 및 훈련에 사용할 인스턴스 유형 및 개수 설정
processing_instance_type = 'ml.m5.xlarge'
processing_instance_count = 1
training_instance_type = 'ml.m5.xlarge'

# 모델 승인 상태 설정
model_approval_status = 'PendingManualApproval'

# 입력 데이터 및 배치 데이터 경로 설정
input_data = original_data_path
batch_data = batch_input_folder

# 파이프라인 객체 생성
pipeline = Pipeline(
    name=pipeline_name,  # 파이프라인 이름
    parameters=[  # 파이프라인 파라미터 설정
        processing_instance_count,
        training_instance_type,
        model_approval_status,
        input_data,
        batch_data,
        roauc_threshold
    ],
    sagemaker_session=pipeline_session,  # 파이프라인 세션
    steps=[preprocess_step, train_step, eval_step, condition_step],  # 파이프라인 단계 설정
)

In [28]:
# 파이프라인을 생성하거나 업데이트합니다.
# role_arn 파라미터는 파이프라인 실행에 필요한 IAM 역할을 지정합니다.
pipeline.upsert(role_arn=role)



{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:905418381372:pipeline/AdultIncomePredictionPipeline',
 'ResponseMetadata': {'RequestId': '95cd1ab7-84ca-48c1-9687-267f5bd78ac4',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '95cd1ab7-84ca-48c1-9687-267f5bd78ac4',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '102',
   'date': 'Mon, 21 Oct 2024 15:35:08 GMT'},
  'RetryAttempts': 0}}

In [29]:
# 파이프라인 실행 시작
execution = pipeline.start()

In [30]:
# 파이프라인 실행 상태 및 세부 정보를 조회합니다.
execution.describe()

{'PipelineArn': 'arn:aws:sagemaker:ap-northeast-2:905418381372:pipeline/AdultIncomePredictionPipeline',
 'PipelineExecutionArn': 'arn:aws:sagemaker:ap-northeast-2:905418381372:pipeline/AdultIncomePredictionPipeline/execution/o0kkgc6z2fnk',
 'PipelineExecutionDisplayName': 'execution-1729524908760',
 'PipelineExecutionStatus': 'Executing',
 'CreationTime': datetime.datetime(2024, 10, 22, 0, 35, 8, 696000, tzinfo=tzlocal()),
 'LastModifiedTime': datetime.datetime(2024, 10, 22, 0, 35, 8, 696000, tzinfo=tzlocal()),
 'CreatedBy': {'IamIdentity': {'Arn': 'arn:aws:iam::905418381372:user/tutor_sdk',
   'PrincipalId': 'AIDA5FTZEDA6GVWMFH7I2'}},
 'LastModifiedBy': {'IamIdentity': {'Arn': 'arn:aws:iam::905418381372:user/tutor_sdk',
   'PrincipalId': 'AIDA5FTZEDA6GVWMFH7I2'}},
 'ResponseMetadata': {'RequestId': '11c6d65a-0a02-40bb-a2b9-a282107676ec',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '11c6d65a-0a02-40bb-a2b9-a282107676ec',
   'content-type': 'application/x-amz-json-1.1

In [31]:
# 파이프라인 실행이 완료될 때까지 대기합니다.
# 이 함수는 파이프라인 실행이 끝날 때까지 프로그램의 실행을 일시 중지시킵니다.
execution.wait()