Skip to content

[CELEBORN-2217] Use a separate thread to handle RpcEndpointVerifier messages#3554

Closed
TheodoreLx wants to merge 3 commits intoapache:mainfrom
TheodoreLx:verifier-first
Closed

[CELEBORN-2217] Use a separate thread to handle RpcEndpointVerifier messages#3554
TheodoreLx wants to merge 3 commits intoapache:mainfrom
TheodoreLx:verifier-first

Conversation

@TheodoreLx
Copy link
Contributor

What changes were proposed in this pull request?

An EndpointVerifierMessageLoop is introduced to specifically handle RpcEndpointVerifier messages, and a separate thread is used to execute the EndpointVerifierMessageLoop.

Why are the changes needed?

RpcEndpointVerifier.CheckExistence is a high-priority message that needs to be processed first. In the original model, if the LifecycleManager's RPC message queue accumulates a large backlog, RpcEndpointVerifier.CheckExistence messages cannot be processed immediately, leading to numerous ShuffleClient initialization failures and causing task failures.

Does this PR resolve a correctness bug?

No

Does this PR introduce any user-facing change?

No

How was this patch tested?

Cluster Test

buildConf("celeborn.rpc.RpcEndpointVerifier.separate.enabled")
.categories("network")
.version("0.7.0")
.doc("dispatcher will process RpcEndpointVerifier's request separately")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.doc("dispatcher will process RpcEndpointVerifier's request separately")
.doc("Whether to enable dispatcher process RpcEndpointVerifier's request separately.")

}

/** Message loop used for dispatching messages. */
private class EndpointVerifierMessageLoop extends Runnable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some test case to verify?

@FMX
Copy link
Contributor

FMX commented Dec 9, 2025

This pr look interesting to me. Can you share some test results for this PR? For example, are there any changes in the number of rpc look up timeout exception?

@TheodoreLx
Copy link
Contributor Author

image

This is a screenshot of the driver logs from a previous task. If the sum of the queueTime and processTime of an RPC message exceeds 1 second, it will be displayed here.

In this image, because many RPC messages were waiting in the queue, the CheckExistence message was queued for 12 seconds before being processed. In some large jobs, this could even exceed 30 seconds. This is precisely the timeout period of rpcLookupTimeout, which would cause the task to fail.

After adopting this pull request, we no longer found slow rpc detected information for CheckExistence in the driver logs, meaning that the sum of queueTime and processTime has decreased to below 1 second.

Copy link
Contributor

@FMX FMX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

@RexXiong RexXiong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@RexXiong RexXiong closed this in 5789b4e Dec 14, 2025
@RexXiong
Copy link
Contributor

Thanks, Merge to main(v0.7.0)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants