This is an unofficial code implement. Paper link: https://arxiv.org/abs/2603.02604.
All experiments from the paper “Heterogenous Agent Collaborative Reinforcement Learning” can be reproduced with this repo.
Heterogenous Agent Collaborative Policy Optimization (HACPO) is an RLVR framework designed to facilitate the collaborative training of multiple heterogeneous agents on a common task.
This repo use the same environment as verl, so you can find detailed setting on https://verl.readthedocs.io/en/latest/start/install.html.
bash ./recipe/hacpo/run_qwen3-1.7b_qwen3-4b.sh
# key parameter
# aux_model.enable: wether to use multiple heterogenous agents
# aux_model.path: the path of other heterogenous agent
# algorithm.adv_estimator: use mapo
# actor_rollout_ref.actor.policy_loss.loss_mode: use mapo_clip
# actor_rollout_ref.actor.alpha: expontial importance sampling
# actor_rollout_ref.actor.accuracy_window_size: the size of batches used to estimate the capability of agent
# actor_rollout_ref.actor.aux_clip_ratio_low: the low bound for responses from other agents
# actor_rollout_ref.actor.aux_clip_ratio_step: the step size used in stepwise clippingverl/trainer/ppo/ray_trainer.py implements simultaneous training for both Actor and Aux models. It manages the Aux model through a separate Worker Group, supports configuring distinct Tokenizers for each, and processes dedicated data with the aux_ prefix (such as aux_input_ids) in the data pipeline, enabling efficient parallel training of dual models within the Ray framework.
# verl/trainer/ppo/ray_trainer.py
# Resource of two models
# If aux_model is enabled, spawn it as a dedicated WorkerGroup (separate process)
if self.use_aux_model and ("aux_model" in class_dict):
main_class_dict = {k: v for k, v in class_dict.items() if k != "aux_model"}
# ... (spawn main worker group) ...
aux_only_dict = {"aux_model": class_dict["aux_model"]}
worker_dict_cls_aux = create_colocated_worker_cls(class_dict=aux_only_dict)
wg_dict_aux = self.ray_worker_group_cls(
resource_pool=resource_pool,
ray_cls_with_init=worker_dict_cls_aux,
**wg_kwargs,
)
spawn_wg_aux = wg_dict_aux.spawn(prefix_set=aux_only_dict.keys())
all_wg.update(spawn_wg_aux)
# Train of two models
# if auxiliary model is enabled, also generate rollouts for auxiliary model
if self.use_aux_model:
with marked_timer("gen_aux", timing_raw, color="purple"):
if not self.async_rollout_mode:
aux_gen_batch_output = self.aux_model_wg.generate_sequences(aux_gen_batch)
# mark this is from auxiliary model in the output
gen_batch_output.batch["model_source"] = torch.zeros(
gen_batch_output.batch.batch_size[0], dtype=torch.long
)
aux_gen_batch_output.batch["model_source"] = torch.ones(
aux_gen_batch_output.batch.batch_size[0], dtype=torch.long
)
# ... (merging batches) ...
batch = DataProto.concat([batch, aux_batch])
# ... (compute old_log_prob for each model) ...
old_log_prob_main = self.actor_rollout_wg.compute_log_prob(main_batch)
old_log_prob_aux = self.aux_model_wg.compute_log_prob(aux_batch)
# ... (update actor) ...
# use_aux_model update
print(f"Updating actor")
'''
all batch's input_ids, responses, response_mask, attention_mask, position_ids are from chat_template and tokenizer of actor
'''
aux_mask = batch.batch["model_source"] == 1
swap(batch, mask=aux_mask)
actor_output = self.actor_rollout_wg.update_actor(batch)
# ... (update aux model) ...
# reverse the model_source to get the auxiliary model data
print(f"Updating aux model")
'''
all batch's input_ids, responses, response_mask, attention_mask, position_ids are from chat_template and tokenizer of aux
'''
swap(batch)
batch.batch["model_source"] = 1 - batch.batch["model_source"]
# recompute the advantage, the same method but different in group baseline, kl_penalty
# ...
aux_output = self.aux_model_wg.update_actor(batch)verl/trainer/ppo/core_algos.py introduces loss calculation logic that supports dual models (e.g., mapo_clip). It uses model_source to distinguish the model origin, computing the standard PPO Loss for the Actor and a weighted Loss based on performance for the Aux model, thereby achieving joint optimization of both models in a single update step.
# verl/trainer/ppo/core_algos.py
# advantage estimator
@register_adv_est(AdvantageEstimator.MAPO)
def compute_mapo_outcome_advantage(...):
# ...
# Update id2score with weighted aux model scores
for i in range(bsz):
if model_source[i] == 0: # main model
id2score[index[i]].append(scores[i])
id2main_score[index[i]].append(scores[i])
elif model_source[i] == 1: # aux model
id2score[index[i]].append(scores[i] * aux_model_performance_reciprocal)
# Recompute mean and std using the weighted scores
for idx in id2score:
# ...
id2mean[idx] = torch.sum(scores_tensor * weights_tensor) / torch.sum(weights_tensor)
# ...
# loss function
@register_policy_loss("mapo_clip")
def compute_policy_loss_mapo_clip(...):
# ...
# Separate main and aux model data
main_mask = model_source == 0
aux_mask = model_source == 1
# ... (compute main model loss) ...
# Compute loss for aux model (with alpha and performance weighting)
if aux_response_mask.numel() > 0:
aux_advantages = advantages[aux_mask]
aux_seq_importance_ratio = seq_importance_ratio[aux_mask]
aux_performance_values = performance[aux_mask].clamp(min=0.1, max=10.0)
aux_seq_importance_ratio_clip = torch.clamp(aux_seq_importance_ratio, min=aux_clip_ratio_low + aux_clip_ratio_step * batch_idx, max=1.0)
# ...
pg_losses[aux_mask] = -aux_advantages * aux_seq_importance_ratio_clip * aux_performance_values.unsqueeze(-1) *(aux_seq_importance_ratio_clip.detach() ** config.alpha)