In [None]:
import pandas as pd
import numpy as np
import talib
import gymnasium as gym
from gymnasium import spaces
import matplotlib.pyplot as plt
import math
import os
import random

from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize

# --- 0. パラメータ設定 ---
INITIAL_BALANCE = 1_000_000
LOOKBACK_PERIOD = 60
TRAINING_TIMESTEPS = 200000 # 1トライアルあたりの学習ステップ数（テストのため少し短縮）
NUM_TRIALS = 5# 試行回数

BEST_PARAMS = {
    'learning_rate': 0.003,
    'n_steps': 512,
    'gamma': 0.99,
    'ent_coef': 0.05,
    'batch_size': 64
}

# --- 1. データ準備 ---
# --- 1. データ準備 ---
print("ステップ1: データ準備を開始...")
file_name = 'Nipponsteel_stock_data2014~2024.csv'  # ファイル名
df = pd.read_csv(
    file_name, header=0, skiprows=[1, 2], index_col='Price', parse_dates=True
)
df.index.name = 'Date'
df = df.apply(pd.to_numeric, errors='coerce').dropna()

# テクニカル指標の計算
df['MA5'] = talib.SMA(df['Close'], timeperiod=5)
df['MA25'] = talib.SMA(df['Close'], timeperiod=25)
df['RSI'] = talib.RSI(df['Close'], timeperiod=14)
df['MACD'], df['MACD_signal'], df['MACD_hist'] = talib.MACD(df['Close'], fastperiod=12, slowperiod=26, signalperiod=9)
df['BB_upper'], df['BB_middle'], df['BB_lower'] = talib.BBANDS(df['Close'], timeperiod=20, nbdevup=2, nbdevdn=2, matype=0)
df['ADX'] = talib.ADX(df['High'], df['Low'], df['Close'], timeperiod=14)
df['P_DI'] = talib.PLUS_DI(df['High'], df['Low'], df['Close'], timeperiod=14)
df['M_DI'] = talib.MINUS_DI(df['High'], df['Low'], df['Close'], timeperiod=14)

# --- ステップ1.5: 特徴量エンジニアリング ---
# トレンド指標の計算
df['Checker1'] = (df['MA5'] > df['MA25']).astype(int)
df['Checker2'] = (df['MACD'] > df['MACD_signal']).astype(int)
df['Checker3'] = (df['P_DI'] > df['M_DI']).astype(int)

# トレンドスコアを計算
df['Trend_detector'] = df[['Checker1', 'Checker2', 'Checker3']].sum(axis=1)
df['Trend_signal'] = np.where(df['Trend_detector'] >= 2, 1,  # 上昇トレンド
                             np.where(df['Trend_detector'] <= -2, -1,  # 下降トレンド
                                      0))  # レンジ

# NaNを削除（すべての特徴量計算後に1回だけ）
df.dropna(inplace=True)

# 特徴量リスト（Trend_signalを含む）
features_to_use = ['Close', 'MA5', 'MA25', 'RSI', 'MACD', 'MACD_signal', 'MACD_hist', 'BB_upper', 'BB_middle', 'BB_lower', 'Trend_signal']
# 特徴量を選択
df_features = df[features_to_use]

# 訓練データとテストデータに分割
training_data_len = int(len(df_features) * 0.78)
train_df = df_features[:training_data_len]
test_df = df_features[training_data_len:]

print("ステップ1: データ準備完了")
print("カスタム指標の追加完了")
print(df_features.tail())  # 確認
print("--------------------------\n")




# --- 2. カスタム環境定義 ---
# (StockTradingEnvクラスの定義は変更なし)
class StockTradingEnv(gym.Env):
    def __init__(self, df):
        super().__init__()
        self.df = df
        self.lookback = LOOKBACK_PERIOD
        self.initial_balance = INITIAL_BALANCE
        self.num_market_features = len(df.columns)
        self.action_space = spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float32)
        self.observation_space = spaces.Dict({
            "market_data": spaces.Box(low=-np.inf, high=np.inf, shape=(self.lookback, self.num_market_features), dtype=np.float32),
            "agent_status": spaces.Box(low=0, high=np.inf, shape=(2,), dtype=np.float32)
        })

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.balance = self.initial_balance
        self.shares_held = 0
        self.average_purchase_price = 0
        self.current_step = self.lookback
        self.total_asset = self.balance
        self.prev_total_asset = self.total_asset # prev_total_assetを初期化
        return self._get_observation(), {}

    def step(self, action):
        self.prev_total_asset = self.total_asset
        current_price = self.df.iloc[self.current_step]["Close"]
        action = action[0]

        num_shares_to_buy = 0
        num_shares_to_sell = 0
        avg_price_at_sale_time = 0

        if action > 0:
            amount_to_invest = self.balance * action
            num_shares_to_buy = int(amount_to_invest / current_price)
            if num_shares_to_buy * current_price > self.balance: # 所持金を超えないように調整
                num_shares_to_buy = int(self.balance / current_price)

            if num_shares_to_buy > 0:
                actual_cost = num_shares_to_buy * current_price
                total_investment_before = self.average_purchase_price * self.shares_held
                self.shares_held += num_shares_to_buy
                self.average_purchase_price = (total_investment_before + actual_cost) / self.shares_held
                self.balance -= actual_cost

        elif action < 0:
            num_shares_to_sell = int(self.shares_held * abs(action))
            if num_shares_to_sell > 0:
                avg_price_at_sale_time = self.average_purchase_price
                self.balance += num_shares_to_sell * current_price
                self.shares_held -= num_shares_to_sell
                if self.shares_held == 0:
                    self.average_purchase_price = 0

        self.current_step += 1
        self.total_asset = self.balance + (self.shares_held * current_price)
        reward = self.total_asset - self.prev_total_asset

        if action > 0 and num_shares_to_buy == 0:
            reward -= 10
        elif action < 0:
            if num_shares_to_sell > 0 and current_price > avg_price_at_sale_time and avg_price_at_sale_time > 0:
                profit = (current_price - avg_price_at_sale_time) * num_shares_to_sell
                reward += profit * 0.1
            elif num_shares_to_sell == 0:
                reward -= 100

        terminated = self.current_step >= len(self.df) - 1
        truncated = False
        obs = self._get_observation()
        info = {"total_asset": self.total_asset}
        return obs, reward, terminated, truncated, info

    def _get_observation(self):
        market_data = self.df.iloc[self.current_step - self.lookback:self.current_step].values
        agent_status = np.array([self.shares_held, self.balance], dtype=np.float32)
        return {"market_data": market_data.astype(np.float32), "agent_status": agent_status}

# --- 3. バックテスト関数 ---
def run_backtest(model, env):
    # VecNormalize環境をアンラップして、元のカスタム環境にアクセス
    unwrapped_env = env.envs[0]
    obs, info = unwrapped_env.reset()

    asset_history_dict = {unwrapped_env.df.index[unwrapped_env.lookback-1]: unwrapped_env.initial_balance}

    for i in range(len(unwrapped_env.df) - unwrapped_env.lookback):
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = unwrapped_env.step(action)
        done = terminated or truncated
        current_date = unwrapped_env.df.index[unwrapped_env.current_step - 1]
        asset_history_dict[current_date] = info['total_asset']
        if done:
            break
    return pd.Series(asset_history_dict)

# --- 4. 複数トライアルの実行 ---
all_trial_results = []
models = []

for i in range(NUM_TRIALS):
    print(f"\n--- トライアル {i + 1}/{NUM_TRIALS} を開始 ---")

    # 環境を作成し、正規化ラッパーを適用
    train_env_raw = StockTradingEnv(df=train_df)
    train_env = VecNormalize(DummyVecEnv([lambda: train_env_raw]))

    model = PPO(
        "MultiInputPolicy",
        train_env,
        verbose=1,
        seed=random.randint(0, 1000000),
        **BEST_PARAMS
    )

    model.learn(total_timesteps=TRAINING_TIMESTEPS)
    print(f"--- トライアル {i + 1} 学習完了 ---")

    # 評価
    test_env_raw = StockTradingEnv(df=test_df)

    # 2. テスト用の正規化環境を作成
    test_env = VecNormalize(
        DummyVecEnv([lambda: test_env_raw]),
        training=False,
        norm_reward=False
    )

    # 3. 学習時の統計情報をテスト環境に適用
    test_env.obs_rms = train_env.obs_rms
    test_env.ret_rms = train_env.ret_rms

    agent_performance = run_backtest(model, test_env)
    all_trial_results.append(agent_performance)

    print(f"--- トライアル {i + 1} 完了 | 最終資産: {agent_performance.iloc[-1]:,.0f} 円 ---")

    # 都度のグラフ表示
    buy_hold_performance = test_env_raw.initial_balance / test_env_raw.df.iloc[0]['Close'] * test_env_raw.df['Close']
    plt.figure(figsize=(16,8))
    plt.title(f'Backtest Result - Trial {i + 1}')
    plt.plot(agent_performance, label=f"AI Agent (Trial {i+1})")
    plt.plot(buy_hold_performance, label="Buy & Hold")
    plt.legend()
    plt.grid(True)
    plt.show()

# --- 5. 最終評価 ---
print("\n\n--- 全トライアル結果の集計 ---")
final_assets = [result.iloc[-1] for result in all_trial_results]
print(f"試行回数: {NUM_TRIALS} 回")
print(f"平均資産: {np.mean(final_assets):,.0f} 円")
print(f"最高資産: {np.max(final_assets):,.0f} 円")
print(f"最低資産: {np.min(final_assets):,.0f} 円")

# 最も成績の良かったモデルのインデックスを取得
best_trial_index = np.argmax(final_assets)
best_performance_series = all_trial_results[best_trial_index]
print(f"\n最も成績の良かったトライアル: {best_trial_index + 1}")

# 最終的な統合グラフの描画
plt.figure(figsize=(16,8))
plt.title('Final Backtest: All Trials vs. Buy & Hold')
#全結果の表示
for i, asset in enumerate(final_assets):
    print(f"トライアル {i + 1}: {asset:,.0f} 円")
# 全トライアルの結果を薄い色でプロット
for i, result in enumerate(all_trial_results):
    if i == best_trial_index:
        continue
    plt.plot(result, color='gray', alpha=0.3)

# 最も良かった結果を濃い色でプロット
plt.plot(best_performance_series, color='blue', linewidth=2.5, label=f"AI Agent (Best Trial: {best_trial_index+1})")
# Buy & Holdをプロット
final_test_env_raw = StockTradingEnv(df=test_df)
final_buy_hold_performance = final_test_env_raw.initial_balance / final_test_env_raw.df.iloc[0]['Close'] * final_test_env_raw.df['Close']
plt.plot(final_buy_hold_performance, color='orange', linewidth=2.5, label="Buy & Hold")

plt.legend()
plt.grid(True)
plt.show()

ModuleNotFoundError: No module named 'talib'