-
-
Notifications
You must be signed in to change notification settings - Fork 105
fix: waker registration and cross-node PUT response routing #1985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
iduartgomez
merged 9 commits into
main
from
fix/network-event-loop-waker-registration-rebased
Oct 26, 2025
Merged
fix: waker registration and cross-node PUT response routing #1985
iduartgomez
merged 9 commits into
main
from
fix/network-event-loop-waker-registration-rebased
Oct 26, 2025
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
When a node receives a forwarded PUT request via RequestPut and becomes the final destination (no next_target to forward to), it now properly: 1. Sends SuccessfulPut back to the sender (originating node) 2. Transitions to Finished state instead of AwaitingResponse Previously, the handler would transition to AwaitingResponse but never send a response, causing incomplete operations that left clients waiting indefinitely. Changes: - Added sender field to RequestPut message to track request origin - Split state handling: AwaitingResponse when forwarding, Finished when final - Send SuccessfulPut to sender when node is final destination (no forwarding) Fixes cross-node PUT response delivery bug where responses would attempt local delivery on wrong node's SessionActor instead of routing back to the originating node. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit fixes a race condition where the network event loop could lose waker registration, causing messages to be dropped. The issue occurred because futures were being recreated on every iteration of the event loop. Key changes: - Replaced function-based priority_select with a stream-based PrioritySelectStream that maintains waker registration across iterations - Moved peer_connections FuturesUnordered into the stream to enable proper lifetime management - Added comprehensive unit tests including stress tests with 1700+ concurrent messages The stream stays alive across loop iterations, maintaining waker registration for all channels. This ensures the runtime can wake the task when any message arrives, preventing the lost wakeup race condition. Fixes #1932 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Previously, when notification_receiver or op_execution_receiver channels closed (returned Poll::Ready(None)), the code would silently continue polling other sources instead of properly signaling a critical shutdown condition. Changes: - Add ChannelCloseReason::Notification and ChannelCloseReason::OpExecution - Update handle_notification_msg to return ClosedChannel event on None - Update handle_op_execution to return ClosedChannel event on None - Add new variants to shutdown match arm in run_event_listener This ensures critical channel closures trigger proper cleanup and shutdown instead of being silently ignored. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
…ations After adding proper channel closure handling, we discovered that ReceiverStream returns Poll::Ready(None) repeatedly after a channel closes, creating an infinite loop of closure notifications. Solution: Track which channels have been reported as closed and skip polling them on subsequent calls to poll_next(). Changes: - Add bool flags to track closure state for each channel - Skip polling channels that have already been reported as closed - Report closure only once, then treat the channel as permanently done - Continue to check all active sources before reporting any closure This ensures: 1. Channel closures are properly signaled to handlers 2. Closures are reported only once (not in an infinite loop) 3. Messages from active channels are delivered before reporting closures 4. All 13 priority_select tests pass 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Refactors HandshakeHandler::wait_for_events() from a Future-based loop to a proper Stream implementation (HandshakeEventStream). This fixes the lost wakeup race condition (#1932) by maintaining state across polls. ## Changes ### Core Implementation - Add HandshakeEventStream struct wrapping HandshakeHandler - Implement Stream trait with poll_next() for all 5 event channels - Port all logic from wait_for_events tokio::select! to poll-based - Add priority-based polling: inbound > outbound > unconfirmed > pending_msg > establish_connection ### Event Handling - Add InternalEvent::InboundConnectionAccepted variant - Add InternalEvent::InboundConnectionRejected variant - Add InternalEvent::TransientForward variant - Implement handle_unconfirmed_inbound() helper - Implement async handle_inbound_gw_join_request() for both accepted and transient connection paths ### Integration - Update PrioritySelectStream to use generic Stream type - Update p2p_protoc.rs to instantiate HandshakeEventStream - Add production type alias for HandshakeEventStream ### Testing - Add 5 comprehensive stream-based unit tests - test_stream_gateway_inbound_conn_success - test_stream_gateway_inbound_conn_rejected - test_stream_peer_to_gw_outbound_conn - test_stream_peer_to_peer_outbound_conn_succeeded - test_stream_peer_to_gw_outbound_conn_rejected (complex multi-hop) - All 13 handshake tests pass (8 original + 5 stream) - All 13 priority_select tests pass ### Bug Fixes - Fix missing event notification when forward_conn returns Ok(None) Previously dropped connection without emitting event, now properly returns InboundConnectionAccepted with op: None to signal rejection ## Architecture Notes Stream implementation uses strict priority ordering (vs tokio::select!'s random racing). This provides deterministic behavior and better inbound throughput, but could cause starvation of low-priority channels under heavy load. See .claude/handshake-stream-implementation-status.md for detailed analysis and next steps. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Changes the poll_next implementation to use an internal loop pattern instead of returning Poll::Pending immediately after high-priority work. This ensures all priority levels get fair polling opportunities. The previous implementation would wake and return Pending after processing high-priority channels, starving lower priorities. Now we use 'continue' to loop back and check all priorities before returning Pending. This fixes the fairness issue discovered during #1932 debugging where mesh connectivity would establish but connections would disappear. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
a03478e to
7fb2351
Compare
Restore essential fixes that were lost when rebasing onto main: - **UPDATE operation improvements**: Proper state transitions, cross-node routing, and contract existence checks - **op_state_manager**: Prevent completed operations from being pushed back to HashMap - **p2p_protoc**: Fix TOCTOU race conditions in connection management and prevent self-targeting in message routing These changes fix test failures where operations were timing out or clients were disconnecting unexpectedly. Test results: 7/8 tests passing consistently, 1 test (test_multiple_clients_subscription) has intermittent timing issues. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Summary
This PR contains two categories of critical fixes:
1. Waker Registration and Event Loop Fixes
Resolves lost wakeup race condition in the network event loop that was causing missed notifications and operation stalls.
Changes:
PrioritySelectStreamcombinator to properly handle waker registration across multiple futures2. Cross-Node PUT Response Routing Fix
Fixes cross-node PUT response delivery bug where responses would attempt local delivery on the wrong node's SessionActor instead of routing back to the originating node.
Root Cause:
When a client on Node A sends a PUT request that gets forwarded to Node C:
upstream: NoneChanges:
senderfield toRequestPutmessage to track request originAwaitingResponsestate (wait for downstream response)SuccessfulPutback to sender and setFinishedstateAwaitingResponsebut never send response, causing incomplete operationsTest Results
Test Plan
cargo checkpasses🤖 Generated with Claude Code
Co-Authored-By: Claude noreply@anthropic.com