In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import torch as th
from torch import nn
from typing import Optional, Union, Any, Type, Dict, List, Tuple # Tuple を追加

from stable_baselines3 import SAC
from stable_baselines3.common.policies import BasePolicy
from stable_baselines3.common.torch_layers import (
    BaseFeaturesExtractor,
    FlattenExtractor,
    create_mlp, # MultiAgentContinuousCritic で使用
    get_actor_critic_arch, # MultiAgentSACPolicy で使用
)
from stable_baselines3.common.type_aliases import Schedule, PyTorchObs
from stable_baselines3.common.utils import get_schedule_fn # 学習率スケジュール用
# SACPolicy が内部で使用するコンポーネント
from stable_baselines3.sac.policies import SACPolicy, Actor, ContinuousCritic


# 2. カスタムクリティック
class MultiAgentContinuousCritic(ContinuousCritic):
    """
    マルチエージェント用のContinuousCritic。
    リプレイバッファからのactions (B, N, D_act) を (B*N, D_act) に変形して処理する。
    """
    def __init__(
        self,
        observation_space: spaces.Space, # Multi-agent observation space
        action_space: spaces.Space,      # Single-agent action space
        net_arch: List[int],
        features_extractor: nn.Module,   # Should be MultiAgentFlattenExtractor instance
        features_dim: int,
        activation_fn: Type[nn.Module] = nn.ReLU,
        n_critics: int = 2,
        share_features_extractor: bool = True, # SACPolicy default
        n_agents: int = 1, # 追加パラメータ
    ):
        # ContinuousCriticの__init__を呼び出すために、必要な引数を設定
        # features_extractor は BasePolicy._wrap_features_extractor でラップされる前のものを想定
        super(ContinuousCritic, self).__init__( # MROを考慮し、ContinuousCriticの親であるBasePolicyの__init__を呼び出す
             observation_space=observation_space,
             action_space=action_space, # Single-agent action space
             features_extractor=features_extractor, # ここで渡されるのは MultiAgentFlattenExtractor のはず
             normalize_images=False, # normalize_images は MlpExtractor には直接影響しない
        )
        # BasePolicyの__init__でfeatures_extractorが設定される
        # ContinuousCriticの__init__の残りの部分を手動で実行
        self.n_agents = n_agents
        self.share_features_extractor = share_features_extractor # 通常True for SAC's critic if actor shares

        # ContinuousCritic のネットワーク作成部分を模倣
        # アクションの次元は単一エージェントのもの
        action_dim = spaces.utils.flatdim(action_space)
        
        self.q_networks = []
        for i in range(n_critics):
            # features_dim は MultiAgentFlattenExtractor の出力次元 (単一エージェントのフラット化観測次元)
            q_net_input_dim = features_dim + action_dim
            q_net = create_mlp(q_net_input_dim, 1, net_arch, activation_fn)
            q_net = nn.Sequential(*q_net)
            self.add_module(f"qf{i}", q_net)
            self.q_networks.append(q_net)

    def forward(self, obs: th.Tensor, actions: th.Tensor) -> Tuple[th.Tensor, ...]:
        # obs: (B, N, D_obs) from replay buffer or direct call
        # actions: (B, N, D_act) from replay buffer
        
        # self.extract_features は BasePolicy のメソッドで、self.features_extractor を使う
        # self.features_extractor は MultiAgentFlattenExtractor のインスタンス
        # extracted_features の形状: (B * N, features_dim)
        extracted_features = self.extract_features(obs)

        # actions を (B * N, D_act_single_agent) に reshape
        batch_size = actions.shape[0]
        # self.action_space は単一エージェントの行動空間 (SACPolicyの__init__で設定)
        # 単一エージェントの行動次元を取得
        single_agent_action_dim = self.action_space.shape[0]
        reshaped_actions = actions.reshape(batch_size * self.n_agents, single_agent_action_dim)
        
        q_values = []
        for q_net in self.q_networks:
            # q_net の入力は (extracted_features と reshaped_actions の結合)
            q_input = th.cat([extracted_features, reshaped_actions], dim=1)
            q_values.append(q_net(q_input))
        return tuple(q_values)


# 3. マルチエージェント SAC ポリシー (make_critic を修正)
class CPM_SAC_Policy(SACPolicy):
    """
    マルチエージェント強化学習のための SAC (Soft Actor-Critic) ポリシー。
    MultiAgentContinuousCritic を使用する。
    """
    def __init__(
        self,
        observation_space: spaces.Box,
        action_space: spaces.Box,
        lr_schedule: Schedule,
        **kwargs,
    ):
        super().__init__(
            observation_space=observation_space,
            action_space=action_space, # アクターが直接扱う単一エージェントの行動空間
            lr_schedule=lr_schedule,
            **kwargs,
        )

    # make_critic をオーバーライドして MultiAgentContinuousCritic を使用
    def make_critic(self, features_extractor: Optional[BaseFeaturesExtractor] = None) -> ContinuousCritic:
        # critic_kwargs は SACPolicy の __init__ で準備される
        critic_kwargs = self.critic_kwargs.copy() # self.critic_kwargs を使用
        if features_extractor is not None:
             critic_kwargs["features_extractor"] = features_extractor
             # Update features_dim if features_extractor is provided
             critic_kwargs["features_dim"] = features_extractor.features_dim


        # MultiAgentContinuousCritic に n_agents を渡す
        critic_kwargs["n_agents"] = self.n_agents
        # net_arch は critic_kwargs に既に入っているはず (SACPolicy.__init__で設定)
        # action_space は critic_kwargs["action_space"] (single-agent)
        # observation_space は critic_kwargs["observation_space"] (multi-agent)
        return MultiAgentContinuousCritic(**critic_kwargs).to(self.device)


    def _predict(self, observation: PyTorchObs, deterministic: bool = False) -> th.Tensor:
        # (以前のコードからコピー)
        actions_flat = SACPolicy._predict(self, observation, deterministic=deterministic) # 基底のSACPolicy._predictを明示的に呼ぶ
        
        if isinstance(observation, dict):
            key = next(iter(observation))
            batch_size = observation[key].shape[0]
        else:
            batch_size = observation.shape[0]
        
        action_shape_per_agent = self.original_action_space.shape[1:]
        actions = actions_flat.reshape(batch_size, self.n_agents, *action_shape_per_agent)
        return actions

    def _get_constructor_parameters(self) -> dict[str, Any]:
        # (以前のコードからコピー)
        data = super()._get_constructor_parameters()
        data["action_space"] = self.original_action_space
        return data

# 4. 簡単なマルチエージェント環境
class SimpleMAEnvRevised(gym.Env):
    metadata = {"render_modes": ["human"], "render_fps": 30}

    def __init__(self, n_agents=2, state_dim_per_agent=2, act_dim_per_agent=1, episode_len=100):
        super().__init__()
        self.n_agents = n_agents
        self.state_dim_per_agent = state_dim_per_agent
        self.act_dim_per_agent = act_dim_per_agent
        self.episode_len = episode_len

        self.action_space = spaces.Box(low=-1, high=1,
                                       shape=(self.n_agents, self.act_dim_per_agent),
                                       dtype=np.float32)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf,
                                            shape=(self.n_agents, self.state_dim_per_agent),
                                            dtype=np.float32)

        self.agent_states = np.zeros((self.n_agents, self.state_dim_per_agent), dtype=np.float32)
        self.target_states = np.zeros((self.n_agents, self.state_dim_per_agent), dtype=np.float32)
        if self.state_dim_per_agent > 0: # ターゲットを固定で設定
             self.target_states[:, :self.state_dim_per_agent // 2] = 0.5 # 例: 半分は0.5
             self.target_states[:, self.state_dim_per_agent // 2:] = -0.5 # 残り半分は-0.5

        self.current_step = 0

    def _get_obs(self):
        return self.agent_states.copy()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.current_step = 0
        self.agent_states = self.np_random.uniform(low=-1, high=1, size=(self.n_agents, self.state_dim_per_agent)).astype(np.float32)
        observation = self._get_obs()
        info = {}
        return observation, info

    def step(self, action: np.ndarray):
        # Action clipping, as policy output might be outside [-1, 1] before squashing
        # However, SAC actor uses Tanh, so it's already in [-1, 1]
        # action = np.clip(action, self.action_space.low, self.action_space.high)

        force_applied = np.zeros_like(self.agent_states)
        dim_to_apply = min(self.act_dim_per_agent, self.state_dim_per_agent)
        force_applied[:, :dim_to_apply] = action[:, :dim_to_apply]
        
        self.agent_states += force_applied * 0.1
        self.agent_states = np.clip(self.agent_states, -5, 5) # 状態が発散しないようにクリップ

        self.current_step += 1

        total_reward = 0.0
        for i in range(self.n_agents):
            distance = np.linalg.norm(self.agent_states[i] - self.target_states[i])
            total_reward -= distance # 距離が小さいほど報酬が高い
        
        reward = float(total_reward) # 全エージェントの報酬の合計 (または平均)

        terminated = self.current_step >= self.episode_len
        truncated = False
        observation = self._get_obs()
        info = {}
        return observation, reward, terminated, truncated, info

    def render(self):
        if "human" in self.metadata["render_modes"]:
            for i in range(self.n_agents):
                print(f"Agent {i}: State={np.round(self.agent_states[i],2)}, Target={np.round(self.target_states[i],2)}")
            print(f"Step: {self.current_step}, Total Reward (current step): {self.reward_for_render:.2f}") # 最後に計算された報酬を表示
            print("-" * 20)
    
    def close(self):
        pass

# 5. メインの実行ブロック
if __name__ == '__main__':
    N_AGENTS = 3
    STATE_DIM_PER_AGENT = 2
    ACTION_DIM_PER_AGENT = 1
    EPISODE_LEN = 200
    TOTAL_TIMESTEPS = 30000 # 学習ステップ数を増やす

    # 環境のインスタンス化
    env = SimpleMAEnvRevised(n_agents=N_AGENTS, 
                             state_dim_per_agent=STATE_DIM_PER_AGENT, 
                             act_dim_per_agent=ACTION_DIM_PER_AGENT,
                             episode_len=EPISODE_LEN)

    # SACモデルのインスタンス化
    # verbose=1で学習の進捗を表示
    # buffer_sizeを小さくしてメモリ使用量を抑える（デモ用）
    # learning_startsを小さくして早く学習を開始
    model = SAC(
        MultiAgentSACPolicy,
        env,
        verbose=1,
        tensorboard_log="./ma_sac_tensorboard/",
        buffer_size=10000, # リプレイバッファのサイズ
        learning_starts=100, # このステップ数から学習を開始
        batch_size=64,       # バッチサイズ
        train_freq=(1, "step"), # 1ステップごとに学習
        gradient_steps=1,       # 1ステップごとに1回勾配更新
        learning_rate=3e-4,     # 学習率 (定数またはスケジュール)
        gamma=0.99,             # 割引率
        tau=0.005,              # ソフトアップデートの係数
        device="auto"
    )

    print("学習を開始します...")
    model.learn(total_timesteps=TOTAL_TIMESTEPS, log_interval=10) # log_intervalでTensorBoardへのログ記録頻度を指定
    print("学習が完了しました。")

    # 学習済みモデルの保存
    model.save("ma_sac_model")
    print("モデルを ma_sac_model.zip として保存しました。")

    # 学習済みモデルのロード (オプション)
    # del model 
    # model = SAC.load("ma_sac_model", env=env, policy_class=MultiAgentSACPolicy)
    # print("モデルをロードしました。")

    # 学習後のポリシーで環境を数ステップ実行してみる
    print("\n学習後のエージェントの動作テスト:")
    obs, info = env.reset()
    env.reward_for_render = 0 # render用
    for i in range(EPISODE_LEN + 5):
        action, _states = model.predict(obs, deterministic=True)
        obs, reward, terminated, truncated, info = env.step(action)
        env.reward_for_render = reward # render用
        if i % 20 == 0: # 20ステップごとに描画
            env.render()
        if terminated or truncated:
            print("エピソード終了")
            obs, info = env.reset()
            env.reward_for_render = 0
            if i > EPISODE_LEN: # 1エピソード以上実行したら終了
                break
    
    env.close()
    print("デモを終了します。")

Using cuda device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


TypeError: __init__() got an unexpected keyword argument 'normalize_images'