Skip to content

feat(data-plane): add TQ fault tolerance APIs#2492

Open
pthombre wants to merge 1 commit into
mainfrom
pranav/tq_fault_tolerance
Open

feat(data-plane): add TQ fault tolerance APIs#2492
pthombre wants to merge 1 commit into
mainfrom
pranav/tq_fault_tolerance

Conversation

@pthombre

@pthombre pthombre commented May 14, 2026

Copy link
Copy Markdown
Contributor

What does this PR do ?

Adds the recovery/control-plane API surface that the async SingleController needs to coordinate TransferQueue from metadata only, without moving tensor payloads through the controller.

Issues

N/A

Usage

The async controller-facing methods added to DataPlaneClient are:

dp_client.ping(timeout_s=1.0)
groups = dp_client.list_metadata(partition_id="train")
ready_groups = [g for g in groups if g.committed and g.is_complete]
depth = dp_client.depth(partition_id="train")
dp_client.pop(keys=trained_keys, partition_id="train")
dp_client.evict(keys=stale_keys, partition_id="train")
caps = dp_client.get_capabilities()

Methods Added for Async Controller Support

  • ping(timeout_s) health-checks the real data-plane request path so SingleController can detect data-plane/TQ availability failures.
  • list_metadata(partition_id) -> list[DataPlaneGroupMeta] returns non-consuming rollout-group metadata. This lets SingleController inspect queued groups without advancing TQ consumer counters or fetching tensors.
  • depth(partition_id) -> int counts committed, complete groups visible to recovery. This supports rebuilding queue capacity after controller or data-plane recovery.
  • pop(keys, partition_id) removes successfully trained keys. The base implementation routes through clear_samples(), and the TQ adapter translates that to backend kv_clear.
  • evict(keys, partition_id) removes stale or abandoned keys, using the same clear_samples() path.
  • get_capabilities() -> DataPlaneCapabilities exposes backend recovery guarantees such as persistent recovery, server-side filtering, atomic batch put, and verified clear support.

Supporting Types and Behavior

  • Adds DataPlaneGroupMeta for control-plane-only rollout group records: group_id, keys, weight_version, created_at, committed, expected_num_keys, size_bytes, and tags.
  • Adds DataPlaneCapabilities so adapters can advertise recovery-relevant behavior.
  • Adds typed data-plane failures including DataPlaneUnavailable, DataPlaneTimeout, DataPlaneReadError, DataPlaneWriteError, DataPlaneClearError, and DataPlaneBadRequest.
  • Updates TQDataPlaneClient to translate Ray/TQ/storage errors into typed data-plane exceptions.
  • Updates the NoOp adapter and MetricsDataPlaneClient so the recovery API is testable and observable without booting Ray/TQ.

Why This Helps Async SingleController

SingleController needs to orchestrate async rollout, training, recovery, cleanup, and backpressure while preserving the invariant that it never handles tensor payloads. These APIs give it a metadata-only boundary:

  • list_metadata() lets SC reconstruct queue state and select trainable groups.
  • depth() lets SC rebuild capacity accounting after restart or TQ recovery.
  • pop() centralizes cleanup of rows that trained successfully.
  • evict() gives SC an explicit stale-row cleanup path.
  • ping() and typed failures give SC clear recovery triggers instead of parsing generic Ray/TQ exceptions.

This PR does not implement SingleController. It adds the TransferQueue/DataPlane support methods needed by that controller.

Before your PR is "Ready for review"

Pre checks:

  • Make sure you read and followed Contributor guidelines
  • Did you write any new necessary tests?
  • Did you run the unit tests and functional tests locally? Visit our Testing Guide for how to run tests
  • Did you add or update any necessary documentation? Visit our Document Development Guide for how to write, build and test the docs.

Additional Information

Testing was not run after the rebase per request. An earlier uv run pytest ... attempt did not execute tests because this workspace does not have the repo-required Python 3.13.13 interpreter available to uv.

@pthombre pthombre requested review from a team as code owners May 14, 2026 02:42
@copy-pr-bot

copy-pr-bot Bot commented May 14, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@pthombre pthombre force-pushed the pranav/tq_fault_tolerance branch from 1d02615 to 430dee5 Compare May 14, 2026 02:46
@ZhiyuLi-Nvidia ZhiyuLi-Nvidia force-pushed the zhiyul/data_plane_plan branch 9 times, most recently from 14cd92d to b63c18f Compare May 24, 2026 07:44
Base automatically changed from zhiyul/data_plane_plan to main May 24, 2026 16:26
Signed-off-by: Pranav Prashant Thombre <pthombre@nvidia.com>
@pthombre pthombre force-pushed the pranav/tq_fault_tolerance branch from 430dee5 to b2cb045 Compare June 4, 2026 18:44
keys=list(sample_ids),
partition_id=partition_id,
),
DataPlaneClearError,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emm, is it good practice to pass Error type in a function?

This error type point to a specific function call, and this information should be available within the error stack. So DataPlaneClearError (same as DataPlaneReadError, DataPlaneWriteError, DataPlaneClearError) may not provide additional info.

Do you think if it is necessary?
Alternatively, any possibility to make it more lightweighted like

def _call_tq(

    raise Exception(f"{operation} failed with ...")

Comment on lines +823 to +831
weight_version=_as_int(first_tag.get("weight_version")),
created_at=_as_float(first_tag.get("created_at")),
committed=all(_as_bool(t.get("committed", False)) for t in tags),
expected_num_keys=expected_num_keys,
size_bytes=_as_int(first_tag.get("size_bytes")),
tags=first_tag,
)
)
return groups

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is to provide the data plane API to async RL algorithm.

Are we align well with @mehraakash's async RL PR
#2700?


# ── (C) recovery/control-plane ─────────────────────────────────────

def ping(self, timeout_s: float | None = None) -> None:

@ZhiyuLi-Nvidia ZhiyuLi-Nvidia Jun 9, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this function be called only after recovery?

I didn't see this function is used other than the tests.

So is that because we don't have recovery / ckpt / persistent recovery support yet?

@ZhiyuLi-Nvidia

Copy link
Copy Markdown
Contributor

Thank you a lot @pthombre for the fault tolerance PR.

This PR added some apis for async RL specifically and we should check with @mehraakash.

For the fault tolerance API, I think we are still missing the persistent recovery and therefore some API can't be verified functionally. Not sure if we can mimic that in a test first and we need later add persistent recovery as the next step.

For error type, personally I wanna to simplify it good to keep those providing additional error message. Otherwise the error stack would just tell us the location of failed api call.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants