Multi-Agent and Hierarchical

比如：在交通中，可能有多种“车”和“交通灯”智能体在环境中，同时有动作

定义多智能体环境

RLlib 中多智能体的思维模型：

环境返回：你的环境是 MultiAgentEnv 的子类，它返回一个字典，将智能体的 ID（例如字符串，环境可以任意选择这些 ID）映射到每个智能体的观察、奖励和完成标志。

定义策略：你需要预先定义一些策略，这些策略在训练开始时就可用（你也可以在训练过程中动态地添加新的策略）。这些策略定义了智能体在环境中如何行动。

智能体到策略的映射：你需要定义一个函数，将环境生成的智能体 ID 映射到任何可用的策略 ID。这个函数确定了用于计算该特定智能体动作的策略。

In [None]:
""" 
所有智能体同时移动
"""

# Env, in which all agents (whose IDs are entirely determined by the env
# itself via the returned multi-agent obs/reward/dones-dicts) step
# simultaneously.
env = MultiAgentTrafficEnv(num_cars=2, num_traffic_lights=1)

# Observations are a dict mapping agent names to their obs. Only those
# agents' names that require actions in the next call to `step()` should
# be present in the returned observation dict (here: all, as we always step
# simultaneously).
print(env.reset())
# ... {
# ...   "car_1": [[...]],
# ...   "car_2": [[...]],
# ...   "traffic_light_1": [[...]],
# ... }

# In the following call to `step`, actions should be provided for each
# agent that returned an observation before:
new_obs, rewards, dones, infos = env.step(
    actions={"car_1": ..., "car_2": ..., "traffic_light_1": ...})

# Similarly, new_obs, rewards, dones, etc. also become dicts.
print(rewards)
# ... {"car_1": 3, "car_2": -1, "traffic_light_1": 0}

# Individual agents can early exit; The entire episode is done when
# dones["__all__"] = True.
print(dones)
# ... {"car_2": True, "__all__": False}

In [None]:
"""  
智能体一个接着一个移动
"""
# Env, in which two agents step in sequence (tuen-based game).
# The env is in charge of the produced agent ID. Our env here produces
# agent IDs: "player1" and "player2".
env = TicTacToe()

# Observations are a dict mapping agent names to their obs. Only those
# agents' names that require actions in the next call to `step()` should
# be present in the returned observation dict (here: one agent at a time).
print(env.reset())
# ... {
# ...   "player1": [[...]],
# ... }

# In the following call to `step`, only those agents' actions should be
# provided that were present in the returned obs dict:
new_obs, rewards, dones, infos = env.step(actions={"player1": ...})

# Similarly, new_obs, rewards, dones, etc. also become dicts.
# Note that only in the `rewards` dict, any agent may be listed (even those that have
# not(!) acted in the `step()` call). Rewards for individual agents will be added
# up to the point where a new action for that agent is needed. This way, you may
# implement a turn-based 2-player game, in which player-2's reward is published
# in the `rewards` dict immediately after player-1 has acted.
print(rewards)
# ... {"player1": 0, "player2": 0}

# Individual agents can early exit; The entire episode is done when
# dones["__all__"] = True.
print(dones)
# ... {"player1": False, "__all__": False}

# In the next step, it's player2's turn. Therefore, `new_obs` only container
# this agent's ID:
print(new_obs)
# ... {
# ...   "player2": [[...]]
# ... }


In [None]:
"""
"car1" 智能体的策略使用了默认的策略类（policy_class=None），观察空间和动作空间从环境中自动推断，同时通过配置参数 {"gamma": 0.85} 来设置特定的参数。
"car2" 智能体的策略也使用了默认的策略类，但观察空间和动作空间是通过 car_obs_space 和 car_act_space 来指定的，并且通过配置参数 {"gamma": 0.99} 来设置了特定的参数。
"traffic_light" 智能体的策略同样使用了默认的策略类，但是它具有特殊的观察空间和动作空间，分别由 tl_obs_space 和 tl_act_space 指定。
同时，policy_mapping_fn 函数定义了智能体 ID 到策略的映射关系。如果智能体 ID 以 "traffic_light_" 开头，那么将其映射到名为 traffic_light 的策略上；否则，随机选择 car1 或 car2 中的一个策略。
"""

algo = pg.PGAgent(env="my_multiagent_env", config={
    "multiagent": {
        "policies": {
            # Use the PolicySpec namedtuple to specify an individual policy:
            "car1": PolicySpec(
                policy_class=None,  # infer automatically from Algorithm
                observation_space=None,  # infer automatically from env
                action_space=None,  # infer automatically from env
                config={"gamma": 0.85},  # use main config plus <- this override here
                ),  # alternatively, simply do: `PolicySpec(config={"gamma": 0.85})`

            # Deprecated way: Tuple specifying class, obs-/action-spaces,
            # config-overrides for each policy as a tuple.
            # If class is None -> Uses Algorithm's default policy class.
            "car2": (None, car_obs_space, car_act_space, {"gamma": 0.99}),

            # New way: Use PolicySpec() with keywords: `policy_class`,
            # `observation_space`, `action_space`, `config`.
            "traffic_light": PolicySpec(
                observation_space=tl_obs_space,  # special obs space for lights?
                action_space=tl_act_space,  # special action space for lights?
                ),
        },
        "policy_mapping_fn":
            lambda agent_id, episode, worker, **kwargs: # <- this is the mapping function
                "traffic_light"  # Traffic lights are always controlled by this policy
                if agent_id.startswith("traffic_light_")
                else random.choice(["car1", "car2"])  # Randomly choose from car policies
    },
})

while True:
    print(algo.train())

In [None]:
# Example for a mapping function that maps agent IDs "player1" and "player2" to either
# "random_policy" or "learning_policy", making sure that in each episode, both policies
# are always playing each other.
def policy_mapping_fn(agent_id, episode, worker, **kwargs):
    agent_idx = int(agent_id[-1])  # 0 (player1) or 1 (player2)
    # agent_id = "player[1|2]" -> policy depends on episode ID
    # This way, we make sure that both policies sometimes play player1
    # (start player) and sometimes player2 (player to move 2nd).
    return "learning_policy" if episode.episode_id % 2 == agent_idx else "random_policy"

algo = pg.PGAgent(env="two_player_game", config={
    "multiagent": {
        "policies": {
            "learning_policy": PolicySpec(),  # <- use default class & infer obs-/act-spaces from env.
            "random_policy": PolicySpec(policy_class=RandomPolicy),  # infer obs-/act-spaces from env.
        },
        # Example for a mapping function that maps agent IDs "player1" and "player2" to either
        # "random_policy" or "learning_policy", making sure that in each episode, both policies
        # are always playing each other.
        "policy_mapping_fn": policy_mapping_fn,
        # Specify a (fixed) list (or set) of policy IDs that should be updated.
        "policies_to_train": ["learning_policy"],

        # Alternatively, you can provide a callable that returns True or False, when provided
        # with a policy ID and an (optional) SampleBatch:

        # "policies_to_train": lambda pid, batch: ... (<- return True or False)

        # This allows you to more flexibly update (or not) policies, based on
        # who they played with in the episode (or other information that can be
        # found in the given batch, e.g. rewards).
    },
})

In [None]:
# 示例：映射函数，将代理ID "player1" 和 "player2" 分别映射到 "random_policy" 或 "learning_policy"。
def policy_mapping_fn(agent_id, episode, worker, **kwargs):
    agent_idx = int(agent_id[-1])  # 0 (player1) 或 1 (player2)
    # agent_id = "player[1|2]" -> 策略取决于回合 ID
    # 这样，我们确保两种策略有时与 player1（先手）对战，有时与 player2（后手）对战。
    return "learning_policy" if episode.episode_id % 2 == agent_idx else "random_policy"

algo = pg.PGAgent(env="two_player_game", config={
    "multiagent": {
        "policies": {
            "learning_policy": PolicySpec(),  # <- 使用默认类 & 从环境推断观察/动作空间。
            "random_policy": PolicySpec(policy_class=RandomPolicy),  # 从环境推断观察/动作空间。
        },
        # 映射函数，将代理 ID "player1" 和 "player2" 分别映射到 "random_policy" 或 "learning_policy"。
        "policy_mapping_fn": policy_mapping_fn,
        # 指定一个（固定的）策略 ID 列表（或集合），应该进行训练。
        "policies_to_train": ["learning_policy"],

        # 或者，您可以提供一个可调用函数，当提供策略 ID 和（可选的）SampleBatch 时返回 True 或 False：

        # "policies_to_train": lambda pid, batch: ... (<- 返回 True 或 False)

        # 这使您能够根据他们在回合中与谁对战（或在给定批次中找到的其他信息，例如奖励）更灵活地更新（或不更新）策略。
    },
})


In [None]:
"""Simple example of setting up an agent-to-module mapping function.

How to run this script
----------------------
`python [script file name].py --enable-new-api-stack --num-agents=2`

Control the number of agents and policies (RLModules) via --num-agents and
--num-policies.

For debugging, use the following additional command line options
`--no-tune --num-env-runners=0`
which should allow you to set breakpoints anywhere in the RLlib code and
have the execution stop there for inspection and debugging.

For logging to your WandB account, use:
`--wandb-key=[your WandB API key] --wandb-project=[some project name]
--wandb-run-name=[optional: WandB run name (within the defined project)]`
"""

from ray.rllib.examples.envs.classes.multi_agent import MultiAgentCartPole
from ray.rllib.utils.test_utils import (
    add_rllib_example_script_args,
    run_rllib_example_script_experiment,
)
from ray.tune.registry import get_trainable_cls, register_env

parser = add_rllib_example_script_args(
    default_iters=200,
    default_timesteps=100000,
    default_reward=600.0,
)
# TODO (sven): This arg is currently ignored (hard-set to 2).
parser.add_argument("--num-policies", type=int, default=2)


if __name__ == "__main__":
    args = parser.parse_args()

    # Register our environment with tune.
    if args.num_agents > 0:
        register_env(
            "env",
            lambda _: MultiAgentCartPole(config={"num_agents": args.num_agents}),
        )

    base_config = (
        get_trainable_cls(args.algo)
        .get_default_config()
        .environment("env" if args.num_agents > 0 else "CartPole-v1")
        .env_runners(
            # TODO (sven): MAEnvRunner does not support vectorized envs yet
            #  due to gym's env checkers and non-compatability with RLlib's
            #  MultiAgentEnv API.
            num_envs_per_env_runner=1
            if args.num_agents > 0
            else 20,
        )
    )

    # Add a simple multi-agent setup.
    if args.num_agents > 0:
        base_config.multi_agent(
            policies={f"p{i}" for i in range(args.num_agents)},
            policy_mapping_fn=lambda aid, *a, **kw: f"p{aid}",
        )

    run_rllib_example_script_experiment(base_config, args)

PettingZoo Multi-Agent Environments

it can be converted into an rllib MultiAgentEnv

In [None]:
from ray.tune.registry import register_env
# import the pettingzoo environment
from pettingzoo.butterfly import prison_v3
# import rllib pettingzoo interface
from ray.rllib.env import PettingZooEnv
# define how to make the environment. This way takes an optional environment config, num_floors
"""
lambda: 在 Python 中，lambda 关键字用于创建匿名函数，即没有名称的函数。
config: 这是 lambda 函数的参数，它用于接收环境创建时的配置参数，通常是一个字典。
prison_v3.env(...): 这是调用 prison_v3 环境对象的 env 方法来创建环境实例。
num_floors=config.get("num_floors", 4): 这是一个参数设置，它从配置参数中获取名为 "num_floors" 的键对应的值，如果该键不存在，则默认为 4。这样可以灵活地指定环境的楼层数。

"""
env_creator = lambda config: prison_v3.env(num_floors=config.get("num_floors", 4))
# register that way to make the environment under an rllib name
register_env('prison', lambda config: PettingZooEnv(env_creator(config)))
# now you can use `prison` as an environment
# you can pass arguments to the environment creator with the env_config option in the config
config['env_config'] = {"num_floors": 5}

分组处理

In [None]:
def with_agent_groups(
    self,
    groups: Dict[str, List[AgentID]],
    obs_space: gym.Space = None,
    act_space: gym.Space = None,
) -> "MultiAgentEnv":
    """Convenience method for grouping together agents in this env.

    An agent group is a list of agent IDs that are mapped to a single
    logical agent. All agents of the group must act at the same time in the
    environment. The grouped agent exposes Tuple action and observation
    spaces that are the concatenated action and obs spaces of the
    individual agents.

    The rewards of all the agents in a group are summed. The individual
    agent rewards are available under the "individual_rewards" key of the
    group info return.

    Agent grouping is required to leverage algorithms such as Q-Mix.

    Args:
        groups: Mapping from group id to a list of the agent ids
            of group members. If an agent id is not present in any group
            value, it will be left ungrouped. The group id becomes a new agent ID
            in the final environment.
        obs_space: Optional observation space for the grouped
            env. Must be a tuple space. If not provided, will infer this to be a
            Tuple of n individual agents spaces (n=num agents in a group).
        act_space: Optional action space for the grouped env.
            Must be a tuple space. If not provided, will infer this to be a Tuple
            of n individual agents spaces (n=num agents in a group).

    .. testcode::
        :skipif: True

        from ray.rllib.env.multi_agent_env import MultiAgentEnv
        class MyMultiAgentEnv(MultiAgentEnv):
            # define your env here
            ...
        env = MyMultiAgentEnv(...)
        grouped_env = env.with_agent_groups(env, {
            "group1": ["agent1", "agent2", "agent3"],
            "group2": ["agent4", "agent5"],
        })

    """

    from ray.rllib.env.wrappers.group_agents_wrapper import \
        GroupAgentsWrapper
    return GroupAgentsWrapper(self, groups, obs_space, act_space)


分层

In [None]:
"multiagent": {
    "policies": {
        "top_level": (custom_policy or None, ...),
        "mid_level": (custom_policy or None, ...),
        "low_level": (custom_policy or None, ...),
    },
    "policy_mapping_fn":
        lambda agent_id:
            "low_level" if agent_id.startswith("low_level_") else
            "mid_level" if agent_id.startswith("mid_level_") else "top_level"
    "policies_to_train": ["top_level"],
},