notify standbys to resubscribe when forward entry missing#399
Conversation
WalkthroughAdds CcShard::NotifyStandbyOutOfSync(uint32_t) and replaces inline out-of-sync handling with calls to it when ForwardEntry is missing or resend fails; the helper ensures stream sender init, sends a KeyObjectStandbyForwardRequest, marks standby seq_id = UINT64_MAX, removes heartbeat targets, and dispatches per-core subscription updates. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant CcShard as CcShard
participant Stream as StreamSender
participant Standby as StandbyNode
participant Heartbeat as HeartbeatManager
participant Scheduler as PerCoreScheduler
CcShard->>CcShard: detect missing ForwardEntry or resend failure
CcShard->>Stream: ensure_initialized()
Stream-->>CcShard: initialized
CcShard->>Stream: send(KeyObjectStandbyForwardRequest(node_id, out_of_sync))
Stream->>Standby: deliver out-of-sync request
CcShard->>CcShard: set standby.seq_id = UINT64_MAX
CcShard->>Heartbeat: remove_target(node_id)
CcShard->>Scheduler: broadcast(adjust_subscriptions_for_node(node_id))
Note right of Scheduler: per-core tasks clear subscriptions for that node
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@tx_service/src/cc/cc_shard.cpp`:
- Around line 3079-3141: NotifyStandbyOutOfSync sets the node's seq id to
UINT64_MAX but leaves the entry in subscribed_standby_nodes_, causing
ForwardStandbyMessage to treat it as a send target and trigger retries; fix by
changing forwarding logic in ForwardStandbyMessage to skip entries whose seq id
(seq_id_and_term.first / the subscribed_standby_nodes_ value) equals UINT64_MAX
(or otherwise mark them as non-sendable), or alternatively remove the node from
subscribed_standby_nodes_ in NotifyStandbyOutOfSync; update the iteration/send
path in ForwardStandbyMessage to check for UINT64_MAX and not add
history/trigger retries for those nodes.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@tx_service/src/cc/cc_shard.cpp`:
- Around line 3098-3160: NotifyStandbyOutOfSync currently updates state (setting
seq_id to UINT64_MAX and removing heartbeat targets) immediately after calling
stream_sender_->SendMessageToNode without checking the SendMessageResult; change
it to capture the return value from stream_sender_->SendMessageToNode
(SendMessageResult), and if both sent and queued_for_retry are false, log a
warning and return early so you don't mark the standby out-of-sync; only proceed
to modify seq_node_iter->second, call local_shards_.RemoveHeartbeatTargetNode,
and dispatch the unsubscribe updates to other cores after the send_result
indicates success or queued retry.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
tx_service/include/cc/cc_shard.htx_service/include/cc/object_cc_map.htx_service/src/cc/cc_shard.cpp
🚧 Files skipped from review as they are similar to previous changes (1)
- tx_service/include/cc/cc_shard.h
| void CcShard::NotifyStandbyOutOfSync(uint32_t node_id) | ||
| { | ||
| auto seq_node_iter = subscribed_standby_nodes_.find(node_id); | ||
| if (seq_node_iter == subscribed_standby_nodes_.end()) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| if (!stream_sender_) | ||
| { | ||
| stream_sender_ = Sharder::Instance().GetCcStreamSender(); | ||
| } | ||
| if (!stream_sender_) | ||
| { | ||
| LOG(WARNING) << "Failed to notify standby " << node_id | ||
| << " of out of sync state because stream sender is null"; | ||
| return; | ||
| } | ||
|
|
||
| // Message not found in map - it has been evicted. Notify standby that it | ||
| // has already fallen behind so it can resubscribe to the primary node. | ||
| remote::CcMessage cc_msg; | ||
| cc_msg.set_type(remote::CcMessage_MessageType:: | ||
| CcMessage_MessageType_KeyObjectStandbyForwardRequest); | ||
| auto req = cc_msg.mutable_key_obj_standby_forward_req(); | ||
| req->set_forward_seq_grp(core_id_); | ||
| req->set_forward_seq_id(next_forward_sequence_id_ - 1); | ||
| req->set_primary_leader_term( | ||
| Sharder::Instance().LeaderTerm(Sharder::Instance().NativeNodeGroup())); | ||
| req->set_out_of_sync(true); | ||
| stream_sender_->SendMessageToNode(node_id, cc_msg); | ||
|
|
||
| auto &seq_id_and_term = seq_node_iter->second; | ||
| seq_id_and_term.first = UINT64_MAX; | ||
| // Remove heartbeat target node | ||
| local_shards_.RemoveHeartbeatTargetNode(node_id, seq_id_and_term.second); | ||
|
|
||
| int64_t unsubscribe_standby_term = seq_id_and_term.second; | ||
| for (size_t core_idx = 0; core_idx < core_cnt_; ++core_idx) | ||
| { | ||
| if (core_idx != core_id_) | ||
| { | ||
| DispatchTask( | ||
| core_idx, | ||
| [node_id, unsubscribe_standby_term](CcShard &ccs) -> bool | ||
| { | ||
| auto subscribe_node_iter = | ||
| ccs.subscribed_standby_nodes_.find(node_id); | ||
| if (subscribe_node_iter != | ||
| ccs.subscribed_standby_nodes_.end()) | ||
| { | ||
| if (subscribe_node_iter->second.second <= | ||
| unsubscribe_standby_term) | ||
| { | ||
| subscribe_node_iter->second.first = UINT64_MAX; | ||
| } | ||
| } | ||
|
|
||
| return true; | ||
| }); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find SendMessageToNode method definition and its return type
rg -n "SendMessageToNode" -t cpp -t h -t hh -C 3Repository: eloqdata/tx_service
Length of output: 89
🏁 Script executed:
#!/bin/bash
# Find SendMessageToNode method definition and its return type
rg -n "SendMessageToNode" -t cpp -t h -C 3Repository: eloqdata/tx_service
Length of output: 25623
🏁 Script executed:
#!/bin/bash
# Find SendMessageResult struct definition
rg -n "struct SendMessageResult|class SendMessageResult" -t cpp -t h -A 10Repository: eloqdata/tx_service
Length of output: 809
🏁 Script executed:
#!/bin/bash
# Get more details about the SendMessageResult and understand the implementation
rg -n "SendMessageResult" tx_service/include/remote/cc_stream_sender.h -A 30 | head -60Repository: eloqdata/tx_service
Length of output: 2387
🏁 Script executed:
#!/bin/bash
# Check the NotifyStandbyOutOfSync context and see if SendStandbyMessageToNode could be used
rg -n "SendStandbyMessageToNode" -t cpp -t h -C 2 | head -50Repository: eloqdata/tx_service
Length of output: 2445
🏁 Script executed:
#!/bin/bash
# Check the implementation of SendMessageToNode to see what happens on failure
cat -n tx_service/src/remote/cc_stream_sender.cpp | sed -n '158,250p'Repository: eloqdata/tx_service
Length of output: 4286
🏁 Script executed:
#!/bin/bash
# Continue reading SendMessageToNode implementation
cat -n tx_service/src/remote/cc_stream_sender.cpp | sed -n '250,290p'Repository: eloqdata/tx_service
Length of output: 1948
🏁 Script executed:
#!/bin/bash
# Continue reading to see what happens on non-resend failures
cat -n tx_service/src/remote/cc_stream_sender.cpp | sed -n '290,310p'Repository: eloqdata/tx_service
Length of output: 1443
🏁 Script executed:
#!/bin/bash
# See the final return value
cat -n tx_service/src/remote/cc_stream_sender.cpp | sed -n '310,325p'Repository: eloqdata/tx_service
Length of output: 633
🏁 Script executed:
#!/bin/bash
# Check the exact context around line 3128 in cc_shard.cpp
cat -n tx_service/src/cc/cc_shard.cpp | sed -n '3115,3165p'Repository: eloqdata/tx_service
Length of output: 2377
Check the send result before marking standby as out-of-sync.
NotifyStandbyOutOfSync modifies state (setting seq_id to UINT64_MAX and removing heartbeat targets) without verifying the notification was successfully sent or queued for retry. SendMessageToNode returns a SendMessageResult struct with fields sent and queued_for_retry. If the result indicates the message failed and wasn't queued (both false), the standby will never resubscribe and will silently lag behind.
Check the result before modifying state:
const auto send_result = stream_sender_->SendMessageToNode(node_id, cc_msg);
if (!send_result.sent && !send_result.queued_for_retry)
{
LOG(WARNING) << "Failed to notify standby " << node_id
<< " of out-of-sync state; will retry later";
return;
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tx_service/src/cc/cc_shard.cpp` around lines 3098 - 3160,
NotifyStandbyOutOfSync currently updates state (setting seq_id to UINT64_MAX and
removing heartbeat targets) immediately after calling
stream_sender_->SendMessageToNode without checking the SendMessageResult; change
it to capture the return value from stream_sender_->SendMessageToNode
(SendMessageResult), and if both sent and queued_for_retry are false, log a
warning and return early so you don't mark the standby out-of-sync; only proceed
to modify seq_node_iter->second, call local_shards_.RemoveHeartbeatTargetNode,
and dispatch the unsubscribe updates to other cores after the send_result
indicates success or queued retry.
Summary by CodeRabbit
New Features
Bug Fixes