From 29bb6ab78fd542eb2683ca7a680acb48cde88fee Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 04:12:36 +0900 Subject: [PATCH 01/10] perf(raft): partition etcd dispatcher by message type to isolate heartbeats Adds an opt-in 4-lane dispatcher layout (heartbeat / replication / snapshot / other) behind the ELASTICKV_RAFT_DISPATCHER_LANES feature flag. Default behavior (flag unset or "0") is byte-for-byte identical to the current 2-lane (heartbeat + normal) implementation introduced in PR #522. When enabled, each per-peer peerQueues gains three extra channels and three extra goroutines so that: - MsgApp / MsgAppResp run on their own replication lane, independent of heartbeats and of MsgSnap. - MsgSnap (bulky, rare) runs on its own snapshot lane and can no longer stall subsequent MsgApps behind a multi-MiB transfer. - Heartbeat / vote / read-index traffic keeps its dedicated priority lane, so heartbeats still cannot be starved under write load. Per-peer within-type ordering (the raft invariant that matters for MsgApp) is preserved because a given peer MsgApp stream still shares one lane and one worker. dispatchDropCount and postDispatchReport firing semantics are unchanged, and the existing drop paths have been factored to close whichever subset of lanes is actually wired up. Rollout plan: ship default-off, enable in staging with ELASTICKV_RAFT_DISPATCHER_LANES=1 and watch dispatchDropCount + dispatchErrorCount + Prometheus heartbeat-drop gauge for 24h before flipping production. The flag can be removed once the 4-lane path has soaked. --- internal/raftengine/etcd/engine.go | 145 +++++++++++++++---- internal/raftengine/etcd/engine_test.go | 178 ++++++++++++++++++++++++ 2 files changed, 297 insertions(+), 26 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 9e880017..f4ed9974 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "io" "log/slog" + "os" "path/filepath" "runtime/debug" "sort" @@ -65,11 +66,26 @@ const ( // upside is that a ~5 s transient pause (election-timeout scale) // no longer drops heartbeats and force the peers' lease to expire. defaultHeartbeatBufPerPeer = 512 - defaultSnapshotEvery = 10_000 - defaultSnapshotQueueSize = 1 - defaultAdminPollInterval = 10 * time.Millisecond - defaultMaxPendingConfigs = 64 - unknownLastContact = time.Duration(-1) + // defaultSnapshotLaneBufPerPeer sizes the per-peer MsgSnap lane when the + // 4-lane dispatcher mode is enabled (see ELASTICKV_RAFT_DISPATCHER_LANES). + // MsgSnap is rare and bulky; 4 is enough to absorb a retry or two without + // holding up MsgApp replication behind a multi-MiB payload. + defaultSnapshotLaneBufPerPeer = 4 + // defaultOtherLaneBufPerPeer sizes the per-peer fallback lane for message + // types not classified as heartbeat/replication/snapshot (e.g. surprise + // locally-addressed control types). Small buffer: traffic volume is tiny. + defaultOtherLaneBufPerPeer = 16 + // dispatcherLanesEnvVar toggles the 4-lane dispatcher (heartbeat / + // replication / snapshot / other). When unset or "0", the legacy + // 2-lane layout (heartbeat + normal) is used. Opt-in by design: the + // raft hot path is high blast radius and a regression here can cause + // cluster-wide elections. + dispatcherLanesEnvVar = "ELASTICKV_RAFT_DISPATCHER_LANES" + defaultSnapshotEvery = 10_000 + defaultSnapshotQueueSize = 1 + defaultAdminPollInterval = 10 * time.Millisecond + defaultMaxPendingConfigs = 64 + unknownLastContact = time.Duration(-1) proposalEnvelopeVersion = byte(0x01) readContextVersion = byte(0x02) @@ -152,15 +168,19 @@ type Engine struct { dispatchReportCh chan dispatchReport peerDispatchers map[uint64]*peerQueues perPeerQueueSize int - dispatchStopCh chan struct{} - dispatchCtx context.Context - dispatchCancel context.CancelFunc - snapshotReqCh chan snapshotRequest - snapshotResCh chan snapshotResult - snapshotStopCh chan struct{} - closeCh chan struct{} - doneCh chan struct{} - startedCh chan struct{} + // dispatcherLanesEnabled toggles the 4-lane dispatcher layout. Captured + // once at Open from ELASTICKV_RAFT_DISPATCHER_LANES so the run-time code + // path is branch-free per message and does not need to re-read env vars. + dispatcherLanesEnabled bool + dispatchStopCh chan struct{} + dispatchCtx context.Context + dispatchCancel context.CancelFunc + snapshotReqCh chan snapshotRequest + snapshotResCh chan snapshotResult + snapshotStopCh chan struct{} + closeCh chan struct{} + doneCh chan struct{} + startedCh chan struct{} leaderReady chan struct{} leaderOnce sync.Once @@ -311,11 +331,23 @@ type dispatchRequest struct { // peerQueues holds separate dispatch channels per peer so that heartbeats // are never blocked behind large log-entry RPCs. +// +// Legacy 2-lane layout (default): heartbeat + normal. +// +// 4-lane layout (opt-in via ELASTICKV_RAFT_DISPATCHER_LANES=1): heartbeat + +// replication (MsgApp/MsgAppResp) + snapshot (MsgSnap) + other. Each lane +// gets its own goroutine so a bulky MsgSnap transfer cannot stall MsgApp +// replication and vice versa. Per-peer ordering within a given message type +// is preserved because a single peer's MsgApp stream all share one lane and +// one worker. type peerQueues struct { - normal chan dispatchRequest - heartbeat chan dispatchRequest - ctx context.Context - cancel context.CancelFunc + normal chan dispatchRequest + heartbeat chan dispatchRequest + replication chan dispatchRequest // 4-lane mode only; nil otherwise + snapshot chan dispatchRequest // 4-lane mode only; nil otherwise + other chan dispatchRequest // 4-lane mode only; nil otherwise + ctx context.Context + cancel context.CancelFunc } type preparedOpenState struct { @@ -464,6 +496,7 @@ func (e *Engine) initTransport(cfg OpenConfig) { // Size the per-peer dispatch buffer to match the Raft inflight limit so that // the channel never drops messages that Raft's flow-control would permit. e.perPeerQueueSize = cfg.MaxInflightMsg + e.dispatcherLanesEnabled = dispatcherLanesEnabledFromEnv() e.dispatchStopCh = make(chan struct{}) e.transport.SetSpoolDir(cfg.DataDir) e.transport.SetFSMSnapDir(e.fsmSnapDir) @@ -1438,10 +1471,7 @@ func (e *Engine) enqueueDispatchMessage(msg raftpb.Message) error { e.recordDroppedDispatch(msg) return nil } - ch := pd.normal - if isPriorityMsg(msg.Type) { - ch = pd.heartbeat - } + ch := e.selectDispatchLane(pd, msg.Type) // Avoid the expensive deep-clone in prepareDispatchRequest when the channel // is already full. The len/cap check is safe here because this function is // only ever called from the single engine event-loop goroutine. @@ -1474,6 +1504,38 @@ func isPriorityMsg(t raftpb.MessageType) bool { t == raftpb.MsgTimeoutNow } +// selectDispatchLane picks the per-peer channel for msgType. In the legacy +// 2-lane layout it returns pd.heartbeat for priority control traffic and +// pd.normal for everything else. In the 4-lane layout it additionally +// partitions the non-heartbeat traffic so that MsgApp/MsgAppResp and MsgSnap +// do not share a goroutine and cannot block each other. +func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) chan dispatchRequest { + if !e.dispatcherLanesEnabled { + if isPriorityMsg(msgType) { + return pd.heartbeat + } + return pd.normal + } + switch msgType { + case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: + return pd.heartbeat + case raftpb.MsgApp, raftpb.MsgAppResp: + return pd.replication + case raftpb.MsgSnap: + return pd.snapshot + case raftpb.MsgVote, raftpb.MsgVoteResp, + raftpb.MsgPreVote, raftpb.MsgPreVoteResp, + raftpb.MsgReadIndex, raftpb.MsgReadIndexResp, + raftpb.MsgTimeoutNow: + // Election / read-index traffic is small and latency-sensitive but + // rare; put it on the heartbeat lane so it keeps its priority + // treatment and, like today, is never stuck behind MsgApp. + return pd.heartbeat + default: + return pd.other + } +} + func (e *Engine) applyReadySnapshot(snapshot raftpb.Snapshot) error { if etcdraft.IsEmptySnap(snapshot) { return nil @@ -2624,19 +2686,51 @@ func (e *Engine) startPeerDispatcher(nodeID uint64) { } ctx, cancel := context.WithCancel(baseCtx) pd := &peerQueues{ - normal: make(chan dispatchRequest, size), heartbeat: make(chan dispatchRequest, defaultHeartbeatBufPerPeer), ctx: ctx, cancel: cancel, } + var workers []chan dispatchRequest + if e.dispatcherLanesEnabled { + // 4-lane layout: split MsgApp/MsgAppResp (replication), MsgSnap + // (snapshot), and misc (other) onto independent goroutines so a + // bulky snapshot transfer cannot stall replication. Each channel + // still serves a single peer, so within-type ordering (the raft + // invariant we care about for MsgApp) is preserved. + pd.replication = make(chan dispatchRequest, size) + pd.snapshot = make(chan dispatchRequest, defaultSnapshotLaneBufPerPeer) + pd.other = make(chan dispatchRequest, defaultOtherLaneBufPerPeer) + workers = []chan dispatchRequest{pd.heartbeat, pd.replication, pd.snapshot, pd.other} + } else { + pd.normal = make(chan dispatchRequest, size) + workers = []chan dispatchRequest{pd.normal, pd.heartbeat} + } e.peerDispatchers[nodeID] = pd - workers := []chan dispatchRequest{pd.normal, pd.heartbeat} e.dispatchWG.Add(len(workers)) for _, w := range workers { go e.runDispatchWorker(ctx, w) } } +// dispatcherLanesEnabledFromEnv returns true when the 4-lane dispatcher has +// been explicitly opted into via ELASTICKV_RAFT_DISPATCHER_LANES. Any value +// other than "" and "0" enables it so operators can use "1", "true", etc. +func dispatcherLanesEnabledFromEnv() bool { + v := os.Getenv(dispatcherLanesEnvVar) + return v != "" && v != "0" +} + +// closePeerLanes closes every non-nil dispatch channel on pd so that the +// drain loops in runDispatchWorker exit. It is safe to call with either the +// 2-lane or 4-lane layout because unused lanes are nil. +func closePeerLanes(pd *peerQueues) { + for _, ch := range []chan dispatchRequest{pd.heartbeat, pd.normal, pd.replication, pd.snapshot, pd.other} { + if ch != nil { + close(ch) + } + } +} + // runDispatchWorker drains ch until the channel is closed, the engine stops, // or the per-peer context is cancelled (e.g. by removePeer). The ctx.Done() // arm ensures old workers exit promptly when a peer is removed and @@ -2917,8 +3011,7 @@ func (e *Engine) removePeer(nodeID uint64) { if pd, ok := e.peerDispatchers[nodeID]; ok { delete(e.peerDispatchers, nodeID) pd.cancel() // cancel any in-flight RPC for this peer immediately - close(pd.normal) - close(pd.heartbeat) + closePeerLanes(pd) } } } diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 5522c940..180bfa05 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1635,3 +1635,181 @@ func TestErrNotLeaderMatchesRaftEngineSentinel(t *testing.T) { require.True(t, errors.Is(errors.WithStack(errLeadershipTransferNotLeader), raftengine.ErrNotLeader)) require.True(t, errors.Is(errors.WithStack(errLeadershipTransferInProgress), raftengine.ErrLeadershipTransferInProgress)) } + +// TestSelectDispatchLane_LegacyTwoLane verifies that, when the 4-lane +// dispatcher is disabled (default), messages are routed exactly as before: +// priority control traffic → heartbeat lane, everything else → normal lane. +func TestSelectDispatchLane_LegacyTwoLane(t *testing.T) { + t.Parallel() + engine := &Engine{dispatcherLanesEnabled: false} + pd := &peerQueues{ + normal: make(chan dispatchRequest, 1), + heartbeat: make(chan dispatchRequest, 1), + } + + cases := map[raftpb.MessageType]chan dispatchRequest{ + raftpb.MsgHeartbeat: pd.heartbeat, + raftpb.MsgHeartbeatResp: pd.heartbeat, + raftpb.MsgReadIndex: pd.heartbeat, + raftpb.MsgVote: pd.heartbeat, + raftpb.MsgTimeoutNow: pd.heartbeat, + raftpb.MsgApp: pd.normal, + raftpb.MsgAppResp: pd.normal, + raftpb.MsgSnap: pd.normal, + } + for mt, want := range cases { + got := engine.selectDispatchLane(pd, mt) + require.Equalf(t, want, got, "legacy mode routing for %s", mt) + } +} + +// TestSelectDispatchLane_FourLane verifies that, when ELASTICKV_RAFT_DISPATCHER_LANES +// is enabled, MsgApp/MsgAppResp goes to the replication lane, MsgSnap goes to +// the snapshot lane, and heartbeats/votes/read-index share the priority lane. +func TestSelectDispatchLane_FourLane(t *testing.T) { + t.Parallel() + engine := &Engine{dispatcherLanesEnabled: true} + pd := &peerQueues{ + heartbeat: make(chan dispatchRequest, 1), + replication: make(chan dispatchRequest, 1), + snapshot: make(chan dispatchRequest, 1), + other: make(chan dispatchRequest, 1), + } + + cases := map[raftpb.MessageType]chan dispatchRequest{ + raftpb.MsgHeartbeat: pd.heartbeat, + raftpb.MsgHeartbeatResp: pd.heartbeat, + raftpb.MsgVote: pd.heartbeat, + raftpb.MsgVoteResp: pd.heartbeat, + raftpb.MsgPreVote: pd.heartbeat, + raftpb.MsgPreVoteResp: pd.heartbeat, + raftpb.MsgReadIndex: pd.heartbeat, + raftpb.MsgReadIndexResp: pd.heartbeat, + raftpb.MsgTimeoutNow: pd.heartbeat, + raftpb.MsgApp: pd.replication, + raftpb.MsgAppResp: pd.replication, + raftpb.MsgSnap: pd.snapshot, + } + for mt, want := range cases { + got := engine.selectDispatchLane(pd, mt) + require.Equalf(t, want, got, "4-lane mode routing for %s", mt) + } +} + +// TestFourLaneDispatcher_SnapshotDoesNotBlockReplication exercises the key +// correctness invariant for the 4-lane layout: a stuck MsgSnap transfer must +// not prevent MsgApp from being dispatched, because they now run on +// independent goroutines. +func TestFourLaneDispatcher_SnapshotDoesNotBlockReplication(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + replicationDone := make(chan struct{}, 1) + snapshotBlocking := make(chan struct{}) + engine := &Engine{ + nodeID: 1, + peerDispatchers: make(map[uint64]*peerQueues), + perPeerQueueSize: 4, + dispatcherLanesEnabled: true, + dispatchStopCh: make(chan struct{}), + dispatchCtx: ctx, + dispatchCancel: cancel, + } + engine.dispatchFn = func(dctx context.Context, req dispatchRequest) error { + switch req.msg.Type { + case raftpb.MsgSnap: + // Block the snapshot lane until the test releases it. The + // replication lane should keep flowing in the meantime. + select { + case <-snapshotBlocking: + case <-dctx.Done(): + } + case raftpb.MsgApp: + replicationDone <- struct{}{} + } + return nil + } + + engine.upsertPeer(Peer{NodeID: 2, ID: "peer2", Address: "localhost:2"}) + pd, ok := engine.peerDispatchers[2] + require.True(t, ok) + require.NotNil(t, pd.replication) + require.NotNil(t, pd.snapshot) + + require.NoError(t, engine.enqueueDispatchMessage(raftpb.Message{Type: raftpb.MsgSnap, To: 2})) + require.NoError(t, engine.enqueueDispatchMessage(raftpb.Message{Type: raftpb.MsgApp, To: 2})) + + select { + case <-replicationDone: + case <-time.After(time.Second): + t.Fatal("MsgApp did not dispatch while MsgSnap was stuck — lanes are not independent") + } + + close(snapshotBlocking) + close(engine.dispatchStopCh) + engine.dispatchCancel() + engine.dispatchWG.Wait() +} + +// TestFourLaneDispatcher_RemovePeerClosesAllLanes confirms removePeer closes +// every lane (not just normal/heartbeat) so no worker goroutine leaks under +// the opt-in 4-lane layout. +func TestFourLaneDispatcher_RemovePeerClosesAllLanes(t *testing.T) { + stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + pd := &peerQueues{ + heartbeat: make(chan dispatchRequest, 4), + replication: make(chan dispatchRequest, 4), + snapshot: make(chan dispatchRequest, 4), + other: make(chan dispatchRequest, 4), + ctx: ctx, + cancel: cancel, + } + engine := &Engine{ + nodeID: 1, + peers: map[uint64]Peer{2: {NodeID: 2, ID: "peer2"}}, + peerDispatchers: map[uint64]*peerQueues{2: pd}, + dispatchStopCh: stopCh, + dispatcherLanesEnabled: true, + } + engine.dispatchWG.Add(4) + go engine.runDispatchWorker(ctx, pd.heartbeat) + go engine.runDispatchWorker(ctx, pd.replication) + go engine.runDispatchWorker(ctx, pd.snapshot) + go engine.runDispatchWorker(ctx, pd.other) + + engine.removePeer(2) + + done := make(chan struct{}) + go func() { + engine.dispatchWG.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("4-lane dispatch workers did not exit after peer removal") + } + + // Subsequent sends to the removed peer must be dropped without panic. + require.NoError(t, engine.enqueueDispatchMessage(raftpb.Message{Type: raftpb.MsgApp, To: 2})) +} + +// TestDispatcherLanesEnabledFromEnv pins env-var parsing so a regression in +// the feature flag can't silently flip the default. +func TestDispatcherLanesEnabledFromEnv(t *testing.T) { + cases := []struct { + val string + want bool + }{ + {"", false}, + {"0", false}, + {"1", true}, + {"true", true}, + {"yes", true}, + } + for _, c := range cases { + t.Setenv(dispatcherLanesEnvVar, c.val) + require.Equalf(t, c.want, dispatcherLanesEnabledFromEnv(), "env=%q", c.val) + } +} From 95370c71848910604aaab04f0d2ca8c4001239e8 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 05:46:49 +0900 Subject: [PATCH 02/10] refactor(raft): selectDispatchLane reuses isPriorityMsg Route priority control traffic through isPriorityMsg up front in both legacy and 4-lane modes, dropping the duplicated message-type list in the switch. Behaviour is identical: heartbeats, votes, read-index and timeout-now still ride pd.heartbeat, MsgApp/MsgAppResp go to pd.replication, MsgSnap to pd.snapshot, and everything else falls through to pd.other (4-lane) or pd.normal (legacy). --- internal/raftengine/etcd/engine.go | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index f4ed9974..a8660fa4 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -1510,27 +1510,20 @@ func isPriorityMsg(t raftpb.MessageType) bool { // partitions the non-heartbeat traffic so that MsgApp/MsgAppResp and MsgSnap // do not share a goroutine and cannot block each other. func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) chan dispatchRequest { + // Priority control traffic (heartbeats, votes, read-index, timeout-now) + // always rides the heartbeat lane in both layouts so it keeps its + // low-latency treatment and is never stuck behind MsgApp payloads. + if isPriorityMsg(msgType) { + return pd.heartbeat + } if !e.dispatcherLanesEnabled { - if isPriorityMsg(msgType) { - return pd.heartbeat - } return pd.normal } switch msgType { - case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp: - return pd.heartbeat case raftpb.MsgApp, raftpb.MsgAppResp: return pd.replication case raftpb.MsgSnap: return pd.snapshot - case raftpb.MsgVote, raftpb.MsgVoteResp, - raftpb.MsgPreVote, raftpb.MsgPreVoteResp, - raftpb.MsgReadIndex, raftpb.MsgReadIndexResp, - raftpb.MsgTimeoutNow: - // Election / read-index traffic is small and latency-sensitive but - // rare; put it on the heartbeat lane so it keeps its priority - // treatment and, like today, is never stuck behind MsgApp. - return pd.heartbeat default: return pd.other } From 6ff5fcec2e4a328df71650fc20d34f4f8f545a98 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 14:11:34 +0900 Subject: [PATCH 03/10] fix(raft): parse ELASTICKV_RAFT_DISPATCHER_LANES via strconv.ParseBool Replace the permissive "non-empty and not 0" check in dispatcherLanesEnabledFromEnv with strconv.ParseBool so the env var accepts only the standard boolean tokens (1/t/T/TRUE/true/True and 0/f/F/FALSE/false/False). Unrecognized or empty values default to false. Addresses Gemini review on PR #577. --- internal/raftengine/etcd/engine.go | 15 +++++++++++---- internal/raftengine/etcd/engine_test.go | 4 +++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index a8660fa4..d448cc71 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -11,6 +11,7 @@ import ( "runtime/debug" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -2706,11 +2707,17 @@ func (e *Engine) startPeerDispatcher(nodeID uint64) { } // dispatcherLanesEnabledFromEnv returns true when the 4-lane dispatcher has -// been explicitly opted into via ELASTICKV_RAFT_DISPATCHER_LANES. Any value -// other than "" and "0" enables it so operators can use "1", "true", etc. +// been explicitly opted into via ELASTICKV_RAFT_DISPATCHER_LANES. The value +// is parsed with strconv.ParseBool, which accepts the standard tokens +// (1, t, T, TRUE, true, True enable; 0, f, F, FALSE, false, False disable). +// An empty string or any unrecognized value disables the feature. func dispatcherLanesEnabledFromEnv() bool { - v := os.Getenv(dispatcherLanesEnvVar) - return v != "" && v != "0" + v := strings.TrimSpace(os.Getenv(dispatcherLanesEnvVar)) + enabled, err := strconv.ParseBool(v) + if err != nil { + return false + } + return enabled } // closePeerLanes closes every non-nil dispatch channel on pd so that the diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 180bfa05..d2ec97c7 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1806,7 +1806,9 @@ func TestDispatcherLanesEnabledFromEnv(t *testing.T) { {"0", false}, {"1", true}, {"true", true}, - {"yes", true}, + {"TRUE", true}, + {"false", false}, + {"yes", false}, } for _, c := range cases { t.Setenv(dispatcherLanesEnvVar, c.val) From c385e3f12f2e20b77bd05224a590e5cf6d7eb584 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 16:35:36 +0900 Subject: [PATCH 04/10] test(raft): cover PreVote/VoteResp/ReadIndexResp in dispatch lane tests --- internal/raftengine/etcd/engine_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index d2ec97c7..cd668f51 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1651,7 +1651,11 @@ func TestSelectDispatchLane_LegacyTwoLane(t *testing.T) { raftpb.MsgHeartbeat: pd.heartbeat, raftpb.MsgHeartbeatResp: pd.heartbeat, raftpb.MsgReadIndex: pd.heartbeat, + raftpb.MsgReadIndexResp: pd.heartbeat, raftpb.MsgVote: pd.heartbeat, + raftpb.MsgVoteResp: pd.heartbeat, + raftpb.MsgPreVote: pd.heartbeat, + raftpb.MsgPreVoteResp: pd.heartbeat, raftpb.MsgTimeoutNow: pd.heartbeat, raftpb.MsgApp: pd.normal, raftpb.MsgAppResp: pd.normal, From 676f1d1cfd9f7d9127294a78f35e37e0a045a733 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 18:44:27 +0900 Subject: [PATCH 05/10] fix(raft): exhaustive switch + grammar nit --- internal/raftengine/etcd/engine.go | 4 ++-- internal/raftengine/etcd/engine_test.go | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index d448cc71..9fb83db9 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -65,7 +65,7 @@ const ( // slower than heartbeat tick issuance. Heartbeats are tiny // (< ~100 B), so 512 × numPeers is ≪ 1 MB total memory; the // upside is that a ~5 s transient pause (election-timeout scale) - // no longer drops heartbeats and force the peers' lease to expire. + // no longer drops heartbeats and forces the peers' lease to expire. defaultHeartbeatBufPerPeer = 512 // defaultSnapshotLaneBufPerPeer sizes the per-peer MsgSnap lane when the // 4-lane dispatcher mode is enabled (see ELASTICKV_RAFT_DISPATCHER_LANES). @@ -1520,7 +1520,7 @@ func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) if !e.dispatcherLanesEnabled { return pd.normal } - switch msgType { + switch msgType { //nolint:exhaustive // only MsgApp/MsgAppResp/MsgSnap need dedicated lanes; the rest falls through to pd.other case raftpb.MsgApp, raftpb.MsgAppResp: return pd.replication case raftpb.MsgSnap: diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index cd668f51..8720d4ae 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1720,7 +1720,7 @@ func TestFourLaneDispatcher_SnapshotDoesNotBlockReplication(t *testing.T) { dispatchCancel: cancel, } engine.dispatchFn = func(dctx context.Context, req dispatchRequest) error { - switch req.msg.Type { + switch req.msg.Type { //nolint:exhaustive // test only exercises MsgSnap and MsgApp; other types are irrelevant here case raftpb.MsgSnap: // Block the snapshot lane until the test releases it. The // replication lane should keep flowing in the meantime. @@ -1730,6 +1730,8 @@ func TestFourLaneDispatcher_SnapshotDoesNotBlockReplication(t *testing.T) { } case raftpb.MsgApp: replicationDone <- struct{}{} + default: + // Other MessageType values are not exercised by this test. } return nil } From dbe21a6d2ea391ba8eed6297137035f463dbc22e Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 20:19:00 +0900 Subject: [PATCH 06/10] refactor(raft): enumerate raftpb.MessageType in dispatch lane switch Address gemini medium review on PR #577: replace the //nolint:exhaustive on selectDispatchLane with an exhaustive switch that lists every raftpb.MessageType. Future additions to the upstream enum now fail the exhaustive linter instead of being silently routed to pd.other. Behaviour is preserved: MsgApp/MsgAppResp stay on the replication lane, MsgSnap stays on the snapshot lane, and all local-only / feedback messages continue to route to pd.other. MsgSnapStatus is grouped with MsgSnap for readability (it is a local feedback message and is never dispatched to peers in practice). --- internal/raftengine/etcd/engine.go | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 9fb83db9..3ae63d2a 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -1520,14 +1520,37 @@ func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) if !e.dispatcherLanesEnabled { return pd.normal } - switch msgType { //nolint:exhaustive // only MsgApp/MsgAppResp/MsgSnap need dedicated lanes; the rest falls through to pd.other + // Enumerate every raftpb.MessageType explicitly so that future additions + // to the enum surface as exhaustive-lint failures instead of being + // silently routed to pd.other. Priority control traffic is handled by + // the isPriorityMsg branch above; listing those types here would be + // unreachable, so they are grouped into the heartbeat case for + // documentation only and defensively return pd.heartbeat. + switch msgType { case raftpb.MsgApp, raftpb.MsgAppResp: return pd.replication - case raftpb.MsgSnap: + case raftpb.MsgSnap, raftpb.MsgSnapStatus: return pd.snapshot - default: + case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp, + raftpb.MsgVote, raftpb.MsgVoteResp, + raftpb.MsgPreVote, raftpb.MsgPreVoteResp, + raftpb.MsgReadIndex, raftpb.MsgReadIndexResp, + raftpb.MsgTimeoutNow: + // Unreachable: isPriorityMsg already short-circuited these above. + // Listed for exhaustiveness; keep them on the heartbeat lane so any + // future refactor that removes the short-circuit stays correct. + return pd.heartbeat + case raftpb.MsgHup, raftpb.MsgBeat, raftpb.MsgProp, + raftpb.MsgUnreachable, raftpb.MsgCheckQuorum, raftpb.MsgTransferLeader, + raftpb.MsgStorageAppend, raftpb.MsgStorageAppendResp, + raftpb.MsgStorageApply, raftpb.MsgStorageApplyResp, + raftpb.MsgForgetLeader: return pd.other } + // If a new raftpb.MessageType is added upstream, the exhaustive linter + // will flag the switch above. This fallback keeps runtime behaviour + // backwards-compatible with the previous default branch. + return pd.other } func (e *Engine) applyReadySnapshot(snapshot raftpb.Snapshot) error { From 3076cfbd600ee262953d7b3ff326524ddd516a25 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 21:02:31 +0900 Subject: [PATCH 07/10] refactor(raft): drop unreachable message types from selectDispatchLane Remove cases for MsgHup, MsgBeat, MsgProp, MsgUnreachable, MsgSnapStatus, MsgCheckQuorum and MsgStorage{Append,Apply}{,Resp} from selectDispatchLane. All of these are unreachable in this code path: - MsgHup/MsgBeat/MsgUnreachable/MsgSnapStatus/MsgCheckQuorum and the MsgStorage* family are local messages per etcd/raft's IsLocalMsg table, and skipDispatchMessage already drops them before enqueueDispatchMessage calls selectDispatchLane. - MsgProp is never emitted outbound because DisableProposalForwarding is set and handleProposal rejects non-leader proposals, so no follower ever forwards a proposal to the leader. - The priority control types (MsgHeartbeat/Resp, votes, read-index, MsgTimeoutNow) are short-circuited by the isPriorityMsg branch above the switch. The switch now lists only the types that can actually reach it (MsgApp/MsgAppResp, MsgSnap, MsgTransferLeader, MsgForgetLeader) plus a fallback default for any future raftpb additions. The exhaustive linter is silenced with a targeted //nolint:exhaustive that references skipDispatchMessage and isPriorityMsg so future reviewers understand why the switch isn't exhaustive. --- internal/raftengine/etcd/engine.go | 43 +++++++++++++----------------- 1 file changed, 18 insertions(+), 25 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 3ae63d2a..581d411b 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -1520,36 +1520,29 @@ func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) if !e.dispatcherLanesEnabled { return pd.normal } - // Enumerate every raftpb.MessageType explicitly so that future additions - // to the enum surface as exhaustive-lint failures instead of being - // silently routed to pd.other. Priority control traffic is handled by - // the isPriorityMsg branch above; listing those types here would be - // unreachable, so they are grouped into the heartbeat case for - // documentation only and defensively return pd.heartbeat. - switch msgType { + // Only types that can actually reach this point are listed. Everything + // filtered by skipDispatchMessage (etcdraft.IsLocalMsg: MsgHup, MsgBeat, + // MsgUnreachable, MsgSnapStatus, MsgCheckQuorum, MsgStorageAppend/Resp, + // MsgStorageApply/Resp) is dropped before this switch is reached. + // MsgProp is also unreachable: DisableProposalForwarding is set and + // handleProposal rejects non-leader proposals, so no outbound MsgProp + // is ever emitted. Priority control traffic (MsgHeartbeat/Resp, votes, + // read-index, MsgTimeoutNow) is short-circuited by the isPriorityMsg + // branch above. The exhaustive lint is suppressed because exhaustively + // listing the filtered types would reintroduce the dead cases this + // refactor removed. + switch msgType { //nolint:exhaustive // filtered types handled by skipDispatchMessage + isPriorityMsg; see comment above. case raftpb.MsgApp, raftpb.MsgAppResp: return pd.replication - case raftpb.MsgSnap, raftpb.MsgSnapStatus: + case raftpb.MsgSnap: return pd.snapshot - case raftpb.MsgHeartbeat, raftpb.MsgHeartbeatResp, - raftpb.MsgVote, raftpb.MsgVoteResp, - raftpb.MsgPreVote, raftpb.MsgPreVoteResp, - raftpb.MsgReadIndex, raftpb.MsgReadIndexResp, - raftpb.MsgTimeoutNow: - // Unreachable: isPriorityMsg already short-circuited these above. - // Listed for exhaustiveness; keep them on the heartbeat lane so any - // future refactor that removes the short-circuit stays correct. - return pd.heartbeat - case raftpb.MsgHup, raftpb.MsgBeat, raftpb.MsgProp, - raftpb.MsgUnreachable, raftpb.MsgCheckQuorum, raftpb.MsgTransferLeader, - raftpb.MsgStorageAppend, raftpb.MsgStorageAppendResp, - raftpb.MsgStorageApply, raftpb.MsgStorageApplyResp, - raftpb.MsgForgetLeader: + case raftpb.MsgTransferLeader, raftpb.MsgForgetLeader: return pd.other } - // If a new raftpb.MessageType is added upstream, the exhaustive linter - // will flag the switch above. This fallback keeps runtime behaviour - // backwards-compatible with the previous default branch. + // Fallback for any raftpb.MessageType added upstream that slips past + // skipDispatchMessage and isPriorityMsg. Routing unknown non-priority + // traffic onto pd.other keeps runtime behaviour compatible with the + // pre-lanes default branch. return pd.other } From 7d916623f435deb1f8da6329914f4e8d8105eb26 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 21:16:07 +0900 Subject: [PATCH 08/10] fix(raft): panic on outbound MsgProp to surface config regression Adds an explicit MsgProp case in selectDispatchLane that panics with a message referencing DisableProposalForwarding. Today the case is unreachable because DisableProposalForwarding=true prevents outbound MsgProp, but the prior default fallback silently routed any future MsgProp to pd.other. Turning that silent misrouting into a loud failure flags the invariant if proposal forwarding is ever re-enabled upstream or in our raft.Config without also updating this switch. --- internal/raftengine/etcd/engine.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 581d411b..9d48a241 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -1538,6 +1538,13 @@ func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) return pd.snapshot case raftpb.MsgTransferLeader, raftpb.MsgForgetLeader: return pd.other + case raftpb.MsgProp: + // DisableProposalForwarding=true (see raft.Config construction in this + // file) guarantees no outbound MsgProp is ever emitted, so this case + // is unreachable today. If it does fire, proposal forwarding was + // re-enabled and we must decide the correct lane explicitly rather + // than silently lumping proposals into pd.other. + panic("selectDispatchLane: outbound MsgProp requires explicit lane once DisableProposalForwarding is disabled") } // Fallback for any raftpb.MessageType added upstream that slips past // skipDispatchMessage and isPriorityMsg. Routing unknown non-priority From 767f40fe12d7315c294afee269948ddb1f7a5c15 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 22:02:34 +0900 Subject: [PATCH 09/10] fix(raft): log + fallback on unexpected MsgProp instead of panic Panicking inside a raft engine dispatcher goroutine would crash the entire node. DisableProposalForwarding=true still guarantees outbound MsgProp should not reach selectDispatchLane, but if it does (upstream etcd/raft or our config changed), log loudly and route to pd.other rather than taking the process down. Per gemini review on PR #577. --- internal/raftengine/etcd/engine.go | 16 +++++++++++----- internal/raftengine/etcd/engine_test.go | 20 ++++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 9d48a241..fd980592 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -1540,11 +1540,17 @@ func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) return pd.other case raftpb.MsgProp: // DisableProposalForwarding=true (see raft.Config construction in this - // file) guarantees no outbound MsgProp is ever emitted, so this case - // is unreachable today. If it does fire, proposal forwarding was - // re-enabled and we must decide the correct lane explicitly rather - // than silently lumping proposals into pd.other. - panic("selectDispatchLane: outbound MsgProp requires explicit lane once DisableProposalForwarding is disabled") + // file) should guarantee no outbound MsgProp is ever emitted, so this + // case is unreachable today. If it does fire, either upstream + // etcd/raft semantics or our config changed; log loudly so operators + // notice, but keep the node up by routing to the catch-all lane + // rather than panicking inside a raft engine goroutine (which would + // crash the whole process). + slog.Error("selectDispatchLane: unexpected outbound MsgProp encountered", + slog.Uint64("raft_node_id", e.nodeID), + slog.String("type", msgType.String()), + ) + return pd.other } // Fallback for any raftpb.MessageType added upstream that slips past // skipDispatchMessage and isPriorityMsg. Routing unknown non-priority diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 8720d4ae..a39fcca9 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1700,6 +1700,26 @@ func TestSelectDispatchLane_FourLane(t *testing.T) { } } +// TestSelectDispatchLane_MsgPropFallsBackToOther verifies that, even though +// DisableProposalForwarding=true should prevent outbound MsgProp from ever +// reaching selectDispatchLane, an unexpected MsgProp is routed to the +// catch-all lane rather than panicking. A panic in a raft engine goroutine +// would crash the whole node; log-and-fallback keeps it running. +func TestSelectDispatchLane_MsgPropFallsBackToOther(t *testing.T) { + t.Parallel() + engine := &Engine{nodeID: 1, dispatcherLanesEnabled: true} + pd := &peerQueues{ + heartbeat: make(chan dispatchRequest, 1), + replication: make(chan dispatchRequest, 1), + snapshot: make(chan dispatchRequest, 1), + other: make(chan dispatchRequest, 1), + } + require.NotPanics(t, func() { + got := engine.selectDispatchLane(pd, raftpb.MsgProp) + require.Equal(t, pd.other, got) + }) +} + // TestFourLaneDispatcher_SnapshotDoesNotBlockReplication exercises the key // correctness invariant for the 4-lane layout: a stuck MsgSnap transfer must // not prevent MsgApp from being dispatched, because they now run on From cdad4924bdc481f63eb399b5992de41f15977342 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Wed, 22 Apr 2026 22:17:23 +0900 Subject: [PATCH 10/10] refactor(raft): drop unreachable MsgProp case, rely on default fallback --- internal/raftengine/etcd/engine.go | 27 ++++++------------------- internal/raftengine/etcd/engine_test.go | 12 +++++------ 2 files changed, 12 insertions(+), 27 deletions(-) diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index fd980592..e73fe1d7 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -1524,33 +1524,18 @@ func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) // filtered by skipDispatchMessage (etcdraft.IsLocalMsg: MsgHup, MsgBeat, // MsgUnreachable, MsgSnapStatus, MsgCheckQuorum, MsgStorageAppend/Resp, // MsgStorageApply/Resp) is dropped before this switch is reached. - // MsgProp is also unreachable: DisableProposalForwarding is set and - // handleProposal rejects non-leader proposals, so no outbound MsgProp - // is ever emitted. Priority control traffic (MsgHeartbeat/Resp, votes, - // read-index, MsgTimeoutNow) is short-circuited by the isPriorityMsg - // branch above. The exhaustive lint is suppressed because exhaustively - // listing the filtered types would reintroduce the dead cases this - // refactor removed. - switch msgType { //nolint:exhaustive // filtered types handled by skipDispatchMessage + isPriorityMsg; see comment above. + // Priority control traffic (MsgHeartbeat/Resp, votes, read-index, + // MsgTimeoutNow) is short-circuited by the isPriorityMsg branch above. + // MsgProp is not listed: DisableProposalForwarding=true ensures no + // outbound MsgProp from this engine; if that assumption ever breaks the + // message falls through to the pd.other return below. + switch msgType { //nolint:exhaustive // see skipDispatchMessage / DisableProposalForwarding case raftpb.MsgApp, raftpb.MsgAppResp: return pd.replication case raftpb.MsgSnap: return pd.snapshot case raftpb.MsgTransferLeader, raftpb.MsgForgetLeader: return pd.other - case raftpb.MsgProp: - // DisableProposalForwarding=true (see raft.Config construction in this - // file) should guarantee no outbound MsgProp is ever emitted, so this - // case is unreachable today. If it does fire, either upstream - // etcd/raft semantics or our config changed; log loudly so operators - // notice, but keep the node up by routing to the catch-all lane - // rather than panicking inside a raft engine goroutine (which would - // crash the whole process). - slog.Error("selectDispatchLane: unexpected outbound MsgProp encountered", - slog.Uint64("raft_node_id", e.nodeID), - slog.String("type", msgType.String()), - ) - return pd.other } // Fallback for any raftpb.MessageType added upstream that slips past // skipDispatchMessage and isPriorityMsg. Routing unknown non-priority diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index a39fcca9..3e0119a4 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1700,12 +1700,12 @@ func TestSelectDispatchLane_FourLane(t *testing.T) { } } -// TestSelectDispatchLane_MsgPropFallsBackToOther verifies that, even though -// DisableProposalForwarding=true should prevent outbound MsgProp from ever -// reaching selectDispatchLane, an unexpected MsgProp is routed to the -// catch-all lane rather than panicking. A panic in a raft engine goroutine -// would crash the whole node; log-and-fallback keeps it running. -func TestSelectDispatchLane_MsgPropFallsBackToOther(t *testing.T) { +// TestSelectDispatchLane_MsgPropReachesDefaultFallback verifies that MsgProp, +// which is unreachable in practice because DisableProposalForwarding=true +// prevents outbound proposals, is routed to the catch-all lane by the default +// fallback if it ever slips through. This guards against a regression that +// would panic or misroute MsgProp inside a raft engine goroutine. +func TestSelectDispatchLane_MsgPropReachesDefaultFallback(t *testing.T) { t.Parallel() engine := &Engine{nodeID: 1, dispatcherLanesEnabled: true} pd := &peerQueues{