# 环境
单智能体设置：一个智能体生活在环境中，并执行由单个策略计算的行动。智能体到策略的映射是固定的（“default_agent” 映射到 “default_policy”）。有关此设置在多智能体情况下的泛化方式，请参阅多智能体环境。

## Farama Gymnasium
RLlib 依赖 Farama 的 Gymnasium API 作为其主要的 RL 环境接口，用于进行单智能体训练（多智能体请参见此处）。要使用 gymnasium 实现自定义逻辑并将其集成到 RLlib 配置中。

In [5]:
import os
os.environ["RAY_DISABLE_IMPORT_WARNING"] = "1" 

import gymnasium as gym
from gymnasium.spaces import Discrete, Box
import numpy as np
import random
from typing import Optional

import ray
from ray import tune
from ray.tune.registry import register_env
from ray.rllib.algorithms.ppo import PPOConfig

# -------------------------------------------------------------
# 1. 环境定义（与原来完全一致）
# -------------------------------------------------------------
class SimpleCorridor(gym.Env):
    """Example of a custom env in which the agent has to walk down a corridor."""
    def __init__(self, config: Optional[dict] = None):
        config = config or {}
        self.end_pos = config.get("corridor_length", 7)
        self.cur_pos = 0
        self.action_space = Discrete(2)
        self.observation_space = Box(0.0, self.end_pos, shape=(1,), dtype=np.float32)

    def reset(self, *, seed=None, options=None):
        random.seed(seed)
        self.cur_pos = 0
        return np.array([self.cur_pos], np.float32), {"env_state": "reset"}

    def step(self, action):
        assert action in [0, 1], action
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        elif action == 1:
            self.cur_pos += 1

        terminated = self.cur_pos >= self.end_pos
        truncated = False
        reward = random.uniform(0.5, 1.5) if terminated else -0.01
        infos = {}
        return (
            np.array([self.cur_pos], np.float32),
            reward,
            terminated,
            truncated,
            infos,
        )

# -------------------------------------------------------------
# 2. 注册环境（字符串名称为 "corridor_env"）
# -------------------------------------------------------------
register_env("corridor_env", lambda cfg: SimpleCorridor(cfg))

# -------------------------------------------------------------
# 3. 训练函数封装
# -------------------------------------------------------------
def train(
    algo: str = "PPO",
    corridor_length: int = 10,
    num_iterations: int = 50,
    stop_timesteps: int = 100_000,
    framework: str = "torch",
):
    """在 notebook 里直接调用即可开始训练。"""
    ray.shutdown()   # 防止重复启动
    ray.init(ignore_reinit_error=True)

    config = (
        PPOConfig()
        if algo.upper() == "PPO"
        else ray.rllib.algorithms.dqn.DQNConfig()
        # 可以按需扩展更多算法
    )
    config = config.framework(framework).environment(
        "corridor_env",
        env_config={"corridor_length": corridor_length},
    )

    stop = {
        "training_iteration": num_iterations,
        "timesteps_total": stop_timesteps,
    }

    tuner = tune.Tuner(
        algo.upper(),
        param_space=config.to_dict(),
        run_config=tune.RunConfig(stop=stop, verbose=1),
    )
    results = tuner.fit()
    ray.shutdown()
    return results

# -------------------------------------------------------------
# 4. 开始训练（示例参数）
# -------------------------------------------------------------
results = train(
    algo="PPO",
    corridor_length=10,
    num_iterations=30,
    stop_timesteps=50_000,
    framework="torch",
)

# -------------------------------------------------------------
# 5. 查看训练结果
# -------------------------------------------------------------
df = results.get_dataframe()
display(df.tail())

0,1
Current time:,2025-08-05 20:26:00
Running for:,00:00:10.11
Memory:,13.5/15.3 GiB

Trial name,# failures,error file
PPO_corridor_env_52b8c_00000,1,/tmp/ray/session_2025-08-05_20-25-48_528425_3440243/artifacts/2025-08-05_20-25-50/PPO_2025-08-05_20-25-50/driver_artifacts/PPO_corridor_env_52b8c_00000_0_2025-08-05_20-25-50/error.txt

Trial name,status,loc
PPO_corridor_env_52b8c_00000,ERROR,


[36m(PPO pid=3450135)[0m [2025-08-05 20:25:55,871 E 3450135 3450135] core_worker.cc:2740: Actor with class name: 'SingleAgentEnvRunner' and ID: '2030e8858c269b01fd8b382c01000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
[36m(PPO pid=3450135)[0m [2025-08-05 20:25:55,912 E 3450135 3450135] core_worker.cc:2740: Actor with class name: 'SingleAgentEnvRunner' and ID: '693d2a6815d375afd5e75f6701000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
2025-08-05 20:26:00,827	ERROR tune_controller.py:1331 -- Trial task failed for trial PPO_corridor_env_52b8c_00000
Traceback (most recent call last):
  File "/home/robotarm/

## 配置环境

## 通过字符串指定
默认情况下，RLlib 将字符串值解释为已注册的 gymnasium 环境名称。

In [None]:
from ray.rllib.algorithms.ppo import PPOConfig

config = (
    PPOConfig()
    # Configure the RL environment to use as a string (by name), which
    # is registered with Farama's gymnasium.
    .environment("Acrobot-v1")
)
algo = config.build()
print(algo.train())

2025-08-05 20:34:46,764	INFO worker.py:1927 -- Started a local Ray instance.
[2025-08-05 20:34:48,023 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'SingleAgentEnvRunner' and ID: '1fe1a3a9c425500126ad373301000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
[2025-08-05 20:34:48,075 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'SingleAgentEnvRunner' and ID: '76e5fe6a1250263565a7eb8601000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.


{'timers': {'training_iteration': 14.585700060939416, 'restore_env_runners': 2.1748011931777e-05, 'training_step': 14.585240917047486, 'env_runner_sampling_timer': 3.6546126888133585, 'learner_update_timer': 10.92649948806502, 'synch_weights': 0.0034019839949905872}, 'env_runners': {'env_to_module_connector': {'timers': {'connectors': {'numpy_to_tensor': np.float64(5.8639155159843326e-05), 'add_states_from_episodes_to_batch': np.float64(7.963415578356485e-06), 'add_time_dim_to_batch_and_zero_pad': np.float64(1.2963210141504977e-05), 'add_observations_from_episodes_to_batch': np.float64(1.5038805751915865e-05), 'batch_individual_items': np.float64(3.155113668666544e-05)}}, 'connector_pipeline_timer': np.float64(0.00023713496881496386)}, 'env_to_module_sum_episodes_length_in': np.float64(404.2802303359448), 'episode_return_min': -500.0, 'num_env_steps_sampled_lifetime': 4000.0, 'episode_duration_sec_mean': 0.8947852913988754, 'module_to_env_connector': {'connector_pipeline_timer': np.flo

[36m(SingleAgentEnvRunner pid=3451771)[0m   logger.warn(
[36m(SingleAgentEnvRunner pid=3451771)[0m   logger.warn(f"{pre} is not within the observation space.")
[36m(SingleAgentEnvRunner pid=3451771)[0m   logger.warn(
[36m(SingleAgentEnvRunner pid=3451771)[0m   logger.warn(f"{pre} is not within the observation space.")
[36m(SingleAgentEnvRunner pid=3451773)[0m   logger.warn(
[36m(SingleAgentEnvRunner pid=3451773)[0m   logger.warn(f"{pre} is not within the observation space.")
[36m(SingleAgentEnvRunner pid=3451773)[0m   logger.warn(
[36m(SingleAgentEnvRunner pid=3451773)[0m   logger.warn(f"{pre} is not within the observation space.")
[33m(raylet)[0m [2025-08-06 12:44:49,602 E 3451695 3451695] (raylet) node_manager.cc:3041: 2 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 233c7c689851320fade6364e0333fb29c9b451bd3bd34bb9bacf2d08, IP: 10.110.34.88) over the last time period. To see more information about th

## 通过 gymnasium.Env 的子类指定
如果你正在使用自定义的 gymnasium.Env 类子类，可以直接传递该类而不是注册的字符串。你的子类必须在其构造函数中接受一个 config 参数（可以默认为 None）。


In [7]:
import gymnasium as gym
import numpy as np
from ray.rllib.algorithms.ppo import PPOConfig

class MyDummyEnv(gym.Env):
    # Write the constructor and provide a single `config` arg,
    # which may be set to None by default.
    def __init__(self, config=None):
        # As per gymnasium standard, provide observation and action spaces in your
        # constructor.
        self.observation_space = gym.spaces.Box(-1.0, 1.0, (1,), np.float32)
        self.action_space = gym.spaces.Discrete(2)

    def reset(self, seed=None, options=None):
        # Return (reset) observation and info dict.
        return np.array([1.0]), {}

    def step(self, action):
        # Return next observation, reward, terminated, truncated, and info dict.
        return np.array([1.0]), 1.0, False, False, {}

config = (
    PPOConfig()
    .environment(
        MyDummyEnv,
        env_config={},  # `config` to pass to your env class
    )
)
algo = config.build()
print(algo.train())

[2025-08-05 20:36:47,211 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'SingleAgentEnvRunner' and ID: '5dcad0efa0814cb66af79a4601000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
[2025-08-05 20:36:47,265 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'SingleAgentEnvRunner' and ID: '7102cf8376bed608ece878d701000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.


{'timers': {'training_iteration': 14.936725181993097, 'restore_env_runners': 3.324518911540508e-05, 'training_step': 14.936223768861964, 'env_runner_sampling_timer': 3.4384264659602195, 'learner_update_timer': 11.49317091004923, 'synch_weights': 0.003826322965323925}, 'env_runners': {'env_to_module_connector': {'timers': {'connectors': {'numpy_to_tensor': np.float64(6.049784700693104e-05), 'add_states_from_episodes_to_batch': np.float64(8.169158180437569e-06), 'add_time_dim_to_batch_and_zero_pad': np.float64(1.2553371301353672e-05), 'add_observations_from_episodes_to_batch': np.float64(1.514624764926376e-05), 'batch_individual_items': np.float64(3.165490147136356e-05)}}, 'connector_pipeline_timer': np.float64(0.00024245962985280779)}, 'env_to_module_sum_episodes_length_in': np.float64(1901.0000001845106), 'num_env_steps_sampled_lifetime': 4000.0, 'module_to_env_connector': {'connector_pipeline_timer': np.float64(0.000788165452446644), 'timers': {'connectors': {'remove_single_ts_time_ra

## 通过 Tune 注册的 Lambda 指定
向配置提供环境信息的第三种选项是使用 Ray Tune 注册一个环境创建函数（或 lambda）。该创建函数必须接受一个 config 参数，并返回一个非向量化的 gymnasium.Env 实例。

In [8]:
from ray.tune.registry import register_env

def env_creator(config):
    return MyDummyEnv(config)  # Return a gymnasium.Env instance.

register_env("my_env", env_creator)
config = (
    PPOConfig()
    .environment("my_env")  # <- Tune registered string pointing to your custom env creator.
)
algo = config.build()
print(algo.train())

[2025-08-05 20:42:47,703 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'SingleAgentEnvRunner' and ID: '34c0939bd11be68beb64254701000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
[2025-08-05 20:42:47,770 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'SingleAgentEnvRunner' and ID: '5e09e1d961f5a14a5c03403001000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.


{'timers': {'training_iteration': 15.026597093092278, 'restore_env_runners': 2.874014899134636e-05, 'training_step': 15.02614946803078, 'env_runner_sampling_timer': 3.465104885166511, 'learner_update_timer': 11.557350561022758, 'synch_weights': 0.0030920300632715225}, 'env_runners': {'env_to_module_connector': {'timers': {'connectors': {'numpy_to_tensor': np.float64(5.9179183663791455e-05), 'add_states_from_episodes_to_batch': np.float64(7.904484803943991e-06), 'add_time_dim_to_batch_and_zero_pad': np.float64(1.2192654074200906e-05), 'add_observations_from_episodes_to_batch': np.float64(1.4849176669030723e-05), 'batch_individual_items': np.float64(3.120312120068458e-05)}}, 'connector_pipeline_timer': np.float64(0.0002350004545787404)}, 'env_to_module_sum_episodes_length_in': np.float64(1901.0000001845106), 'num_env_steps_sampled_lifetime': 4000.0, 'module_to_env_connector': {'connector_pipeline_timer': np.float64(0.0008395126778025231), 'timers': {'connectors': {'remove_single_ts_time_

在前面的示例中，env_creator 函数接受一个 config 参数。此配置主要是一个包含所需设置的字典。然而，你也可以在 config 变量中访问其他属性。例如，使用 config.worker_index 获取远程 EnvRunner 索引，或使用 config.num_workers 获取使用的 EnvRunner 总数。这种方法有助于自定义集成中的环境，并使在某些 EnvRunner 上运行的环境与在其他 EnvRunner 上运行的环境表现不同。

In [9]:
import gymnasium as gym
from gymnasium import Env
from ray.tune.registry import register_env

def choose_env_for(worker_index: int, vector_index: int) -> str:
    """根据 worker_index 和 vector_index 返回对应环境 ID"""
    # 示例：交替返回两个内置环境
    if (worker_index + vector_index) % 2 == 0:
        return "CartPole-v1"
    return "MountainCarContinuous-v0"

class EnvDependingOnWorkerAndVectorIndex(gym.Env):
    def __init__(self, config):
        # Pick actual env based on worker and env indexes.
        self.env = gym.make(
            choose_env_for(config.worker_index, config.vector_index)
        )
        self.action_space = self.env.action_space
        self.observation_space = self.env.observation_space

    def reset(self, seed, options):
        return self.env.reset(seed, options)

    def step(self, action):
        return self.env.step(action)

register_env("multi_env", lambda config: EnvDependingOnWorkerAndVectorIndex(config))

# 多智能体环境
可以使用几种不同的策略网络来控制各种智能体。因此，环境中的每个智能体都映射到恰好一个特定的策略。这种映射由用户提供的函数决定，称为“映射函数”。请注意，如果存在映射到 M 个策略的 N 个智能体，则 N 总是大于或等于 M，允许任何策略控制多个智能体。

多智能体设置： N 个智能体位于环境中，并执行由 M 个策略网络计算出的动作。智能体到策略的映射是灵活的，由用户提供的映射函数决定。这里，agent_1 和 agent_3 都映射到 policy_1，而 agent_2 映射到 policy_2。

## RLlib 的 MultiAgentEnv API

RLlib 的 :py:class`~ray.rllib.env.multi_agent_env.MultiAgentEnv` API 紧密遵循 Farama 的 gymnasium（单智能体）环境的约定和 API，甚至继承自 gymnasium.Env。然而，自定义的 :py:class`~ray.rllib.env.multi_agent_env.MultiAgentEnv` 实现不是从 reset() 和 step() 发布单个观察、奖励以及终止/截断标志，而是输出字典，一个用于观察，一个用于奖励，等等。在每个这样的多智能体字典中，智能体 ID 映射到各自的单个智能体的观察/奖励/等。

from ray.rllib.env.multi_agent_env import MultiAgentEnv

class MyMultiAgentEnv(MultiAgentEnv):

    def __init__(self, config=None):
        super().__init__()
        ...

    def reset(self, *, seed=None, options=None):
        ...
        # return observation dict and infos dict.
        return {"agent_1": [obs of agent_1], "agent_2": [obs of agent_2]}, {}

    def step(self, action_dict):
        # return observation dict, rewards dict, termination/truncation dicts, and infos dict
        return {"agent_1": [obs of agent_1]}, {...}, ...


## 智能体定义

环境中的智能体数量及其 ID 完全由您的 :py:class`~ray.rllib.env.multi_agent_env.MultiAgentEnv` 代码控制。您的环境决定哪些智能体在 episode 重置后开始，哪些智能体稍后进入 episode，哪些智能体提前终止 episode，以及哪些智能体留在 episode 中直到整个 episode 结束。

def __init__(self, config=None):
    super().__init__()
    ...
    # Define all agent IDs that might even show up in your episodes.
    self.possible_agents = ["agent_1", "agent_2"]
    ...


如果您的环境仅以部分智能体 ID 开始和/或在 episode 结束前终止部分智能体 ID，您还需要在整个 episode 过程中永久调整 self.agents 属性。另一方面，如果所有智能体 ID 在您的 episode 中是静态的，您可以将 self.agents 设置为与 self.possible_agents 相同，并且在您代码的其余部分不更改其值。


def __init__(self, config=None):
    super().__init__()
    ...
    # If your agents never change throughout the episode, set
    # `self.agents` to the same list as `self.possible_agents`.
    self.agents = self.possible_agents = ["agent_1", "agent_2"]
    # Otherwise, you will have to adjust `self.agents` in `reset()` and `step()` to whatever the
    # currently "alive" agents are.
    ...

## 观察空间和动作空间
接下来，您应该在构造函数中设置每个（可能）智能体 ID 的观察空间和动作空间。使用 self.observation_spaces 和 self.action_spaces 属性来定义将智能体 ID 映射到各个智能体空间的字典。例如

import gymnasium as gym
import numpy as np

...

    def __init__(self, config=None):
        super().__init__()
        ...
        self.observation_spaces = {
            "agent_1": gym.spaces.Box(-1.0, 1.0, (4,), np.float32),
            "agent_2": gym.spaces.Box(-1.0, 1.0, (3,), np.float32),
        }
        self.action_spaces = {
            "agent_1": gym.spaces.Discrete(2),
            "agent_2": gym.spaces.Box(0.0, 1.0, (1,), np.float32),
        }
        ...


如果您的 episode 包含大量智能体，其中一些共享相同的观察空间或动作空间，并且您不想创建非常大的空间字典，您还可以覆盖 get_observation_space() 和 get_action_space() 方法，并自己实现从智能体 ID 到空间的映射逻辑。例如

def get_observation_space(self, agent_id):
    if agent_id.startswith("robot_"):
        return gym.spaces.Box(0, 255, (84, 84, 3), np.uint8)
    elif agent_id.startswith("decision_maker"):
        return gym.spaces.Discrete(2)
    else:
        raise ValueError(f"bad agent id: {agent_id}!")


## 观察、奖励和终止字典

在自定义的 MultiAgentEnv 中，您还需要实现 reset() 和 step() 方法。与单智能体 gymnasium.Env 类似，您需要从 reset() 返回观察和信息，并从 step() 返回观察、奖励、终止/截断标志和信息。然而，这些返回值不再是单个值，而必须是字典，将智能体 ID 映射到各个智能体的对应值。

def reset(self, *, seed=None, options=None):
    ...
    return {
        "agent_1": np.array([0.0, 1.0, 0.0, 0.0]),
        "agent_2": np.array([0.0, 0.0, 1.0]),
    }, {}  # <- empty info dict


智能体同时行动的环境： 两个智能体在每个时间步都收到其观察结果，包括紧随 reset() 之后。请注意，每当返回的观察字典中存在该智能体的观察时，该智能体必须计算并发送一个动作到下一次 step() 调用中。

智能体轮流行动的环境： 两个智能体通过轮流行动。 agent_1 在 reset() 后收到第一个观察结果，因此必须首先计算并发送一个动作。接收到此动作后，环境返回 agent_2 的观察结果，此时 agent_2 需要行动。接收到 agent_2 的动作后，环境返回 agent_1 的下一个观察结果，依此类推。

回合顺序复杂的环境： 三个智能体以看似混乱的顺序行动。 agent_1 和 agent_3 在 reset() 后收到其初始观察结果，因此必须首先计算并发送动作。接收到这两个动作后，环境返回 agent_1 和 agent_2 的观察结果，此时它们必须同时行动。接收到 agent_1 和 agent_2 的动作后，环境返回 agent_2 和 agent_3 的观察结果，依此类推。




## 智能体同时行动的环境
智能体总是同时行动的多智能体环境的一个很好的简单示例是石头剪刀布游戏，其中两个智能体总共需要进行 N 次移动，每次都在“石头”、“剪刀”或“布”动作中选择。每次移动后，比较动作选择。石头胜剪刀，布胜石头，剪刀胜布。赢得移动的玩家获得 +1 奖励，输家获得 -1 奖励。

In [12]:
import gymnasium as gym

from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.rllib.connectors.env_to_module.flatten_observations import FlattenObservations
from ray.rllib.utils.test_utils import (
    add_rllib_example_script_args,
    run_rllib_example_script_experiment,
)
from ray.tune.registry import get_trainable_cls, register_env  # noqa

class RockPaperScissors(MultiAgentEnv):
    """Two-player environment for the famous rock paper scissors game.
    Both players always move simultaneously over a course of 10 timesteps in total.
    The winner of each timestep receives reward of +1, the losing player -1.0.

    The observation of each player is the last opponent action.
    """

    ROCK = 0
    PAPER = 1
    SCISSORS = 2
    LIZARD = 3
    SPOCK = 4

    WIN_MATRIX = {
        (ROCK, ROCK): (0, 0),
        (ROCK, PAPER): (-1, 1),
        (ROCK, SCISSORS): (1, -1),
        (PAPER, ROCK): (1, -1),
        (PAPER, PAPER): (0, 0),
        (PAPER, SCISSORS): (-1, 1),
        (SCISSORS, ROCK): (-1, 1),
        (SCISSORS, PAPER): (1, -1),
        (SCISSORS, SCISSORS): (0, 0),
    }
    def __init__(self, config=None):
        super().__init__()

        self.agents = self.possible_agents = ["player1", "player2"]

        # The observations are always the last taken actions. Hence observation- and
        # action spaces are identical.
        self.observation_spaces = self.action_spaces = {
            "player1": gym.spaces.Discrete(3),
            "player2": gym.spaces.Discrete(3),
        }
        self.last_move = None
        self.num_moves = 0
    
    def reset(self, *, seed=None, options=None):
        self.num_moves = 0

        # The first observation should not matter (none of the agents has moved yet).
        # Set them to 0.
        return {
            "player1": 0,
            "player2": 0,
        }, {}  # <- empty infos dict

    def step(self, action_dict):
        self.num_moves += 1

        move1 = action_dict["player1"]
        move2 = action_dict["player2"]

        # Set the next observations (simply use the other player's action).
        # Note that because we are publishing both players in the observations dict,
        # we expect both players to act in the next `step()` (simultaneous stepping).
        observations = {"player1": move2, "player2": move1}

        # Compute rewards for each player based on the win-matrix.
        r1, r2 = self.WIN_MATRIX[move1, move2]
        rewards = {"player1": r1, "player2": r2}

        # Terminate the entire episode (for all agents) once 10 moves have been made.
        terminateds = {"__all__": self.num_moves >= 10}

        # Leave truncateds and infos empty.
        return observations, rewards, terminateds, {}, {}

if __name__ == "__main__":
    base_config = (
        PPOConfig()
        .environment(
            RockPaperScissors,
            env_config={},
        )
        .env_runners(
            env_to_module_connector=(
                lambda env, spaces, device: FlattenObservations(multi_agent=True)
            ),
        )
        .multi_agent(
            # Define two policies.
            policies={"player1", "player2"},
            # Map agent "player1" to policy "player1" and agent "player2" to policy
            # "player2".
            policy_mapping_fn=lambda agent_id, episode, **kw: agent_id,
        )
    )

    algo = base_config.build()
    print(algo.train())

[2025-08-06 15:48:33,695 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'MultiAgentEnvRunner' and ID: '9d7e0511acc21bb39d1fa18501000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
[2025-08-06 15:48:33,742 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'MultiAgentEnvRunner' and ID: '5fe12fc06b62e06d2a123a6201000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
2025-08-06 15:48:48,037	ERROR actor_manager.py:873 -- Ray error (The actor 5fe12fc06b62e06d2a123a6201000000 is unavailable: The actor is temporarily unavailable: IOError: The actor was restarted. The task may or may not have bee

{'timers': {'training_iteration': 40.02873206604272, 'restore_env_runners': 2.35771294683218e-05, 'training_step': 40.028292561881244, 'env_runner_sampling_timer': 16.731119564035907, 'learner_update_timer': 23.292531518032774, 'synch_weights': 0.004503910895437002}, 'env_runners': {'agent_episode_returns_mean': {'player2': -0.16, 'player1': 0.16}, 'env_to_module_connector': {'timers': {'connectors': {'numpy_to_tensor': np.float64(9.567372640827856e-05), 'add_states_from_episodes_to_batch': np.float64(1.4088377370608972e-05), 'add_time_dim_to_batch_and_zero_pad': np.float64(2.081415253709641e-05), 'add_observations_from_episodes_to_batch': np.float64(4.183911714907247e-05), 'batch_individual_items': np.float64(4.8569046073058694e-05), 'flatten_observations': np.float64(0.00014957863420221948), 'agent_to_module_mapping': np.float64(1.0993880391283682e-05)}}, 'connector_pipeline_timer': np.float64(0.0005539883795363839)}, 'env_to_module_sum_episodes_length_in': np.float64(4.1450237882508

## 回合制环境
我们实现著名的井字棋游戏（有一点微小的变动），在 3x3 的棋盘上进行。每个玩家一次在棋盘上放置一个棋子。棋子一旦放置就不能移动。首先完成一行（水平、对角线或垂直）的玩家赢得游戏并获得 +1 奖励。输家获得 -1 奖励。为了简化实现，与原始游戏的变动是，尝试将棋子放在已占用的区域会导致棋盘完全不变，但移动的玩家会因此受到 -5 的惩罚奖励（在原始游戏中，这种移动是根本不允许发生的）。


In [None]:
import gymnasium as gym
import numpy as np

from ray.rllib.env.multi_agent_env import MultiAgentEnv


class TicTacToe(MultiAgentEnv):
    """A two-player game in which any player tries to complete one row in a 3x3 field.

    The observation space is Box(0.0, 1.0, (9,)), where each index represents a distinct
    field on a 3x3 board and values of 0.0 mean the field is empty, -1.0 means
    the opponend owns the field, and 1.0 means we occupy the field:
    ----------
    | 0| 1| 2|
    ----------
    | 3| 4| 5|
    ----------
    | 6| 7| 8|
    ----------

    The action space is Discrete(9) and actions landing on an already occupied field
    are simply ignored (and thus useless to the player taking these actions).

    Once a player completes a row, they receive +1.0 reward, the losing player receives
    -1.0 reward. In all other cases, both players receive 0.0 reward.
    """

    def __init__(self, config=None):
        super().__init__()

        # Define the agents in the game.
        self.agents = self.possible_agents = ["player1", "player2"]

        # Each agent observes a 9D tensor, representing the 3x3 fields of the board.
        # A 0 means an empty field, a 1 represents a piece of player 1, a -1 a piece of
        # player 2.
        self.observation_spaces = {
            "player1": gym.spaces.Box(-1.0, 1.0, (9,), np.float32),
            "player2": gym.spaces.Box(-1.0, 1.0, (9,), np.float32),
        }
        # Each player has 9 actions, encoding the 9 fields each player can place a piece
        # on during their turn.
        self.action_spaces = {
            "player1": gym.spaces.Discrete(9),
            "player2": gym.spaces.Discrete(9),
        }

        self.board = None
        self.current_player = None

    def reset(self, *, seed=None, options=None):
        self.board = [
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
            0,
        ]
        # Pick a random player to start the game.
        self.current_player = np.random.choice(["player1", "player2"])
        # Return observations dict (only with the starting player, which is the one
        # we expect to act next).
        return {
            self.current_player: np.array(self.board, np.float32),
        }, {}

    def step(self, action_dict):
        action = action_dict[self.current_player]

        # Create a rewards-dict (containing the rewards of the agent that just acted).
        rewards = {self.current_player: 0.0}
        # Create a terminateds-dict with the special `__all__` agent ID, indicating that
        # if True, the episode ends for all agents.
        terminateds = {"__all__": False}

        opponent = "player1" if self.current_player == "player2" else "player2"

        # Penalize trying to place a piece on an already occupied field.
        if self.board[action] != 0:
            rewards[self.current_player] -= 5.0
        # Change the board according to the (valid) action taken.
        else:
            self.board[action] = 1 if self.current_player == "player1" else -1

            # After having placed a new piece, figure out whether the current player
            # won or not.
            if self.current_player == "player1":
                win_val = [1, 1, 1]
            else:
                win_val = [-1, -1, -1]
            if (
                # Horizontal win.
                self.board[:3] == win_val
                or self.board[3:6] == win_val
                or self.board[6:] == win_val
                # Vertical win.
                or self.board[0:7:3] == win_val
                or self.board[1:8:3] == win_val
                or self.board[2:9:3] == win_val
                # Diagonal win.
                or self.board[::3] == win_val
                or self.board[2:7:2] == win_val
            ):
                # Final reward is +5 for victory and -5 for a loss.
                rewards[self.current_player] += 5.0
                rewards[opponent] = -5.0

                # Episode is done and needs to be reset for a new game.
                terminateds["__all__"] = True

            # The board might also be full w/o any player having won/lost.
            # In this case, we simply end the episode and none of the players receives
            # +1 or -1 reward.
            elif 0 not in self.board:
                terminateds["__all__"] = True

        # Flip players and return an observations dict with only the next player to
        # make a move in it.
        self.current_player = opponent

        return (
            {self.current_player: np.array(self.board, np.float32)},
            rewards,
            terminateds,
            {},
            {},
        )



## 智能体分组
在多智能体强化学习中，常见的情况是存在智能体组，其中每个组都被视为一个具有元组动作和观察空间（元组中的每个项对应组中的每个单独智能体）的单智能体。

In [None]:
def with_agent_groups(
    self,
    groups: Dict[str, List[AgentID]],
    obs_space: gym.Space = None,
    act_space: gym.Space = None,
) -> "MultiAgentEnv":
    """Convenience method for grouping together agents in this env.

    An agent group is a list of agent IDs that are mapped to a single
    logical agent. All agents of the group must act at the same time in the
    environment. The grouped agent exposes Tuple action and observation
    spaces that are the concatenated action and obs spaces of the
    individual agents.

    The rewards of all the agents in a group are summed. The individual
    agent rewards are available under the "individual_rewards" key of the
    group info return.

    Agent grouping is required to leverage algorithms such as Q-Mix.

    Args:
        groups: Mapping from group id to a list of the agent ids
            of group members. If an agent id is not present in any group
            value, it will be left ungrouped. The group id becomes a new agent ID
            in the final environment.
        obs_space: Optional observation space for the grouped
            env. Must be a tuple space. If not provided, will infer this to be a
            Tuple of n individual agents spaces (n=num agents in a group).
        act_space: Optional action space for the grouped env.
            Must be a tuple space. If not provided, will infer this to be a Tuple
            of n individual agents spaces (n=num agents in a group).

    .. testcode::
        :skipif: True

        from ray.rllib.env.multi_agent_env import MultiAgentEnv
        class MyMultiAgentEnv(MultiAgentEnv):
            # define your env here
            ...
        env = MyMultiAgentEnv(...)
        grouped_env = env.with_agent_groups(env, {
            "group1": ["agent1", "agent2", "agent3"],
            "group2": ["agent4", "agent5"],
        })

    """

    from ray.rllib.env.wrappers.group_agents_wrapper import \
        GroupAgentsWrapper
    return GroupAgentsWrapper(self, groups, obs_space, act_space)


## 使用 MultiAgentEnv 运行实际训练实验
如果所有智能体使用相同的算法类来训练其策略，请按如下方式配置多智能体训练：

In [14]:
from ray.rllib.algorithms.ppo import PPOConfig
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.core.rl_module.rl_module import RLModuleSpec

config = (
    PPOConfig()
    .environment(env="my_multiagent_env")
    .multi_agent(
        policy_mapping_fn=lambda agent_id, episode, **kwargs: (
            "traffic_light" if agent_id.startswith("traffic_light_")
            else random.choice(["car1", "car2"])
        ),
        algorithm_config_overrides_per_module={
            "car1": PPOConfig.overrides(gamma=0.85),
            "car2": PPOConfig.overrides(lr=0.00001),
        },
    )
    .rl_module(
        rl_module_spec=MultiRLModuleSpec(rl_module_specs={
            "car1": RLModuleSpec(),
            "car2": RLModuleSpec(),
            "traffic_light": RLModuleSpec(),
        }),
    )
)

algo = config.build()
print(algo.train())

[2025-08-06 16:11:41,185 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'SingleAgentEnvRunner' and ID: '0eb248dc9805a7b7fb6aafbd01000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
[2025-08-06 16:11:41,234 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'SingleAgentEnvRunner' and ID: 'd1ad7c3ac22d52ff2a86f56301000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
2025-08-06 16:11:45,252	ERROR actor_manager.py:873 -- Ray error (The actor died because of an error raised in its creation task, [36mray::SingleAgentEnvRunner.__init__()[39m (pid=3451776, ip=10.110.34.88, actor_id=0eb248dc98

IndexError: list index out of range

In [None]:
from ray.rllib.core.rl_module import RLModule

def policy_mapping_fn(agent_id, episode, **kwargs):
    agent_idx = int(agent_id[-1])  # 0 (player1) or 1 (player2)
    return "learning_policy" if episode.id_ % 2 == agent_idx else "random_policy"

config = (
    PPOConfig()
    .environment(env="two_player_game")
    .multi_agent(
        policy_mapping_fn=policy_mapping_fn,
        policies_to_train=["learning_policy"],
    )
    .rl_module(
        rl_module_spec=MultiRLModuleSpec(rl_module_specs={
            "learning_policy": RLModuleSpec(),
            "random_policy": RLModuleSpec(rl_module_class=RandomRLModule),
        }),
    )
)

algo = config.build()
print(algo.train())

# 分层环境
可以将分层动作模式中的任何一种实现为一个包含各种类型智能体（例如高层智能体和低层智能体）的多智能体环境。当使用正确的智能体到模块映射函数进行设置时，从 RLlib 的角度来看，该问题就变成了一个简单的、具有不同类型策略的独立多智能体问题。

In [18]:
from ray.rllib.algorithms.ppo import PPOConfig

config = (
    PPOConfig()
    .environment("Pendulum-v1")
    .multi_agent(
        policies={"top_level", "low_level"},
        policy_mapping_fn=(
            lambda aid, eps, **kw: "low_level" if aid.startswith("low_level") else "top_level"
        ),
        policies_to_train=["top_level"],
    )
)

algo = config.build()
print(algo.train())

[2025-08-06 16:40:28,744 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'MultiAgentEnvRunner' and ID: '75ace56423b1bef47f2366bb01000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
[2025-08-06 16:40:28,795 E 3440243 3440243] core_worker.cc:2740: Actor with class name: 'MultiAgentEnvRunner' and ID: '47414a5b2117509e5f5595e801000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
2025-08-06 16:40:32,882	ERROR actor_manager.py:873 -- Ray error (The actor died because of an error raised in its creation task, [36mray::MultiAgentEnvRunner.__init__()[39m (pid=3932990, ip=10.110.34.88, actor_id=75ace56423b1b

IndexError: list index out of range