# LightGBM Binary Classification + SageMaker Model Monitor v2

このノートブックでは、LightGBMを使った2値分類モデルをSageMakerにデプロイし、Model Monitorで監視する完全なワークフローを実装します。

## 主な改良点
- より堅牢なエラーハンドリング
- 詳細なログ出力
- 設定可能なパラメータ
- 自動クリーンアップ機能
- 包括的なテスト機能

## 1. 環境設定とライブラリインポート

In [None]:
# 必要なライブラリのインストール
!pip install lightgbm>=3.3.0 -q
!pip install sagemaker>=2.100.0 -q

In [None]:
import sagemaker
import boto3
import pandas as pd
import numpy as np
import lightgbm as lgb
import json
import os
import time
import logging
import tarfile
import pickle
import warnings
from datetime import datetime, timedelta
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score

# SageMaker関連
from sagemaker.session import Session
from sagemaker.model import Model
from sagemaker.predictor import Predictor
from sagemaker.serializers import JSONSerializer
from sagemaker.deserializers import JSONDeserializer
from sagemaker.image_uris import retrieve
from sagemaker.model_monitor import (
    DataCaptureConfig,
    DefaultModelMonitor,
    DatasetFormat,
    EndpointInput,
    CronExpressionGenerator
)

# 警告を抑制
warnings.filterwarnings('ignore')

# ログ設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

print("✅ All libraries imported successfully")

## 2. 初期設定

In [None]:
# SageMaker設定
session = sagemaker.Session()
role = sagemaker.get_execution_role()
region = session.boto_region_name
bucket = session.default_bucket()
s3_client = boto3.client('s3')
sm_client = boto3.client('sagemaker')

print(f"Region: {region}")
print(f"Bucket: {bucket}")
print(f"Role: {role}")

# 設定パラメータ
config = {
    # データ生成設定
    'n_samples': 3000,
    'n_features': 25,
    'n_informative': 20,
    'n_redundant': 5,
    'n_clusters_per_class': 2,
    'test_size': 0.2,
    'val_size': 0.1,
    'random_state': 42,
    
    # LightGBMパラメータ
    'lgb_params': {
        'objective': 'binary',
        'metric': 'binary_logloss',
        'boosting_type': 'gbdt',
        'num_leaves': 31,
        'learning_rate': 0.05,
        'feature_fraction': 0.9,
        'bagging_fraction': 0.8,
        'bagging_freq': 5,
        'verbose': -1,
        'random_state': 42
    },
    'num_boost_round': 100,
    'early_stopping_rounds': 10,
    
    # デプロイ設定
    'model_name': f'lightgbm-binary-v2-{int(time.time())}',
    'endpoint_name': f'lightgbm-binary-endpoint-v2-{int(time.time())}',
    'instance_type': 'ml.m5.large',
    'initial_instance_count': 1,
    
    # Data Capture設定
    'enable_data_capture': True,
    'sampling_percentage': 100,
    'capture_modes': ['Input', 'Output'],
    
    # Model Monitor設定
    'monitor_schedule_name': f'lightgbm-monitor-v2-{int(time.time())}',
    'monitoring_instance_type': 'ml.m5.xlarge',
    'monitoring_cron': 'cron(0/10 * ? * * *)',  # 10分毎
    'max_runtime_seconds': 3600,
    
    # S3設定
    's3_prefix': 'lightgbm-binary-classification-v2',
    'model_artifacts_path': 'model',
    'baseline_path': 'baseline',
    'monitoring_reports_path': 'monitoring-reports',
    'data_capture_path': 'datacapture'
}

# S3パス設定
s3_model_path = f's3://{bucket}/{config["s3_prefix"]}/{config["model_artifacts_path"]}'
s3_baseline_path = f's3://{bucket}/{config["s3_prefix"]}/{config["baseline_path"]}'
s3_monitoring_path = f's3://{bucket}/{config["s3_prefix"]}/{config["monitoring_reports_path"]}'
s3_data_capture_path = f's3://{bucket}/{config["s3_prefix"]}/{config["data_capture_path"]}'

print(f"\n📁 S3 Paths:")
print(f"Model: {s3_model_path}")
print(f"Baseline: {s3_baseline_path}")
print(f"Monitoring: {s3_monitoring_path}")
print(f"Data Capture: {s3_data_capture_path}")

print(f"\n🚀 Configuration:")
print(f"Endpoint: {config['endpoint_name']}")
print(f"Monitor: {config['monitor_schedule_name']}")

## 3. データ生成と前処理

In [None]:
def generate_data():
    """
    2値分類用のサンプルデータを生成
    """
    print("📊 Generating sample data for binary classification...")
    
    # データ生成
    X, y = make_classification(
        n_samples=config['n_samples'],
        n_features=config['n_features'],
        n_informative=config['n_informative'],
        n_redundant=config['n_redundant'],
        n_clusters_per_class=config['n_clusters_per_class'],
        random_state=config['random_state']
    )
    
    # データ分割
    X_temp, X_test, y_temp, y_test = train_test_split(
        X, y, test_size=config['test_size'], 
        random_state=config['random_state'], stratify=y
    )
    
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=config['val_size']/(1-config['test_size']), 
        random_state=config['random_state'], stratify=y_temp
    )
    
    print(f"✅ Data generated successfully:")
    print(f"  Train: {X_train.shape[0]} samples")
    print(f"  Validation: {X_val.shape[0]} samples")
    print(f"  Test: {X_test.shape[0]} samples")
    print(f"  Features: {X_train.shape[1]}")
    print(f"  Class distribution (train): {np.bincount(y_train)}")
    
    return X_train, X_val, X_test, y_train, y_val, y_test

# データ生成実行
X_train, X_val, X_test, y_train, y_val, y_test = generate_data()

## 4. LightGBMモデル訓練

In [None]:
def train_model():
    """
    LightGBMモデルを訓練
    """
    print("🤖 Training LightGBM model...")
    
    # LightGBMデータセット作成
    train_data = lgb.Dataset(X_train, label=y_train)
    val_data = lgb.Dataset(X_val, label=y_val, reference=train_data)
    
    # モデル訓練
    model = lgb.train(
        config['lgb_params'],
        train_data,
        num_boost_round=config['num_boost_round'],
        valid_sets=[train_data, val_data],
        valid_names=['train', 'val'],
        callbacks=[
            lgb.early_stopping(config['early_stopping_rounds']),
            lgb.log_evaluation(period=10)
        ]
    )
    
    print("✅ Model training completed successfully")
    return model

def evaluate_model(model):
    """
    モデルの性能を評価
    """
    # 予測
    y_pred_proba = model.predict(X_test, num_iteration=model.best_iteration)
    y_pred = (y_pred_proba > 0.5).astype(int)
    
    # メトリクス計算
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    auc = roc_auc_score(y_test, y_pred_proba)
    
    print("📈 Model Performance Metrics:")
    print(f"  Accuracy: {accuracy:.4f}")
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall: {recall:.4f}")
    print(f"  F1-Score: {f1:.4f}")
    print(f"  AUC: {auc:.4f}")
    
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'auc': auc
    }

# モデル訓練実行
model = train_model()

# モデル評価
metrics = evaluate_model(model)

## 5. モデル保存と推論スクリプト作成

In [None]:
def save_model(model):
    """
    モデルをS3に保存
    """
    print("💾 Saving model artifacts...")
    
    # ローカルディレクトリ作成
    os.makedirs('model', exist_ok=True)
    
    # モデルをpickle形式で保存（LightGBMのunordered_map::atエラー回避）
    with open('model/model.pkl', 'wb') as f:
        pickle.dump(model, f)
    
    # テキスト形式でも保存（デバッグ用）
    model.save_model('model/model.txt')
    
    # tar.gz形式でアーカイブ
    with tarfile.open('model.tar.gz', 'w:gz') as tar:
        tar.add('model', arcname='.')
    
    # S3にアップロード
    model_uri = f'{s3_model_path}/model.tar.gz'
    session.upload_data('model.tar.gz', bucket=bucket, 
                        key_prefix=f'{config["s3_prefix"]}/{config["model_artifacts_path"]}')
    
    print(f"✅ Model saved to: {model_uri}")
    return model_uri

def create_inference_script():
    """
    推論スクリプトを作成
    """
    print("📝 Creating inference script...")
    
    os.makedirs('source', exist_ok=True)
    
    inference_code = '''import json
import pickle
import numpy as np
import lightgbm as lgb
import logging

logger = logging.getLogger(__name__)

def model_fn(model_dir):
    """モデルをロード"""
    try:
        with open(f"{model_dir}/model.pkl", "rb") as f:
            model = pickle.load(f)
        logger.info("Model loaded successfully from pickle file")
        return model
    except Exception as e:
        logger.error(f"Error loading model: {str(e)}")
        raise

def input_fn(request_body, request_content_type):
    """入力データを処理"""
    if request_content_type == "application/json":
        try:
            input_data = json.loads(request_body)
            
            # 単一サンプルの場合
            if "instances" in input_data:
                data = np.array(input_data["instances"])
            elif isinstance(input_data, list):
                data = np.array(input_data)
            else:
                data = np.array([input_data])
                
            logger.info(f"Input data shape: {data.shape}")
            return data
        except Exception as e:
            logger.error(f"Error parsing input: {str(e)}")
            raise ValueError(f"Invalid input format: {str(e)}")
    else:
        raise ValueError(f"Unsupported content type: {request_content_type}")

def predict_fn(input_data, model):
    """予測を実行"""
    try:
        # 確率予測
        probabilities = model.predict(input_data, num_iteration=model.best_iteration)
        
        # 2値分類の場合、クラス1の確率を返す
        if len(probabilities.shape) == 1:
            predictions = probabilities
        else:
            predictions = probabilities[:, 1]
            
        # 閾値0.5でクラス予測
        classes = (predictions > 0.5).astype(int)
        
        logger.info(f"Predictions generated for {len(predictions)} samples")
        
        return {
            "predictions": classes.tolist(),
            "probabilities": predictions.tolist()
        }
    except Exception as e:
        logger.error(f"Error during prediction: {str(e)}")
        raise

def output_fn(prediction, content_type):
    """出力を処理"""
    if content_type == "application/json":
        return json.dumps(prediction)
    else:
        raise ValueError(f"Unsupported content type: {content_type}")
'''
    
    with open('source/inference.py', 'w') as f:
        f.write(inference_code)
    
    # requirements.txt作成
    requirements = '''lightgbm>=3.3.0
numpy>=1.21.0
scikit-learn>=1.0.0
'''
    
    with open('source/requirements.txt', 'w') as f:
        f.write(requirements)
    
    print("✅ Inference script created successfully")

# モデル保存と推論スクリプト作成
model_uri = save_model(model)
create_inference_script()