- 
          
 - 
                Notifications
    
You must be signed in to change notification settings  - Fork 105
 
fix: make PUT auto-subscribe atomic #1894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review: PR #1894 - Make PUT auto-subscribe atomic
[AI-assisted debugging and comment]
Summary
This PR successfully implements atomic PUT+subscribe operations by making the PUT operation await the completion of auto-generated Subscribe transactions. The implementation adds proper tracking via op-manager and includes a comprehensive regression test. Overall, the code quality is good with proper error handling and timeout management.
Detailed Findings
1. Correctness of Atomic Implementation ✅
Strengths:
- The implementation correctly uses a oneshot channel to await subscribe completion
 - The 
register_auto_subscriptionmethod properly links parent PUT transactions to child Subscribe transactions - Multiple completion paths are properly handled:
LocalSubscribeCompleteevent (for locally available contracts)- Normal subscribe operation completion (via 
handle_op_result) - Transaction timeout events
 
 - The 
await_txparameter is properly threaded throughstart_subscription_requestto distinguish atomic vs fire-and-forget subscriptions 
Architecture:
The solution uses a clean design with:
AutoSubscribeContextstruct to track parent_tx, key, and notification channelDashMap<Transaction, AutoSubscribeContext>for thread-safe tracking- Resolution through both success and failure paths that clean up the map entry
 
2. Error Handling and Timeout Handling ✅
Strengths:
- Comprehensive error handling with proper cleanup:
resolve_auto_subscription_failurecalled on all error paths- Timeout handling at multiple levels (15-second operation timeout + network timeout)
 - Channel drop detection (
Ok(Err(_))case) 
 - The implementation properly handles the case where 
request_subscribefails early - Timeout cleanup prevents memory leaks by calling 
resolve_auto_subscription_failurebefore returning error 
Considerations:
- 15-second timeout seems reasonable for most cases but could be configurable
 - The implementation properly cleans up even on timeout, preventing memory leaks
 
3. Test Coverage and Quality ✅
Strengths:
- Excellent regression test 
test_put_auto_subscribe_allows_immediate_update:- Tests the exact issue #1893 scenario
 - Verifies PUT with subscribe=true blocks until subscribe completes
 - Immediately issues UPDATE and verifies notification arrives without sleeps
 - Includes verification via GET that state persisted correctly
 
 - Test passes consistently (verified)
 - Existing test 
test_put_with_subscribe_flagstill passes (backward compatibility) 
Test Quality:
- Comprehensive setup with gateway and client nodes
 - Proper use of timeouts to prevent test hangs
 - Clear assertions and error messages
 - Tests the full end-to-end flow
 
4. Race Conditions and Concurrency Issues ⚠️ 
Potential Issues Identified:
a) Double Resolution Race Condition (Medium Severity)
The resolve_auto_subscription_success/failure methods use DashMap::remove, which is safe, but there's a theoretical race where:
- Subscribe completes via 
LocalSubscribeCompleteevent - Simultaneously, network timeout fires
 - Both try to resolve the same transaction
 
Current State: This is handled correctly - DashMap::remove is atomic and only one caller will get Some(ctx). The second caller gets None and returns false, which is logged but otherwise ignored.
Recommendation: This is actually handled correctly by the current implementation.
b) Subscribe Transaction Completes Before Registration (Low Probability)
In start_subscription_request:
let receiver = op_manager.register_auto_subscription(parent_tx, subscribe_tx, key);
// Gap here - subscribe could theoretically complete before registration is done
if let Err(error) = subscribe::request_subscribe(op_manager, sub_op).await {Analysis: This is unlikely in practice because request_subscribe is async and the subscribe transaction needs to go through the network stack. However, for locally-available contracts, the LocalSubscribeComplete event could theoretically fire very quickly.
Current State: The LocalSubscribeComplete event is sent via notify_node_event, which goes through a channel, providing a natural ordering guarantee. Registration happens before the subscribe request is initiated, so this should be safe.
Recommendation: No changes needed - the async nature provides sufficient ordering guarantees.
5. Code Quality and Maintainability ✅
Strengths:
- Clear, descriptive variable names and logging
 - Comprehensive debug and info logging at all key points
 - Good separation of concerns (op_state_manager handles tracking, operations/mod.rs handles orchestration)
 - Proper use of Rust idioms (oneshot channels, DashMap, Result types)
 - The code is well-documented with comments explaining the flow
 
Minor Observations:
- Warning log when replacing existing auto-subscription context is good defensive programming
 - The 
let _ = ctx.notify.send(...)pattern is correct (receiver may have dropped due to timeout) - All non-atomic subscription calls properly pass 
Noneforawait_tx 
6. Additional Observations
Memory Management ✅
auto_subscribe_linksis properly cleaned up on both success and failure paths- No memory leaks detected - all insertions have corresponding removals
 - Timeout path includes cleanup before error return
 
Backward Compatibility ✅
- Non-subscribing PUTs unchanged
 - Fire-and-forget subscriptions (e.g., in intermediate nodes) use 
Noneforawait_tx - Existing test still passes
 
Error Propagation ✅
- PUT operation properly returns errors from failed auto-subscriptions (line 538-539 in put.rs)
 - This causes the PUT to fail if subscribe fails, which is correct for atomicity
 
Recommendations
Required Changes: None
The implementation is solid and production-ready.
Optional Improvements (for future consideration):
- 
Configurable Timeout: Consider making
AUTO_SUBSCRIBE_TIMEOUTconfigurable via config file for different deployment scenarios. - 
Metrics/Monitoring: Consider adding metrics for:
- Auto-subscribe success/failure rates
 - Time-to-complete for auto-subscribes
 - Timeout frequency
 
 - 
Documentation: Add a comment in
start_subscription_requestexplaining the atomicity guarantee and the two modes (await vs fire-and-forget). - 
Test Enhancement: Consider adding a test that verifies timeout behavior (though this may be hard to test reliably).
 
Conclusion
This is a well-implemented solution to issue #1893. The atomic PUT+subscribe implementation is correct, properly handles errors and timeouts, includes comprehensive testing, and maintains backward compatibility. The code is production-ready.
Recommendation: APPROVE ✅
All tests pass, the implementation correctly solves the stated problem, and the code quality is high. No blocking issues identified.
Fix CI flake where ports weren't fully released by OS before nodes tried to bind, causing "Address already in use" error in CI. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
Two critical fixes for connection maintenance in PR #1894: 1. Gateway inbound connection tracking (p2p_protoc.rs:865-885) - Gateway wasn't calling add_connection() for inbound connections - This caused gateways to "forget" about joining peers immediately - Now properly tracks inbound connections in connections_by_location 2. Skip-list iteration bug (ring/mod.rs:350-365) - closest_to_location() used random sampling with choose() - When multiple peers at same location, could repeatedly pick skipped peer - Changed to iterate through all peers at each location sequentially These fixes improve connection establishment reliability but a persistence issue remains (documented in #1896). Related: #1894, #1896 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
| 
           This is the wrong approach, we will write a ADR/design doc in #1894 for atomic sub-transactions in general that will solve the general problem. The fix is a bit more elaborate.  | 
    
…mmendation This commit addresses two issues: 1. Re-adds the test_three_node_network_connectivity test that was previously removed. This test verifies that a network of 3 nodes (1 gateway + 2 peers) can establish a full mesh and perform operations. 2. Fixes a bug in connect.rs where the gateway wasn't properly filtering already-connected peers when recommending connections. When handling FindOptimalPeer requests, the gateway now passes the full skip_connections set to closest_to_location() instead of only skipping the joiner peer. This fix should allow the three-node mesh to form properly by preventing the gateway from repeatedly recommending peers that the joiner is already connected to. Related: #1894, #1889 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: nacho.d.g <iduartgomez@users.noreply.github.com>
Two critical fixes for connection maintenance in PR #1894: 1. Gateway inbound connection tracking (p2p_protoc.rs:865-885) - Gateway wasn't calling add_connection() for inbound connections - This caused gateways to "forget" about joining peers immediately - Now properly tracks inbound connections in connections_by_location 2. Skip-list iteration bug (ring/mod.rs:350-365) - closest_to_location() used random sampling with choose() - When multiple peers at same location, could repeatedly pick skipped peer - Changed to iterate through all peers at each location sequentially These fixes improve connection establishment reliability but a persistence issue remains (documented in #1896). Related: #1894, #1896 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> # Conflicts: # crates/core/src/node/network_bridge/handshake.rs # crates/core/src/node/network_bridge/p2p_protoc.rs # crates/core/src/ring/mod.rs # crates/core/tests/connectivity.rs
…mmendation This commit addresses two issues: 1. Re-adds the test_three_node_network_connectivity test that was previously removed. This test verifies that a network of 3 nodes (1 gateway + 2 peers) can establish a full mesh and perform operations. 2. Fixes a bug in connect.rs where the gateway wasn't properly filtering already-connected peers when recommending connections. When handling FindOptimalPeer requests, the gateway now passes the full skip_connections set to closest_to_location() instead of only skipping the joiner peer. This fix should allow the three-node mesh to form properly by preventing the gateway from repeatedly recommending peers that the joiner is already connected to. Related: #1894, #1889 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: nacho.d.g <iduartgomez@users.noreply.github.com>
Two critical fixes for connection maintenance in PR #1894: 1. Gateway inbound connection tracking (p2p_protoc.rs:865-885) - Gateway wasn't calling add_connection() for inbound connections - This caused gateways to "forget" about joining peers immediately - Now properly tracks inbound connections in connections_by_location 2. Skip-list iteration bug (ring/mod.rs:350-365) - closest_to_location() used random sampling with choose() - When multiple peers at same location, could repeatedly pick skipped peer - Changed to iterate through all peers at each location sequentially These fixes improve connection establishment reliability but a persistence issue remains (documented in #1896). Related: #1894, #1896 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> # Conflicts: # crates/core/src/node/network_bridge/handshake.rs # crates/core/src/node/network_bridge/p2p_protoc.rs # crates/core/src/ring/mod.rs # crates/core/tests/connectivity.rs
…mmendation This commit addresses two issues: 1. Re-adds the test_three_node_network_connectivity test that was previously removed. This test verifies that a network of 3 nodes (1 gateway + 2 peers) can establish a full mesh and perform operations. 2. Fixes a bug in connect.rs where the gateway wasn't properly filtering already-connected peers when recommending connections. When handling FindOptimalPeer requests, the gateway now passes the full skip_connections set to closest_to_location() instead of only skipping the joiner peer. This fix should allow the three-node mesh to form properly by preventing the gateway from repeatedly recommending peers that the joiner is already connected to. Related: #1894, #1889 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: nacho.d.g <iduartgomez@users.noreply.github.com>
Two critical fixes for connection maintenance in PR #1894: 1. Gateway inbound connection tracking (p2p_protoc.rs:865-885) - Gateway wasn't calling add_connection() for inbound connections - This caused gateways to "forget" about joining peers immediately - Now properly tracks inbound connections in connections_by_location 2. Skip-list iteration bug (ring/mod.rs:350-365) - closest_to_location() used random sampling with choose() - When multiple peers at same location, could repeatedly pick skipped peer - Changed to iterate through all peers at each location sequentially These fixes improve connection establishment reliability but a persistence issue remains (documented in #1896). Related: #1894, #1896 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com> # Conflicts: # crates/core/src/node/network_bridge/handshake.rs # crates/core/src/node/network_bridge/p2p_protoc.rs # crates/core/src/ring/mod.rs # crates/core/tests/connectivity.rs
…mmendation This commit addresses two issues: 1. Re-adds the test_three_node_network_connectivity test that was previously removed. This test verifies that a network of 3 nodes (1 gateway + 2 peers) can establish a full mesh and perform operations. 2. Fixes a bug in connect.rs where the gateway wasn't properly filtering already-connected peers when recommending connections. When handling FindOptimalPeer requests, the gateway now passes the full skip_connections set to closest_to_location() instead of only skipping the joiner peer. This fix should allow the three-node mesh to form properly by preventing the gateway from repeatedly recommending peers that the joiner is already connected to. Related: #1894, #1889 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: nacho.d.g <iduartgomez@users.noreply.github.com>
Summary
PUT+subscribe:trueatomic by awaiting the generated Subscribe transaction before returning thePutResponseTesting