fix(raft): report unreachable/snapshot status from dispatch workers#535
fix(raft): report unreachable/snapshot status from dispatch workers#535
Conversation
The etcd engine dispatch workers logged outbound-send failures and incremented a counter, but never informed the raft state machine. As a result, per-peer Progress could latch in StateReplicate or StateSnapshot with PendingSnapshot set while the transport was actually failing, and the leader would indefinitely skip sendAppend / sendSnap for that peer. A follower that later recovered would then sit forever behind the leader because only heartbeats were being exchanged (observed after a snapshot-restore crash: once the follower restarted, the leader kept PendingSnapshot from the previous failed attempt and never retried). Route dispatch errors back into the event loop through a new dispatchReportCh. rawNode.ReportUnreachable and rawNode.ReportSnapshot are not goroutine-safe, so the dispatch worker posts a small report struct instead of calling raft directly. handleDispatchReport then runs on the engine goroutine and invokes the correct call: ReportSnapshot with SnapshotFailure for MsgSnap failures (clears PendingSnapshot and transitions Progress to Probe), ReportUnreachable for everything else (moves Progress from Replicate to Probe). The channel post is non-blocking. Under sustained dispatch failure the event loop is the natural pressure point; losing an occasional report only delays the next retry by one heartbeat interval, which is harmless and strictly better than blocking the dispatch worker. Also add payload-byte counters on the streaming snapshot path (streamFSMSnapshot on the sender, receiveSnapshotStream on the receiver). Recently a follower failed to install a snapshot with readRestoreEntry hitting unexpected EOF; the counters let us compare sender- and receiver-side totals to tell transport truncation apart from a serialization/format problem on the next reproduction.
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 55 minutes and 9 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThis pull request adds infrastructure for reporting transport failures back to the raft consensus layer. The changes introduce a dispatch report channel in the Engine's event loop that handles peer send failures, converts them into appropriate raft state updates ( Changes
Sequence Diagram(s)sequenceDiagram
actor DW as Dispatch Worker
participant EC as Engine Event Loop
participant CH as dispatchReportCh
participant RN as RawNode
DW->>CH: postDispatchReport(error, peer, msgType) via non-blocking send
Note over CH: Queued or dropped if full/closing
EC->>CH: select on dispatchReportCh
EC->>EC: handleDispatchReport()
alt MsgSnap failure
EC->>RN: ReportSnapshot(peer, SnapshotFailure)
else Other failure
EC->>RN: ReportUnreachable(peer)
end
Note over RN: Raft state updated
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~22 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Reviewdog golangci (gci) flagged the initial import block; the pre-existing convention in this package groups third-party imports together (see engine_test.go), so collapse the raftpb + testify split.
There was a problem hiding this comment.
🧹 Nitpick comments (5)
internal/raftengine/etcd/engine.go (2)
806-821: LGTM — non-blocking contract holds.The
selectcorrectly implements the three required outcomes: fast path enqueue, abort during shutdown (viacloseCh), and non-blocking drop when the channel is saturated. Go selects a ready case randomly when multiple are ready, but sincedefaultonly fires when no other case is ready, thecloseChpath always wins over drop during shutdown — which matches the intent.One small observability note: dropped reports here do not increment a counter like
dispatchDropCountdoes for dropped messages. Given raft will reconcile on the next heartbeat this is fine, but exposing a gauge/counter would make silent backpressure visible in operations.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/engine.go` around lines 806 - 821, Add observability for dropped dispatch reports in postDispatchReport: inside the default branch where the report is dropped, increment the existing metric/counter used for dropped messages (e.g., dispatchDropCount) or create a new counter/gauge if none exists and record the drop alongside the slog.Warn. Update the function postDispatchReport to reference e.dispatchReportCh, e.closeCh and the dispatch drop metric so operations can detect backpressure without changing the non-blocking behavior.
790-804:e.rawNode == nilguard is unreachable.
rawNodeis assigned inOpen()(line 292, after successful creation at line 261) and is never reset tonil—Close()does not touch it.handleDispatchReportis only invoked fromhandleEvent(line 772), the engine's main event loop, which runs only afterOpen()succeeds. The guard therefore cannot fire in production. Not a functional bug, but it adds unnecessary noise and could mask a real regression later if someone assumes the branch is exercised.The routing logic itself is sound:
MsgSnap→ReportSnapshot(SnapshotFailure), otherwise →ReportUnreachable, both now run on the engine goroutine as intended.Either drop the guard or add a clear comment explaining why it exists (e.g., if it's defensive against future refactors).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/engine.go` around lines 790 - 804, The nil-check guard in handleDispatchReport (if e.rawNode == nil { return }) is unreachable because rawNode is set in Open() and never cleared in Close(), and handleDispatchReport is only called from the engine event loop started after Open() succeeds; remove that guard to avoid dead/noisy code and potential confusion. Locate handleDispatchReport and delete the e.rawNode nil-check, or alternatively replace it with a one-line comment referencing Open(), Close(), and handleEvent to justify the defensive check (mentioning rawNode) if you prefer to keep the check.internal/raftengine/etcd/grpc_transport.go (2)
288-301:Close()oncountingReadCloseris never invoked — consider dropping theio.Closerhalf.
sendSnapshotReaderChunkstakesio.Reader, and the defer at Line 252-256 already callsrc.Close()directly on the underlying opener handle, never on the wrapper. That leavescountingReadCloser.Close(and the//nolint:wrapcheck) as dead code. Shrinking this to a plainio.Readercounter both removes the dead method and thenolintpragma, which the project guidelines prefer to avoid.Suggested simplification
-type countingReadCloser struct { - inner io.ReadCloser - n int64 -} - -func (c *countingReadCloser) Read(p []byte) (int, error) { - n, err := c.inner.Read(p) - c.n += int64(n) - return n, err //nolint:wrapcheck // preserve io.EOF sentinel identity for callers -} - -func (c *countingReadCloser) Close() error { - return c.inner.Close() //nolint:wrapcheck // caller expects the underlying close error verbatim -} +// countingReader wraps an io.Reader and tallies bytes read. It intentionally +// returns the underlying error verbatim so io.EOF sentinel identity is preserved. +type countingReader struct { + inner io.Reader + n int64 +} + +func (c *countingReader) Read(p []byte) (int, error) { + n, err := c.inner.Read(p) + c.n += int64(n) + return n, err //nolint:wrapcheck +}As per coding guidelines: "Avoid adding
//nolintunless absolutely required; prefer refactoring."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/grpc_transport.go` around lines 288 - 301, The countingReadCloser wrapper currently implements io.ReadCloser but its Close method is never called (sendSnapshotReaderChunks accepts io.Reader and the underlying opener is closed directly), so remove the Close implementation and convert countingReadCloser into a plain reader-only type: replace type countingReadCloser { inner io.ReadCloser; n int64 } with a reader-only struct holding inner io.Reader and n int64, update the Read method to read from inner io.Reader and accumulate n, remove the Close method and the //nolint:wrapcheck comment, and update any call sites creating countingReadCloser to pass an io.Reader rather than relying on Close being forwarded.
270-286: Also logpayload_byteson the error paths.The PR explicitly calls out that these counters exist to distinguish transport truncation from serialization/format errors. In the current shape, both error exits (
sendSnapshotReaderChunksfailure at line 274,CloseAndRecvfailure at line 277) return before theslog.Infocall, so the byte count — the most useful diagnostic signal on a failed send — is dropped on the floor precisely when you need it.Consider emitting a structured warn/error log (or reusing the success log at a different level) that includes
counter.non both failure exits so operators can correlate the sender's byte count against the receiver'spayload_bytesinreceiveSnapshotStream.Suggested adjustment
counter := &countingReadCloser{inner: rc} if err := sendSnapshotReaderChunks(stream, header, counter, t.chunkSize()); err != nil { + slog.Warn("etcd raft snapshot stream send failed", + "index", index, + "to", msg.To, + "payload_bytes", counter.n, + "err", err, + ) return err } if _, err := stream.CloseAndRecv(); err != nil { + slog.Warn("etcd raft snapshot stream close failed", + "index", index, + "to", msg.To, + "payload_bytes", counter.n, + "err", err, + ) return errors.WithStack(err) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/grpc_transport.go` around lines 270 - 286, sendSnapshotReaderChunks and stream.CloseAndRecv return early without logging the countingReadCloser payload total, losing the diagnostic byte count; capture counter.n on both error paths and emit a structured slog log (e.g., slog.Warn or slog.Error) that includes "index", "to", "payload_bytes" and the returned error before returning the error. Update the error branch after sendSnapshotReaderChunks and the one after stream.CloseAndRecv to log the same fields (using countingReadCloser.counter.n), referencing countingReadCloser, sendSnapshotReaderChunks and stream.CloseAndRecv so operators can correlate sender/receiver byte counts.internal/raftengine/etcd/dispatch_report_test.go (1)
19-88: Tests coverpostDispatchReportpaths well; consider noting thehandleDispatchReportgap.The three tests cleanly exercise the three select arms (deliver, drop-on-full, abort-on-close) with a 2s timeout to catch hangs,
t.Parallel(), andrequire.Equalfor payload equality. Constructing a bare&Engine{}with onlydispatchReportCh/closeChis safe becausepostDispatchReporttouches nothing else.One coverage gap worth acknowledging (not necessarily here):
handleDispatchReport's routing ofMsgSnapvs. other message types torawNode.ReportSnapshot/rawNode.ReportUnreachableis currently not exercised by a unit test. Given that requires wiring a realetcdraft.RawNode, an integration test — or a small interface seam around those two methods — would close the loop on the behavior the PR is actually fixing.As per coding guidelines: "prefer table-driven test cases." The three scenarios share enough scaffolding (Engine construction, timeout) that a single table-driven test with a
setuphook per case would reduce duplication, though the current shape is readable as-is.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/dispatch_report_test.go` around lines 19 - 88, The tests exercise postDispatchReport's three select paths (deliver, drop-on-full, abort-on-close) but don't cover handleDispatchReport's routing logic; add guidance to either (a) add an integration test that wires a real etcdraft.RawNode to assert that handleDispatchReport calls rawNode.ReportSnapshot for MsgSnap and rawNode.ReportUnreachable for other message types, or (b) introduce a small interface seam (e.g. a reporter interface with ReportSnapshot/ReportUnreachable) and provide a mock to unit-test handleDispatchReport's branching; optionally consolidate the three existing tests into a single table-driven test that calls postDispatchReport for each case to reduce duplication while keeping the 2s timeout and t.Parallel usage.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/raftengine/etcd/dispatch_report_test.go`:
- Around line 19-88: The tests exercise postDispatchReport's three select paths
(deliver, drop-on-full, abort-on-close) but don't cover handleDispatchReport's
routing logic; add guidance to either (a) add an integration test that wires a
real etcdraft.RawNode to assert that handleDispatchReport calls
rawNode.ReportSnapshot for MsgSnap and rawNode.ReportUnreachable for other
message types, or (b) introduce a small interface seam (e.g. a reporter
interface with ReportSnapshot/ReportUnreachable) and provide a mock to unit-test
handleDispatchReport's branching; optionally consolidate the three existing
tests into a single table-driven test that calls postDispatchReport for each
case to reduce duplication while keeping the 2s timeout and t.Parallel usage.
In `@internal/raftengine/etcd/engine.go`:
- Around line 806-821: Add observability for dropped dispatch reports in
postDispatchReport: inside the default branch where the report is dropped,
increment the existing metric/counter used for dropped messages (e.g.,
dispatchDropCount) or create a new counter/gauge if none exists and record the
drop alongside the slog.Warn. Update the function postDispatchReport to
reference e.dispatchReportCh, e.closeCh and the dispatch drop metric so
operations can detect backpressure without changing the non-blocking behavior.
- Around line 790-804: The nil-check guard in handleDispatchReport (if e.rawNode
== nil { return }) is unreachable because rawNode is set in Open() and never
cleared in Close(), and handleDispatchReport is only called from the engine
event loop started after Open() succeeds; remove that guard to avoid dead/noisy
code and potential confusion. Locate handleDispatchReport and delete the
e.rawNode nil-check, or alternatively replace it with a one-line comment
referencing Open(), Close(), and handleEvent to justify the defensive check
(mentioning rawNode) if you prefer to keep the check.
In `@internal/raftengine/etcd/grpc_transport.go`:
- Around line 288-301: The countingReadCloser wrapper currently implements
io.ReadCloser but its Close method is never called (sendSnapshotReaderChunks
accepts io.Reader and the underlying opener is closed directly), so remove the
Close implementation and convert countingReadCloser into a plain reader-only
type: replace type countingReadCloser { inner io.ReadCloser; n int64 } with a
reader-only struct holding inner io.Reader and n int64, update the Read method
to read from inner io.Reader and accumulate n, remove the Close method and the
//nolint:wrapcheck comment, and update any call sites creating
countingReadCloser to pass an io.Reader rather than relying on Close being
forwarded.
- Around line 270-286: sendSnapshotReaderChunks and stream.CloseAndRecv return
early without logging the countingReadCloser payload total, losing the
diagnostic byte count; capture counter.n on both error paths and emit a
structured slog log (e.g., slog.Warn or slog.Error) that includes "index", "to",
"payload_bytes" and the returned error before returning the error. Update the
error branch after sendSnapshotReaderChunks and the one after
stream.CloseAndRecv to log the same fields (using countingReadCloser.counter.n),
referencing countingReadCloser, sendSnapshotReaderChunks and stream.CloseAndRecv
so operators can correlate sender/receiver byte counts.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ca3478a2-1157-43eb-be7b-2077c9686800
📒 Files selected for processing (3)
internal/raftengine/etcd/dispatch_report_test.gointernal/raftengine/etcd/engine.gointernal/raftengine/etcd/grpc_transport.go
There was a problem hiding this comment.
Code Review
This pull request introduces a non-blocking mechanism for dispatch workers to report transport failures to the Raft engine, ensuring that peer progress is correctly updated when followers become unreachable. It also adds byte-counting instrumentation to snapshot transfers within the gRPC transport to improve observability and assist in diagnosing snapshot restoration failures. I have no feedback to provide.
The etcd engine dispatch workers logged outbound-send failures and incremented a counter, but never informed the raft state machine. As a result, per-peer Progress could latch in StateReplicate or StateSnapshot with PendingSnapshot set while the transport was actually failing, and the leader would indefinitely skip sendAppend / sendSnap for that peer. A follower that later recovered would then sit forever behind the leader because only heartbeats were being exchanged (observed after a snapshot-restore crash: once the follower restarted, the leader kept PendingSnapshot from the previous failed attempt and never retried).
Route dispatch errors back into the event loop through a new dispatchReportCh. rawNode.ReportUnreachable and rawNode.ReportSnapshot are not goroutine-safe, so the dispatch worker posts a small report struct instead of calling raft directly. handleDispatchReport then runs on the engine goroutine and invokes the correct call: ReportSnapshot with SnapshotFailure for MsgSnap failures (clears PendingSnapshot and transitions Progress to Probe), ReportUnreachable for everything else (moves Progress from Replicate to Probe).
The channel post is non-blocking. Under sustained dispatch failure the event loop is the natural pressure point; losing an occasional report only delays the next retry by one heartbeat interval, which is harmless and strictly better than blocking the dispatch worker.
Also add payload-byte counters on the streaming snapshot path (streamFSMSnapshot on the sender, receiveSnapshotStream on the receiver). Recently a follower failed to install a snapshot with readRestoreEntry hitting unexpected EOF; the counters let us compare sender- and receiver-side totals to tell transport truncation apart from a serialization/format problem on the next reproduction.
Summary by CodeRabbit
Release Notes
Tests
Improvements