In [None]:
# 必要なライブラリのインポート
import sys
import os
sys.path.append('../../')

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

# 自作モジュールのインポート
from src.data_fetcher import DataFetcher
from src.technical_indicators import TechnicalIndicators
from src.utils import DataPreprocessor, ModelUtils

# Transformerモデル用のライブラリ
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LayerNormalization, Dropout
from tensorflow.keras.layers import MultiHeadAttention, GlobalAveragePooling1D
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split


In [None]:
# データ取得
# 注意: 実際のAPIキーは別途設定してください
# from google.colab import userdata  # Google Colab使用時
# api_key = userdata.get('KuCoin_API_KEY')
# secret = userdata.get('KuCoin_API_SECRET')
# password = userdata.get('KuCoin_API_PASSPHRAS')

# テスト用のダミーデータフェッチャー（実際は上記のAPIキーを使用）
fetcher = DataFetcher('kucoin')

# データ取得設定
symbol = 'SOL/USDT'
timeframe = '1d'
limit = 8760 * 2  # 2年分

print(f"データ取得開始: {symbol}")
# data = fetcher.fetch_ohlcv_data(symbol, timeframe, limit)
# print(f"取得完了: {len(data)}件")

# 実際のAPIキーが設定されていない場合のダミーデータ
np.random.seed(42)
dates = pd.date_range(start='2022-01-01', periods=1000, freq='D')
data = pd.DataFrame({
    'Open': np.random.randn(1000).cumsum() + 100,
    'High': np.random.randn(1000).cumsum() + 102,
    'Low': np.random.randn(1000).cumsum() + 98,
    'Close': np.random.randn(1000).cumsum() + 100,
    'Volume': np.random.randn(1000).cumsum() + 1000000
}, index=dates)

# 価格の整合性を確保
for i in range(len(data)):
    data.iloc[i, 1] = max(data.iloc[i, [0, 1, 2, 3]])  # High
    data.iloc[i, 2] = min(data.iloc[i, [0, 1, 2, 3]])  # Low

print(f"データ準備完了: {len(data)}件")
data.head()


In [None]:
# テクニカル指標の追加
print("テクニカル指標を計算中...")
df = TechnicalIndicators.add_all_indicators(data)
print(f"テクニカル指標追加完了: {df.shape}")
df.head()


In [None]:
# Transformerモデルの定義
def create_transformer_model(input_dim, num_heads=8, ff_dim=32, num_layers=2, dropout_rate=0.1):
    """
    Transformerモデルを作成
    
    Args:
        input_dim: 入力次元数
        num_heads: マルチヘッドアテンションのヘッド数
        ff_dim: フィードフォワード層の次元数
        num_layers: Transformerレイヤーの数
        dropout_rate: ドロップアウト率
    
    Returns:
        Kerasモデル
    """
    inputs = Input(shape=(1, input_dim))
    
    # Transformerブロック
    x = inputs
    for _ in range(num_layers):
        # Multi-Head Attention
        attention_output = MultiHeadAttention(
            num_heads=num_heads, 
            key_dim=input_dim // num_heads
        )(x, x)
        attention_output = Dropout(dropout_rate)(attention_output)
        x = LayerNormalization(epsilon=1e-6)(x + attention_output)
        
        # Feed Forward
        ffn_output = Dense(ff_dim, activation="relu")(x)
        ffn_output = Dense(input_dim)(ffn_output)
        ffn_output = Dropout(dropout_rate)(ffn_output)
        x = LayerNormalization(epsilon=1e-6)(x + ffn_output)
    
    # Global Average Pooling
    x = GlobalAveragePooling1D()(x)
    
    # 出力層
    x = Dense(64, activation="relu")(x)
    x = Dropout(dropout_rate)(x)
    outputs = Dense(1, activation="sigmoid")(x)
    
    model = Model(inputs=inputs, outputs=outputs)
    return model


In [None]:
# データの前処理
preprocessor = DataPreprocessor(scaler_type='standard')
X_scaled, y = preprocessor.fit_transform(df, target_column='Close')

# 最後の行は予測対象なので除外
X_scaled = X_scaled[:-1]
y = y[:-1]

print(f"特徴量数: {X_scaled.shape[1]}")
print(f"データ数: {X_scaled.shape[0]}")
print(f"ターゲット分布: {np.bincount(y)}")

# 学習・テストデータの分割
X_train, X_test, y_train, y_test = train_test_split(
    X_scaled, y, test_size=0.2, random_state=42, stratify=y
)

# Transformerモデル用にデータを3次元に変換
X_train = X_train.reshape(X_train.shape[0], 1, X_train.shape[1])
X_test = X_test.reshape(X_test.shape[0], 1, X_test.shape[1])

print(f"訓練データ: {X_train.shape}")
print(f"テストデータ: {X_test.shape}")


In [None]:
# モデルの作成とコンパイル
model = create_transformer_model(input_dim=X_train.shape[2])

model.compile(
    optimizer=Adam(learning_rate=0.001),
    loss='binary_crossentropy',
    metrics=['accuracy']
)

# モデルの概要を表示
model.summary()


In [None]:
# モデルの訓練
print("モデル訓練を開始...")
history = model.fit(
    X_train, y_train,
    epochs=50,
    batch_size=32,
    validation_data=(X_test, y_test),
    verbose=1
)

# 訓練結果の可視化
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()

plt.tight_layout()
plt.show()


In [None]:
# モデルの保存
model_save_path = "../../models/transformer_model.h5"
scaler_save_path = "../../models/scaler.pkl"

ModelUtils.save_model(model, model_save_path)
preprocessor.save_scaler(scaler_save_path)

print("モデルとスケーラーを保存しました")

# テストデータでの評価
test_loss, test_accuracy = model.evaluate(X_test, y_test)
print(f"テスト精度: {test_accuracy:.4f}")
print(f"テスト損失: {test_loss:.4f}")

# 予測結果の確認
y_pred = model.predict(X_test)
y_pred_binary = (y_pred > 0.5).astype(int)

from sklearn.metrics import classification_report, confusion_matrix
print("\n分類レポート:")
print(classification_report(y_test, y_pred_binary))
print("\n混同行列:")
print(confusion_matrix(y_test, y_pred_binary))
