Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SDK Router message handling #316

Merged
merged 21 commits into from
Sep 8, 2023
Merged
6 changes: 4 additions & 2 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,16 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
n.lock.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since closed is atomic, could we move this lock to directly before
handler, exists := n.markRequestFulfilled(requestID)?

Should we move it to inside markRequestFulfilled instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes much more sense than what I was currently doing.

defer n.lock.Unlock()

if n.closed.Get() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very subtle change. So we should be terrified to make it.

I believe we must hold the lock when checking for n.closed on all of the inbound response + requestFailed flows to avoid a potential panic due to a write to a closed channel (or closing a channel twice).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline I'll see if I can make a separate regression test for this invariant as a follow-up to this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I documented the invariant on n.closed as well 👍

n.lock.Unlock()
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we okay with dropping responses (and timeouts) that should have been sent to the SDK router after we close the network?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are not okay with this - then I think we'll need to be fairly careful around what gets passed through to the router.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We chatted offline but agreed it was cleanest to leave the Router code as-is, and just drop the closed check on the server-end because we're guaranteed that the outstanding request is empty on shutdown because we empty it on Shutdown and stop sending requests after the flag is set.

}

log.Debug("received AppResponse from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
n.lock.Unlock()
if !exists {
log.Debug("forwarding AppResponse to SDK router", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return n.router.AppResponse(ctx, nodeID, requestID, response)
Expand All @@ -393,15 +394,16 @@ func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID
// returns error only when the response handler returns an error
func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
n.lock.Unlock()
return nil
}

log.Debug("received AppRequestFailed from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
n.lock.Unlock()
if !exists {
log.Debug("forwarding AppRequestFailed to SDK router", "nodeID", nodeID, "requestID", requestID)
return n.router.AppRequestFailed(ctx, nodeID, requestID)
Expand Down