Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 134 additions & 27 deletions internal/raftengine/etcd/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"encoding/binary"
"io"
"log/slog"
"os"
"path/filepath"
"runtime/debug"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -64,13 +66,28 @@ const (
// slower than heartbeat tick issuance. Heartbeats are tiny
// (< ~100 B), so 512 × numPeers is ≪ 1 MB total memory; the
// upside is that a ~5 s transient pause (election-timeout scale)
// no longer drops heartbeats and force the peers' lease to expire.
// no longer drops heartbeats and forces the peers' lease to expire.
defaultHeartbeatBufPerPeer = 512
defaultSnapshotEvery = 10_000
defaultSnapshotQueueSize = 1
defaultAdminPollInterval = 10 * time.Millisecond
defaultMaxPendingConfigs = 64
unknownLastContact = time.Duration(-1)
// defaultSnapshotLaneBufPerPeer sizes the per-peer MsgSnap lane when the
// 4-lane dispatcher mode is enabled (see ELASTICKV_RAFT_DISPATCHER_LANES).
// MsgSnap is rare and bulky; 4 is enough to absorb a retry or two without
// holding up MsgApp replication behind a multi-MiB payload.
defaultSnapshotLaneBufPerPeer = 4
// defaultOtherLaneBufPerPeer sizes the per-peer fallback lane for message
// types not classified as heartbeat/replication/snapshot (e.g. surprise
// locally-addressed control types). Small buffer: traffic volume is tiny.
defaultOtherLaneBufPerPeer = 16
// dispatcherLanesEnvVar toggles the 4-lane dispatcher (heartbeat /
// replication / snapshot / other). When unset or "0", the legacy
// 2-lane layout (heartbeat + normal) is used. Opt-in by design: the
// raft hot path is high blast radius and a regression here can cause
// cluster-wide elections.
dispatcherLanesEnvVar = "ELASTICKV_RAFT_DISPATCHER_LANES"
defaultSnapshotEvery = 10_000
defaultSnapshotQueueSize = 1
defaultAdminPollInterval = 10 * time.Millisecond
defaultMaxPendingConfigs = 64
unknownLastContact = time.Duration(-1)

proposalEnvelopeVersion = byte(0x01)
readContextVersion = byte(0x02)
Expand Down Expand Up @@ -153,15 +170,19 @@ type Engine struct {
dispatchReportCh chan dispatchReport
peerDispatchers map[uint64]*peerQueues
perPeerQueueSize int
dispatchStopCh chan struct{}
dispatchCtx context.Context
dispatchCancel context.CancelFunc
snapshotReqCh chan snapshotRequest
snapshotResCh chan snapshotResult
snapshotStopCh chan struct{}
closeCh chan struct{}
doneCh chan struct{}
startedCh chan struct{}
// dispatcherLanesEnabled toggles the 4-lane dispatcher layout. Captured
// once at Open from ELASTICKV_RAFT_DISPATCHER_LANES so the run-time code
// path is branch-free per message and does not need to re-read env vars.
dispatcherLanesEnabled bool
dispatchStopCh chan struct{}
dispatchCtx context.Context
dispatchCancel context.CancelFunc
snapshotReqCh chan snapshotRequest
snapshotResCh chan snapshotResult
snapshotStopCh chan struct{}
closeCh chan struct{}
doneCh chan struct{}
startedCh chan struct{}

leaderReady chan struct{}
leaderOnce sync.Once
Expand Down Expand Up @@ -320,11 +341,23 @@ type dispatchRequest struct {

// peerQueues holds separate dispatch channels per peer so that heartbeats
// are never blocked behind large log-entry RPCs.
//
// Legacy 2-lane layout (default): heartbeat + normal.
//
// 4-lane layout (opt-in via ELASTICKV_RAFT_DISPATCHER_LANES=1): heartbeat +
// replication (MsgApp/MsgAppResp) + snapshot (MsgSnap) + other. Each lane
// gets its own goroutine so a bulky MsgSnap transfer cannot stall MsgApp
// replication and vice versa. Per-peer ordering within a given message type
// is preserved because a single peer's MsgApp stream all share one lane and
// one worker.
type peerQueues struct {
normal chan dispatchRequest
heartbeat chan dispatchRequest
ctx context.Context
cancel context.CancelFunc
normal chan dispatchRequest
heartbeat chan dispatchRequest
replication chan dispatchRequest // 4-lane mode only; nil otherwise
snapshot chan dispatchRequest // 4-lane mode only; nil otherwise
other chan dispatchRequest // 4-lane mode only; nil otherwise
ctx context.Context
cancel context.CancelFunc
}

type preparedOpenState struct {
Expand Down Expand Up @@ -473,6 +506,7 @@ func (e *Engine) initTransport(cfg OpenConfig) {
// Size the per-peer dispatch buffer to match the Raft inflight limit so that
// the channel never drops messages that Raft's flow-control would permit.
e.perPeerQueueSize = cfg.MaxInflightMsg
e.dispatcherLanesEnabled = dispatcherLanesEnabledFromEnv()
e.dispatchStopCh = make(chan struct{})
e.transport.SetSpoolDir(cfg.DataDir)
e.transport.SetFSMSnapDir(e.fsmSnapDir)
Expand Down Expand Up @@ -1486,10 +1520,7 @@ func (e *Engine) enqueueDispatchMessage(msg raftpb.Message) error {
e.recordDroppedDispatch(msg)
return nil
}
ch := pd.normal
if isPriorityMsg(msg.Type) {
ch = pd.heartbeat
}
ch := e.selectDispatchLane(pd, msg.Type)
// Avoid the expensive deep-clone in prepareDispatchRequest when the channel
// is already full. The len/cap check is safe here because this function is
// only ever called from the single engine event-loop goroutine.
Expand Down Expand Up @@ -1522,6 +1553,45 @@ func isPriorityMsg(t raftpb.MessageType) bool {
t == raftpb.MsgTimeoutNow
}

// selectDispatchLane picks the per-peer channel for msgType. In the legacy
// 2-lane layout it returns pd.heartbeat for priority control traffic and
// pd.normal for everything else. In the 4-lane layout it additionally
// partitions the non-heartbeat traffic so that MsgApp/MsgAppResp and MsgSnap
// do not share a goroutine and cannot block each other.
func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) chan dispatchRequest {
// Priority control traffic (heartbeats, votes, read-index, timeout-now)
// always rides the heartbeat lane in both layouts so it keeps its
// low-latency treatment and is never stuck behind MsgApp payloads.
if isPriorityMsg(msgType) {
return pd.heartbeat
}
if !e.dispatcherLanesEnabled {
return pd.normal
}
// Only types that can actually reach this point are listed. Everything
// filtered by skipDispatchMessage (etcdraft.IsLocalMsg: MsgHup, MsgBeat,
// MsgUnreachable, MsgSnapStatus, MsgCheckQuorum, MsgStorageAppend/Resp,
// MsgStorageApply/Resp) is dropped before this switch is reached.
// Priority control traffic (MsgHeartbeat/Resp, votes, read-index,
// MsgTimeoutNow) is short-circuited by the isPriorityMsg branch above.
// MsgProp is not listed: DisableProposalForwarding=true ensures no
// outbound MsgProp from this engine; if that assumption ever breaks the
// message falls through to the pd.other return below.
switch msgType { //nolint:exhaustive // see skipDispatchMessage / DisableProposalForwarding
case raftpb.MsgApp, raftpb.MsgAppResp:
return pd.replication
case raftpb.MsgSnap:
return pd.snapshot
case raftpb.MsgTransferLeader, raftpb.MsgForgetLeader:
return pd.other
}
// Fallback for any raftpb.MessageType added upstream that slips past
// skipDispatchMessage and isPriorityMsg. Routing unknown non-priority
// traffic onto pd.other keeps runtime behaviour compatible with the
// pre-lanes default branch.
return pd.other
}
Comment on lines +1561 to +1593
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic in selectDispatchLane can be simplified by leveraging the existing isPriorityMsg helper. Since all priority messages (heartbeats, votes, read-index, etc.) are routed to the heartbeat lane in both legacy and 4-lane modes, checking it first allows for a cleaner implementation and avoids duplicating the list of priority message types in the switch statement.

func (e *Engine) selectDispatchLane(pd *peerQueues, msgType raftpb.MessageType) chan dispatchRequest {
	if isPriorityMsg(msgType) {
		return pd.heartbeat
	}
	if !e.dispatcherLanesEnabled {
		return pd.normal
	}
	switch msgType {
	case raftpb.MsgApp, raftpb.MsgAppResp:
		return pd.replication
	case raftpb.MsgSnap:
		return pd.snapshot
	default:
		return pd.other
	}
}


func (e *Engine) applyReadySnapshot(snapshot raftpb.Snapshot) error {
if etcdraft.IsEmptySnap(snapshot) {
return nil
Expand Down Expand Up @@ -2672,19 +2742,57 @@ func (e *Engine) startPeerDispatcher(nodeID uint64) {
}
ctx, cancel := context.WithCancel(baseCtx)
pd := &peerQueues{
normal: make(chan dispatchRequest, size),
heartbeat: make(chan dispatchRequest, defaultHeartbeatBufPerPeer),
ctx: ctx,
cancel: cancel,
}
var workers []chan dispatchRequest
if e.dispatcherLanesEnabled {
// 4-lane layout: split MsgApp/MsgAppResp (replication), MsgSnap
// (snapshot), and misc (other) onto independent goroutines so a
// bulky snapshot transfer cannot stall replication. Each channel
// still serves a single peer, so within-type ordering (the raft
// invariant we care about for MsgApp) is preserved.
pd.replication = make(chan dispatchRequest, size)
pd.snapshot = make(chan dispatchRequest, defaultSnapshotLaneBufPerPeer)
pd.other = make(chan dispatchRequest, defaultOtherLaneBufPerPeer)
workers = []chan dispatchRequest{pd.heartbeat, pd.replication, pd.snapshot, pd.other}
} else {
pd.normal = make(chan dispatchRequest, size)
workers = []chan dispatchRequest{pd.normal, pd.heartbeat}
}
e.peerDispatchers[nodeID] = pd
workers := []chan dispatchRequest{pd.normal, pd.heartbeat}
e.dispatchWG.Add(len(workers))
for _, w := range workers {
go e.runDispatchWorker(ctx, w)
}
}

// dispatcherLanesEnabledFromEnv returns true when the 4-lane dispatcher has
// been explicitly opted into via ELASTICKV_RAFT_DISPATCHER_LANES. The value
// is parsed with strconv.ParseBool, which accepts the standard tokens
// (1, t, T, TRUE, true, True enable; 0, f, F, FALSE, false, False disable).
// An empty string or any unrecognized value disables the feature.
func dispatcherLanesEnabledFromEnv() bool {
v := strings.TrimSpace(os.Getenv(dispatcherLanesEnvVar))
enabled, err := strconv.ParseBool(v)
if err != nil {
return false
}
return enabled
}
Comment on lines +2776 to +2783
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of dispatcherLanesEnabledFromEnv is quite permissive, enabling the feature for any non-empty value other than "0". While this matches the PR description, using strconv.ParseBool would provide a more standard and predictable behavior for boolean environment variables (e.g., handling "true", "1", "false", "0" consistently). Additionally, if this dispatcher lanes feature is a prototype or limited-scope implementation, ensure that its limitations are explicitly documented and runtime validation is added to prevent misuse in a general context.

Suggested change
func dispatcherLanesEnabledFromEnv() bool {
v := os.Getenv(dispatcherLanesEnvVar)
return v != "" && v != "0"
}
func dispatcherLanesEnabledFromEnv() bool {
v := os.Getenv(dispatcherLanesEnvVar)
enabled, _ := strconv.ParseBool(v)
return enabled
}
References
  1. For prototype or limited-scope implementations, explicitly document the limitations (e.g., single-node only, blocking startup) and add runtime validation to prevent misuse in a general context.


// closePeerLanes closes every non-nil dispatch channel on pd so that the
// drain loops in runDispatchWorker exit. It is safe to call with either the
// 2-lane or 4-lane layout because unused lanes are nil.
func closePeerLanes(pd *peerQueues) {
for _, ch := range []chan dispatchRequest{pd.heartbeat, pd.normal, pd.replication, pd.snapshot, pd.other} {
if ch != nil {
close(ch)
}
}
}

// runDispatchWorker drains ch until the channel is closed, the engine stops,
// or the per-peer context is cancelled (e.g. by removePeer). The ctx.Done()
// arm ensures old workers exit promptly when a peer is removed and
Expand Down Expand Up @@ -2967,8 +3075,7 @@ func (e *Engine) removePeer(nodeID uint64) {
if pd, ok := e.peerDispatchers[nodeID]; ok {
delete(e.peerDispatchers, nodeID)
pd.cancel() // cancel any in-flight RPC for this peer immediately
close(pd.normal)
close(pd.heartbeat)
closePeerLanes(pd)
}
}
}
Expand Down
Loading
Loading