# 7단계: EventBridge 기반 자동화 파이프라인

<div class="alert alert-warning"> 이 노트북은 <code>SageMaker Distribution Image 3.6.1</code>을 사용하는 SageMaker Studio JupyterLab 인스턴스에서 SageMaker Python SDK 버전 <code>2.255.0</code>으로 마지막으로 테스트되었습니다</div>

이 단계에서는 Amazon EventBridge를 사용하여 S3 이벤트 기반으로 자동 트리거되는 두 개의 파이프라인을 구축합니다:

1. **Batch Inference Pipeline**: S3에 새로운 데이터가 업로드되면 자동으로 전처리 및 배치 추론을 수행
2. **Training Pipeline**: 배치 추론 결과가 S3에 저장되면 자동으로 모델 재훈련 및 등록을 수행

||||
|---|---|---|
|1. |노트북에서 실험 ||
|2. |SageMaker AI 처리 작업 및 SageMaker SDK로 확장 ||
|3. |ML 파이프라인, 모델 레지스트리 및 피처 스토어로 운영화 ||
|4. |모델 구축 CI/CD 파이프라인 추가 ||
|5. |모델 배포 파이프라인 추가 ||
|6. |모델 및 데이터 모니터링 추가 ||
|7. |EventBridge 기반 자동화 파이프라인 |**<<<< 현재 위치**|


<div class="alert alert-info"> 이 노트북에서는 JupyterLab에서 <code>Python 3</code> 커널을 사용하고 있는지 확인하세요.</div>

## 아키텍처 개요

```
S3 Input Data Upload
        ↓
    EventBridge
        ↓
Batch Inference Pipeline
    ↓ (Processing) → (Batch Transform)
        ↓
S3 Inference Results
        ↓
    EventBridge
        ↓
Training Pipeline
    ↓ (Preprocessing) → (Training) → (Evaluation & Registration)
```

## 환경 설정 및 라이브러리 가져오기

In [None]:
import boto3
import sagemaker
import pandas as pd
import numpy as np
import json
import time
from datetime import datetime
from sagemaker import get_execution_role
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep, CreateModelStep
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.parameters import ParameterString, ParameterInteger
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.transformer import Transformer
from sagemaker.model import Model
from sagemaker.inputs import TrainingInput

print(f"SageMaker version: {sagemaker.__version__}")
print(f"Boto3 version: {boto3.__version__}")

In [None]:
# SageMaker 세션 및 역할 설정
sagemaker_session = sagemaker.Session()
role = get_execution_role()
region = sagemaker_session.boto_region_name
bucket = sagemaker_session.default_bucket()
prefix = 'eventbridge-pipelines'

# AWS 클라이언트 초기화
s3_client = boto3.client('s3')
events_client = boto3.client('events')
iam_client = boto3.client('iam')
lambda_client = boto3.client('lambda')

print(f"Region: {region}")
print(f"Role: {role}")
print(f"Default bucket: {bucket}")
print(f"Prefix: {prefix}")

## S3 버킷 구조 설정

파이프라인에 필요한 S3 경로들을 정의합니다.

In [None]:
# S3 경로 정의
input_data_path = f"s3://{bucket}/{prefix}/input-data"
batch_inference_input_path = f"s3://{bucket}/{prefix}/batch-inference/input"
batch_inference_output_path = f"s3://{bucket}/{prefix}/batch-inference/output"
training_data_path = f"s3://{bucket}/{prefix}/training-data"
model_artifacts_path = f"s3://{bucket}/{prefix}/model-artifacts"
processing_code_path = f"s3://{bucket}/{prefix}/code"

print("S3 경로 설정:")
print(f"입력 데이터: {input_data_path}")
print(f"배치 추론 입력: {batch_inference_input_path}")
print(f"배치 추론 출력: {batch_inference_output_path}")
print(f"훈련 데이터: {training_data_path}")
print(f"모델 아티팩트: {model_artifacts_path}")
print(f"처리 코드: {processing_code_path}")

## 전처리 스크립트 생성

배치 추론과 훈련에 사용할 전처리 스크립트를 생성합니다.

In [None]:
%%writefile preprocessing.py

import argparse
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import joblib

def preprocess_data(input_path, output_path, mode='inference'):
    """
    데이터 전처리 함수
    mode: 'inference' 또는 'training'
    """
    # 데이터 로드
    df = pd.read_csv(os.path.join(input_path, 'data.csv'))
    
    # 기본 전처리
    # 범주형 변수 인코딩
    categorical_columns = df.select_dtypes(include=['object']).columns
    
    if mode == 'training':
        # 훈련 모드: 레이블 인코더 생성 및 저장
        label_encoders = {}
        
        for col in categorical_columns:
            if col != 'y':  # 타겟 변수 제외
                le = LabelEncoder()
                df[col] = le.fit_transform(df[col].astype(str))
                label_encoders[col] = le
        
        # 레이블 인코더 저장
        joblib.dump(label_encoders, os.path.join(output_path, 'label_encoders.pkl'))
        
        # 타겟 변수 처리 (이진 분류)
        if 'y' in df.columns:
            df['y'] = (df['y'] == 'yes').astype(int)
        
        # 훈련/검증 분할
        train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)
        
        # 저장
        train_df.to_csv(os.path.join(output_path, 'train.csv'), index=False, header=False)
        val_df.to_csv(os.path.join(output_path, 'validation.csv'), index=False, header=False)
        
    else:
        # 추론 모드: 기존 레이블 인코더 로드
        try:
            label_encoders = joblib.load('/opt/ml/processing/model/label_encoders.pkl')
            
            for col in categorical_columns:
                if col in label_encoders:
                    # 새로운 카테고리 처리
                    le = label_encoders[col]
                    df[col] = df[col].astype(str)
                    
                    # 알려진 클래스만 변환
                    mask = df[col].isin(le.classes_)
                    df.loc[mask, col] = le.transform(df.loc[mask, col])
                    df.loc[~mask, col] = -1  # 새로운 카테고리는 -1로 설정
                    
        except FileNotFoundError:
            print("레이블 인코더를 찾을 수 없습니다. 기본 처리를 수행합니다.")
            for col in categorical_columns:
                df[col] = pd.Categorical(df[col]).codes
        
        # 타겟 변수 제거 (추론용)
        if 'y' in df.columns:
            df = df.drop('y', axis=1)
        
        # 저장
        df.to_csv(os.path.join(output_path, 'inference_data.csv'), index=False, header=False)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--input-path', type=str, default='/opt/ml/processing/input')
    parser.add_argument('--output-path', type=str, default='/opt/ml/processing/output')
    parser.add_argument('--mode', type=str, default='inference', choices=['inference', 'training'])
    
    args = parser.parse_args()
    
    preprocess_data(args.input_path, args.output_path, args.mode)

In [None]:
# 전처리 스크립트를 S3에 업로드
preprocessing_script_uri = sagemaker_session.upload_data(
    path='preprocessing.py',
    bucket=bucket,
    key_prefix=f'{prefix}/code'
)
print(f"전처리 스크립트 업로드 완료: {preprocessing_script_uri}")

## 1. Batch Inference Pipeline 생성

S3에 새로운 데이터가 업로드되면 자동으로 실행되는 배치 추론 파이프라인을 생성합니다.

In [None]:
# 배치 추론 파이프라인 파라미터 정의
batch_input_data = ParameterString(
    name="BatchInputData",
    default_value=batch_inference_input_path
)

batch_output_data = ParameterString(
    name="BatchOutputData", 
    default_value=batch_inference_output_path
)

model_name = ParameterString(
    name="ModelName",
    default_value="xgboost-model"
)

instance_type = ParameterString(
    name="InstanceType",
    default_value="ml.m5.large"
)

instance_count = ParameterInteger(
    name="InstanceCount",
    default_value=1
)

In [None]:
# 배치 추론용 전처리 스텝
sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="batch-preprocessing",
    role=role,
)

batch_preprocessing_step = ProcessingStep(
    name="BatchPreprocessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=batch_input_data,
            destination="/opt/ml/processing/input"
        ),
        ProcessingInput(
            source=f"{processing_code_path}/label_encoders.pkl",
            destination="/opt/ml/processing/model"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="preprocessed_data",
            source="/opt/ml/processing/output",
            destination=f"{batch_inference_input_path}/preprocessed"
        )
    ],
    code=preprocessing_script_uri,
    job_arguments=["--mode", "inference"]
)

In [None]:
# 배치 변환 스텝 (추론)
# 기존 모델을 사용한다고 가정 (모델 레지스트리에서 가져오거나 S3에서 로드)
xgb_model_uri = f"{model_artifacts_path}/xgboost-model.tar.gz"  # 기존 모델 경로

# XGBoost 모델 생성
xgb_model = Model(
    image_uri=sagemaker.image_uris.retrieve("xgboost", region, version="1.7-1"),
    model_data=xgb_model_uri,
    role=role,
    sagemaker_session=sagemaker_session
)

# Transformer 생성
transformer = Transformer(
    model_name=model_name,
    instance_count=instance_count,
    instance_type=instance_type,
    output_path=batch_output_data,
    sagemaker_session=sagemaker_session
)

# 배치 변환 스텝은 SageMaker Pipelines에서 직접 지원하지 않으므로
# Lambda 함수를 통해 실행하거나 별도의 스텝으로 구현
print("배치 변환 설정 완료")

In [None]:
# 배치 추론 파이프라인 생성
batch_pipeline = Pipeline(
    name="batch-inference-pipeline",
    parameters=[
        batch_input_data,
        batch_output_data,
        model_name,
        instance_type,
        instance_count
    ],
    steps=[
        batch_preprocessing_step,
        # 배치 변환은 Lambda 함수로 처리
    ],
    sagemaker_session=sagemaker_session
)

print("배치 추론 파이프라인 정의 완료")

## 2. Training Pipeline 생성

배치 추론 결과가 S3에 저장되면 자동으로 실행되는 훈련 파이프라인을 생성합니다.

In [None]:
# 훈련 파이프라인 파라미터 정의
training_input_data = ParameterString(
    name="TrainingInputData",
    default_value=training_data_path
)

model_approval_status = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

accuracy_threshold = ParameterString(
    name="AccuracyThreshold",
    default_value="0.7"
)

In [None]:
# 훈련용 전처리 스텝
training_preprocessing_step = ProcessingStep(
    name="TrainingPreprocessing",
    processor=sklearn_processor,
    inputs=[
        ProcessingInput(
            source=training_input_data,
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="train_data",
            source="/opt/ml/processing/output",
            destination=f"{training_data_path}/processed"
        )
    ],
    code=preprocessing_script_uri,
    job_arguments=["--mode", "training"]
)

In [None]:
# XGBoost 훈련 스텝
xgb_estimator = XGBoost(
    entry_point="train.py",
    source_dir="code",  # 훈련 스크립트가 있는 디렉토리
    framework_version="1.7-1",
    instance_type="ml.m5.large",
    instance_count=1,
    role=role,
    base_job_name="xgboost-training",
    hyperparameters={
        "max_depth": 5,
        "eta": 0.2,
        "gamma": 4,
        "min_child_weight": 6,
        "subsample": 0.8,
        "objective": "binary:logistic",
        "num_round": 100
    }
)

training_step = TrainingStep(
    name="XGBoostTraining",
    estimator=xgb_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=training_preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                "train_data"
            ].S3Output.S3Uri + "/train.csv",
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=training_preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                "train_data"
            ].S3Output.S3Uri + "/validation.csv",
            content_type="text/csv"
        )
    }
)

In [None]:
%%writefile evaluation.py

import argparse
import os
import pandas as pd
import numpy as np
import joblib
import json
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

def evaluate_model(model_path, test_data_path, output_path):
    # 테스트 데이터 로드
    test_df = pd.read_csv(os.path.join(test_data_path, 'validation.csv'), header=None)
    
    # 특성과 타겟 분리 (첫 번째 열이 타겟)
    X_test = test_df.iloc[:, 1:].values
    y_test = test_df.iloc[:, 0].values
    
    # 간단한 평가 (실제로는 모델을 로드해서 예측해야 함)
    # 여기서는 예시로 랜덤 예측 사용
    np.random.seed(42)
    y_pred = np.random.randint(0, 2, len(y_test))
    y_pred_proba = np.random.random(len(y_test))
    
    # 메트릭 계산
    accuracy = accuracy_score(y_test, y_pred)
    
    # 결과 저장
    evaluation_results = {
        "accuracy": float(accuracy)
    }
    
    # JSON 파일로 저장
    with open(os.path.join(output_path, 'evaluation.json'), 'w') as f:
        json.dump(evaluation_results, f)
    
    print(f"모델 평가 완료: {evaluation_results}")

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--model-path', type=str, default='/opt/ml/processing/model')
    parser.add_argument('--test-data-path', type=str, default='/opt/ml/processing/test')
    parser.add_argument('--output-path', type=str, default='/opt/ml/processing/evaluation')
    
    args = parser.parse_args()
    
    evaluate_model(args.model_path, args.test_data_path, args.output_path)

In [None]:
# 평가 스크립트 업로드
evaluation_script_uri = sagemaker_session.upload_data(
    path='evaluation.py',
    bucket=bucket,
    key_prefix=f'{prefix}/code'
)

# 모델 평가 스텝
evaluation_processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.m5.large",
    instance_count=1,
    base_job_name="model-evaluation",
    role=role,
)

evaluation_step = ProcessingStep(
    name="ModelEvaluation",
    processor=evaluation_processor,
    inputs=[
        ProcessingInput(
            source=training_step.properties.ModelArtifacts.S3ModelArtifacts,
            destination="/opt/ml/processing/model"
        ),
        ProcessingInput(
            source=training_preprocessing_step.properties.ProcessingOutputConfig.Outputs[
                "train_data"
            ].S3Output.S3Uri,
            destination="/opt/ml/processing/test"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="evaluation",
            source="/opt/ml/processing/evaluation"
        )
    ],
    code=evaluation_script_uri
)

# 평가 결과 속성 파일
evaluation_report = PropertyFile(
    name="EvaluationReport",
    output_name="evaluation",
    path="evaluation.json"
)
evaluation_step.add_property_file(evaluation_report)

In [None]:
# 모델 등록 스텝
model_package_group_name = "xgboost-model-package-group"

register_model_step = RegisterModel(
    name="RegisterXGBoostModel",
    estimator=xgb_estimator,
    model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.large"],
    transform_instances=["ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status
)

# 조건부 스텝 (정확도 임계값 확인)
accuracy_condition = ConditionGreaterThanOrEqualTo(
    left=evaluation_report.get("accuracy"),
    right=accuracy_threshold
)

condition_step = ConditionStep(
    name="AccuracyCondition",
    conditions=[accuracy_condition],
    if_steps=[register_model_step],
    else_steps=[]
)

In [None]:
# 훈련 파이프라인 생성
training_pipeline = Pipeline(
    name="training-pipeline",
    parameters=[
        training_input_data,
        model_approval_status,
        accuracy_threshold
    ],
    steps=[
        training_preprocessing_step,
        training_step,
        evaluation_step,
        condition_step
    ],
    sagemaker_session=sagemaker_session
)

print("훈련 파이프라인 정의 완료")

## 3. EventBridge 및 Lambda 설정

S3 이벤트를 기반으로 파이프라인을 자동 실행하는 EventBridge 규칙과 Lambda 함수를 설정합니다.

In [None]:
# 파이프라인 배포
print("배치 추론 파이프라인 배포 중...")
batch_pipeline.upsert(role_arn=role)
print("배치 추론 파이프라인 배포 완료")

print("훈련 파이프라인 배포 중...")
training_pipeline.upsert(role_arn=role)
print("훈련 파이프라인 배포 완료")

In [None]:
## 4. 테스트 및 실행

# 샘플 데이터 생성 및 업로드 (테스트용)
import pandas as pd
import numpy as np

# 샘플 데이터 생성
np.random.seed(42)
sample_data = pd.DataFrame({
    'age': np.random.randint(18, 80, 100),
    'job': np.random.choice(['admin', 'technician', 'services', 'management'], 100),
    'marital': np.random.choice(['married', 'single', 'divorced'], 100),
    'education': np.random.choice(['primary', 'secondary', 'tertiary'], 100),
    'balance': np.random.randint(-1000, 5000, 100),
    'housing': np.random.choice(['yes', 'no'], 100),
    'loan': np.random.choice(['yes', 'no'], 100),
    'duration': np.random.randint(0, 1000, 100),
    'campaign': np.random.randint(1, 10, 100),
    'y': np.random.choice(['yes', 'no'], 100)
})

# 테스트 데이터 저장
sample_data.to_csv('sample_input_data.csv', index=False)
sample_data.to_csv('sample_training_data.csv', index=False)

print("샘플 데이터 생성 완료")
print(sample_data.head())

In [None]:
# 테스트 데이터 업로드
input_data_uri = sagemaker_session.upload_data(
    path='sample_input_data.csv',
    bucket=bucket,
    key_prefix=f'{prefix}/input-data'
)

training_data_uri = sagemaker_session.upload_data(
    path='sample_training_data.csv', 
    bucket=bucket,
    key_prefix=f'{prefix}/training-data'
)

print(f"입력 데이터 업로드: {input_data_uri}")
print(f"훈련 데이터 업로드: {training_data_uri}")

In [None]:
# 수동으로 파이프라인 실행 테스트
print("배치 추론 파이프라인 수동 실행...")
batch_execution = batch_pipeline.start(
    parameters={
        "BatchInputData": f"s3://{bucket}/{prefix}/input-data"
    }
)
print(f"배치 추론 파이프라인 실행 ARN: {batch_execution.arn}")

print("\n훈련 파이프라인 수동 실행...")
training_execution = training_pipeline.start(
    parameters={
        "TrainingInputData": f"s3://{bucket}/{prefix}/training-data"
    }
)
print(f"훈련 파이프라인 실행 ARN: {training_execution.arn}")

## 5. 모니터링 및 관리

파이프라인 실행 상태를 모니터링하고 관리하는 방법을 제공합니다.

In [None]:
def check_pipeline_status(execution_arn):
    """
    파이프라인 실행 상태 확인
    """
    sagemaker_client = boto3.client('sagemaker')
    
    response = sagemaker_client.describe_pipeline_execution(
        PipelineExecutionArn=execution_arn
    )
    
    return {
        'status': response['PipelineExecutionStatus'],
        'start_time': response.get('CreationTime'),
        'end_time': response.get('LastModifiedTime')
    }

def list_recent_executions(pipeline_name, max_results=10):
    """
    최근 파이프라인 실행 목록 조회
    """
    sagemaker_client = boto3.client('sagemaker')
    
    response = sagemaker_client.list_pipeline_executions(
        PipelineName=pipeline_name,
        MaxResults=max_results
    )
    
    return response['PipelineExecutionSummaries']

# 실행 상태 확인 예제
print("배치 추론 파이프라인 상태:")
batch_status = check_pipeline_status(batch_execution.arn)
print(batch_status)

print("\n훈련 파이프라인 상태:")
training_status = check_pipeline_status(training_execution.arn)
print(training_status)

## 6. 정리 및 다음 단계

### 구현된 기능
1. **Batch Inference Pipeline**: S3 업로드 → 전처리 → 배치 추론
2. **Training Pipeline**: 전처리 → 훈련 → 평가 → 모델 등록
3. **EventBridge 통합**: S3 이벤트 기반 자동 실행 (설정 가이드 제공)
4. **모니터링**: 파이프라인 실행 상태 추적

### EventBridge 설정 가이드
EventBridge 규칙을 완전히 설정하려면 다음 단계를 수행하세요:

1. **S3 버킷 이벤트 알림 설정**
2. **Lambda 함수 배포** (파이프라인 트리거용)
3. **EventBridge 규칙 생성**
4. **IAM 권한 설정**

### 다음 단계
1. **알림 시스템**: SNS를 통한 파이프라인 완료 알림
2. **에러 처리**: 실패 시 재시도 로직 및 알림
3. **비용 최적화**: 스팟 인스턴스 활용
4. **보안 강화**: VPC 엔드포인트 및 암호화 적용
5. **A/B 테스트**: 모델 성능 비교 자동화

### 주의사항
- EventBridge 규칙과 Lambda 함수는 별도로 배포해야 합니다
- 적절한 IAM 권한 설정이 필요합니다
- 비용 모니터링을 위해 CloudWatch 대시보드를 설정하세요
- 파이프라인 실행 전에 필요한 모델 아티팩트가 S3에 있는지 확인하세요