# SageMaker Pipeline: 데이터 추출부터 API 배포까지

이 노트북은 Redshift에서 데이터를 추출하여 S3에 저장하고, 모델을 학습한 후 엔드포인트로 배포하고, Lambda 함수와 API Gateway를 설정하는 전체 SageMaker 파이프라인을 정의합니다.

In [None]:
import boto3
import sagemaker
import json
from sagemaker.workflow.parameters import ParameterString
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.inputs import TrainingInput
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.callback_step import CallbackStep
from sagemaker.workflow.pipeline import Pipeline

In [None]:
# SageMaker 세션 및 역할 설정
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()

# 파이프라인 파라미터 정의
redshift_cluster_id = ParameterString(name="RedshiftClusterId", default_value="stoa-aiml-pipeline-workgroup")
redshift_database = ParameterString(name="RedshiftDatabase", default_value="dev")
redshift_user = ParameterString(name="RedshiftUser", default_value="admin")
s3_bucket = ParameterString(name="S3Bucket", default_value="stoa-aiml-pipeline-bucket")
s3_output_path = ParameterString(name="S3OutputPath", default_value="extracted_data")
model_name = ParameterString(name="ModelName", default_value="customer-purchase-model")
endpoint_name = ParameterString(name="EndpointName", default_value="customer-purchase-endpoint")

In [None]:
# 데이터 추출 스크립트 정의
data_extraction_script = """
import boto3
import pandas as pd

def extract_data_from_redshift(cluster_id, database, user, s3_output_path):
    redshift_data = boto3.client('redshift-data')
    
    sql = """
    UNLOAD ('SELECT * FROM customers')
    TO '{}'
    IAM_ROLE default
    FORMAT CSV
    HEADER
    PARALLEL OFF;
    """.format(s3_output_path)
    
    response = redshift_data.execute_statement(
        ClusterIdentifier=cluster_id,
        Database=database,
        DbUser=user,
        Sql=sql
    )
    
    query_id = response['Id']
    
    # 쿼리 완료 대기
    waiter = redshift_data.get_waiter('statement_finished')
    waiter.wait(Id=query_id)
    
    print("Data extraction completed.")

if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--cluster-id", type=str, required=True)
    parser.add_argument("--database", type=str, required=True)
    parser.add_argument("--user", type=str, required=True)
    parser.add_argument("--s3-output-path", type=str, required=True)
    args = parser.parse_args()
    
    extract_data_from_redshift(args.cluster_id, args.database, args.user, args.s3_output_path)
"""

In [None]:
# 데이터 추출 처리 단계 정의
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    base_job_name="redshift-data-extraction"
)

step_extract = ProcessingStep(
    name="ExtractDataFromRedshift",
    processor=sklearn_processor,
    inputs=[],
    outputs=[ProcessingOutput(output_name="extracted_data", source="/opt/ml/processing/output")],
    code=data_extraction_script,
    job_arguments=[
        "--cluster-id", redshift_cluster_id,
        "--database", redshift_database,
        "--user", redshift_user,
        "--s3-output-path", f"s3://{s3_bucket}/{s3_output_path}/"
    ]
)

In [None]:
# 모델 학습 스크립트 정의 (간단한 예시)
train_script = """
import argparse
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
import joblib

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-test-split-ratio", type=float, default=0.3)
    args, _ = parser.parse_known_args()
    
    # 데이터 로드
    df = pd.read_csv("/opt/ml/input/data/train/extracted_data.csv")
    
    # 특성과 타겟 분리
    X = df.drop(['customer_id', 'name', 'email', 'purchase_amount'], axis=1)
    y = df['purchase_amount']
    
    # 학습 및 테스트 데이터 분리
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=args.train_test_split_ratio, random_state=42)
    
    # 모델 학습
    model = RandomForestRegressor(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    
    # 모델 평가
    y_pred = model.predict(X_test)
    mse = mean_squared_error(y_test, y_pred)
    print(f"Mean Squared Error: {mse}")
    
    # 모델 저장
    joblib.dump(model, "/opt/ml/model/model.joblib")
"""

In [None]:
# 모델 학습 단계 정의
sklearn_estimator = SKLearn(
    entry_point="train.py",
    source_dir=".",
    role=role,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    framework_version="0.23-1",
    base_job_name="customer-purchase-training",
    hyperparameters={"train-test-split-ratio": 0.3}
)

step_train = TrainingStep(
    name="TrainCustomerPurchaseModel",
    estimator=sklearn_estimator,
    inputs={
        "train": TrainingInput(s3_data=step_extract.properties.ProcessingOutputConfig.Outputs["extracted_data"].S3Output.S3Uri, content_type="text/csv")
    }
)

In [None]:
# 모델 배포 단계 추가
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

# 모델 배포 또는 업데이트 함수
def create_or_update_model_and_endpoint(model_name, endpoint_name, image_uri, model_data, role, instance_type):
    sagemaker_client = boto3.client('sagemaker')
    
    # 모델 생성 또는 업데이트
    try:
        sagemaker_client.describe_model(ModelName=model_name)
        print(f"Updating existing model: {model_name}")
        sagemaker_client.update_model(
            ModelName=model_name,
            PrimaryContainer={
                'Image': image_uri,
                'ModelDataUrl': model_data,
            },
            ExecutionRoleArn=role
        )
    except sagemaker_client.exceptions.ResourceNotFound:
        print(f"Creating new model: {model_name}")
        sagemaker_client.create_model(
            ModelName=model_name,
            PrimaryContainer={
                'Image': image_uri,
                'ModelDataUrl': model_data,
            },
            ExecutionRoleArn=role
        )
    
    # 엔드포인트 설정 생성 또는 업데이트
    try:
        sagemaker_client.describe_endpoint_config(EndpointConfigName=endpoint_name)
        print(f"Updating existing endpoint configuration: {endpoint_name}")
        sagemaker_client.update_endpoint_config(
            EndpointConfigName=endpoint_name,
            ProductionVariants=[{
                'InstanceType': instance_type,
                'InitialInstanceCount': 1,
                'ModelName': model_name,
                'VariantName': 'AllTraffic'
            }]
        )
    except sagemaker_client.exceptions.ResourceNotFound:
        print(f"Creating new endpoint configuration: {endpoint_name}")
        sagemaker_client.create_endpoint_config(
            EndpointConfigName=endpoint_name,
            ProductionVariants=[{
                'InstanceType': instance_type,
                'InitialInstanceCount': 1,
                'ModelName': model_name,
                'VariantName': 'AllTraffic'
            }]
        )
    
    # 엔드포인트 생성 또는 업데이트
    try:
        sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
        print(f"Updating existing endpoint: {endpoint_name}")
        sagemaker_client.update_endpoint(
            EndpointName=endpoint_name,
            EndpointConfigName=endpoint_name
        )
    except sagemaker_client.exceptions.ResourceNotFound:
        print(f"Creating new endpoint: {endpoint_name}")
        sagemaker_client.create_endpoint(
            EndpointName=endpoint_name,
            EndpointConfigName=endpoint_name
        )
    
    # 엔드포인트 생성 완료 대기
    waiter = sagemaker_client.get_waiter('endpoint_in_service')
    waiter.wait(EndpointName=endpoint_name)
    
    return {
        'model_name': model_name,
        'endpoint_name': endpoint_name
    }

# 모델 배포 단계 추가
step_create_or_update_model = LambdaStep(
    name="CreateOrUpdateModelAndEndpoint",
    lambda_func=Lambda(
        function_name="CreateOrUpdateModelAndEndpointFunction",
        execution_role_arn=role,
        handler="lambda_function.lambda_handler",
        script=f"""
import boto3

def lambda_handler(event, context):
    model_name = '{model_name}'
    endpoint_name = '{endpoint_name}'
    image_uri = '{sklearn_estimator.image_uri}'
    model_data = '{step_train.properties.ModelArtifacts.S3ModelArtifacts}'
    role = '{role}'
    instance_type = 'ml.m5.xlarge'
    
    return {create_or_update_model_and_endpoint.__name__}(
        model_name, endpoint_name, image_uri, model_data, role, instance_type
    )
""".format(
    model_name=model_name,
    endpoint_name=endpoint_name,
    sklearn_estimator=sklearn_estimator,
    step_train=step_train,
    role=role,
    create_or_update_model_and_endpoint=create_or_update_model_and_endpoint.__name__
    )
    )
)


In [None]:
# Lambda 함수 생성 또는 업데이트 단계
lambda_function_code = """
import json
import boto3

def lambda_handler(event, context):
    runtime = boto3.client('runtime.sagemaker')
    
    data = json.loads(event['body'])
    payload = json.dumps(data)
    
    response = runtime.invoke_endpoint(
        EndpointName='{endpoint_name}',
        ContentType='application/json',
        Body=payload
    )
    
    result = json.loads(response['Body'].read().decode())
    
    return {{
        'statusCode': 200,
        'body': json.dumps(result)
    }}
""".format(endpoint_name=endpoint_name)

def create_or_update_lambda(function_name, role_arn, code):
    lambda_client = boto3.client('lambda')
    
    try:
        # 기존 함수 업데이트 시도
        lambda_client.update_function_code(
            FunctionName=function_name,
            ZipFile=code.encode()
        )
        print(f"Lambda function {function_name} updated.")
    except lambda_client.exceptions.ResourceNotFoundException:
        # 함수가 없으면 새로 생성
        lambda_client.create_function(
            FunctionName=function_name,
            Runtime='python3.8',
            Role=role_arn,
            Handler='lambda_function.lambda_handler',
            Code={'ZipFile': code.encode()},
        )
        print(f"Lambda function {function_name} created.")
    
    return function_name

step_create_or_update_lambda = LambdaStep(
    name="CreateOrUpdateLambdaFunction",
    lambda_func=Lambda(
        function_name="CustomerPurchasePredictor",
        execution_role_arn=role,
        script=f"""
import boto3

def lambda_handler(event, context):
    function_name = 'CustomerPurchasePredictor'
    role_arn = '{role}'
    code = '''{lambda_function_code}'''
    
    return {create_or_update_lambda.__name__}(function_name, role_arn, code)
""".format(role=role, lambda_function_code=lambda_function_code, create_or_update_lambda=create_or_update_lambda.__name__)
    )
)

In [None]:
# API Gateway 생성 또는 업데이트 단계
def create_or_update_api_gateway(api_name, lambda_function_name):
    client = boto3.client('apigateway')
    lambda_client = boto3.client('lambda')

    # 기존 API 검색 또는 새로 생성
    apis = client.get_rest_apis()
    existing_api = next((api for api in apis['items'] if api['name'] == api_name), None)
    
    if existing_api:
        api_id = existing_api['id']
        print(f"Updating existing API: {api_id}")
    else:
        api = client.create_rest_api(name=api_name)
        api_id = api['id']
        print(f"Created new API: {api_id}")

    # 리소스 생성 또는 검색
    resources = client.get_resources(restApiId=api_id)
    root_id = next(resource for resource in resources['items'] if resource['path'] == '/')['id']
    predict_resource = next((resource for resource in resources['items'] if resource['path'] == '/predict'), None)
    
    if not predict_resource:
        predict_resource = client.create_resource(restApiId=api_id, parentId=root_id, pathPart='predict')
    
    # POST 메소드 설정
    try:
        client.put_method(
            restApiId=api_id,
            resourceId=predict_resource['id'],
            httpMethod='POST',
            authorizationType='NONE'
        )
    except client.exceptions.ConflictException:
        print("POST method already exists")

    # Lambda 함수와 통합
    lambda_function = lambda_client.get_function(FunctionName=lambda_function_name)
    
    client.put_integration(
        restApiId=api_id,
        resourceId=predict_resource['id'],
        httpMethod='POST',
        type='AWS_PROXY',
        integrationHttpMethod='POST',
        uri=f"arn:aws:apigateway:{boto3.Session().region_name}:lambda:path/2015-03-31/functions/{lambda_function['Configuration']['FunctionArn']}/invocations"
    )

    # API 배포
    deployment = client.create_deployment(
        restApiId=api_id,
        stageName='prod'
    )

    return {
        'api_id': api_id,
        'endpoint_url': f"https://{api_id}.execute-api.{boto3.Session().region_name}.amazonaws.com/prod/predict"
    }

step_create_or_update_api = CallbackStep(
    name="CreateOrUpdateAPIGateway",
    sagemaker_session=sagemaker_session,
    lambda_function=Lambda(
        function_name="CreateOrUpdateAPIGatewayFunction",
        execution_role_arn=role,
        handler="lambda_function.lambda_handler",
        script=f"""import boto3

def lambda_handler(event, context):
    api_name = 'CustomerPurchaseAPI'
    lambda_function_name = 'CustomerPurchasePredictor'
    
    result = {create_or_update_api_gateway.__name__}(api_name, lambda_function_name)
    
    return result
""".format(create_or_update_api_gateway=create_or_update_api_gateway.__name__)
    )
)

In [None]:
# 파이프라인 정의
pipeline = Pipeline(
    name="CustomerPurchasePipeline",
    parameters=[redshift_cluster_id, redshift_database, redshift_user, s3_bucket, s3_output_path, model_name, endpoint_name],
    steps=[step_extract, step_train, step_create_or_update_model, step_create_or_update_lambda, step_create_or_update_api]
)

In [None]:
# 파이프라인 생성 (또는 업데이트)
pipeline.upsert(role_arn=role)
print("Pipeline created/updated successfully.")

In [None]:
# 파이프라인 실행
execution = pipeline.start()
print(f"Pipeline execution started. Execution ARN: {execution.arn}")