# モジュールインポート

In [1]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.session import Session
from sagemaker.workflow.parameters import (
    ParameterString,
    ParameterInteger,
)
from sagemaker.workflow.steps import (
    ProcessingStep,
    TrainingStep,
)
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.sklearn.model import SKLearnModel
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.functions import JsonGet
from sagemaker.workflow.model_step import ModelStep
from sagemaker.model import Model
from sagemaker.serverless import ServerlessInferenceConfig
from sagemaker.workflow.properties import PropertyFile
import os
import pandas as pd
import numpy as np

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


# 各種設定

## セッションの設定

In [2]:
# セッションとロールの設定
sagemaker_session = sagemaker.session.Session()
role = get_execution_role()
region = sagemaker_session.boto_region_name
default_bucket = sagemaker_session.default_bucket()

print(sagemaker_session)
print(role)
print(region)
print(default_bucket)

<sagemaker.session.Session object at 0x7eff5c367820>
arn:aws:iam::706711397653:role/service-role/AmazonSageMaker-ExecutionRole-20240825T162290
ap-northeast-1
sagemaker-ap-northeast-1-706711397653


## パラメータの設定
- パイプラインで使用するパラメータを定義。

In [3]:
### パラメータの定義
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.xlarge")
model_approval_status = ParameterString(name="ModelApprovalStatus", default_value="Approved")

### S3 key
input_data_uri = f's3://{default_bucket}/lightgbm-pipeline/input/data.csv'
output_data_uri = f's3://{default_bucket}/lightgbm-pipeline/output'

# サンプルデータの作成
- サンプルデータを作成し、S3にアップロードする

In [4]:
from sklearn.datasets import make_classification

X, y = make_classification(
    n_samples=1000, n_features=20, n_informative=15, n_redundant=5, n_classes=2, random_state=42
)
df = pd.DataFrame(X)
df['target'] = y

# データをS3にアップロード
os.makedirs('data', exist_ok=True)
df.to_csv('data/data.csv', index=False)
sagemaker_session.upload_data(path='data/data.csv', 
                              bucket=default_bucket, 
                              key_prefix='lightgbm-pipeline/input')

's3://sagemaker-ap-northeast-1-706711397653/lightgbm-pipeline/input/data.csv'

# パイプライン定義

## 前処理STEP

### スクリプト定義
- データを訓練データと検証データに分割する前処理スクリプトを作成します。

In [30]:
# 前処理スクリプトの作成
processing_script = """
import argparse
import os
import pandas as pd
import boto3
import logging
from sklearn.model_selection import train_test_split
from io import BytesIO
from io import StringIO

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

if __name__ == "__main__":
    logger.info("前処理を開始します。")
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-bucket", type=str)
    parser.add_argument("--input-key", type=str)
    parser.add_argument("--output-train-bucket", type=str)
    parser.add_argument("--output-train-key", type=str)
    parser.add_argument("--output-validation-bucket", type=str)
    parser.add_argument("--output-validation-key", type=str)
    args = parser.parse_args()

    logger.info("S3からデータを読み込んでいます。")
    s3 = boto3.client('s3')
    response = s3.get_object(Bucket=args.input_bucket, Key=args.input_key)
    df = pd.read_csv(BytesIO(response['Body'].read()))

    logger.info(f"入力データの形状: {df.shape}")

    logger.info("データを訓練セットと検証セットに分割します。")
    train_df, val_df = train_test_split(df, test_size=0.2, random_state=42)

    logger.info("訓練データをS3に保存します。")
    csv_buffer = StringIO()
    train_df.to_csv(csv_buffer, index=False)
    s3.put_object(Bucket=args.output_train_bucket, Key=args.output_train_key, Body=csv_buffer.getvalue().encode('utf-8'))

    logger.info("検証データをS3に保存します。")
    csv_buffer = StringIO()
    val_df.to_csv(csv_buffer, index=False)
    s3.put_object(Bucket=args.output_validation_bucket, Key=args.output_validation_key, Body=csv_buffer.getvalue().encode('utf-8'))

    logger.info("前処理が完了しました。")

"""

with open('preprocessing.py', 'w') as f:
    f.write(processing_script)

### 前処理stepの定義
- 前処理ステップをパイプラインに組み込みます。

In [31]:
# 前処理ステップの定義
script_processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve(framework='sklearn',
                                            region=region,
                                            version='0.23-1'),
    command=['python3'],
    instance_type=processing_instance_type,
    instance_count=1,
    base_job_name='lightgbm-preprocessing',
    role=role,
    sagemaker_session=sagemaker_session
)

processing_step = ProcessingStep(
    name='Preprocessing',
    processor=script_processor,
    inputs=[],  # S3から直接読み込むため、inputsは不要
    outputs=[],  # S3に直接保存するため、outputsは不要
    code='preprocessing.py',
    job_arguments=[
        "--input-bucket", default_bucket,
        "--input-key", "lightgbm-pipeline/input/data.csv",
        "--output-train-bucket", default_bucket,
        "--output-train-key", "lightgbm-pipeline/output/train/train.csv",
        "--output-validation-bucket", default_bucket,
        "--output-validation-key", "lightgbm-pipeline/output/validation/validation.csv"
    ]
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.


## 訓練STEP

### スクリプト定義

In [32]:
# 学習スクリプトの作成（S3から読み込み、S3に保存）
training_script = """
import argparse
import os
import subprocess
import sys

# 必要なパッケージをインストール
subprocess.check_call([sys.executable, "-m", "pip", "install", "lightgbm", "optuna", "boto3"])

import pandas as pd
import lightgbm as lgb
import optuna
import boto3
from sklearn.metrics import roc_auc_score
from io import BytesIO

def objective(trial, X_train, y_train, X_val, y_val):
    params = {
        'objective': 'binary',
        'metric': 'auc',
        'verbosity': -1,
        'boosting_type': 'gbdt',
        'num_leaves': trial.suggest_int('num_leaves', 20, 150),
        'max_depth': trial.suggest_int('max_depth', 3, 12),
        'learning_rate': trial.suggest_loguniform('learning_rate', 1e-4, 1e-1),
        'feature_fraction': trial.suggest_uniform('feature_fraction', 0.6, 1.0),
        'bagging_fraction': trial.suggest_uniform('bagging_fraction', 0.6, 1.0),
        'bagging_freq': trial.suggest_int('bagging_freq', 1, 7),
        'reg_alpha': trial.suggest_loguniform('reg_alpha', 1e-4, 10.0),
        'reg_lambda': trial.suggest_loguniform('reg_lambda', 1e-4, 10.0),
    }

    train_dataset = lgb.Dataset(X_train, label=y_train)
    val_dataset = lgb.Dataset(X_val, label=y_val)

    model = lgb.train(params, train_dataset, valid_sets=[val_dataset], early_stopping_rounds=10, verbose_eval=False)
    y_pred = model.predict(X_val)
    auc = roc_auc_score(y_val, y_pred)
    return auc

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--train-bucket", type=str)
    parser.add_argument("--train-key", type=str)
    parser.add_argument("--validation-bucket", type=str)
    parser.add_argument("--validation-key", type=str)
    parser.add_argument("--model-bucket", type=str)
    parser.add_argument("--model-key", type=str)
    parser.add_argument("--n-trials", type=int, default=20)
    args = parser.parse_args()

    s3 = boto3.client('s3')

    print("訓練データをS3から読み込んでいます。")
    response = s3.get_object(Bucket=args.train_bucket, Key=args.train_key)
    train_df = pd.read_csv(BytesIO(response['Body'].read()))

    print("検証データをS3から読み込んでいます。")
    response = s3.get_object(Bucket=args.validation_bucket, Key=args.validation_key)
    val_df = pd.read_csv(BytesIO(response['Body'].read()))

    y_train = train_df.pop('target')
    X_train = train_df
    y_val = val_df.pop('target')
    X_val = val_df

    # Optunaによるハイパーパラメータ最適化
    study = optuna.create_study(direction='maximize')
    study.optimize(lambda trial: objective(trial, X_train, y_train, X_val, y_val), n_trials=args.n_trials)

    print('Best trial:')
    trial = study.best_trial
    print('  AUC: {}'.format(trial.value))
    print('  Params: ')
    for key, value in trial.params.items():
        print('    {}: {}'.format(key, value))

    # 最適なハイパーパラメータでモデルを再学習
    best_params = trial.params
    best_params['objective'] = 'binary'
    best_params['metric'] = 'auc'
    best_params['verbosity'] = -1
    best_params['boosting_type'] = 'gbdt'

    train_dataset = lgb.Dataset(X_train, label=y_train)
    val_dataset = lgb.Dataset(X_val, label=y_val)

    model = lgb.train(best_params, train_dataset, valid_sets=[val_dataset], early_stopping_rounds=10)

    # モデルをS3に保存
    import tempfile
    with tempfile.TemporaryDirectory() as tmpdir:
        model_path = os.path.join(tmpdir, "model.txt")
        model.save_model(model_path)
        with open(model_path, 'rb') as f:
            s3.put_object(Bucket=args.model_bucket, Key=args.model_key, Body=f)
"""

with open('train.py', 'w') as f:
    f.write(training_script)

### 訓練stepの定義

In [39]:
# Estimatorの定義
sklearn_estimator = SKLearn(
    entry_point='train.py',
    role=role,
    instance_type=training_instance_type,
    instance_count=1,
    framework_version='0.23-1',
    base_job_name='lightgbm-training',
    sagemaker_session=sagemaker_session,
    hyperparameters={
        'n-trials': 50,
        'train-bucket': default_bucket,
        'train-key': 'lightgbm-pipeline/output/train/train.csv',
        'validation-bucket': default_bucket,
        'validation-key': 'lightgbm-pipeline/output/validation/validation.csv',
        'model-bucket': default_bucket,
        'model-key': 'lightgbm-pipeline/output/model/model.txt'
    },
)

# TrainingStepの定義
training_step = TrainingStep(
    name='Training',
    estimator=sklearn_estimator,
    inputs={
        'train': sagemaker.inputs.TrainingInput(
            s3_data=f's3://{default_bucket}/lightgbm-pipeline/output/train/train.csv',
            content_type='text/csv'
        ),
    },
)



## パイプライン定義

### パイプラインの作成

In [40]:
pipeline = Pipeline(
    name='LightGBM-Pipeline-Optuna',
    parameters=[
        processing_instance_type,
        training_instance_type,
        model_approval_status,
    ],
    steps=[processing_step,
          training_step,
          ],
    sagemaker_session=sagemaker_session,
)

### 実行

In [41]:
# パイプラインの作成と実行
pipeline.upsert(role_arn=role)
execution = pipeline.start()



In [43]:
execution.list_steps()

[{'StepName': 'Preprocessing',
  'StartTime': datetime.datetime(2024, 9, 21, 6, 56, 43, 408000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-1:706711397653:processing-job/pipelines-heuntu5mx8dg-Preprocessing-7jFxMsrkZz'}},
  'AttemptCount': 1},
 {'StepName': 'Training',
  'StartTime': datetime.datetime(2024, 9, 21, 6, 56, 43, 408000, tzinfo=tzlocal()),
  'StepStatus': 'Executing',
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:ap-northeast-1:706711397653:training-job/pipelines-heuntu5mx8dg-Training-f05wpx93HZ'}},
  'AttemptCount': 1}]