In [None]:
from ray.rllib.env.multi_agent_env import MultiAgentEnv
import gymnasium as gym
import numpy as np
from ray.tune.registry import register_env
from common.envUtils import *


class MultiAgentReachEnv(MultiAgentEnv):
    def __init__(self, config=None):
        super().__init__()
        self.env_core = make_reach_her_env_masac(log_dir=config.get("log_dir"))
        
        # multi agent id
        self.possible_agents = ['agent_1', 'agent_2']
        self.agents = self.possible_agents = ['agent_1', 'agent_2']

        # multi agent obs_space & act_space
        # self.observation_space = self.env_core.unwrapped.observation_spec
        self.observation_spaces = {
            "agent_1": gym.spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32),
            "agent_2": gym.spaces.Box(low=-np.inf, high=np.inf, shape=(8,), dtype=np.float32),
        }
        # self.action_space = self.env_core.unwrapped.action_spec
        self.action_spaces = {
            "agent_1": gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32),
            "agent_2": gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32),
        }
    
    def get_action_space(self, agent_id):
        return self.action_spaces[agent_id]

    def get_observation_space(self, agent_id):
        return self.observation_spaces[agent_id]
    
    def reset(self, *, seed=None, options=None):
        self.env_core.reset()
        return {
            "agent_1": np.zeros(self.observation_spaces["agent_1"].shape, dtype=self.observation_spaces["agent_1"].dtype),
            "agent_2": np.zeros(self.observation_spaces["agent_2"].shape, dtype=self.observation_spaces["agent_2"].dtype),
        }, {'success': False}
    
    def step(self, action_dict):
        action = np.concatenate([
            action_dict["agent_1"], 
            action_dict["agent_2"],
        ])
        observation, reward, terminated, truncated, info = self.env_core.step(action)

        observations = {
            "agent_1": self.env_core._process_obs_her_masac(obs=observation, obs_type='pos'),
            "agent_2": self.env_core._process_obs_her_masac(obs=observation, obs_type='rot'),
        }

        pos_err_reward, rot_err_reward, _, _, _ = self.env_core._get_reward_masac()
        rewards = {
            "agent_1": pos_err_reward + reward,
            "agent_2": rot_err_reward + reward,
        }
        self.env_core.unwrapped.write_tensorboard("Reward/PosAgentReward", rewards["agent_1"])
        self.env_core.unwrapped.write_tensorboard("Reward/RotAgentReward", rewards["agent_2"])

        terminateds = {
            "agent_1": terminated, "agent_2": terminated,
        }

        truncateds = {
            "agent_1": truncated, "agent_2": truncated,
        }
        terminateds["__all__"] = all(terminateds.values())
        truncateds["__all__"] = any(truncateds.values())

        infos = {
            "agent_1": info, "agent_2": info,
        }

        return observations, rewards, terminateds, truncateds, infos
    
register_env("MultiReach-v0", MultiAgentReachEnv)


In [None]:
from datetime import datetime
import gymnasium as gym
from ray.rllib.algorithms.sac import SACConfig
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
import numpy as np

TASK="MultiAgentReach_"
experiment_name = TASK + datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
LOGDIR=f"/home/ey/rl/src/rlreach2/rlreach/ray/db/ray_results/{experiment_name}"

def policy_mapping_fn(agent_id, *args, **kwargs):
    if agent_id == "agent_1":
        p = "policy_1"
    elif agent_id == "agent_2":
        p = "policy_2"
    else:
        raise ValueError(f"Unknown agent_id: {agent_id}")
    # print(f"[mapping] {agent_id} -> {p}")
    return p


config = (
    SACConfig()
    .environment(
        env="MultiReach-v0",
        env_config={"log_dir": LOGDIR},        
    )
    .multi_agent(
        policies={
            "policy_1": (None, gym.spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32), gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32), {}),
            "policy_2": (None, gym.spaces.Box(low=-np.inf, high=np.inf, shape=(8,), dtype=np.float32), gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32), {}),
        },
        policy_mapping_fn=policy_mapping_fn,
        policies_to_train=["policy_1", "policy_2",]
    )
    .rl_module(
        rl_module_spec=MultiRLModuleSpec(
            rl_module_specs={
                "policy_1": RLModuleSpec(
                    observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32),
                    action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32),
                ),
                "policy_2": RLModuleSpec(
                    observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(8,), dtype=np.float32),
                    action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32),
                ),
            }
        )
    )
    .training(
        initial_alpha=0.2,
        actor_lr=1e-4,
        critic_lr=1e-4,
        alpha_lr=1e-4,
        target_entropy="auto",
        n_step=1,
        tau=0.005,
        train_batch_size=128,
        target_network_update_freq=1,
        replay_buffer_config={
            "type": "MultiAgentEpisodeReplayBuffer",
            "capacity": 1000000,
            "learning_starts": 1000,
        },
        num_steps_sampled_before_learning_starts=1000,
        model={
            "fcnet_hiddens": [512, 512],
            "fcnet_activation": "relu",
            "post_fcnet_hiddens": [],
            "post_fcnet_activation": None,
            "post_fcnet_weights_initializer": "orthogonal_",
            "post_fcnet_weights_initializer_config": {"gain": 0.01},
        },
    )
    .resources(
        num_gpus=0.25,      # 或 0.25 视机器配置
        num_cpus_per_worker=1,
        num_learner_workers=1,
    )
    .framework("torch")
    .reporting(
        metrics_num_episodes_for_smoothing=5,
        min_sample_timesteps_per_iteration=1000,
    )
    .evaluation(
        evaluation_interval=1,
        evaluation_num_env_runners=1,
        evaluation_config={"seed": 42},
    )
    .env_runners(
        num_env_runners=6,             # 进程数量
        num_envs_per_env_runner=1,     # 环境数量
        # gym_env_vectorize_mode="ASYNC"
    )
)


from ray import train, tune, air
tuner = tune.Tuner(
    trainable=config.algo_class,
    param_space=config,
    run_config=train.RunConfig(
        name="multi_agent_reach",
        storage_path=LOGDIR,
        log_to_file=True,
        checkpoint_config=air.CheckpointConfig(
            checkpoint_frequency=10,
            checkpoint_at_end=True,
        ),
        stop={"evaluation/env_runners/episode_return_mean": 18000.0}
    )
)

results = tuner.fit()

In [None]:

# spec1 = RLModuleSpec(
#     module_class=rl_module.__class__,
#     observation_space=rl_module.observation_space,
#     action_space=rl_module.action_space,
#     model_config={"twin_q": True},
#     load_state_path=checkpoint_path 
# )

from ray.rllib.policy.policy import Policy
checkpoint_path = "/home/ey/rl/src/rlreach2/rlreach/ray/db/ray_results/Reach_2025-08-19_20-10-32/reach/SAC_ReachEnvHERGym_81d54_00000_0_2025-08-19_20-10-33/checkpoint_000155/learner_group/learner/rl_module/default_policy"
my_restored_policy = Policy.from_checkpoint(checkpoint_path)

In [3]:
from ray.rllib.env.multi_agent_env import MultiAgentEnv
import gymnasium as gym
import numpy as np
from ray.tune.registry import register_env
from common.envUtils import *
import random


class MultiAgentReachEnv(MultiAgentEnv):
    def __init__(self, config=None):
        super().__init__()
        self.env_core = make_reach_her_env_masac(log_dir=config.get("log_dir"))
        # self.her_ratio=0.05
        
        # multi agent id
        self.possible_agents = ['agent_1', 'agent_2']
        self.agents = self.possible_agents = ['agent_1', 'agent_2']

        # multi agent obs_space & act_space
        # self.observation_space = self.env_core.unwrapped.observation_spec
        self.observation_spaces = {
            "agent_1": gym.spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32),
            "agent_2": gym.spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32),
        }
        # self.action_space = self.env_core.unwrapped.action_spec
        self.action_spaces = {
            "agent_1": gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32),
            "agent_2": gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32),
        }
    
    def get_action_space(self, agent_id):
        return self.action_spaces[agent_id]

    def get_observation_space(self, agent_id):
        return self.observation_spaces[agent_id]
    
    def reset(self, *, seed=None, options=None):
        self.env_core.reset()
        return {
            "agent_1": np.zeros(self.observation_spaces["agent_1"].shape, dtype=self.observation_spaces["agent_1"].dtype),
            "agent_2": np.zeros(self.observation_spaces["agent_2"].shape, dtype=self.observation_spaces["agent_2"].dtype),
        }, {'success': False}
    
    def step(self, action_dict):
        action = np.concatenate([
            action_dict["agent_1"], 
            action_dict["agent_2"],
        ])
        observation, reward, terminated, truncated, info = self.env_core.step(action)
        pos_err_reward, rot_err_reward, pose_err_reward, action_penalty, vel_penalty = self.env_core._get_reward_masac()

        # if (random.random() < self.her_ratio) and (pos_err_reward + rot_err_reward < 0.0):
        #     observations = {
        #         "agent_1": self.env_core._process_obs_her_masac(obs=observation, obs_type='pos'),
        #         "agent_2": self.env_core._process_obs_her_masac(obs=observation, obs_type='rot'),
        #     }
        #     rewards = {
        #         "agent_1": 5.0,
        #         "agent_2": 5.0,
        #     }
        # else:
        observations = {
            "agent_1": self.env_core._process_obs_masac(obs=observation, obs_type='pos'),
            "agent_2": self.env_core._process_obs_masac(obs=observation, obs_type='rot'),
        }
        rewards = {
            "agent_1": pos_err_reward + pose_err_reward + action_penalty + vel_penalty,
            "agent_2": rot_err_reward + pose_err_reward + action_penalty + vel_penalty,
        }

        self.env_core.unwrapped.write_tensorboard("Reward/EnvOriginReward", reward)
        self.env_core.unwrapped.write_tensorboard("Reward/PosProcessReward", pos_err_reward)
        self.env_core.unwrapped.write_tensorboard("Reward/RotProcessReward", rot_err_reward)
        self.env_core.unwrapped.write_tensorboard("Reward/PosAgentReward", rewards["agent_1"])
        self.env_core.unwrapped.write_tensorboard("Reward/RotAgentReward", rewards["agent_2"])

        terminateds = {
            "agent_1": terminated, "agent_2": terminated,
        }

        truncateds = {
            "agent_1": truncated, "agent_2": truncated,
        }
        terminateds["__all__"] = all(terminateds.values())
        truncateds["__all__"] = any(truncateds.values())

        infos = {
            "agent_1": info, "agent_2": info,
        }

        return observations, rewards, terminateds, truncateds, infos
    
register_env("MultiReach-v0", MultiAgentReachEnv)



In [2]:
from datetime import datetime
import gymnasium as gym
from ray.rllib.algorithms.sac import SACConfig
from ray.rllib.core.rl_module.multi_rl_module import MultiRLModuleSpec
from ray.rllib.core.rl_module.rl_module import RLModuleSpec
import numpy as np
from pathlib import Path

TASK="MultiAgentReach_"
experiment_name = TASK + datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
LOGDIR=f"/home/ey/rl/src/rlreach2/rlreach/ray/db/ray_results/{experiment_name}"

def policy_mapping_fn(agent_id, *args, **kwargs):
    if agent_id == "agent_1":
        p = "policy_1"
    elif agent_id == "agent_2":
        p = "policy_2"
    else:
        raise ValueError(f"Unknown agent_id: {agent_id}")
    # print(f"[mapping] {agent_id} -> {p}")
    return p

config = (
    SACConfig()
    .environment(
        env="MultiReach-v0",
        env_config={"log_dir": LOGDIR},        
    )
    .multi_agent(
        policies={
            "policy_1": (None, gym.spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32), gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32), {}),
            "policy_2": (None, gym.spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32), gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32), {}),
        },
        policy_mapping_fn=policy_mapping_fn,
        policies_to_train=["policy_1", "policy_2",]
    )
    .rl_module(
        rl_module_spec=MultiRLModuleSpec(
            rl_module_specs={
                "policy_1": RLModuleSpec(
                    observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32),
                    action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32),
                ),
                "policy_2": RLModuleSpec(
                    observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(6,), dtype=np.float32),
                    action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32),
                ),
            }
        )
    )
    .training(
        twin_q=True,
        initial_alpha=0.2,
        actor_lr=1e-4,
        critic_lr=1e-4,
        alpha_lr=1e-4,
        target_entropy="auto",
        n_step=1,
        tau=0.005,
        train_batch_size=128,
        target_network_update_freq=1,
        replay_buffer_config={
            "type": "MultiAgentEpisodeReplayBuffer",
            "capacity": 1000000,
            "learning_starts": 1000,
            "replay_batch_size": 200,
        },
        num_steps_sampled_before_learning_starts=1000,
        model={
            "fcnet_hiddens": [512, 512],
            "fcnet_activation": "relu",
            "post_fcnet_hiddens": [],
            "post_fcnet_activation": None,
            "post_fcnet_weights_initializer": "orthogonal_",
            "post_fcnet_weights_initializer_config": {"gain": 0.01},
        },
    )
    # .resources(
    #     num_cpus=10,
    #     num_gpus=0.25,      # 或 0.25 视机器配置
    #     num_cpus_per_worker=5,
    #     num_learner_workers=1,
    # )
    .framework("torch")
    .reporting(
        metrics_num_episodes_for_smoothing=5,
        min_sample_timesteps_per_iteration=1000,
    )
    .evaluation(
        evaluation_interval=1,
        evaluation_num_env_runners=1,
        evaluation_duration=2,
        evaluation_config={"seed": 42},
    )
    .env_runners(
        rollout_fragment_length=200,
    )
    # .env_runners(
    #     num_env_runners=6,             # 进程数量
    #     num_envs_per_env_runner=1,     # 环境数量
    #     # gym_env_vectorize_mode="ASYNC"
    # )
)

from ray import train, tune, air

def _on_sample_end(env_runner, metrics_logger, samples, **kwargs):
    if not hasattr(_on_sample_end, "count"):
        _on_sample_end.count = 0  # 初始化静态变量
    agent_1_episode = samples[0].agent_episodes["agent_1"]
    agent_2_episode = samples[0].agent_episodes["agent_2"]
    # print(f"samples len:{len(samples)}")
    if len(agent_1_episode) == 200:
        if (random.random() < 0.1):
            err = {
                "pos_init": calculate_pos_error(
                    np.array(agent_1_episode.observations[1][:3]), 
                    np.array(agent_1_episode.observations[1][-3:])
                ),
                "rot_init": calculate_rot_error(
                    np.array(agent_2_episode.observations[1][:3]), 
                    np.array(agent_2_episode.observations[1][-3:]),
                    angle_unit='radians'
                ),
            }
        # pos
            for i in range(len(agent_1_episode.observations)):
                agent_1_episode.observations[i][:3] = agent_1_episode.observations[-1][-3:]
                err["pos"] = calculate_pos_error(
                    np.array(agent_1_episode.observations[i][:3]), 
                    np.array(agent_1_episode.observations[i][-3:])
                )
                agent_1_episode.rewards[i-1] = ReachEnv.cal_pos_reward(error=err)
        # rot
            # for i in range(len(agent_2_episode.observations))[1:]:
                agent_2_episode.observations[i][:3] = agent_2_episode.observations[-1][-3:]
                err["rot"] = calculate_rot_error(
                    np.array(agent_2_episode.observations[i][:3]), 
                    np.array(agent_2_episode.observations[i][-3:]),
                    angle_unit='radians'
                )
                agent_2_episode.rewards[i-1] = ReachEnv.cal_rot_reward(error=err)
    # for i in range(len(ep1.observations)):
    #     if ep1.observations[i][0] == 0 and  ep1.observations[i][1] == 0:
    #         print(_on_sample_end.count)
    #         _on_sample_end.count = 0
    #     else:
    #         _on_sample_end.count += 1
    #     print(f"ep1 obs:{ep1.observations[i]}")
    # for i in range(len(ep1.rewards)):
    #     print(f"ep1 r:{ep1.rewards[i]}")
    # for i in range(len(ep1.actions)):
    #     print(f"ep1 act:{ep1.actions[i]}")

    
config.callbacks(on_sample_end=_on_sample_end)

tuner = tune.Tuner(
    trainable=config.algo_class,
    param_space=config,
    run_config=train.RunConfig(
        name="multi_agent_reach",
        storage_path=LOGDIR,
        log_to_file=True,
        checkpoint_config=air.CheckpointConfig(
            checkpoint_frequency=10,
            checkpoint_at_end=True,
        ),
        stop={"evaluation/env_runners/episode_return_mean": 1800000.0},
        # callbacks=[MyCheckpointCallback()],  # 挂上去
    )
)

results = tuner.fit()



0,1
Current time:,2025-09-21 21:22:17
Running for:,00:07:31.67
Memory:,12.6/15.3 GiB

Trial name,# failures,error file
SAC_MultiReach-v0_f179e_00000,1,/tmp/ray/session_2025-09-21_21-14-43_999374_380144/artifacts/2025-09-21_21-14-45/multi_agent_reach/driver_artifacts/SAC_MultiReach-v0_f179e_00000_0_2025-09-21_21-14-45/error.txt

Trial name,status,loc,iter,total time (s),num_training_step_ca lls_per_iteration,num_env_steps_sample d_lifetime
SAC_MultiReach-v0_f179e_00000,ERROR,127.0.1.1:381322,12,219.276,5,12000


[36m(pid=381322)[0m E0000 00:00:1758460486.522497  381322 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(pid=381322)[0m E0000 00:00:1758460486.526133  381322 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
[36m(pid=381322)[0m W0000 00:00:1758460486.535692  381322 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
[36m(pid=381322)[0m W0000 00:00:1758460486.535713  381322 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
[36m(pid=381322)[0m W0000 00:00:1758460486.535714  381322 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
[36m(pid=38

[36m(SAC pid=381322)[0m [chatbus_1] 共享内存不存在，创建成功


[36m(SAC pid=381322)[0m   gym.logger.warn(
[36m(SAC pid=381322)[0m   gym.logger.warn(
[36m(SAC pid=381322)[0m [2025-09-21 21:14:52,647 E 381322 381322] core_worker.cc:2246: Actor with class name: 'MultiAgentEnvRunner' and ID: 'b92df5a1c0f93636bf18cf2c01000000' has constructor arguments in the object store and max_restarts > 0. If the arguments in the object store go out of scope or are lost, the actor restart will fail. See https://github.com/ray-project/ray/issues/53727 for more details.
[36m(pid=381428)[0m E0000 00:00:1758460493.361554  381428 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
[36m(pid=381428)[0m E0000 00:00:1758460493.365102  381428 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
[36m(pid=381428)[0m W0000 00:00:1758460493.374602  381428 computation_placer.cc:177] computation plac

[36m(MultiAgentEnvRunner pid=381428)[0m [chatbus_5] 共享内存不存在，创建成功


[36m(MultiAgentEnvRunner pid=381428)[0m   gym.logger.warn(
[36m(MultiAgentEnvRunner pid=381428)[0m   gym.logger.warn(
[36m(SAC pid=381322)[0m Install gputil for GPU system monitoring.
[36m(SAC(env=MultiReach-v0; env-runners=0; learners=0; multi-agent=True) pid=381322)[0m   gym.logger.warn("Casting input x to numpy array.")
[36m(MultiAgentEnvRunner pid=381428)[0m   gym.logger.warn("Casting input x to numpy array.")
[36m(MultiAgentEnvRunner pid=381428)[0m   gym.logger.warn("Casting input x to numpy array.")
[36m(MultiAgentEnvRunner pid=381428)[0m   gym.logger.warn("Casting input x to numpy array.")
[36m(SAC(env=MultiReach-v0; env-runners=0; learners=0; multi-agent=True) pid=381322)[0m Checkpoint successfully created at: Checkpoint(filesystem=local, path=/home/ey/rl/src/rlreach2/rlreach/ray/db/ray_results/MultiAgentReach_2025-09-21_21-14-43/multi_agent_reach/SAC_MultiReach-v0_f179e_00000_0_2025-09-21_21-14-45/checkpoint_000000)
2025-09-21 21:22:17,282	ERROR tune_controller