In [1]:
from pettingzoo.sisl import waterworld_v4
import supersuit as ss
from collections import defaultdict
import gymnasium
import numpy as np

def test_environment_info():
    """测试环境信息，包括动作空间、观察空间等关键信息"""
    
    print("=== 环境信息测试 ===\n")
    
    # 准备环境参数
    agent_algos = ["PPO", "PPO", "DQN", "DQN", "A2C"] * 4
    env = waterworld_v4.env(
        render_mode=None,  # 测试时不渲染
        n_predators=5,
        n_preys=15,
        n_evaders=1,
        n_obstacles=2,
        obstacle_coord=[(0.2, 0.2), (0.8, 0.2)],
        n_poisons=20,
        agent_algorithms=agent_algos
    )
    
    # 黑死亡包装
    env = ss.black_death_v3(env)
    
    # 重置环境
    obs = env.reset(seed=42)
    
    print("1. 基本环境信息:")
    print(f"   环境名称: {env.metadata.get('name', 'waterworld_v4')}")
    print(f"   智能体数量: {len(env.agents)}")
    print(f"   智能体列表: {env.agents[:5]}...")  # 只显示前5个
    print()
    
    # 检查第一个智能体的空间信息
    first_agent = env.agents[0]
    
    print("2. 动作空间信息:")
    action_space = env.action_space(first_agent)
    print(f"   动作空间类型: {type(action_space)}")
    print(f"   动作空间: {action_space}")
    
    if isinstance(action_space, gymnasium.spaces.Discrete):
        print(f"   离散动作数量: {action_space.n}")
        print(f"   动作范围: 0 到 {action_space.n-1}")
        print("   动作含义可能为: [无动作, 上, 下, 左, 右, 左上, 右上, 左下, 右下]")
    elif isinstance(action_space, gymnasium.spaces.Box):
        print(f"   连续动作维度: {action_space.shape}")
        print(f"   动作范围: low={action_space.low}, high={action_space.high}")
        print(f"   动作类型: {action_space.dtype}")
    print()
    
    print("3. 观察空间信息:")
    observation_space = env.observation_space(first_agent)
    print(f"   观察空间类型: {type(observation_space)}")
    print(f"   观察空间: {observation_space}")
    
    if isinstance(observation_space, gymnasium.spaces.Box):
        print(f"   观察维度: {observation_space.shape}")
        print(f"   观察范围: low={observation_space.low[:5]}..., high={observation_space.high[:5]}...")
        print(f"   观察类型: {observation_space.dtype}")
    elif isinstance(observation_space, gymnasium.spaces.Dict):
        print("   字典观察空间包含:")
        for key, space in observation_space.spaces.items():
            print(f"     {key}: {space}")
    print()
    
    # 执行几步并检查实际的动作和观察
    print("4. 实际动作和观察采样:")
    step_count = 0
    for agent in env.agent_iter():
        if step_count >= 3:  # 只测试前3步
            break
            
        observation, reward, termination, truncation, info = env.last()
        
        print(f"   智能体: {agent}")
        if observation is not None:
            print(f"   观察形状: {np.array(observation).shape}")
            print(f"   观察类型: {type(observation)}")
            if isinstance(observation, np.ndarray):
                print(f"   观察范围: [{observation.min():.3f}, {observation.max():.3f}]")
        
        # 选择动作
        if termination or truncation:
            action = None
            print(f"   动作: {action} (终止)")
        else:
            action = env.action_space(agent).sample()
            print(f"   动作: {action}")
            print(f"   动作类型: {type(action)}")
            if isinstance(action, np.ndarray):
                print(f"   动作形状: {action.shape}")
        
        print(f"   奖励: {reward}")
        print()
        
        env.step(action)
        step_count += 1
    
    env.close()
    
    print("5. 转换为连续动作空间的考虑:")
    print("   当前离散动作 Discrete(9) 可能对应:")
    print("   0: 无动作 [0, 0]")
    print("   1: 上      [0, 1]") 
    print("   2: 下      [0, -1]")
    print("   3: 左      [-1, 0]")
    print("   4: 右      [1, 0]")
    print("   5: 左上    [-0.707, 0.707]")
    print("   6: 右上    [0.707, 0.707]")
    print("   7: 左下    [-0.707, -0.707]")
    print("   8: 右下    [0.707, -0.707]")
    print()
    print("   连续动作空间 Box(2) 的范围建议: [-1, 1] x [-1, 1]")
    print("   表示 [x_velocity, y_velocity] 或 [delta_x, delta_y]")

def analyze_continuous_action_requirements():
    """分析连续动作空间的要求"""
    print("\n=== 连续动作空间转换分析 ===\n")
    
    print("1. 网络架构需要修改的地方:")
    print("   - Actor网络输出层: 从 Linear(hidden, 9) 改为 Linear(hidden, 2)")
    print("   - 激活函数: 需要在输出层添加 tanh 或其他限制函数")
    print("   - 动作范围: 通常使用 [-1, 1] 然后映射到实际范围")
    print()
    
    print("2. 策略算法需要修改的地方:")
    print("   - 分布函数: 从 Categorical(logits) 改为 Normal(mu, sigma)")
    print("   - PPO: 需要处理连续动作的概率比率计算")
    print("   - A2C/NPG/TRPO: 同样需要连续动作分布")
    print("   - 探索策略: 从epsilon-greedy改为高斯噪声")
    print()
    
    print("3. 主要代码修改点:")
    print("   - dist_fn: lambda x: Categorical(logits=x) → lambda x: Normal(x[..., :2], x[..., 2:].exp())")
    print("   - 网络输出: 需要输出均值和方差 (或log_std)")
    print("   - 动作采样: 从整数采样改为连续值采样")
    print("   - 动作clipping: 确保动作在有效范围内")

if __name__ == "__main__":
    test_environment_info()
    analyze_continuous_action_requirements()

=== 环境信息测试 ===

1. 基本环境信息:
   环境名称: waterworld_v4
   智能体数量: 20
   智能体列表: ['predator_0', 'predator_1', 'predator_2', 'predator_3', 'predator_4']...

2. 动作空间信息:
   动作空间类型: <class 'gymnasium.spaces.box.Box'>
   动作空间: Box(-1.0, 1.0, (2,), float32)
   连续动作维度: (2,)
   动作范围: low=[-1. -1.], high=[1. 1.]
   动作类型: float32

3. 观察空间信息:
   观察空间类型: <class 'gymnasium.spaces.box.Box'>
   观察空间: Box(-inf, inf, (302,), float32)
   观察维度: (302,)
   观察范围: low=[-inf -inf -inf -inf -inf]..., high=[inf inf inf inf inf]...
   观察类型: float32

4. 实际动作和观察采样:
   智能体: predator_0
   观察形状: (302,)
   观察类型: <class 'numpy.ndarray'>
   观察范围: [-26.209, 10.000]
   动作: [0.20213628 0.22434653]
   动作类型: <class 'numpy.ndarray'>
   动作形状: (2,)
   奖励: 0

   智能体: predator_1
   观察形状: (302,)
   观察类型: <class 'numpy.ndarray'>
   观察范围: [-34.306, 9.000]
   动作: [-0.28497612  0.37577546]
   动作类型: <class 'numpy.ndarray'>
   动作形状: (2,)
   奖励: 0

   智能体: predator_2
   观察形状: (302,)
   观察类型: <class 'numpy.ndarray'>
   观察范围: [-22.876, 1.000]
   动