diff --git a/internal/raftengine/etcd/engine.go b/internal/raftengine/etcd/engine.go index 2ebc10cd..8d3e700e 100644 --- a/internal/raftengine/etcd/engine.go +++ b/internal/raftengine/etcd/engine.go @@ -6,10 +6,12 @@ import ( "encoding/binary" "io" "log/slog" + "os" "path/filepath" "runtime/debug" "sort" "strconv" + "strings" "sync" "sync/atomic" "time" @@ -64,13 +66,28 @@ const ( // slower than heartbeat tick issuance. Heartbeats are tiny // (< ~100 B), so 512 × numPeers is ≪ 1 MB total memory; the // upside is that a ~5 s transient pause (election-timeout scale) - // no longer drops heartbeats and force the peers' lease to expire. + // no longer drops heartbeats and forces the peers' lease to expire. defaultHeartbeatBufPerPeer = 512 - defaultSnapshotEvery = 10_000 - defaultSnapshotQueueSize = 1 - defaultAdminPollInterval = 10 * time.Millisecond - defaultMaxPendingConfigs = 64 - unknownLastContact = time.Duration(-1) + // defaultSnapshotLaneBufPerPeer sizes the per-peer MsgSnap lane when the + // 4-lane dispatcher mode is enabled (see ELASTICKV_RAFT_DISPATCHER_LANES). + // MsgSnap is rare and bulky; 4 is enough to absorb a retry or two without + // holding up MsgApp replication behind a multi-MiB payload. + defaultSnapshotLaneBufPerPeer = 4 + // defaultOtherLaneBufPerPeer sizes the per-peer fallback lane for message + // types not classified as heartbeat/replication/snapshot (e.g. surprise + // locally-addressed control types). Small buffer: traffic volume is tiny. + defaultOtherLaneBufPerPeer = 16 + // dispatcherLanesEnvVar toggles the 4-lane dispatcher (heartbeat / + // replication / snapshot / other). When unset or "0", the legacy + // 2-lane layout (heartbeat + normal) is used. Opt-in by design: the + // raft hot path is high blast radius and a regression here can cause + // cluster-wide elections. + dispatcherLanesEnvVar = "ELASTICKV_RAFT_DISPATCHER_LANES" + defaultSnapshotEvery = 10_000 + defaultSnapshotQueueSize = 1 + defaultAdminPollInterval = 10 * time.Millisecond + defaultMaxPendingConfigs = 64 + unknownLastContact = time.Duration(-1) proposalEnvelopeVersion = byte(0x01) readContextVersion = byte(0x02) @@ -153,15 +170,19 @@ type Engine struct { dispatchReportCh chan dispatchReport peerDispatchers map[uint64]*peerQueues perPeerQueueSize int - dispatchStopCh chan struct{} - dispatchCtx context.Context - dispatchCancel context.CancelFunc - snapshotReqCh chan snapshotRequest - snapshotResCh chan snapshotResult - snapshotStopCh chan struct{} - closeCh chan struct{} - doneCh chan struct{} - startedCh chan struct{} + // dispatcherLanesEnabled toggles the 4-lane dispatcher layout. Captured + // once at Open from ELASTICKV_RAFT_DISPATCHER_LANES so the run-time code + // path is branch-free per message and does not need to re-read env vars. + dispatcherLanesEnabled bool + dispatchStopCh chan struct{} + dispatchCtx context.Context + dispatchCancel context.CancelFunc + snapshotReqCh chan snapshotRequest + snapshotResCh chan snapshotResult + snapshotStopCh chan struct{} + closeCh chan struct{} + doneCh chan struct{} + startedCh chan struct{} leaderReady chan struct{} leaderOnce sync.Once @@ -320,11 +341,23 @@ type dispatchRequest struct { // peerQueues holds separate dispatch channels per peer so that heartbeats // are never blocked behind large log-entry RPCs. +// +// Legacy 2-lane layout (default): heartbeat + normal. +// +// 4-lane layout (opt-in via ELASTICKV_RAFT_DISPATCHER_LANES=1): heartbeat + +// replication (MsgApp/MsgAppResp) + snapshot (MsgSnap) + other. Each lane +// gets its own goroutine so a bulky MsgSnap transfer cannot stall MsgApp +// replication and vice versa. Per-peer ordering within a given message type +// is preserved because a single peer's MsgApp stream all share one lane and +// one worker. type peerQueues struct { - normal chan dispatchRequest - heartbeat chan dispatchRequest - ctx context.Context - cancel context.CancelFunc + normal chan dispatchRequest + heartbeat chan dispatchRequest + replication chan dispatchRequest // 4-lane mode only; nil otherwise + snapshot chan dispatchRequest // 4-lane mode only; nil otherwise + other chan dispatchRequest // 4-lane mode only; nil otherwise + ctx context.Context + cancel context.CancelFunc } type preparedOpenState struct { @@ -473,6 +506,7 @@ func (e *Engine) initTransport(cfg OpenConfig) { // Size the per-peer dispatch buffer to match the Raft inflight limit so that // the channel never drops messages that Raft's flow-control would permit. e.perPeerQueueSize = cfg.MaxInflightMsg + e.dispatcherLanesEnabled = dispatcherLanesEnabledFromEnv() e.dispatchStopCh = make(chan struct{}) e.transport.SetSpoolDir(cfg.DataDir) e.transport.SetFSMSnapDir(e.fsmSnapDir) @@ -1486,10 +1520,7 @@ func (e *Engine) enqueueDispatchMessage(msg raftpb.Message) error { e.recordDroppedDispatch(msg) return nil } - ch := pd.normal - if isPriorityMsg(msg.Type) { - ch = pd.heartbeat - } + ch := e.selectDispatchLane(pd, msg.Type) // Avoid the expensive deep-clone in prepareDispatchRequest when the channel // is already full. The len/cap check is safe here because this function is // only ever called from the single engine event-loop goroutine. @@ -1522,6 +1553,45 @@ func isPriorityMsg(t raftpb.MessageType) bool { t == raftpb.MsgTimeoutNow } +// selectDispatchLane picks the per-peer channel for msgType. In the legacy +// 2-lane layout it returns pd.heartbeat for priority control traffic and +// pd.normal for everything else. In the 4-lane layout it additionally +// partitions the non-heartbeat traffic so that MsgApp/MsgAppResp and MsgSnap +// do not share a goroutine and cannot block each other. +func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) chan dispatchRequest { + // Priority control traffic (heartbeats, votes, read-index, timeout-now) + // always rides the heartbeat lane in both layouts so it keeps its + // low-latency treatment and is never stuck behind MsgApp payloads. + if isPriorityMsg(msgType) { + return pd.heartbeat + } + if !e.dispatcherLanesEnabled { + return pd.normal + } + // Only types that can actually reach this point are listed. Everything + // filtered by skipDispatchMessage (etcdraft.IsLocalMsg: MsgHup, MsgBeat, + // MsgUnreachable, MsgSnapStatus, MsgCheckQuorum, MsgStorageAppend/Resp, + // MsgStorageApply/Resp) is dropped before this switch is reached. + // Priority control traffic (MsgHeartbeat/Resp, votes, read-index, + // MsgTimeoutNow) is short-circuited by the isPriorityMsg branch above. + // MsgProp is not listed: DisableProposalForwarding=true ensures no + // outbound MsgProp from this engine; if that assumption ever breaks the + // message falls through to the pd.other return below. + switch msgType { //nolint:exhaustive // see skipDispatchMessage / DisableProposalForwarding + case raftpb.MsgApp, raftpb.MsgAppResp: + return pd.replication + case raftpb.MsgSnap: + return pd.snapshot + case raftpb.MsgTransferLeader, raftpb.MsgForgetLeader: + return pd.other + } + // Fallback for any raftpb.MessageType added upstream that slips past + // skipDispatchMessage and isPriorityMsg. Routing unknown non-priority + // traffic onto pd.other keeps runtime behaviour compatible with the + // pre-lanes default branch. + return pd.other +} + func (e *Engine) applyReadySnapshot(snapshot raftpb.Snapshot) error { if etcdraft.IsEmptySnap(snapshot) { return nil @@ -2672,19 +2742,57 @@ func (e *Engine) startPeerDispatcher(nodeID uint64) { } ctx, cancel := context.WithCancel(baseCtx) pd := &peerQueues{ - normal: make(chan dispatchRequest, size), heartbeat: make(chan dispatchRequest, defaultHeartbeatBufPerPeer), ctx: ctx, cancel: cancel, } + var workers []chan dispatchRequest + if e.dispatcherLanesEnabled { + // 4-lane layout: split MsgApp/MsgAppResp (replication), MsgSnap + // (snapshot), and misc (other) onto independent goroutines so a + // bulky snapshot transfer cannot stall replication. Each channel + // still serves a single peer, so within-type ordering (the raft + // invariant we care about for MsgApp) is preserved. + pd.replication = make(chan dispatchRequest, size) + pd.snapshot = make(chan dispatchRequest, defaultSnapshotLaneBufPerPeer) + pd.other = make(chan dispatchRequest, defaultOtherLaneBufPerPeer) + workers = []chan dispatchRequest{pd.heartbeat, pd.replication, pd.snapshot, pd.other} + } else { + pd.normal = make(chan dispatchRequest, size) + workers = []chan dispatchRequest{pd.normal, pd.heartbeat} + } e.peerDispatchers[nodeID] = pd - workers := []chan dispatchRequest{pd.normal, pd.heartbeat} e.dispatchWG.Add(len(workers)) for _, w := range workers { go e.runDispatchWorker(ctx, w) } } +// dispatcherLanesEnabledFromEnv returns true when the 4-lane dispatcher has +// been explicitly opted into via ELASTICKV_RAFT_DISPATCHER_LANES. The value +// is parsed with strconv.ParseBool, which accepts the standard tokens +// (1, t, T, TRUE, true, True enable; 0, f, F, FALSE, false, False disable). +// An empty string or any unrecognized value disables the feature. +func dispatcherLanesEnabledFromEnv() bool { + v := strings.TrimSpace(os.Getenv(dispatcherLanesEnvVar)) + enabled, err := strconv.ParseBool(v) + if err != nil { + return false + } + return enabled +} + +// closePeerLanes closes every non-nil dispatch channel on pd so that the +// drain loops in runDispatchWorker exit. It is safe to call with either the +// 2-lane or 4-lane layout because unused lanes are nil. +func closePeerLanes(pd *peerQueues) { + for _, ch := range []chan dispatchRequest{pd.heartbeat, pd.normal, pd.replication, pd.snapshot, pd.other} { + if ch != nil { + close(ch) + } + } +} + // runDispatchWorker drains ch until the channel is closed, the engine stops, // or the per-peer context is cancelled (e.g. by removePeer). The ctx.Done() // arm ensures old workers exit promptly when a peer is removed and @@ -2967,8 +3075,7 @@ func (e *Engine) removePeer(nodeID uint64) { if pd, ok := e.peerDispatchers[nodeID]; ok { delete(e.peerDispatchers, nodeID) pd.cancel() // cancel any in-flight RPC for this peer immediately - close(pd.normal) - close(pd.heartbeat) + closePeerLanes(pd) } } } diff --git a/internal/raftengine/etcd/engine_test.go b/internal/raftengine/etcd/engine_test.go index 5522c940..3e0119a4 100644 --- a/internal/raftengine/etcd/engine_test.go +++ b/internal/raftengine/etcd/engine_test.go @@ -1635,3 +1635,209 @@ func TestErrNotLeaderMatchesRaftEngineSentinel(t *testing.T) { require.True(t, errors.Is(errors.WithStack(errLeadershipTransferNotLeader), raftengine.ErrNotLeader)) require.True(t, errors.Is(errors.WithStack(errLeadershipTransferInProgress), raftengine.ErrLeadershipTransferInProgress)) } + +// TestSelectDispatchLane_LegacyTwoLane verifies that, when the 4-lane +// dispatcher is disabled (default), messages are routed exactly as before: +// priority control traffic → heartbeat lane, everything else → normal lane. +func TestSelectDispatchLane_LegacyTwoLane(t *testing.T) { + t.Parallel() + engine := &Engine{dispatcherLanesEnabled: false} + pd := &peerQueues{ + normal: make(chan dispatchRequest, 1), + heartbeat: make(chan dispatchRequest, 1), + } + + cases := map[raftpb.MessageType]chan dispatchRequest{ + raftpb.MsgHeartbeat: pd.heartbeat, + raftpb.MsgHeartbeatResp: pd.heartbeat, + raftpb.MsgReadIndex: pd.heartbeat, + raftpb.MsgReadIndexResp: pd.heartbeat, + raftpb.MsgVote: pd.heartbeat, + raftpb.MsgVoteResp: pd.heartbeat, + raftpb.MsgPreVote: pd.heartbeat, + raftpb.MsgPreVoteResp: pd.heartbeat, + raftpb.MsgTimeoutNow: pd.heartbeat, + raftpb.MsgApp: pd.normal, + raftpb.MsgAppResp: pd.normal, + raftpb.MsgSnap: pd.normal, + } + for mt, want := range cases { + got := engine.selectDispatchLane(pd, mt) + require.Equalf(t, want, got, "legacy mode routing for %s", mt) + } +} + +// TestSelectDispatchLane_FourLane verifies that, when ELASTICKV_RAFT_DISPATCHER_LANES +// is enabled, MsgApp/MsgAppResp goes to the replication lane, MsgSnap goes to +// the snapshot lane, and heartbeats/votes/read-index share the priority lane. +func TestSelectDispatchLane_FourLane(t *testing.T) { + t.Parallel() + engine := &Engine{dispatcherLanesEnabled: true} + pd := &peerQueues{ + heartbeat: make(chan dispatchRequest, 1), + replication: make(chan dispatchRequest, 1), + snapshot: make(chan dispatchRequest, 1), + other: make(chan dispatchRequest, 1), + } + + cases := map[raftpb.MessageType]chan dispatchRequest{ + raftpb.MsgHeartbeat: pd.heartbeat, + raftpb.MsgHeartbeatResp: pd.heartbeat, + raftpb.MsgVote: pd.heartbeat, + raftpb.MsgVoteResp: pd.heartbeat, + raftpb.MsgPreVote: pd.heartbeat, + raftpb.MsgPreVoteResp: pd.heartbeat, + raftpb.MsgReadIndex: pd.heartbeat, + raftpb.MsgReadIndexResp: pd.heartbeat, + raftpb.MsgTimeoutNow: pd.heartbeat, + raftpb.MsgApp: pd.replication, + raftpb.MsgAppResp: pd.replication, + raftpb.MsgSnap: pd.snapshot, + } + for mt, want := range cases { + got := engine.selectDispatchLane(pd, mt) + require.Equalf(t, want, got, "4-lane mode routing for %s", mt) + } +} + +// TestSelectDispatchLane_MsgPropReachesDefaultFallback verifies that MsgProp, +// which is unreachable in practice because DisableProposalForwarding=true +// prevents outbound proposals, is routed to the catch-all lane by the default +// fallback if it ever slips through. This guards against a regression that +// would panic or misroute MsgProp inside a raft engine goroutine. +func TestSelectDispatchLane_MsgPropReachesDefaultFallback(t *testing.T) { + t.Parallel() + engine := &Engine{nodeID: 1, dispatcherLanesEnabled: true} + pd := &peerQueues{ + heartbeat: make(chan dispatchRequest, 1), + replication: make(chan dispatchRequest, 1), + snapshot: make(chan dispatchRequest, 1), + other: make(chan dispatchRequest, 1), + } + require.NotPanics(t, func() { + got := engine.selectDispatchLane(pd, raftpb.MsgProp) + require.Equal(t, pd.other, got) + }) +} + +// TestFourLaneDispatcher_SnapshotDoesNotBlockReplication exercises the key +// correctness invariant for the 4-lane layout: a stuck MsgSnap transfer must +// not prevent MsgApp from being dispatched, because they now run on +// independent goroutines. +func TestFourLaneDispatcher_SnapshotDoesNotBlockReplication(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + replicationDone := make(chan struct{}, 1) + snapshotBlocking := make(chan struct{}) + engine := &Engine{ + nodeID: 1, + peerDispatchers: make(map[uint64]*peerQueues), + perPeerQueueSize: 4, + dispatcherLanesEnabled: true, + dispatchStopCh: make(chan struct{}), + dispatchCtx: ctx, + dispatchCancel: cancel, + } + engine.dispatchFn = func(dctx context.Context, req dispatchRequest) error { + switch req.msg.Type { //nolint:exhaustive // test only exercises MsgSnap and MsgApp; other types are irrelevant here + case raftpb.MsgSnap: + // Block the snapshot lane until the test releases it. The + // replication lane should keep flowing in the meantime. + select { + case <-snapshotBlocking: + case <-dctx.Done(): + } + case raftpb.MsgApp: + replicationDone <- struct{}{} + default: + // Other MessageType values are not exercised by this test. + } + return nil + } + + engine.upsertPeer(Peer{NodeID: 2, ID: "peer2", Address: "localhost:2"}) + pd, ok := engine.peerDispatchers[2] + require.True(t, ok) + require.NotNil(t, pd.replication) + require.NotNil(t, pd.snapshot) + + require.NoError(t, engine.enqueueDispatchMessage(raftpb.Message{Type: raftpb.MsgSnap, To: 2})) + require.NoError(t, engine.enqueueDispatchMessage(raftpb.Message{Type: raftpb.MsgApp, To: 2})) + + select { + case <-replicationDone: + case <-time.After(time.Second): + t.Fatal("MsgApp did not dispatch while MsgSnap was stuck — lanes are not independent") + } + + close(snapshotBlocking) + close(engine.dispatchStopCh) + engine.dispatchCancel() + engine.dispatchWG.Wait() +} + +// TestFourLaneDispatcher_RemovePeerClosesAllLanes confirms removePeer closes +// every lane (not just normal/heartbeat) so no worker goroutine leaks under +// the opt-in 4-lane layout. +func TestFourLaneDispatcher_RemovePeerClosesAllLanes(t *testing.T) { + stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + pd := &peerQueues{ + heartbeat: make(chan dispatchRequest, 4), + replication: make(chan dispatchRequest, 4), + snapshot: make(chan dispatchRequest, 4), + other: make(chan dispatchRequest, 4), + ctx: ctx, + cancel: cancel, + } + engine := &Engine{ + nodeID: 1, + peers: map[uint64]Peer{2: {NodeID: 2, ID: "peer2"}}, + peerDispatchers: map[uint64]*peerQueues{2: pd}, + dispatchStopCh: stopCh, + dispatcherLanesEnabled: true, + } + engine.dispatchWG.Add(4) + go engine.runDispatchWorker(ctx, pd.heartbeat) + go engine.runDispatchWorker(ctx, pd.replication) + go engine.runDispatchWorker(ctx, pd.snapshot) + go engine.runDispatchWorker(ctx, pd.other) + + engine.removePeer(2) + + done := make(chan struct{}) + go func() { + engine.dispatchWG.Wait() + close(done) + }() + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("4-lane dispatch workers did not exit after peer removal") + } + + // Subsequent sends to the removed peer must be dropped without panic. + require.NoError(t, engine.enqueueDispatchMessage(raftpb.Message{Type: raftpb.MsgApp, To: 2})) +} + +// TestDispatcherLanesEnabledFromEnv pins env-var parsing so a regression in +// the feature flag can't silently flip the default. +func TestDispatcherLanesEnabledFromEnv(t *testing.T) { + cases := []struct { + val string + want bool + }{ + {"", false}, + {"0", false}, + {"1", true}, + {"true", true}, + {"TRUE", true}, + {"false", false}, + {"yes", false}, + } + for _, c := range cases { + t.Setenv(dispatcherLanesEnvVar, c.val) + require.Equalf(t, c.want, dispatcherLanesEnabledFromEnv(), "env=%q", c.val) + } +}