feat: Placement Group based subprocess manager - inference (3/5)#1833
feat: Placement Group based subprocess manager - inference (3/5)#1833praateekmahajan merged 7 commits intomainfrom
Conversation
Greptile SummaryThis PR lands the Ray-placement-group-based subprocess manager as a self-contained infra library, split across Confidence Score: 5/5Safe to merge; all remaining findings are P2 style/defensive-coding suggestions that do not affect correctness of the happy path or orphan-cleanup guarantees. Prior review concerns (private Ray API, missing tp_size guard, constant mutation) are resolved. The two new findings are both P2: the TOCTOU race in _stop_subprocess causes unnecessary SIGKILL escalation on clean subprocess exits but leaves no orphans, and the check_total_gpu_capacity semantic mismatch is a docstring/naming concern rather than a runtime failure. Tests cover the critical lifecycle paths end-to-end. nemo_curator/core/serve/subprocess_mgr.py for the _stop_subprocess signal-sending race; nemo_curator/backends/utils.py for the available-vs-total GPU capacity semantics. Important Files Changed
Sequence DiagramsequenceDiagram
participant Driver
participant placement.py
participant subprocess_mgr.py
participant RayActor as _SubprocessActor
participant Subprocess
Driver->>placement.py: plan_replica_bundle_shape(tp_size)
placement.py-->>Driver: ReplicaBundleSpec
Driver->>placement.py: build_replica_pg(spec, name)
placement.py->>Ray: placement_group lifetime=detached
placement.py->>Ray: pg.ready() timeout=180s
placement.py-->>Driver: PlacementGroup
Driver->>subprocess_mgr.py: ManagedSubprocess.spawn(label, pg, bundle_index)
subprocess_mgr.py->>RayActor: actor_cls.options(num_gpus=N, pg_strategy).remote()
subprocess_mgr.py->>RayActor: actor.initialize.remote(command, env, log_file)
RayActor->>RayActor: inject CUDA_VISIBLE_DEVICES from accelerator IDs
RayActor->>Subprocess: Popen(command, start_new_session=True)
subprocess_mgr.py->>RayActor: actor.run.remote() returns run_ref
subprocess_mgr.py-->>Driver: ManagedSubprocess(label, actor, run_ref)
Driver->>subprocess_mgr.py: proc.stop()
subprocess_mgr.py->>subprocess_mgr.py: graceful_stop_actors
subprocess_mgr.py->>RayActor: actor.stop.remote() SIGTERM group
RayActor->>Subprocess: os.killpg(pgid, SIGTERM)
alt stop drains in time
RayActor-->>subprocess_mgr.py: rc
else timeout
subprocess_mgr.py->>RayActor: actor.force_sigkill_subprocess.remote()
RayActor->>Subprocess: os.killpg(pgid, SIGKILL)
end
subprocess_mgr.py->>Ray: ray.kill(actor, no_restart=True)
Reviews (4): Last reviewed commit: "chore: trigger DCO re-check after base r..." | Re-trigger Greptile |
|
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
|
/claude review |
3f4dc18 to
abf139e
Compare
oyilmaz-nvidia
left a comment
There was a problem hiding this comment.
@praateekmahajan Can you give a bit more context why we need these new classes (subprocess manager for instance) and functions?
|
@oyilmaz-nvidia fair question on subprocess manager, the answer is that we use this in PR4/5. This is a completely backend (dynamo / serve) agnostic PR. Rather in future if we add native SGLang support (or vLLM) this all will be leveraged then as well.
PR 4's |
…rs infra library
Land the Ray-placement-group-backed subprocess manager and its
SubprocessError type as a self-contained, zero-consumer infra library.
Nothing in nemo_curator imports these modules yet -- DynamoBackend is
still the NotImplementedError placeholder from PR 2 -- so the diff is
reviewable in isolation before PR 4 wires it into the real backend.
Concepts landed by the module:
- Replica bundle-shape planner: STRICT_PACK single-bundle when the
replica fits on one node; STRICT_SPREAD N-bundle across nodes
otherwise. vLLM requires an equal per-node local_world_size so
asymmetric splits (e.g. 1+3 for TP=4) are rejected up front.
- CURATOR_IGNORE_RAY_HEAD_NODE=1 is translated to a per-bundle
{"ray.io/node-type": "worker"} label selector (Ray-native; matches
the Ray Serve deployment-scheduler pattern) rather than a Python-side
inventory filter. OSS Ray nodes must be launched with the label.
- Detached, named placement groups under a stable
"nemo_curator_dynamo" namespace so a reconnecting driver can find
and reap them across the server.start -> pipeline.run -> server.stop
ray.init cycles.
- ManagedSubprocess actor overrides __ray_terminate__ and registers
an atexit hook; the subprocess process-group is reaped even when
Ray hard-kills the actor. graceful_stop_actors parallelises
teardown and falls back to ray.kill on timeout.
- Post-pg.ready() bundle helpers (get_bundle_node_ip,
get_free_port_in_bundle) resolve master-addr and bind ports on the
same node the consuming actor will land on, avoiding
pre-scheduling IP lookups.
- remove_named_pgs_with_prefix as an orphan sweep keyed on the
stable PG name prefix.
Tests are intentionally consolidated: pure-logic cases are
parametrised, and the GPU slice collapses PG creation, bundle
IP/port discovery, actor spawn, CUDA env propagation, subprocess
env override semantics, inherited-PATH behaviour, and graceful
stop into one cohesive end-to-end test that shares a single PG.
This avoids N x (Ray actor + PG + subprocess) startup cost per
run while still covering the public-API invariants.
Signed-off-by: Praateek <praateekm@nvidia.com>
Signed-off-by: Praateek <praateekm@gmail.com>
… dedup head-node env check
Scope-narrow pass on PR 3 following the subprocess_mgr / Dynamo-infra
separation discussion:
subprocess_mgr.py is now strictly generic Ray-placement-group +
subprocess infrastructure. Dynamo-specific pieces moved out:
- NEMO_CURATOR_DYNAMO_NAMESPACE -> dynamo/constants.py
- build_infra_pg (etcd+NATS+FE) -> dynamo/infra.py
- build_worker_actor_name -> dynamo/infra.py
- engine_kwargs_to_cli_flags -> dynamo/infra.py
(also renamed from the underscore-private form because it is a
public Dynamo-layer helper now)
Shared Ray-env helper lives once, not twice: core/utils.py grows
ignore_ray_head_node(). backends/base.py now uses it instead of its
own inline parse of CURATOR_IGNORE_RAY_HEAD_NODE, and
subprocess_mgr.plan_replica_bundle_shape calls the same helper.
check_total_gpu_capacity kept (Ray's PG scheduler can hang
indefinitely on pg.ready() when GPUs are oversubscribed -- a coarse
pre-check gives a cleaner error) and moved next to
get_available_cpu_gpu_resources in backends/utils.py. The new
implementation reuses that function so ignore_head_node is honoured
consistently.
ManagedSubprocess grows real instance methods so callers stop
hand-threading ray.get / actor.X.remote / ray_mod into free
functions:
- proc.is_alive(), .pid(), .read_log_tail(), .wait(timeout)
- proc.stop(timeout_s=...) and classmethod stop_many(procs, ...)
graceful_stop_actors stays as the raw-actor primitive for cases
without a ManagedSubprocess handle (e.g. the reconnecting-driver
orphan sweep), but drops its ray_mod parameter and imports ray
directly -- tests / consumers no longer pass ray through.
graceful_stop_actor (single-actor free function) deleted; callers
use proc.stop() or the list primitive.
A TODO comment flags that _run_in_bundle / get_free_port_in_bundle /
get_bundle_node_ip (plus spawn_actor's (pg, bundle_index) tuple)
could collapse into a Bundle(pg, index) wrapper once PR 4's
DynamoBackend wiring demonstrates how often the same pair gets
threaded through. Not doing it speculatively.
Tests realigned accordingly:
- test_subprocess_mgr.py: exercises proc.wait / proc.read_log_tail
/ proc.is_alive / proc.stop instead of the boilerplate form.
- Moved test_engine_kwargs_to_cli_flags + test_build_worker_actor_name
to tests/core/serve/dynamo/test_infra.py (next to the code under test).
- Moved test_check_total_gpu_capacity to tests/backends/test_utils.py
(next to check_total_gpu_capacity's new home), uses monkey-patched
get_available_cpu_gpu_resources so it stays a pure unit test.
- Added test_ignore_ray_head_node_env_parsing in tests/core/test_utils.py.
Verified on 2 GPUs:
- 35/35 non-GPU tests pass (subprocess_mgr + dynamo/infra + core utils + backends/test_utils::TestCheckTotalGpuCapacity)
- 3/3 GPU tests pass (TestReplicaLifecycle end-to-end,
test_actor_death_surfaces_via_run_ref, test_orphan_pg_cleanup_by_prefix)
Signed-off-by: Praateek <praateekm@nvidia.com>
Signed-off-by: Praateek <praateekm@gmail.com>
…s to constants, spawn as classmethod
Three related cleanups on top of the prior PR 3 commit:
1. subprocess_mgr.py lost its placement-group half to a new
placement.py. subprocess_mgr is now strictly ManagedSubprocess +
the _SubprocessActor factory + graceful_stop_actors primitive;
placement.py owns ReplicaBundleSpec, the planner
(plan_replica_bundle_shape, _get_gpu_topology),
build_pg / build_replica_pg, the bundle-scoped discovery helpers
(get_bundle_node_ip, get_free_port_in_bundle, _run_in_bundle),
and remove_named_pgs_with_prefix. Concrete win: dynamo/infra.py
imports build_pg from placement now, which reads correctly;
previously it pulled it out of subprocess_mgr which was wrong
name-wise. The Bundle(pg, idx) wrapper TODO lives with the bundle
helpers in placement.py.
2. The module-level tunables that used to sit at the top of
subprocess_mgr moved to nemo_curator/core/serve/constants.py and
dropped the underscore prefix -- they were being imported across
files (dynamo/infra.py pulled PG_READY_TIMEOUT_S and
WORKER_NODE_LABEL), so "private" was misleading. Same file now
holds SIGTERM_WAIT_S, SIGKILL_WAIT_S,
PLACEMENT_GROUP_READY_TIMEOUT_S (renamed from PG_... for
readability), WORKER_NODE_LABEL, and NOSET_CUDA_RUNTIME_ENV,
alongside the existing DEFAULT_SERVE_PORT /
DEFAULT_SERVE_HEALTH_TIMEOUT_S.
3. spawn_actor free function is gone; spawning is now
ManagedSubprocess.spawn(label, pg, bundle_index, ...) classmethod,
so the factory lives on the type it returns. The previously
underscore-private _build_pg is now just build_pg (public; it's
imported across modules).
Tests tracked the split:
- tests/core/serve/test_placement.py (new) -- planner matrix,
head-node exclusion, ReplicaBundleSpec, orphan PG cleanup.
- tests/core/serve/test_subprocess_mgr.py narrowed to the
lifecycle coverage: TestReplicaLifecycle (end-to-end spawn +
env propagation + wait + read_log_tail + stop) and
test_actor_death_surfaces_via_run_ref.
Verified on 2 GPUs:
- 21/21 non-GPU tests pass (placement planner + dynamo/infra + test_infra + test_config + test_server, etc.)
- 3/3 GPU tests pass (TestReplicaLifecycle.test_end_to_end,
test_actor_death_surfaces_via_run_ref, test_orphan_pg_cleanup_by_prefix)
Signed-off-by: Praateek <praateekm@nvidia.com>
Signed-off-by: Praateek <praateekm@gmail.com>
…ors.py ``nemo_curator/core/serve/errors.py`` held a single 23-line class (``SubprocessError``) with no consumers yet. Given SubprocessError is tightly coupled to subprocess lifecycle and only subprocess_mgr is going to raise it, it belongs next to the code that owns it. A dedicated errors module is premature until multiple sibling modules need to share exception types. Signed-off-by: Praateek <praateekm@nvidia.com> Signed-off-by: Praateek <praateekm@gmail.com>
…calation in graceful_stop_actors
Review-driven cleanups on subprocess_mgr.py and placement.py:
Dead code removed (absent in v2 consumer too):
- ManagedSubprocess.log_file dataclass field (never read)
- _SubprocessActor.log_file() method (never called)
- _SubprocessActor.get_node_ip() method (placement.get_bundle_node_ip
does its own remote via _run_in_bundle instead)
Two-RTT -> one-RTT spawn:
- _SubprocessActor.initialize() now injects CUDA_VISIBLE_DEVICES
from ray.get_accelerator_ids() inside the actor. ManagedSubprocess.spawn
no longer needs a separate actor.get_assigned_gpus.remote() round-trip
before the actor.initialize.remote() call.
- initialize() now returns the pid directly (it used to return the echo
dict {"pid", "log_file"} whose log_file echo was only used to populate
the now-deleted ManagedSubprocess.log_file field).
File-handle safety:
- initialize() wraps Popen in try/except that closes self._log_fh on
launch failure, preventing the handle from leaking if the binary
isn't on $PATH or exec fails.
SIGKILL escalation in graceful_stop_actors:
- Added _SubprocessActor.force_sigkill_subprocess() -- non-blocking
os.killpg(..., SIGKILL) on the subprocess group. Escalation path
when stop() is hung.
- Bumped actor max_concurrency from 2 to 4 so force_sigkill_subprocess
doesn't queue behind a stuck stop() + run().
- graceful_stop_actors now: (1) actor.stop() with bounded wait,
(2) if stop did not drain, actor.force_sigkill_subprocess() with
short timeout, (3) ray.kill. Previously step 2 was missing -- ray.kill
bypasses __ray_terminate__ and atexit, so subprocesses orphaned
whenever the actor went hard-kill.
Lazy-import cleanup in placement.py:
- Hoisted `import ray` to module top (ray is a hard dep of the package;
the defensive lazy imports were noise). Enables:
- @ray.remote decorators for _remote_get_free_port and _remote_get_node_ip
now live at module scope. Previously the wrapper functions redefined
their RemoteFunction on every call.
Narration comment trimmed:
- Dropped the speculative TODO(dynamo-refactor) block suggesting a
Bundle(pg, idx) wrapper. That was PR-narrating noise that would rot
once PR 4 lands. If the repetition turns out real in PR 4, we can
add the wrapper then -- no header comment needed to tell us to.
Dual validation kept:
- (command is None) == (python_args is None) check remains in
ManagedSubprocess.spawn for a clean caller-side ValueError. The
actor-side check in initialize is now dead in practice but
stays as defense-in-depth; it's three lines.
Verified on 2 GPUs:
- 21/21 non-GPU tests pass
- 3/3 GPU tests pass (TestReplicaLifecycle end-to-end, actor-death
run_ref surfacing, orphan PG cleanup by prefix)
Signed-off-by: Praateek <praateekm@nvidia.com>
Signed-off-by: Praateek <praateekm@gmail.com>
… fix
CI fix:
- tests/gpu_test_groups.json: register the two new GPU test files
(tests/core/serve/test_placement.py, tests/core/serve/test_subprocess_mgr.py)
under the "sdg" group, next to ray_serve/test_integration.py.
Remove the private-Ray-API teardown hack:
- Deleted _SubprocessActor.__ray_terminate__ entirely. Nothing in
our teardown flow calls __ray_terminate__ (the driver goes through
graceful_stop_actors: actor.stop -> force_sigkill_subprocess ->
ray.kill), so the override was inert and the access to
_ray._private.worker.global_worker was pure private-API risk.
atexit is the actor's only self-registered reap path now --
standard Python, no Ray internals.
- Also dropped the dead _SubprocessActor.get_assigned_gpus() method
(called from spawn() before the simplify-pass two-RTT collapse,
now replaced by initialize() injecting CUDA_VISIBLE_DEVICES itself).
Review P2 (greptile): NOSET_CUDA_RUNTIME_ENV shallow-copy:
- subprocess_mgr.py spawn(): the else-branch of the runtime_env merge
was assigning `merged_runtime_env = NOSET_CUDA_RUNTIME_ENV` by
reference. Mutation in-place would poison the module-level constant.
Now shallow-copies via `{**NOSET_CUDA_RUNTIME_ENV}`, matching the
if-branch.
Review P2 (greptile): tp_size validation:
- placement.plan_replica_bundle_shape now rejects tp_size < 1 with a
ValueError. Previously tp_size=0 silently returned a {"CPU": 1,
"GPU": 0} bundle. New parametrised test covers 0 and -1.
Review P2 (greptile): head-node detection consistency:
- placement._get_gpu_topology previously used
ray.get_runtime_context().get_node_id() alone to identify the head,
which is only correct when the driver runs on the head node.
Now uses backends.utils.get_head_node_id() (resource-marker based
-- looks for "node:__internal_head__") with the runtime-context as
a fallback. Matches how the rest of the codebase identifies the
head and is robust to driver-off-head deployments.
Review P3 (claude[bot]): test coverage for engine_kwargs_to_cli_flags
dict/list branches:
- Added parametrised cases for list (served_model_name=["a", "b"])
and dict (generation_config={"temperature": 0.7}) values so the
json.dumps path isn't an untested branch.
Verified on 2 GPUs: 25/25 non-GPU pass, 3/3 GPU pass
(TestReplicaLifecycle end-to-end -- which exercises the atexit-only
teardown path via proc.stop() -- still green).
Signed-off-by: Praateek <praateekm@nvidia.com>
Signed-off-by: Praateek <praateekm@gmail.com>
2e0108d to
1d23acb
Compare
Signed-off-by: Praateek <praateekm@gmail.com>
Description
nemo_curator/imports these modules yet — PR 4 (DynamoBackend) willserve/subprocess_mgr.py—ManagedSubprocess+_SubprocessActorfactory +SubprocessError+graceful_stop_actorsserve/placement.py—ReplicaBundleSpec,plan_replica_bundle_shape,build_pg/build_replica_pg, bundle-scoped helpers (get_bundle_node_ip,get_free_port_in_bundle), andremove_named_pgs_with_prefixfor orphan sweepserve/dynamo/infra.py(build_infra_pg,build_worker_actor_name,engine_kwargs_to_cli_flags) sosubprocess_mgr/placementstay backend-genericCURATOR_IGNORE_RAY_HEAD_NODEparsing into a sharednemo_curator.core.utils.ignore_ray_head_node();nemo_curator/backends/base.pynow uses the same helper instead of parsing the env var inlinecheck_total_gpu_capacitynext toget_available_cpu_gpu_resourcesinnemo_curator/backends/utils.py— coarse pre-check so insufficient-GPU cases fail fast instead of hanging onpg.ready()Design callouts
lifetime=\"detached\"and a stablename=so a reconnecting driver (server.start → pipeline.run → server.stop flow) can find and reap them acrossray.shutdown()/ray.init()cycles.remove_named_pgs_with_prefixis the orphan sweep.STRICT_PACKbundle; multi-node TP replicas useSTRICT_SPREADwith an equal per-node split. Asymmetric splits (1+3 for TP=4) are rejected up front because vLLM's distributed executor requires identicallocal_world_sizeper node.CURATOR_IGNORE_RAY_HEAD_NODE=1is translated to a per-bundle{\"ray.io/node-type\": \"worker\"}label selector (Ray-native; matches Ray Serve's deployment-scheduler pattern). Auto-satisfied on Anyscale; OSS Ray users must start worker nodes withray start --labels ray.io/node-type=worker.start_new_session=Trueso the child becomes a process-group leader._SubprocessActoroverrides__ray_terminate__and registers anatexithook so the subprocess tree is SIGTERM→SIGKILL'd on the process group whenever the actor shuts down gracefully.graceful_stop_actorsruns in three steps per actor: (1)actor.stop()with bounded wait, (2) if stop didn't drain,actor.force_sigkill_subprocess()— a non-blockingos.killpg(..., SIGKILL)scheduled on the actor's node — then (3)ray.kill. Without step 2,ray.killbypasses__ray_terminate__/atexitand orphans the subprocess tree.ManagedSubprocess.spawnis a classmethod on the data class it returns — ergonomic factory that avoids the split between a free-function spawner and the handle type. The handle exposes.stop(),.stop_many(),.is_alive(),.pid(),.read_log_tail(),.wait()so callers don't hand-writeray.get(actor.X.remote()).What's intentionally NOT in this PR
DynamoBackend.start()/.stop()— still theNotImplementedErrorplaceholder from feat!: Typed Serve Config + Dyamo config stub - inference (2/5) #1820subprocess_mgr/placementfrom production code (tests are the only consumer)Bundle(pg, bundle_index)wrapper idea — deferred until PR 4'sDynamoBackendwiring shows how often the pair repeatsInternal usage (preview of how PR 4 will consume this)
Verification
ruff check nemo_curator/core/serve/ tests/core/serve/pytest tests/core/serve/test_placement.py tests/core/serve/test_subprocess_mgr.py tests/core/serve/dynamo/test_infra.py tests/core/test_utils.py tests/backends/test_utils.py::TestCheckTotalGpuCapacity -m \"not gpu\"— 21 passed (~28s; setup-bound on Ray cluster autouse fixture)CUDA_VISIBLE_DEVICES=0,1 pytest tests/core/serve/test_placement.py tests/core/serve/test_subprocess_mgr.py -m gpu— 3 passed (TestReplicaLifecycle end-to-end, actor-deathrun_refsurfacing, orphan PG cleanup by prefix)Checklist