Skip to content

Commit

Permalink
Add SDK Router message handling (#316)
Browse files Browse the repository at this point in the history
Co-authored-by: Stephen Buttolph <stephen@avalabs.org>
  • Loading branch information
joshua-kim and StephenButtolph committed Sep 8, 2023
1 parent 3f5dc8a commit d4e7f3a
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 75 deletions.
96 changes: 40 additions & 56 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils"
Expand Down Expand Up @@ -87,6 +88,7 @@ type network struct {
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests
router *p2p.Router // handles messages being sent to the generic networking SDK
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
Expand All @@ -99,11 +101,18 @@ type network struct {

// Set to true when Shutdown is called, after which all operations on this
// struct are no-ops.
//
// Invariant: Even though `closed` is an atomic, `lock` is required to be
// held when sending requests to guarantee that the network isn't closed
// during these calls. This is because closing the network cancels all
// outstanding requests, which means we must guarantee never to register a
// request that will never be fulfilled or cancelled.
closed utils.Atomic[bool]
}

func NewNetwork(appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
return &network{
router: router,
appSender: appSender,
codec: codec,
crossChainCodec: crossChainCodec,
Expand Down Expand Up @@ -172,10 +181,7 @@ func (n *network) sendAppRequest(nodeID ids.NodeID, request []byte, responseHand
log.Debug("sending request to peer", "nodeID", nodeID, "requestLen", len(request))
n.peers.TrackPeer(nodeID)

// generate requestID
requestID := n.requestIDGen
n.requestIDGen++

requestID := n.nextRequestID()
n.outstandingRequestHandlers[requestID] = responseHandler

nodeIDs := set.NewSet[ids.NodeID](1)
Expand Down Expand Up @@ -209,10 +215,7 @@ func (n *network) SendCrossChainRequest(chainID ids.ID, request []byte, handler
return nil
}

// generate requestID
requestID := n.requestIDGen
n.requestIDGen++

requestID := n.nextRequestID()
n.outstandingRequestHandlers[requestID] = handler

// Send cross chain request to [chainID].
Expand Down Expand Up @@ -272,19 +275,12 @@ func (n *network) CrossChainAppRequest(ctx context.Context, requestingChainID id
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) CrossChainAppRequestFailed(ctx context.Context, respondingChainID ids.ID, requestID uint32) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

log.Debug("received CrossChainAppRequestFailed from chain", "respondingChainID", respondingChainID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received CrossChainAppRequestFailed to unknown request", "respondingChainID", respondingChainID, "requestID", requestID)
// Can happen after the network has been closed.
log.Debug("received CrossChainAppRequestFailed to unknown request", "respondingChainID", respondingChainID, "requestID", requestID)
return nil
}

Expand All @@ -299,19 +295,12 @@ func (n *network) CrossChainAppRequestFailed(ctx context.Context, respondingChai
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) CrossChainAppResponse(ctx context.Context, respondingChainID ids.ID, requestID uint32, response []byte) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

log.Debug("received CrossChainAppResponse from responding chain", "respondingChainID", respondingChainID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received CrossChainAppResponse to unknown request", "respondingChainID", respondingChainID, "requestID", requestID, "responseLen", len(response))
// Can happen after the network has been closed.
log.Debug("received CrossChainAppResponse to unknown request", "respondingChainID", respondingChainID, "requestID", requestID, "responseLen", len(response))
return nil
}

Expand All @@ -335,8 +324,8 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u

var req message.Request
if _, err := n.codec.Unmarshal(request, &req); err != nil {
log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return nil
log.Debug("forwarding AppRequest to SDK router", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return n.router.AppRequest(ctx, nodeID, requestID, deadline, request)
}

bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats)
Expand Down Expand Up @@ -366,21 +355,13 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u
// Error returned by this function is expected to be treated as fatal by the engine
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
log.Debug("received AppResponse from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppResponse to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return nil
log.Debug("forwarding AppResponse to SDK router", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return n.router.AppResponse(ctx, nodeID, requestID, response)
}

// We must release the slot
Expand All @@ -395,21 +376,13 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui
// - request times out before a response is provided
// error returned by this function is expected to be treated as fatal by the engine
// returns error only when the response handler returns an error
func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error {
n.lock.Lock()
defer n.lock.Unlock()

if n.closed.Get() {
return nil
}

func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
log.Debug("received AppRequestFailed from peer", "nodeID", nodeID, "requestID", requestID)

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID)
return nil
log.Debug("forwarding AppRequestFailed to SDK router", "nodeID", nodeID, "requestID", requestID)
return n.router.AppRequestFailed(ctx, nodeID, requestID)
}

// We must release the slot
Expand Down Expand Up @@ -442,8 +415,11 @@ func calculateTimeUntilDeadline(deadline time.Time, stats stats.RequestHandlerSt

// markRequestFulfilled fetches the handler for [requestID] and marks the request with [requestID] as having been fulfilled.
// This is called by either [AppResponse] or [AppRequestFailed].
// Assumes that the write lock is held.
// Assumes that the write lock is not held.
func (n *network) markRequestFulfilled(requestID uint32) (message.ResponseHandler, bool) {
n.lock.Lock()
defer n.lock.Unlock()

handler, exists := n.outstandingRequestHandlers[requestID]
if !exists {
return nil, false
Expand All @@ -467,10 +443,6 @@ func (n *network) Gossip(gossip []byte) error {
// error returned by this function is expected to be treated as fatal by the engine
// returns error if request could not be parsed as message.Request or when the requestHandler returns an error
func (n *network) AppGossip(_ context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
if n.closed.Get() {
return nil
}

var gossipMsg message.GossipMessage
if _, err := n.codec.Unmarshal(gossipBytes, &gossipMsg); err != nil {
log.Debug("could not parse app gossip", "nodeID", nodeID, "gossipLen", len(gossipBytes), "err", err)
Expand Down Expand Up @@ -564,3 +536,15 @@ func (n *network) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {

n.peers.TrackBandwidth(nodeID, bandwidth)
}

// invariant: peer/network must use explicitly even request ids.
// for this reason, [n.requestID] is initialized as zero and incremented by 2.
// This is for backwards-compatibility while the SDK router exists with the
// legacy coreth handlers to avoid a (very) narrow edge case where request ids
// can overlap, resulting in a dropped timeout.
func (n *network) nextRequestID() uint32 {
next := n.requestIDGen
n.requestIDGen += 2

return next
}

0 comments on commit d4e7f3a

Please sign in to comment.