feat(raft): replace unary Send with long-lived client-streaming SendStream per peer#526
feat(raft): replace unary Send with long-lived client-streaming SendStream per peer#526
Conversation
…tream per peer ## Problem Per-peer heartbeat channels (cap=64) fill up because the unary Send RPC blocks for one full RTT per message. At 100 heartbeats/s per peer, any RTT above ~640 ms causes the channel to overflow and messages to be dropped. Disk I/O and CPU are not the bottleneck; the RTT wait in the dispatch worker is the root cause. ## Solution Add SendStream (client-streaming RPC) to the EtcdRaft gRPC service. stream.Send() writes to the gRPC send buffer and returns immediately — no RTT wait — so the dispatch worker can enqueue messages at line rate. ### Protocol - New proto RPC: SendStream(stream EtcdRaftMessage) returns (EtcdRaftAck) - Old unary Send RPC kept for backward compatibility ### Sender (GRPCTransport) - One long-lived peerStream per peer node, opened on first dispatch - getOrOpenStream: double-checked locking without holding t.mu during dial, avoiding lock-order inversion with t.mu (clientFor) vs streamsMu - On stream.Send error: close stream, return error; Raft retransmits - Backward compat: codes.Unimplemented → add to noStream map, fall back to dispatchUnary (old path); noStream cleared on UpsertPeer address change - Close/RemovePeer/UpsertPeer: release t.mu before calling closeStream to maintain lock-order invariant (streamsMu → t.mu, never the reverse) ### Receiver (GRPCTransport) - SendStream server handler: recv loop → handle each message → SendAndClose ### Dispatch goroutine model (Engine) - Single multiplexing goroutine per peer (runMultiplexDispatchWorker) replaces the previous two-goroutine model; gRPC stream.Send is not goroutine-safe so a single writer is required - Biased-select pattern: drainPriorityChannel (non-blocking heartbeat drain) before waitForChannel (blocks on either channel), ensuring heartbeats are never starved by normal log-entry traffic - runDispatchWorker kept for backward compatibility with existing tests
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 10 minutes and 34 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
📝 WalkthroughWalkthroughReplaces per-peer dual dispatch goroutines with a single heartbeat‑prioritized multiplexing worker and adds a long-lived client-streaming gRPC RPC ( Changes
Sequence DiagramsequenceDiagram
participant Client as Peer Client
participant Transport as GRPCTransport
participant Stream as SendStream (stream)
participant Engine as Engine / Handler
Client->>Transport: getOrOpenStream(peer)
alt cached stream exists
Transport-->>Client: return cached stream
else open new stream
Transport->>Stream: Dial/Open SendStream
Stream-->>Transport: stream established
Transport-->>Client: return stream
end
loop for each raft message
Client->>Stream: Send(EtcdRaftMessage)
Stream->>Transport: server Recv -> unmarshal
Transport->>Engine: handle(message)
Engine-->>Transport: ack / processed
end
Client->>Stream: Close (EOF)
Stream->>Transport: send final EtcdRaftAck
Transport->>Client: EtcdRaftAck (response)
Transport->>Transport: cleanup stream cache / close on error
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR introduces a client-streaming Raft message RPC (SendStream) and updates the dispatch path to use a long-lived per-peer gRPC stream to avoid unary RTT blocking that can overflow heartbeat queues under high RTT.
Changes:
- Add
SendStream(stream EtcdRaftMessage) returns (EtcdRaftAck)to the EtcdRaft gRPC service (keeping unarySendfor backward compatibility). - Update
GRPCTransportto maintain a per-peer long-livedSendStreamand fall back to unarySendoncodes.Unimplemented. - Replace the per-peer two-goroutine dispatch model with a single multiplexing dispatch worker to ensure only one goroutine calls
stream.Send()per peer.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| proto/etcd_raft.proto | Adds the new SendStream RPC to the service definition. |
| proto/etcd_raft_grpc.pb.go | Regenerates gRPC stubs for the new client-streaming RPC and updates stream indices. |
| proto/etcd_raft.pb.go | Regenerates protobuf descriptors/indexes to include SendStream. |
| internal/raftengine/etcd/grpc_transport.go | Implements per-peer stream management, streaming dispatch, fallback to unary, and server handler for SendStream. |
| internal/raftengine/etcd/engine.go | Switches to a single per-peer multiplexing dispatch worker with heartbeat bias. |
| internal/raftengine/etcd/grpc_transport_test.go | Updates test client mocks to satisfy the expanded gRPC client interface. |
| func (t *GRPCTransport) dispatchRegular(ctx context.Context, msg raftpb.Message) error { | ||
| ctx, cancel := transportContext(ctx, defaultDispatchTimeout) | ||
| defer cancel() | ||
|
|
||
| raw, err := msg.Marshal() | ||
| if err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| client, err := t.clientFor(msg.To) | ||
|
|
||
| stream, err := t.getOrOpenStream(msg.To) | ||
| if err != nil { | ||
| if errors.Is(err, errStreamNotSupported) { | ||
| return t.dispatchUnary(ctx, raw, msg.To) | ||
| } | ||
| return err | ||
| } | ||
|
|
||
| if err := stream.Send(&pb.EtcdRaftMessage{Message: raw}); err != nil { | ||
| t.closeStream(msg.To) | ||
| return errors.WithStack(err) | ||
| } | ||
| return nil |
There was a problem hiding this comment.
New streaming dispatch behavior (getOrOpenStream + fallback to dispatchUnary on codes.Unimplemented, plus stream teardown on Send error) isn't covered by unit tests in this package. Adding focused tests for (1) using SendStream when supported, (2) caching noStream after Unimplemented and falling back to unary, and (3) closeStream invoked on Send errors would help prevent regressions in the transport layer.
| // runMultiplexDispatchWorker is the single per-peer goroutine that reads from | ||
| // both pd.heartbeat and pd.normal with a biased select: it drains the | ||
| // heartbeat channel completely before waiting on the normal channel, bounding | ||
| // heartbeat delay to one normal-message send time. | ||
| // | ||
| // A single writer goroutine per peer is required because gRPC stream.Send is | ||
| // not goroutine-safe. The biased-select replaces the two-goroutine model from | ||
| // runDispatchWorker while preserving heartbeat priority. | ||
| func (e *Engine) runMultiplexDispatchWorker(ctx context.Context, pd *peerQueues) { | ||
| defer e.dispatchWG.Done() | ||
| for { | ||
| drained, stop := e.drainPriorityChannel(ctx, pd) | ||
| if stop { | ||
| return | ||
| } | ||
| if drained { | ||
| continue // re-check priority before waiting | ||
| } | ||
| if e.waitForChannel(ctx, pd) { | ||
| return | ||
| } | ||
| } |
There was a problem hiding this comment.
runMultiplexDispatchWorker introduces a new scheduling policy (biased select draining heartbeat before normal). There are existing dispatch-worker tests, but none assert this new priority/drain behavior. Adding a test that enqueues interleaved normal + heartbeat requests and verifies heartbeats are dispatched ahead of normal traffic (and/or are drained fully before blocking) would guard the intended anti-starvation behavior.
| streamCtx, cancel := context.WithCancel(context.Background()) | ||
| stream, err := client.SendStream(streamCtx) | ||
| if err != nil { |
There was a problem hiding this comment.
getOrOpenStream creates the stream context from context.Background(), so a blocked stream.Send won't be cancelled when the per-peer dispatch ctx is cancelled (e.g., stopDispatchWorkers/removePeer). This can hang shutdown because dispatch workers may block inside transport.Dispatch while stopDispatchWorkers waits for them to exit. Consider threading a caller context into getOrOpenStream and deriving streamCtx from it (optionally stripping deadlines via context.WithoutCancel) so cancellation reliably tears down the stream and unblocks Send.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/raftengine/etcd/grpc_transport.go (1)
366-398:⚠️ Potential issue | 🟡 Minor
dispatchRegular'sctxis unused on the streaming path.On the streaming path,
ctxis neither passed tostream.Sendnor consulted viactx.Err(). The per-calltransportContext(..., defaultDispatchTimeout)deadline is therefore only honored on the unary fallback (dispatchUnary). That's defensible becausestream.Sendis buffered/non-blocking in the common case, but when the gRPC send buffer is saturatedstream.Sendwill block, and there is no per-message cancellation to bound that wait.Two minor notes:
- Consider at least an early
if err := ctx.Err(); err != nil { return err }check beforestream.Sendso a cancelled per-request context is not silently ignored.- A slow peer that fills the gRPC send buffer will back up the single per-peer dispatch goroutine, which also serves heartbeats. The biased-select for heartbeats only avoids head-of-line blocking inside the channel — once
stream.Senditself is blocked, heartbeats to that peer block too. Probably acceptable for this PR, but worth documenting in the worker comment.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/grpc_transport.go` around lines 366 - 398, dispatchRegular currently ignores the incoming ctx on the streaming path so a cancelled/deadlined per-call context isn’t honored when calling stream.Send; add a pre-send cancellation check and guard the blocking Send with the per-call context/timeout similar to dispatchUnary. Specifically, in GRPCTransport.dispatchRegular (and around getOrOpenStream/stream.Send) first check if ctx.Err() != nil and return that error, and then wrap the Send so it respects a transportContext(ctx, defaultDispatchTimeout) deadline (or select on ctx.Done() before attempting Send) so the call is bounded; keep the existing fallback behavior to dispatchUnary unchanged.
🧹 Nitpick comments (1)
internal/raftengine/etcd/grpc_transport.go (1)
79-87: Lock-ordering doc vs. actual behavior.The struct comment states "always acquire
streamsMubeforet.mu(never the reverse)", but in practice neither path ever holds both locks simultaneously:
getOrOpenStreamreleasesstreamsMubefore callingclientFor(which takest.mu), then re-acquiresstreamsMu.UpsertPeer/RemovePeerreleaset.mubefore callingcloseStream/clearNoStream(which takestreamsMu).So the rule that actually prevents deadlock is "never hold both at once," not a particular acquisition order. Recommend rewording the comment to match — the current wording suggests
UpsertPeeris violating an invariant when it is not.Also applies to: 170-211
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/grpc_transport.go` around lines 79 - 87, The comment's lock-order guidance is inaccurate: code never acquires streamsMu and t.mu simultaneously (functions like getOrOpenStream release streamsMu before calling clientFor which locks t.mu, and UpsertPeer/RemovePeer release t.mu before calling closeStream/clearNoStream which lock streamsMu), so update the struct and the similar comment at lines ~170-211 to state the real invariant: "Do not hold streamsMu and t.mu at the same time; all code paths must release one before acquiring the other" and optionally mention the concrete helpers (getOrOpenStream, clientFor, UpsertPeer, RemovePeer, closeStream, clearNoStream) as examples to clarify the pattern.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/raftengine/etcd/grpc_transport.go`:
- Around line 783-828: The code marks peers as unary-only by setting
noStream[nodeID] in getOrOpenStream when a SendStream returns
codes.Unimplemented, but noStream is only cleared by UpsertPeer on address
change so a restarted peer at the same address that now supports streams remains
stuck; update the transport lifecycle to clear noStream when the underlying
grpc.ClientConn is replaced or reset (e.g., in closePeerConnLocked or wherever
client/conn swapping happens) so that getOrOpenStream can re-probe streaming
support; specifically, add a call to clearNoStream(nodeID) (or remove the map
entry) inside closePeerConnLocked (or the code path that replaces the client
returned by clientFor) and ensure any code paths that recreate streams
(getOrOpenStream) will attempt SendStream again after the clear.
- Around line 440-457: SendStream currently returns on any error from t.handle,
tearing down the long-lived stream for transient backpressure errors like
errStepQueueFull; change SendStream to detect transient errors (e.g., compare
err from t.handle against errStepQueueFull or other enqueueStep transient
signals using errors.Is) and, in those cases, log the incident via slog (with
context from stream.Context() and the message) and continue the for loop instead
of returning; only return/wrap the error for fatal/unexpected errors from
t.handle while preserving existing behavior for io.EOF and Unmarshal failures.
---
Outside diff comments:
In `@internal/raftengine/etcd/grpc_transport.go`:
- Around line 366-398: dispatchRegular currently ignores the incoming ctx on the
streaming path so a cancelled/deadlined per-call context isn’t honored when
calling stream.Send; add a pre-send cancellation check and guard the blocking
Send with the per-call context/timeout similar to dispatchUnary. Specifically,
in GRPCTransport.dispatchRegular (and around getOrOpenStream/stream.Send) first
check if ctx.Err() != nil and return that error, and then wrap the Send so it
respects a transportContext(ctx, defaultDispatchTimeout) deadline (or select on
ctx.Done() before attempting Send) so the call is bounded; keep the existing
fallback behavior to dispatchUnary unchanged.
---
Nitpick comments:
In `@internal/raftengine/etcd/grpc_transport.go`:
- Around line 79-87: The comment's lock-order guidance is inaccurate: code never
acquires streamsMu and t.mu simultaneously (functions like getOrOpenStream
release streamsMu before calling clientFor which locks t.mu, and
UpsertPeer/RemovePeer release t.mu before calling closeStream/clearNoStream
which lock streamsMu), so update the struct and the similar comment at lines
~170-211 to state the real invariant: "Do not hold streamsMu and t.mu at the
same time; all code paths must release one before acquiring the other" and
optionally mention the concrete helpers (getOrOpenStream, clientFor, UpsertPeer,
RemovePeer, closeStream, clearNoStream) as examples to clarify the pattern.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 06ddbea4-8a37-428f-8a41-8912c409b887
⛔ Files ignored due to path filters (2)
proto/etcd_raft.pb.gois excluded by!**/*.pb.goproto/etcd_raft_grpc.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (4)
internal/raftengine/etcd/engine.gointernal/raftengine/etcd/grpc_transport.gointernal/raftengine/etcd/grpc_transport_test.goproto/etcd_raft.proto
There was a problem hiding this comment.
Code Review
This pull request refactors the Raft engine's dispatching logic to use a single multiplexed goroutine per peer and introduces long-lived gRPC client-streaming for message transport. These changes improve gRPC safety and prioritize heartbeat messages through a biased selection mechanism. Feedback suggests optimizing the stream management locking mechanism by using a sync.RWMutex to reduce contention on the dispatch hot path.
|
|
||
| // streamsMu protects streams and noStream. | ||
| // Lock ordering: always acquire streamsMu before t.mu (never the reverse). | ||
| streamsMu sync.Mutex |
There was a problem hiding this comment.
The streamsMu mutex is used to protect the streams and noStream maps, which are accessed on the hot path of every Raft message dispatch in getOrOpenStream. To improve performance and reduce contention between per-peer dispatch workers, consider using a sync.RWMutex so that the fast path (checking if a stream already exists) can be performed under a read lock.
| streamsMu sync.Mutex | |
| streamsMu sync.RWMutex |
References
- For frequently accessed fields, prefer efficient locking strategies (like RWMutex or atomic operations) to improve performance on hot paths.
| t.streamsMu.Lock() | ||
| _, skip := t.noStream[nodeID] | ||
| ps, ok := t.streams[nodeID] | ||
| t.streamsMu.Unlock() |
There was a problem hiding this comment.
Use a read lock for the fast path to allow concurrent access by multiple peer dispatch workers when the stream is already established. Ensure that if the stream is not found, the state is re-verified after acquiring the write lock to prevent race conditions.
| t.streamsMu.Lock() | |
| _, skip := t.noStream[nodeID] | |
| ps, ok := t.streams[nodeID] | |
| t.streamsMu.Unlock() | |
| t.streamsMu.RLock() | |
| _, skip := t.noStream[nodeID] | |
| ps, ok := t.streams[nodeID] | |
| t.streamsMu.RUnlock() |
References
- For frequently accessed fields, prefer efficient locking strategies (like RWMutex or atomic operations) to improve performance on hot paths.
- When performing operations based on a state variable, do not rely on stale data after releasing a lock; ensure the condition still holds after re-acquiring the lock.
… doc issues - runMultiplexDispatchWorker now defers drainPendingRequests so every buffered dispatchRequest in both priority and normal channels is closed when the per-peer context is cancelled (e.g. peer removal), not just the first one encountered (addresses Gemini review) - Expand defaultHeartbeatBufPerPeer comment to list all Resp variants routed to the priority channel (MsgHeartbeatResp, MsgVoteResp, etc.) - Fix design doc: heartbeat channel capacity shown as defaultHeartbeatBufPerPeer=64 rather than the incorrect MaxInflightMsg - Fix design doc: §3.6 → §4.1 (was under the wrong heading level) - Fix design doc: replace non-existent defaultDispatchWorkersPerPeer=2 with the accurate single-multiplexing-goroutine description - Update status header from "proposed" to "implemented"
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing the previous unary RPC model to improve throughput. It introduces a multiplexing dispatch worker per peer that utilizes a biased-select approach to prioritize low-latency control messages, such as heartbeats, over log entry bursts. The GRPCTransport now manages long-lived client-streaming connections with built-in fallback to unary RPCs for backward compatibility. The implementation also includes careful management of stream lifecycles and locking strategies to prevent deadlocks during peer membership changes. I have no feedback to provide as all review comments were filtered out.
… transport - closePeerConnLocked now returns the set of node IDs that were using the closed address; UpsertPeer and RemovePeer clear stream/noStream for all of them, so a peer that upgrades to support streaming at the same address is re-probed on the next dispatch rather than staying stuck in unary mode - SendStream server handler now continues on errStepQueueFull instead of tearing down the stream; transient backpressure drops are logged and the long-lived stream is preserved (matches unary Send behaviour) - dispatchRegular checks ctx.Err() before attempting stream.Send so a cancelled dispatch context is honoured on the streaming path - streamsMu changed from sync.Mutex to sync.RWMutex; fast-path reads in getOrOpenStream use RLock to reduce contention on the dispatch hot path - Lock-ordering comment updated from "acquire streamsMu before t.mu" to the accurate invariant: never hold both simultaneously
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing per-message unary RPCs with long-lived client-streaming connections to improve throughput. It introduces a multiplexing dispatch worker per peer that utilizes a biased select to prioritize heartbeats and control messages. The transport layer is updated to manage these streams with a fallback mechanism to unary RPCs for backward compatibility. A logic error was identified in the RemovePeer function where the node being removed is excluded from stream cleanup, which could lead to resource leaks.
| var closedNodeIDs []uint64 | ||
| if peer, ok := t.peers[nodeID]; ok { | ||
| delete(t.peers, nodeID) | ||
| closedNodeIDs = t.closePeerConnLocked(peer.Address) | ||
| } |
There was a problem hiding this comment.
In RemovePeer, the nodeID being removed is not included in the cleanup of streams and noStream status. Since delete(t.peers, nodeID) is called before closePeerConnLocked, the latter will not find the removed node in the peer map, leaving its long-lived stream and fallback state in the transport's internal maps. This can lead to resource leaks and stale transport state if the node is later re-added.
| var closedNodeIDs []uint64 | |
| if peer, ok := t.peers[nodeID]; ok { | |
| delete(t.peers, nodeID) | |
| closedNodeIDs = t.closePeerConnLocked(peer.Address) | |
| } | |
| var closedNodeIDs []uint64 | |
| if peer, ok := t.peers[nodeID]; ok { | |
| delete(t.peers, nodeID) | |
| closedNodeIDs = append(t.closePeerConnLocked(peer.Address), nodeID) | |
| } |
References
- Do not silently ignore errors from Close() methods on resources like network connections. Log them to ensure resource leaks or other cleanup problems are visible.
- RemovePeer: collect affected nodeIDs from closePeerConnLocked BEFORE deleting from t.peers, otherwise the removed node is never found by the address iteration and its stream/noStream state is leaked (bug introduced in previous commit — identified by Gemini review) - Resolve merge conflicts with origin/main in engine.go and design doc (took main's wording for defaultHeartbeatBufPerPeer comment; kept HEAD's accurate single-multiplexing-goroutine description in the doc)
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing per-message unary RPCs with long-lived client-streaming connections to improve throughput. It introduces a multiplexing dispatch worker per peer that uses biased selection to prioritize heartbeats over normal messages. Feedback was provided regarding the lifecycle management of long-lived streams, specifically suggesting that the stream context be derived from a parent context rather than using context.Background().
| return ps.stream, nil | ||
| } | ||
|
|
||
| streamCtx, cancel := context.WithCancel(context.Background()) |
There was a problem hiding this comment.
Using context.Background() for a long-lived gRPC stream is acceptable here because the lifecycle is manually managed via the cancel function stored in peerStream. However, to ensure better integration with the transport's overall lifecycle and adhere to repository guidelines, consider if this should be derived from a context passed during transport initialization or linked to a parent context that is cancelled on Close().
References
- When designing interfaces, use context.Context for managing deadlines and cancellation. Avoid providing a separate time.Duration timeout parameter in method signatures to prevent ambiguity regarding which timeout takes precedence.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing the previous per-message unary RPC model. Key changes include the introduction of a multiplexing dispatch worker that prioritizes heartbeats using a biased select, the addition of a SendStream RPC to the protobuf definition, and logic in the GRPCTransport to manage long-lived streams with a fallback to unary RPCs for peers that do not support streaming. The lifecycle of these streams is integrated into peer management and engine shutdown processes. I have no feedback to provide as all review comments were filtered out.
…riority - TestDispatchRegularUsesStreamWhenSupported: verifies messages go through stream.Send (not unary) when the peer supports SendStream - TestDispatchRegularFallsBackToUnaryOnUnimplemented: verifies noStream is set and unary Send is used when peer returns Unimplemented on SendStream - TestDispatchRegularClosesStreamOnSendError: verifies the stream is evicted from the cache after stream.Send returns an error - TestRunMultiplexDispatchWorkerPrioritizesHeartbeats: verifies the biased- select drains all heartbeats before processing normal messages
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing unary RPCs with long-lived client-streaming connections and introducing a multiplexing dispatch worker that prioritizes heartbeats. Feedback was provided to optimize the getOrOpenStream function by moving the SendStream call outside of the streamsMu lock to reduce potential contention on the dispatch path during network operations.
| t.streamsMu.Lock() | ||
| defer t.streamsMu.Unlock() | ||
|
|
||
| // Re-check: another goroutine or closeStream may have raced. | ||
| if _, skip = t.noStream[nodeID]; skip { | ||
| return nil, errStreamNotSupported | ||
| } | ||
| if ps, ok = t.streams[nodeID]; ok { | ||
| return ps.stream, nil | ||
| } | ||
|
|
||
| streamCtx, cancel := context.WithCancel(ctx) | ||
| stream, err := client.SendStream(streamCtx) | ||
| if err != nil { | ||
| cancel() | ||
| if status.Code(err) == codes.Unimplemented { | ||
| t.noStream[nodeID] = struct{}{} | ||
| return nil, errStreamNotSupported | ||
| } | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| t.streams[nodeID] = &peerStream{stream: stream, cancel: cancel} | ||
| return stream, nil |
There was a problem hiding this comment.
The streamsMu lock is currently held during the client.SendStream(streamCtx) call. While SendStream (which calls NewStream internally) is generally fast, it is a network-related operation that can block if the gRPC balancer is not ready, if it's waiting for HTTP/2 settings, or if there are connection issues. Holding a global transport lock across this call can cause all dispatchers for all peers to block, even those that already have established streams, as they must acquire at least an RLock on the same mutex in the fast path.
It is better to initiate the stream outside the lock and then acquire the lock only to install it, using double-checked locking to handle potential races.
streamCtx, cancel := context.WithCancel(ctx)
stream, err := client.SendStream(streamCtx)
if err != nil {
cancel()
if status.Code(err) == codes.Unimplemented {
t.streamsMu.Lock()
t.noStream[nodeID] = struct{}{}
t.streamsMu.Unlock()
return nil, errStreamNotSupported
}
return nil, errors.WithStack(err)
}
t.streamsMu.Lock()
defer t.streamsMu.Unlock()
// Re-check: another goroutine or closeStream may have raced.
if _, skip = t.noStream[nodeID]; skip {
cancel()
return nil, errStreamNotSupported
}
if ps, ok = t.streams[nodeID]; ok {
cancel()
return ps.stream, nil
}
t.streams[nodeID] = &peerStream{stream: stream, cancel: cancel}
return stream, nilReferences
- For frequently accessed fields or hot paths, prefer strategies that reduce lock contention (like atomic operations or minimizing lock hold time) to improve performance.
…tch reads client.SendStream is a network operation (TLS handshake + connect) that can block for hundreds of milliseconds. Holding streamsMu.Lock during that call stalls all concurrent RLock readers on the dispatch hot path. Fix: release streamsMu before calling client.SendStream, then re-acquire to install the new stream. A final re-check after re-acquire handles the race where two goroutines both observe no stream and both connect; the loser cancels its stream and returns the winner's.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing the previous unary RPC model with long-lived client-streaming connections. It introduces a multiplexing dispatch worker that uses a biased select to prioritize heartbeat messages, ensuring they are not starved by bursts of log entries. The transport layer now supports automatic fallback to unary RPCs for peers that do not yet support the streaming interface. Additionally, the PR includes robust connection management, stream lifecycle handling, and comprehensive unit tests for the new dispatching and transport logic. I have no feedback to provide.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing per-message unary RPCs with long-lived client-streaming connections. Key changes include a new multiplexing dispatch worker with biased selection to prioritize heartbeats, and a fallback mechanism to unary RPCs for peers not supporting the streaming interface. Feedback suggests adding documentation to clarify the channel draining logic during shutdown and refactoring gRPC connection closure to occur outside of critical sections to prevent potential blocking of management operations.
| func (e *Engine) drainPendingRequests(pd *peerQueues) { | ||
| drainCh := func(ch <-chan dispatchRequest) { | ||
| for { | ||
| select { | ||
| case req, ok := <-ch: | ||
| if !ok { | ||
| return | ||
| } | ||
| if err := req.Close(); err != nil { | ||
| slog.Error("etcd raft dispatch: failed to close pending request", "err", err) | ||
| } | ||
| default: | ||
| return | ||
| } | ||
| } | ||
| } | ||
| drainCh(pd.heartbeat) | ||
| drainCh(pd.normal) | ||
| } |
There was a problem hiding this comment.
The drainPendingRequests function uses a non-blocking select with a default branch to drain channels. While this is safe during shutdown because the engine loop has stopped enqueuing new messages, it might return prematurely if the channel is not yet closed and a producer is slightly delayed. However, given the current architecture where removePeer (which closes the channels) and enqueueDispatchMessage are both called from the single engine event loop, this is likely safe. Consider adding a comment to clarify that this is intended to be a best-effort drain of currently buffered messages upon worker exit.
| if err := conn.Close(); err != nil { | ||
| slog.Warn("failed to close etcd raft peer connection", "address", address, "error", err) | ||
| } |
There was a problem hiding this comment.
Closing a gRPC connection while holding the transport's main mutex t.mu can lead to head-of-line blocking for other management operations if the Close() call blocks due to network or OS-level delays. Consider refactoring to perform the actual conn.Close() after releasing the lock. Additionally, ensure that the error returned by Close() is logged rather than silently ignored to ensure resource leaks or cleanup problems are visible.
References
- Do not silently ignore errors from Close() methods on resources like network connections. Log them to ensure resource leaks or other cleanup problems are visible.
… shutdown - closePeerConnLocked now returns the *grpc.ClientConn instead of calling conn.Close() internally; UpsertPeer and RemovePeer close the connection after releasing t.mu so a blocking conn.Close() cannot stall concurrent management operations holding the same lock - Add inline comment on runMultiplexDispatchWorker explaining why drainPendingRequests is deferred: ensures every buffered dispatchRequest is properly closed on worker exit regardless of the stop reason
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing the previous per-message unary RPC model. Key changes include the introduction of a single multiplexing dispatch worker per peer that utilizes a biased-select mechanism to prioritize heartbeats over normal messages. The transport layer now supports long-lived client-streaming connections via a new SendStream RPC, including logic for stream caching, automatic fallback to unary RPCs for incompatible peers, and robust resource cleanup during peer lifecycle events. I have no feedback to provide as the review comments were either validating the existing implementation or providing general advice without specific code improvements.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
internal/raftengine/etcd/engine.go (1)
2028-2053:runDispatchWorkeris now production-dead code.After
startPeerDispatcherswitched torunMultiplexDispatchWorker, the only remaining caller ofrunDispatchWorkerisTestRemovePeerClosesDispatcherAndDropsSubsequentMessagesinengine_test.go. The doc comment here ("retained for backward compatibility with tests") acknowledges this, but keeping two dispatch models in the source risks divergence (e.g., future changes to context/error semantics only applied to one).Consider either:
- Rewriting the one test to drive
runMultiplexDispatchWorker(same peerQueues, same close-on-remove contract) and deletingrunDispatchWorker, or- Gating it behind a
// test-onlybuild tag / file to make the dead-code status explicit.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/raftengine/etcd/engine.go` around lines 2028 - 2053, runDispatchWorker is now dead production code after startPeerDispatcher moved to runMultiplexDispatchWorker; either remove it or isolate it for tests. Update TestRemovePeerClosesDispatcherAndDropsSubsequentMessages to exercise runMultiplexDispatchWorker (create the same peerQueues and verify the close-on-remove behavior) so the test drives the current dispatch model, then delete the runDispatchWorker function and its doc comment; alternatively, if you prefer to keep it for tests only, move runDispatchWorker into a separate file with a // +build test or //go:build test build tag and update the test import so only test builds see it, ensuring the test still uses the same close-on-remove contract and context semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/design/raft-grpc-streaming-transport.md`:
- Around line 24-27: The paragraph incorrectly attributes the single
multiplexing dispatch goroutine to PR `#522`; update the text so PR `#522` is only
credited with introducing per-peer dispatch channels (the split), and explicitly
state that this PR replaces the prior two-worker design (described in §2
"Current architecture" and §3.2) with a single multiplexing dispatch goroutine
per peer (Option A) that uses biased-select for heartbeat priority. Modify the
sentence mentioning "PR `#522` introduced ... a single multiplexing dispatch
goroutine per peer" to instead mention PR `#522` introduced per-peer dispatch
channels and that this PR consolidates the two dispatch workers into one
multiplexing goroutine.
In `@internal/raftengine/etcd/grpc_transport.go`:
- Around line 178-237: The logs for failed conn.Close in UpsertPeer and
RemovePeer show the wrong/empty address because the code moves oldConn.Close
outside the lock but doesn't capture the connection's address; before releasing
t.mu capture the old connection address (e.g., oldAddr := existing.Address in
UpsertPeer and oldAddr := peer.Address in RemovePeer or return it from
closePeerConnLocked), keep oldConn and oldAddr local, then use oldAddr in the
subsequent slog.Warn calls (replace the current peer.Address/"" usage) so the
logged "address" field reflects the actual connection being closed.
---
Nitpick comments:
In `@internal/raftengine/etcd/engine.go`:
- Around line 2028-2053: runDispatchWorker is now dead production code after
startPeerDispatcher moved to runMultiplexDispatchWorker; either remove it or
isolate it for tests. Update
TestRemovePeerClosesDispatcherAndDropsSubsequentMessages to exercise
runMultiplexDispatchWorker (create the same peerQueues and verify the
close-on-remove behavior) so the test drives the current dispatch model, then
delete the runDispatchWorker function and its doc comment; alternatively, if you
prefer to keep it for tests only, move runDispatchWorker into a separate file
with a // +build test or //go:build test build tag and update the test import so
only test builds see it, ensuring the test still uses the same close-on-remove
contract and context semantics.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 771b7d1e-30c9-43e8-b58f-87cbb44eb21d
📒 Files selected for processing (5)
docs/design/raft-grpc-streaming-transport.mdinternal/raftengine/etcd/engine.gointernal/raftengine/etcd/engine_test.gointernal/raftengine/etcd/grpc_transport.gointernal/raftengine/etcd/grpc_transport_test.go
| if oldConn != nil { | ||
| if err := oldConn.Close(); err != nil { | ||
| slog.Warn("failed to close etcd raft peer connection", "address", peer.Address, "error", err) | ||
| } |
There was a problem hiding this comment.
UpsertPeer closes the old connection (for existing.Address), but the warning log reports the new peer.Address. This makes connection-close diagnostics misleading; log the address being closed (existing.Address) or capture it before overwriting the peer entry.
| if oldConn != nil { | ||
| if err := oldConn.Close(); err != nil { | ||
| slog.Warn("failed to close etcd raft peer connection", "address", "", "error", err) | ||
| } |
There was a problem hiding this comment.
RemovePeer’s connection-close warning logs an empty address ("address", ""). This loses the key detail needed to debug teardown failures; capture the peer address before releasing t.mu and log that value.
| func (t *GRPCTransport) dispatchRegular(ctx context.Context, msg raftpb.Message) error { | ||
| ctx, cancel := transportContext(ctx, defaultDispatchTimeout) | ||
| defer cancel() | ||
| if err := ctx.Err(); err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
|
|
||
| raw, err := msg.Marshal() | ||
| if err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| client, err := t.clientFor(msg.To) | ||
|
|
||
| stream, err := t.getOrOpenStream(ctx, msg.To) | ||
| if err != nil { | ||
| if errors.Is(err, errStreamNotSupported) { | ||
| return t.dispatchUnary(ctx, raw, msg.To) | ||
| } | ||
| return err | ||
| } | ||
|
|
||
| if err := stream.Send(&pb.EtcdRaftMessage{Message: raw}); err != nil { | ||
| t.closeStream(msg.To) | ||
| return errors.WithStack(err) | ||
| } | ||
| return nil |
There was a problem hiding this comment.
dispatchRegular no longer applies defaultDispatchTimeout when using the streaming path. Unlike unary RPCs, stream.Send can still block under HTTP/2 flow control or a stalled connection, and with no deadline this can hang the per-peer dispatch worker indefinitely (reintroducing queue buildup/drop). Consider enforcing a bounded send time (e.g., cancel/evict the stream if Send exceeds defaultDispatchTimeout) or otherwise ensuring a deadline is applied for the streaming send path as well.
| func (t *GRPCTransport) closeStream(nodeID uint64) { | ||
| t.streamsMu.Lock() | ||
| ps, ok := t.streams[nodeID] | ||
| if ok { | ||
| delete(t.streams, nodeID) | ||
| } | ||
| t.streamsMu.Unlock() | ||
| if ok { | ||
| ps.cancel() | ||
| } |
There was a problem hiding this comment.
closeStream only cancels the stream context; it never calls CloseSend/CloseAndRecv on the client stream. Relying solely on cancellation can leave the server handler exiting via context-canceled errors and can delay client-side resource cleanup. Consider best-effort stream.CloseSend() (and/or CloseAndRecv if you want the final ack) before canceling.
- Fix slog.Warn in UpsertPeer to log old connection address (existing.Address) instead of the new peer address; fix RemovePeer to log the captured peer address instead of an empty string literal - Call ps.stream.CloseSend() before ps.cancel() in closeStream so the server receives io.EOF rather than a context-cancelled error - Add comment to drainPendingRequests explaining why the non-blocking select is safe: the engine event loop (sole producer) stops enqueuing before the multiplexing worker exits - Fix design doc §1 inconsistency: PR #522 introduced two workers per peer; the single multiplexing worker is introduced by this PR
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements gRPC streaming transport for Raft messages, replacing per-message unary RPCs with long-lived client-streaming connections. Key changes include the introduction of a single multiplexing dispatch goroutine per peer that uses a biased-select mechanism to prioritize heartbeats, and the addition of a streaming path in the gRPC transport with a fallback to unary RPCs for unsupported peers. Feedback was provided regarding the closeAllStreams method, suggesting that CloseSend() should be called and its errors logged before cancelling the stream context to ensure a cleaner shutdown.
| func (t *GRPCTransport) closeAllStreams() { | ||
| t.streamsMu.Lock() | ||
| old := t.streams | ||
| t.streams = make(map[uint64]*peerStream) | ||
| t.noStream = make(map[uint64]struct{}) | ||
| t.streamsMu.Unlock() | ||
| for _, ps := range old { | ||
| ps.cancel() | ||
| } | ||
| } |
There was a problem hiding this comment.
The closeAllStreams method cancels the context for each stream but does not call CloseSend(). While the underlying connection is closed shortly after, calling CloseSend() before cancellation is a cleaner way to signal EOF to the server. Additionally, ensure that any error returned by CloseSend() is logged rather than ignored, to maintain visibility into potential cleanup issues or resource leaks.
func (t *GRPCTransport) closeAllStreams() {
t.streamsMu.Lock()
old := t.streams
t.streams = make(map[uint64]*peerStream)
t.noStream = make(map[uint64]struct{})
t.streamsMu.Unlock()
for _, ps := range old {
if err := ps.stream.CloseSend(); err != nil {
log.Printf("failed to close stream: %v", err)
}
ps.cancel()
}
}References
- Do not silently ignore errors from Close() methods on resources like network connections. Log them to ensure resource leaks or other cleanup problems are visible.
… comment - Call CloseSend before cancel in closeAllStreams for consistent EOF signalling to server (matching closeStream which was already fixed) - Add comment in dispatchRegular explaining why stream.Send does not need a per-send deadline: gRPC send buffer is non-blocking under normal load (Raft bounds inflight messages), stream ctx cancellation handles shutdown, and gRPC keepalive handles stalled TCP connections
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing per-message unary RPCs with long-lived client-streaming connections to improve throughput. Key changes include the introduction of a multiplexing dispatch worker with biased-select logic to prioritize heartbeats and the addition of the SendStream RPC with a fallback mechanism for older peers. Feedback was provided regarding the silent ignoring of errors during stream teardown, which should be logged to ensure visibility into potential resource leaks.
| if ok { | ||
| // CloseSend signals EOF to the server before cancelling the context so | ||
| // the server's Recv loop sees io.EOF rather than a context-cancelled error. | ||
| _ = ps.stream.CloseSend() |
There was a problem hiding this comment.
Do not silently ignore the error from CloseSend(). Log the error to ensure that any issues during stream teardown or potential resource leaks are visible, as required by the repository rules for closing resources.
References
- Do not silently ignore errors from Close() methods on resources like network connections. Log them to ensure resource leaks or other cleanup problems are visible.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing the previous unary RPC model to improve throughput. It introduces a single multiplexing dispatch goroutine per peer using a biased-select mechanism to prioritize heartbeats over normal log entries. The transport layer now manages long-lived streams with an automatic fallback to unary RPCs if a peer does not support streaming. Feedback focuses on improving the robustness of the worker shutdown process to ensure all pending requests are drained, utilizing synchronization like singleflight when opening streams to prevent redundant network operations, and enhancing observability by logging specific gRPC error codes in the server-side stream handler.
| func (e *Engine) drainPendingRequests(pd *peerQueues) { | ||
| drainCh := func(ch <-chan dispatchRequest) { | ||
| for { | ||
| select { | ||
| case req, ok := <-ch: | ||
| if !ok { | ||
| return | ||
| } | ||
| if err := req.Close(); err != nil { | ||
| slog.Error("etcd raft dispatch: failed to close pending request", "err", err) | ||
| } | ||
| default: | ||
| return | ||
| } | ||
| } | ||
| } | ||
| drainCh(pd.heartbeat) | ||
| drainCh(pd.normal) | ||
| } |
There was a problem hiding this comment.
The drainPendingRequests function uses a non-blocking select with a default branch to drain the dispatch channels. While the multiplexing worker is the only consumer, if the engine's event loop (the producer) hasn't fully stopped or if there's a slight delay in message delivery, the default branch might trigger prematurely, leaving messages in the channel. Since dispatchRequest.Close() is currently a no-op, this is not critical, but for future-proofing and ensuring all buffered messages are processed (e.g., to release response futures), consider a blocking drain if the producer is guaranteed to stop.
| // Open the stream outside any lock: SendStream is a network operation that | ||
| // can block during TLS handshake or connect, and holding streamsMu would | ||
| // stall all concurrent reads on the dispatch hot path. | ||
| streamCtx, cancel := context.WithCancel(ctx) | ||
| stream, err := client.SendStream(streamCtx) | ||
| if err != nil { | ||
| cancel() | ||
| if status.Code(err) == codes.Unimplemented { | ||
| t.streamsMu.Lock() | ||
| t.noStream[nodeID] = struct{}{} | ||
| t.streamsMu.Unlock() | ||
| return nil, errStreamNotSupported | ||
| } | ||
| return nil, errors.WithStack(err) | ||
| } |
There was a problem hiding this comment.
Opening a gRPC stream is a network operation that can block. While it's done outside the streamsMu lock to avoid stalling other dispatchers, multiple concurrent calls to getOrOpenStream for the same nodeID could result in multiple streams being opened before the first one is installed in the map. Although the multiplexing worker model limits this to one goroutine per peer, adding a singleflight or a per-peer mutex for the dial operation would make this more robust against future changes in the worker model.
| func (t *GRPCTransport) SendStream(stream pb.EtcdRaft_SendStreamServer) error { | ||
| for { | ||
| req, err := stream.Recv() | ||
| if err != nil { | ||
| if errors.Is(err, io.EOF) { | ||
| return errors.WithStack(stream.SendAndClose(&pb.EtcdRaftAck{})) | ||
| } | ||
| return errors.WithStack(err) | ||
| } | ||
| var msg raftpb.Message | ||
| if err := msg.Unmarshal(req.Message); err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| if err := t.handle(stream.Context(), msg); err != nil { | ||
| if errors.Is(err, errStepQueueFull) { | ||
| slog.Warn("etcd raft SendStream: step queue full, dropping message", | ||
| "type", msg.Type.String(), | ||
| "from", msg.From, | ||
| "to", msg.To, | ||
| ) | ||
| continue | ||
| } | ||
| return err | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
The SendStream server handler correctly processes messages and handles backpressure by skipping errStepQueueFull. However, it doesn't explicitly handle the case where stream.Recv() returns an error that isn't io.EOF but might be transient. While closing the stream on any error is safe for Raft (which will retransmit), logging the specific gRPC status code would improve debuggability of transport issues.
|
/gemini review |
| // Transient backpressure (errStepQueueFull) is logged and skipped rather than | ||
| // tearing down the stream — identical to how the unary Send handler behaves. |
There was a problem hiding this comment.
SendStream’s docstring says dropping on errStepQueueFull is “identical” to the unary Send handler, but unary Send currently returns the handler error to the caller. Either align the behaviors or adjust the comment so it reflects the actual semantics (streaming silently drops messages on backpressure).
| // Transient backpressure (errStepQueueFull) is logged and skipped rather than | |
| // tearing down the stream — identical to how the unary Send handler behaves. | |
| // Transient backpressure (errStepQueueFull) is logged and the message is | |
| // dropped so the stream can continue; other handler errors terminate the RPC. |
| if err := msg.Unmarshal(req.Message); err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| if err := t.handle(stream.Context(), msg); err != nil { | ||
| if errors.Is(err, errStepQueueFull) { | ||
| slog.Warn("etcd raft SendStream: step queue full, dropping message", |
There was a problem hiding this comment.
When errStepQueueFull occurs, this logs a Warn per dropped message. Under load this can flood logs and add CPU overhead. Consider rate-limiting/sampling these warnings (e.g., periodic counter-based logging) or lowering severity, while still keeping some visibility into sustained backpressure.
| return ps.stream, nil | ||
| } | ||
| t.streamsMu.Unlock() | ||
|
|
||
| // Open the stream outside any lock: SendStream is a network operation that | ||
| // can block during TLS handshake or connect, and holding streamsMu would |
There was a problem hiding this comment.
The stream is opened with streamCtx derived from ctx and no explicit timeout/dial deadline. If the underlying connection establishment or HTTP/2 stream creation blocks (as noted in the comment), this can stall the per-peer dispatch goroutine indefinitely. Consider adding a bounded dial/connect timeout in the gRPC dial options (e.g., a context dialer with Timeout) and/or keepalive so stream setup and stuck sends fail within a predictable time.
| delete(t.streams, nodeID) | ||
| } | ||
| t.streamsMu.Unlock() | ||
| if ok { | ||
| // CloseSend signals EOF to the server before cancelling the context so | ||
| // the server's Recv loop sees io.EOF rather than a context-cancelled error. | ||
| if err := ps.stream.CloseSend(); err != nil { |
There was a problem hiding this comment.
closeStream/closeAllStreams call CloseSend and then cancel the stream context but never call CloseAndRecv to drain the final response/trailers. In gRPC-Go, not draining a client-streaming RPC can leak resources (goroutines/transport buffers) until GC. Consider calling CloseAndRecv when tearing down a stream (best-effort, with bounded wait) to ensure the RPC is fully cleaned up.
| // Stalled TCP connections are detected by gRPC keepalive and fail the | ||
| // stream; the stream context (derived from ctx) is also cancelled on engine | ||
| // shutdown, unblocking any in-progress Send. |
There was a problem hiding this comment.
The comment here says stalled TCP connections are detected by gRPC keepalive, but the transport dial/server options don’t appear to configure keepalive (see internal/grpc.go:21-28). Either add explicit keepalive params (and/or a dial timeout) so blocked stream.Send calls reliably fail, or update/remove this comment to avoid overstating the behavior.
| // Stalled TCP connections are detected by gRPC keepalive and fail the | |
| // stream; the stream context (derived from ctx) is also cancelled on engine | |
| // shutdown, unblocking any in-progress Send. | |
| // The stream context is derived from ctx and is cancelled on engine | |
| // shutdown, which unblocks any in-progress Send. Connection liveness | |
| // detection depends on the underlying gRPC transport configuration. |
| // streams holds one long-lived SendStream RPC per peer node ID. | ||
| // Each entry is owned by the single per-peer multiplexing dispatch goroutine. | ||
| streams map[uint64]*peerStream | ||
| // noStream records peers that returned codes.Unimplemented on SendStream; | ||
| // dispatchRegular falls back to unary Send for those peers. | ||
| noStream map[uint64]struct{} |
There was a problem hiding this comment.
This transport caches a shared per-peer stream and calls stream.Send without any per-peer serialization inside GRPCTransport. If Dispatch/dispatchRegular is ever called concurrently for the same nodeID (outside Engine’s single-writer worker), this will race because gRPC client streams are not goroutine-safe. Consider enforcing single-writer at the transport layer (e.g., a per-peer mutex in peerStream) or documenting that callers must serialize sends per peer.
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing the previous unary RPC-based transport. It introduces a single multiplexing dispatch goroutine per peer that utilizes a biased-select mechanism to prioritize heartbeats over normal messages. The transport now supports a long-lived SendStream RPC with an automatic fallback to unary Send for peers that do not yet support streaming. Feedback was provided regarding the potential for the dispatch worker to block on stream.Send during stalled TCP connections, suggesting the need for additional timeout mechanisms beyond standard gRPC keepalives.
| // stream.Send enqueues the message into gRPC's send buffer and returns | ||
| // immediately under normal conditions. It can block briefly under HTTP/2 | ||
| // flow control, but Raft bounds in-flight messages via MaxInflightMsg, so | ||
| // the send buffer will not saturate during steady-state operation. | ||
| // Stalled TCP connections are detected by gRPC keepalive and fail the | ||
| // stream; the stream context (derived from ctx) is also cancelled on engine | ||
| // shutdown, unblocking any in-progress Send. |
There was a problem hiding this comment.
The stream.Send call lacks a per-message timeout. While Raft's MaxInflightMsg bounds the number of pending messages, a stalled TCP connection that hasn't yet been detected by gRPC keepalives could cause the single multiplexing dispatch worker to block indefinitely. This would starve heartbeats for that peer. Consider if a select with a timer or a shorter stream-level deadline is appropriate, although gRPC's Keepalive is the standard mitigation for streaming transport.
- peerStream: document single-writer requirement (stream.Send is not goroutine-safe; must be owned by exactly one goroutine — the per-peer multiplexing dispatch worker) - SendStream doc: replace "identical to unary" with accurate description: errStepQueueFull drops the message and keeps the stream alive; other errors close the stream and rely on Raft retransmission - dispatchRegular: correct keepalive claim — gRPC keepalive is not configured here; stalled TCP connections fall back to OS-level keepalive unless the caller adds grpc.WithKeepalive
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing per-message unary RPCs with long-lived client-streaming connections to reduce overhead and improve throughput. Key architectural changes include the introduction of a single multiplexing dispatch goroutine per peer that uses biased selection to prioritize heartbeats, and a transport layer that supports the new SendStream RPC with a fallback to unary Send for backward compatibility. Peer management logic in the GRPCTransport was also refactored to handle connection and stream lifecycles outside of primary locks to prevent blocking. Feedback was provided regarding a potential performance bottleneck in the peer connection closure logic, which currently performs an O(N) iteration over all peers.
| for id, peer := range t.peers { | ||
| if peer.Address == address { | ||
| nodeIDs = append(nodeIDs, id) | ||
| } | ||
| } |
There was a problem hiding this comment.
Iterating over all peers in closePeerConnLocked is O(N). While Raft clusters are typically small, this could become a bottleneck for management operations in clusters with many learners or during large-scale membership changes. Consider maintaining a reverse mapping from address to node IDs if the peer count is expected to grow significantly.
Without keepalive, stalled TCP connections (e.g. silently dropped by a NAT or load balancer) are not detected until the OS-level TCP keepalive fires — typically ~2 h. Long-lived SendStream RPCs introduced in this PR make this gap much more visible than the previous per-message unary model. Client: ping every 10 s while a stream is active; declare dead after 3 s Server: enforce MinTime=5 s so GOAWAY is not sent before the first ping Stalled connections are now detected within ~13 s (PingTime + PingTimeout) across all callers of GRPCDialOptions/GRPCServerOptions (Raft transport, KV client connection cache, Hashicorp Raft transport, demo server).
|
/gemini review |
| const GRPCMaxMessageBytes = 64 << 20 | ||
|
|
||
| // keepalive timing constants. | ||
| // - PingTime: interval between keepalive pings while a stream is active. |
There was a problem hiding this comment.
🚫 [golangci] reported by reviewdog 🐶
File is not properly formatted (gci)
There was a problem hiding this comment.
Code Review
This pull request implements a gRPC streaming transport for Raft messages, replacing the previous unary RPC model. Key changes include the introduction of a multiplexing dispatch worker per peer with biased-select to prioritize heartbeats, the addition of gRPC keepalive configurations to detect stalled connections, and a fallback mechanism to unary RPCs for peers that do not support streaming. The transport layer now manages long-lived streams with thread-safe initialization and proper cleanup during peer updates or engine shutdown. I have no feedback to provide.
Problem
Per-peer heartbeat channels (cap=64) fill up because the unary Send RPC blocks for one full RTT per message. At 100 heartbeats/s per peer, any RTT above ~640 ms causes the channel to overflow and messages to be dropped. Disk I/O and CPU are not the bottleneck; the RTT wait in the dispatch worker is the root cause.
Solution
Add SendStream (client-streaming RPC) to the EtcdRaft gRPC service. stream.Send() writes to the gRPC send buffer and returns immediately — no RTT wait — so the dispatch worker can enqueue messages at line rate.
Protocol
Sender (GRPCTransport)
Receiver (GRPCTransport)
Dispatch goroutine model (Engine)
Summary by CodeRabbit
New Features
Bug Fixes / Reliability
Documentation
Tests