-
Notifications
You must be signed in to change notification settings - Fork 2
refactor: async Handshake network support #434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. 📝 WalkthroughWalkthroughReplaces the synchronous receive flow in internal/handshake/protocol/peer.go with an asynchronous, channel-driven model. Adds setupConnection that initializes per-message channels (handshakeCh, headersCh, blockCh, addrCh, proofCh), errorCh, doneCh, and starts recvLoop. Introduces recvLoop and handleMessage to decode and route incoming messages to the appropriate channels or errorCh. Adds mutexes (mu, sendMu), hasConnected flag, and a serialized sendMessage path. Reworks handshake, GetPeers, GetHeaders, GetProof, and GetBlock to wait on channels with timeouts and propagate errors via errorCh. Adds ErrorChan() accessor and a close path that cleans up channels/state. Estimated code review effort🎯 4 (Complex) | ⏱️ ~45–60 minutes
Possibly related PRs
Pre-merge checks and finishing touches✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
d2c4316 to
69d895f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🧹 Nitpick comments (4)
internal/handshake/protocol/peer.go (4)
54-54: Remove redundant assignment.Line 54 assigns
p.conn = conn, butconnis already assigned top.connduring struct initialization on line 49.Apply this diff:
- p.conn = conn p.address = conn.RemoteAddr().String()
148-189: Consider adding graceful shutdown mechanism for recvLoop.
recvLoopruns indefinitely until an error occurs, with no way to gracefully stop the goroutine without closing the underlying connection. Consider adding a context or done channel to allow controlled shutdown, which would prevent goroutine leaks and enable cleaner testing.Example approach using a done channel:
type Peer struct { address string conn net.Conn networkMagic uint32 mu sync.Mutex hasConnected bool + doneCh chan struct{} errorCh chan errorfunc (p *Peer) recvLoop() { err := func() error { for { + select { + case <-p.doneCh: + return nil + default: + } headerBuf := make([]byte, messageHeaderLength)func (p *Peer) Close() error { p.mu.Lock() defer p.mu.Unlock() if p.conn == nil { return errors.New("connection is not established") } + close(p.doneCh) if err := p.conn.Close(); err != nil { return err }
51-51: Document rationale for channel buffer sizes.The buffer sizes for
errorCh(5) and message channels (10) appear arbitrary. Consider documenting the rationale for these sizes or making them configurable if they need to be tuned for different workloads.Also applies to: 114-118
148-189: Close message channels when recvLoop exits.The message channels (
handshakeCh,headersCh,blockCh,addrCh,proofCh) created insetupConnectionare never closed. While they'll be garbage collected, it's better practice to close them whenrecvLoopexits to signal consumers that no more messages will arrive.Apply this diff:
func (p *Peer) recvLoop() { + defer func() { + close(p.handshakeCh) + close(p.headersCh) + close(p.blockCh) + close(p.addrCh) + close(p.proofCh) + }() err := func() error {Note: Consumers would need to be updated to handle closed channels gracefully.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/handshake/protocol/peer.go(10 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/handshake/protocol/peer.go (3)
internal/handshake/protocol/messages.go (10)
Message(51-54)UnsupportedMessageTypeError(43-45)MsgVersion(183-192)MsgVerack(291-291)MsgVerack(293-296)MsgVerack(298-301)MsgAddr(319-321)MsgHeaders(422-424)MsgBlock(460-462)MsgProof(499-503)internal/handshake/spv.go (1)
Proof(63-74)internal/handshake/block.go (1)
Block(22-25)
🔇 Additional comments (1)
internal/handshake/protocol/peer.go (1)
238-263: Good error handling in handshake.The refactored handshake properly checks
errorChin both select statements (lines 243, 254), enabling fast failure whenrecvLoopencounters errors. This is the correct pattern that should be applied to the request methods as well.
ddcf0e7 to
e1bc253
Compare
|
@coderabbitai please review this |
|
@agaffney I'll review the changes in this PR. This looks like a significant refactoring to introduce asynchronous message handling with proper concurrency control and timeout management for the Handshake network protocol. ✅ Actions performedReview triggered.
|
|
@cubic-dev-ai please review this |
@agaffney I've started the AI code review. It'll take a few minutes to complete. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2 issues found across 1 file
Prompt for AI agents (all 2 issues)
Understand the root cause of the following 2 issues and fix them.
<file name="internal/handshake/protocol/peer.go">
<violation number="1" location="internal/handshake/protocol/peer.go:102">
Close now nils `p.conn` while the recvLoop goroutine still dereferences it, so closing a peer mid-stream can panic when the loop tries to read from a nil connection.</violation>
<violation number="2" location="internal/handshake/protocol/peer.go:207">
Unhandled but valid protocol messages (e.g., MsgPing) now trigger `handleMessage`’s default error, causing recvLoop to close the connection whenever a peer sends a keepalive or other unlisted message.</violation>
</file>
Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR
| if err := p.conn.Close(); err != nil { | ||
| return err | ||
| } | ||
| p.conn = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Close now nils p.conn while the recvLoop goroutine still dereferences it, so closing a peer mid-stream can panic when the loop tries to read from a nil connection.
Prompt for AI agents
Address the following comment on internal/handshake/protocol/peer.go at line 102:
<comment>Close now nils `p.conn` while the recvLoop goroutine still dereferences it, so closing a peer mid-stream can panic when the loop tries to read from a nil connection.</comment>
<file context>
@@ -51,34 +67,69 @@ func NewPeer(conn net.Conn, networkMagic uint32) (*Peer, error) {
if err := p.conn.Close(); err != nil {
return err
}
+ p.conn = nil
+ close(p.doneCh)
+ return nil
</file context>
Signed-off-by: Aurora Gaffney <aurora@blinklabs.io>
e1bc253 to
b77012e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (2)
internal/handshake/protocol/peer.go (2)
196-212: Major: Unhandled protocol messages cause disconnect.The default case on line 209 returns an error for any message type not explicitly handled. Since
recvLoopcloses the connection on anyhandleMessageerror (line 192), valid protocol messages likeMsgPing(keepalive) will disconnect the peer.Apply this diff to ignore unhandled messages instead of disconnecting:
func (p *Peer) handleMessage(msg Message) error { switch msg.(type) { case *MsgVersion, *MsgVerack: p.handshakeCh <- msg case *MsgAddr: p.addrCh <- msg case *MsgHeaders: p.headersCh <- msg case *MsgBlock: p.blockCh <- msg case *MsgProof: p.proofCh <- msg default: - return fmt.Errorf("unknown message type: %T", msg) + // Ignore unhandled message types (e.g., MsgPing keepalives) + return nil } return nil }Based on learnings.
151-156: Critical: Data race onp.connaccess.Line 155 reads
p.connwithout holdingp.mu, butClose()writesp.conn = nilon line 102 while holding the mutex. This is a data race that the Go race detector will flag.The local variable assignment prevents a nil pointer dereference but does not prevent the race itself.
Consider one of these solutions:
Solution 1: Read connection under mutex protection
func (p *Peer) recvLoop() { err := func() error { // Assign to local var to avoid nil deref panic on shutdown - conn := p.conn + p.mu.Lock() + conn := p.conn + p.mu.Unlock() + if conn == nil { + return errors.New("connection is nil") + } for {Solution 2: Use atomic.Value for p.conn
Storep.connin anatomic.Valueand use atomic loads/stores throughout.Solution 3: Rely on
doneChfor clean shutdownfunc (p *Peer) recvLoop() { err := func() error { - // Assign to local var to avoid nil deref panic on shutdown - conn := p.conn for { + select { + case <-p.doneCh: + return nil + default: + } + p.mu.Lock() + conn := p.conn + p.mu.Unlock() + if conn == nil { + return errors.New("connection closed") + } headerBuf := make([]byte, messageHeaderLength)(Though this adds overhead to every iteration.)
🧹 Nitpick comments (2)
internal/handshake/protocol/peer.go (2)
241-250: Recommend checkingdoneChfor consistency.The first select statement waits for Verack but doesn't check
doneCh, while the second select (lines 252-263) does. If the connection closes during the first wait, the handshake won't detect it until timeout (1 second).Apply this diff for consistent shutdown detection:
// Wait for Verack response select { case msg := <-p.handshakeCh: if _, ok := msg.(*MsgVerack); !ok { return fmt.Errorf("unexpected message: %T", msg) } case err := <-p.errorCh: return fmt.Errorf("handshake failed: %w", err) + case <-p.doneCh: + return errors.New("connection has shut down") case <-time.After(1 * time.Second): return errors.New("handshake timed out") }
277-288: Recommend checkingerrorChfor more informative error messages.When
recvLoopencounters an error, it sends toerrorChthen callsClose()(which closesdoneCh). This method checksdoneChand returns "connection has shut down", but doesn't checkerrorCh, so the caller doesn't get the underlying error (e.g., decode failure, network error) without separately monitoringErrorChan().Apply this diff to provide more informative errors:
// Wait for Addr response select { case msg := <-p.addrCh: msgAddr, ok := msg.(*MsgAddr) if !ok { return nil, fmt.Errorf("unexpected message: %T", msg) } return msgAddr.Peers, nil + case err := <-p.errorCh: + return nil, fmt.Errorf("receive error: %w", err) case <-p.doneCh: return nil, errors.New("connection has shut down") case <-time.After(5 * time.Second): return nil, errors.New("timed out") }Apply similar changes to
GetHeaders(lines 301-312),GetProof(lines 326-337), andGetBlock(lines 354-365).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
internal/handshake/protocol/peer.go(10 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
internal/handshake/protocol/peer.go (1)
internal/handshake/protocol/messages.go (10)
Message(51-54)UnsupportedMessageTypeError(43-45)MsgVersion(183-192)MsgVerack(291-291)MsgVerack(293-296)MsgVerack(298-301)MsgAddr(319-321)MsgHeaders(422-424)MsgBlock(460-462)MsgProof(499-503)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Analyze (go)
🔇 Additional comments (5)
internal/handshake/protocol/peer.go (5)
35-44: LGTM: Well-designed concurrency primitives.The separation of
mu(state) andsendMu(send serialization), lifecycle channels (doneCh,errorCh), and per-message-type channels provides a solid foundation for asynchronous message handling.
53-54: LGTM: Appropriate channel initialization.Unbuffered
doneChfor synchronization and bufferederrorCh(size 5) for async error reporting are correctly sized for this use case.
112-127: LGTM: Correct initialization order.Channels are created before
recvLoopstarts, preventing nil channel panics. The handshake correctly waits on channels populated by the receive loop.
93-105: LGTM: Proper close semantics with shutdown signaling.The close logic correctly:
- Guards against double-close via the
p.conn == nilcheck- Signals shutdown via
doneChto unblock waiting operations- Keeps
errorChopen for the lifetime of the peer (per maintainer design)Note: The assignment
p.conn = nilon line 102 creates a race withrecvLoopreadingp.connon line 155, which is flagged separately.
130-149: LGTM: Serialized send path.Using a dedicated
sendMuto serialize concurrent sends is a good practice that avoids holding the main state mutex during I/O operations.
|
@cubic-dev-ai please review this |
@agaffney I've started the AI code review. It'll take a few minutes to complete. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 issue found across 1 file
Prompt for AI agents (all 1 issues)
Understand the root cause of the following 1 issues and fix them.
<file name="internal/handshake/protocol/peer.go">
<violation number="1" location="internal/handshake/protocol/peer.go:209">
Receiving valid request messages (e.g., getaddr/getheaders/getdata/getproof) now triggers handleMessage’s default error, and recvLoop closes the connection as soon as it sees those messages. This causes unnecessary disconnects from compliant peers instead of ignoring or responding to the requests.</violation>
</file>
Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR
| case *MsgProof: | ||
| p.proofCh <- msg | ||
| default: | ||
| return fmt.Errorf("unknown message type: %T", msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Receiving valid request messages (e.g., getaddr/getheaders/getdata/getproof) now triggers handleMessage’s default error, and recvLoop closes the connection as soon as it sees those messages. This causes unnecessary disconnects from compliant peers instead of ignoring or responding to the requests.
Prompt for AI agents
Address the following comment on internal/handshake/protocol/peer.go at line 209:
<comment>Receiving valid request messages (e.g., getaddr/getheaders/getdata/getproof) now triggers handleMessage’s default error, and recvLoop closes the connection as soon as it sees those messages. This causes unnecessary disconnects from compliant peers instead of ignoring or responding to the requests.</comment>
<file context>
@@ -97,37 +148,67 @@ func (p *Peer) sendMessage(msgType uint8, msgPayload Message) error {
+ case *MsgProof:
+ p.proofCh <- msg
+ default:
+ return fmt.Errorf("unknown message type: %T", msg)
+ }
+ return nil
</file context>
Summary by cubic
Refactors the peer protocol to use an async receive loop with per-message channels, enabling concurrent requests with clear timeouts and better error propagation. Improves connection lifecycle safety and prevents peer reuse after disconnect.
Written for commit b77012e. Summary will update automatically on new commits.
Summary by CodeRabbit
Bug Fixes
Refactor
New Features
✏️ Tip: You can customize this high-level summary in your review settings.