Skip to content

support Ray actor AgentLoop and move judger resource config into AgentLoop#1663

Merged
YanhuiDua merged 6 commits intoInternLM:rl_designfrom
YanhuiDua:mv_judger_cfg
Apr 14, 2026
Merged

support Ray actor AgentLoop and move judger resource config into AgentLoop#1663
YanhuiDua merged 6 commits intoInternLM:rl_designfrom
YanhuiDua:mv_judger_cfg

Conversation

@YanhuiDua
Copy link
Copy Markdown
Collaborator

No description provided.

@YanhuiDua
Copy link
Copy Markdown
Collaborator Author

@claude review

Comment thread xtuner/v1/rl/agent_loop/agent_loop.py Outdated
Comment thread xtuner/v1/rl/agent_loop/agent_loop.py Outdated
Comment thread xtuner/v1/rl/agent_loop/agent_loop.py Outdated
Comment on lines 36 to 49
if isinstance(config, JudgerConfig):
judger_dict[key] = config.build()
elif callable(config):
judger_dict[key] = config
else:
raise ValueError(f"Invalid judger config type: {type(config)} for key {key}")
return judger_dict
elif isinstance(self.judger_config, JudgerConfig):
return self.judger_config.build()
elif callable(self.judger_config):
return self.judger_config
else:
raise ValueError(f"Invalid judger config type: {type(self.judger_config)}")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Warning — build_judger() calls JudgerConfig.build() without pg / start_bundle_idx.

JudgerConfig.build() accepts an optional PlacementGroup for resource allocation (native.py). When called without pg, it creates its own placement group per judger instance.

Previously, the trainer built a single judger and shared it between train and eval agent loop managers. Now each AgentLoopConfig.build() calls build_judger() independently, so if the same judger_config is used in both train and eval configs, two separate judger instances (and potentially two sets of Ray actors for router-type judgers) are created.

This is likely fine for judger_type="native" (no Ray actors), but could double resource consumption for "router" or "ray.actor" types. Please verify this is the intended behavior, or consider accepting an optional pre-built judger to allow sharing.

Comment on lines 74 to +76
assert judger_response is not None, "Reward handler did not return a response."
# native postprocess
assert isinstance(judger_response, dict), (
f"Reward handler must return a dict, but got {type(judger_response)}."
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Nit — good addition. This assertion replaces the # native postprocess comment with a concrete runtime validation that the reward handler returns a dict. Much clearer contract.

Minor: for consistency with the new TypeError in agent_loop.py:judge_sample (line 105-106), you could use TypeError here instead of AssertionError (assertions can be stripped with -O). But this is a very minor point and not blocking.

@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Apr 10, 2026

Claude: ## Summary
Moves judger_config from the trainer level (RLColocateTrainerConfig) down to AgentLoopConfig, so each agent loop owns and builds its own judger. This is a clean architectural improvement — the judger is conceptually tied to the agent loop, not the trainer. The new type system (JudgerSpec, JudgerConfigSpec, etc.) is well-designed and the refactored judge_sample() using the Judger base class is more robust than the old isinstance(NativeJudger) or isinstance(RouterJudger) pattern.

Warning

  • [agent_loop.py:36-49] build_judger() is called independently per AgentLoopConfig, which means train and eval loops now create separate judger instances. For router-type judgers this could double Ray actor resource usage vs. the old shared-judger approach. Please confirm this is intended.
  • [agent_loop.py:93-97] Single-key dict judgers silently discard the key name — consider a debug log so users know this fallback is happening.

Nit

  • [agent_loop.py:15-16] JudgerCallable return type is Any but runtime validates RolloutState. A tighter type alias would surface this contract at type-check time.
  • [native.py:74-76] The new assert isinstance(judger_response, dict) is good; consider TypeError instead of AssertionError for robustness under -O.

Verdict: COMMENT — no blocking issues, but the resource duplication for router-type judgers (warning above) deserves confirmation.

@YanhuiDua YanhuiDua changed the title mv judger_config to agent_loop config support Ray actor AgentLoop and move judger resource config into AgentLoop Apr 13, 2026
YanhuiDua and others added 3 commits April 13, 2026 20:45
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@YanhuiDua
Copy link
Copy Markdown
Collaborator Author

@claude review, and give some advice on this PR's code design

Comment on lines 55 to 59
@dataclass(frozen=True)
class _TaskRunner:
task_name: str
agent_loop: AgentLoop
agent_loop: AgentLoopSpec
produce_strategy: ProduceStrategy
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Critical — same rollout_ctl attribute access issue as in producer.py.

_TaskRunner.agent_loop was widened from AgentLoop to AgentLoopSpec, but AgentLoopManager.produce_batch() (unchanged in this PR) still accesses task.agent_loop.rollout_ctl directly as a plain attribute. This will raise AttributeError when the agent loop is a RayAgentLoopProxy.

Since produce_batch is the main entry point for the training loop, this is a runtime crash waiting to happen for any ray.actor agent loop configuration.

The RouterAgentLoop path is fine (it stores rollout_ctl as an instance attribute), but the single-actor RayAgentLoopProxy path is broken.


async def _cleanup_pending_tasks(
self, pending_tasks: set, agent_loop: AgentLoop, replay_buffer: ReplayBuffer, task_name: str
self, pending_tasks: set, agent_loop: AgentLoopSpec, replay_buffer: ReplayBuffer, task_name: str
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Critical — agent_loop.rollout_ctl will crash for RayAgentLoopProxy.

The type was widened from AgentLoop to AgentLoopSpec, and _timed_generate_group (above) was correctly updated with an isinstance(agent_loop, ray.actor.ActorHandle) branch. But this method still accesses .rollout_ctl as a plain attribute on line 203.

On a RayAgentLoopProxy (Ray actor proxy), this will raise AttributeError at runtime — attribute access doesn't forward to the remote object. You'd need await agent_loop.get_rollout_ctl.remote() (using the method defined in AgentLoopActor).

This will surface whenever AgentLoopConfig(type="ray.actor", num_ray_actors=1) is used with the async produce strategy.

Comment on lines 155 to +166

def _build_worker(self, pg: PlacementGroup | None = None, bundle_idx: int = 0) -> RayJudgerProxy:
pg_options = {"num_cpus": self.num_cpus_per_actor, "memory": self.cpu_memory_per_actor}
if pg is None:
# NOTE: 保持与 router 构建逻辑一致,默认创建 PlacementGroup。
from xtuner.v1.rl.utils.ray_worker import CPUResourcesConfig

cpu_resource_cfg = CPUResourcesConfig(
num_workers=self.num_ray_actors,
num_cpus_per_worker=self.num_cpus_per_actor,
cpu_memory_per_worker=self.cpu_memory_per_actor,
)
pg = cpu_resource_cfg.build_placement_group()
ray.get(pg.ready())
bundle_idx = 0

assert len(pg.bundle_specs) > bundle_idx, "Placement group does not have enough bundles for ray actor."
assert pg.bundle_specs[bundle_idx].get("CPU", 1) >= self.num_cpus_per_actor, (
f"Placement group bundle {bundle_idx} does not have enough CPU resources."
)
assert pg.bundle_specs[bundle_idx].get("memory", 0) >= self.cpu_memory_per_actor, (
f"Placement group bundle {bundle_idx} does not have enough memory resources."
)
return RayJudger.options(
placement_group=pg,
placement_group_bundle_index=bundle_idx,
**pg_options,
).remote(
def get_num_placement_group_bundles(self) -> int:
if self.judger_type == "native":
return 0
return self.num_ray_actors

def get_cpu_bundles(self) -> list[dict[str, float | int]]:
return [
{
"CPU": self.num_cpus_per_actor,
"memory": self.cpu_memory_per_actor,
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude: Warning — RouterJudger is now silently wrapped as a Ray actor, which changes error semantics.

Previously, RouterJudger was a local Python object with an asyncio lock for load balancing. This PR wraps it as a Ray actor via CPUActorLauncher.build_actor(RouterJudger, ...), which means:

  1. Exceptions inside RouterJudger.judge will now be wrapped in ray.exceptions.RayTaskError instead of propagating directly.
  2. The asyncio.Lock now coordinates within the Ray actor's event loop (not the caller's), which is fine but the locking semantics change subtly under high concurrency.
  3. The actor_num_cpus=0, actor_memory=0 means this actor has no reserved resources — it could be starved if the cluster is fully allocated.

Point 3 is especially worth considering: a zero-resource actor relies on Ray's lenient scheduling but could be deprioritized or fail to schedule in a resource-constrained cluster.

Comment thread xtuner/v1/rl/agent_loop/agent_loop.py Outdated
Comment thread xtuner/v1/rl/utils/ray_worker.py
Comment thread xtuner/v1/rl/utils/ray_worker.py Outdated
Comment thread xtuner/v1/rl/utils/ray_worker.py Outdated
@claude
Copy link
Copy Markdown
Contributor

claude Bot commented Apr 13, 2026

Claude: ## Review Summary

This PR introduces CPUActorLauncher as the generic Ray actor launch infrastructure, refactors AutoCPUWorkers to inherit from it, adds Ray actor support for AgentLoop, and moves judger_config from the trainer level down into TaskSpecConfig. The architectural direction is sound — the judger belongs with the agent loop, and extracting CPUActorLauncher reduces duplication.

Critical

  • rollout_ctl attribute access breaks for RayAgentLoopProxy — both producer.py:_cleanup_pending_tasks and agent_loop_manager.py:produce_batch access agent_loop.rollout_ctl as a plain attribute. After widening the type to AgentLoopSpec, this will AttributeError on Ray actor proxies (which don't transparently forward attribute access). The _timed_generate_group function was correctly updated with isinstance checks, but these two call sites were missed.

Warning

  • Unused pg_pack_strategy parameter in CPUActorLauncher.build_actor and build_actors — accepted but never referenced.
  • Asymmetric 1 GB default memory when pg is None vs. bundle-derived default when PG is provided.
  • RouterJudger now wrapped as zero-resource Ray actoractor_num_cpus=0, actor_memory=0 could cause scheduling issues in resource-constrained clusters.

Nit

  • build_actor_list is a one-line wrapper around build_ray_actor_list — consider removing the duplication.
  • _ACTOR_CLASS_CACHE is an unbounded class-level dict (not a real issue in practice).

Verdict

REQUEST_CHANGES — the rollout_ctl attribute access on RayAgentLoopProxy is a runtime crash in both the async produce strategy and the agent loop manager. The rest of the design is clean.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Comment thread xtuner/v1/rl/agent_loop/agent_loop.py Outdated
model_config = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
hf_checkpoint: str
sample_params: SampleParams
type: Literal["local", "ray.actor"] = "local"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我觉得这个可以不要,然后将 num_ray_actors 默认设置为 0,表示不用 ray.actor。这样就简单点,少了一个参数。配置大于1就是启动 n 个

Comment thread xtuner/v1/rl/agent_loop/agent_loop.py Outdated
rollout_ctl=rollout_controller,
)

def build_ray_actor_list(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

删掉

Comment thread xtuner/v1/rl/agent_loop/agent_loop.py Outdated
judger = self.judger
if isinstance(judger, dict):
if len(judger) > 1:
raise NotImplementedError("Multiple judgers require a custom AgentLoop.judge_sample implementation.")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这种判断要提前到 config 构建阶段,不能在运行时候才报错。

@YanhuiDua YanhuiDua merged commit 84fd371 into InternLM:rl_design Apr 14, 2026
2 of 6 checks passed
@YanhuiDua YanhuiDua deleted the mv_judger_cfg branch April 14, 2026 09:40
YanhuiDua added a commit that referenced this pull request Apr 27, 2026
…tLoop (#1663)

* Introduce CPUActorLauncher infrastructure

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Refactor judger and agent loop core abstractions

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Migrate configs and tests to the new judger interface

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix rollout controller access and CPU actor launcher defaults

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* mv judger_sample from agent_loop

* fix cpu pg

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants