Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SDK Router message handling #316

Merged
merged 21 commits into from
Sep 8, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 32 additions & 28 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils"
Expand Down Expand Up @@ -87,23 +88,25 @@ type network struct {
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics
router *p2p.Router
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics

// Set to true when Shutdown is called, after which all operations on this
// struct are no-ops.
closed utils.Atomic[bool]
}

func NewNetwork(appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
return &network{
router: router,
appSender: appSender,
codec: codec,
crossChainCodec: crossChainCodec,
Expand Down Expand Up @@ -172,10 +175,7 @@ func (n *network) sendAppRequest(nodeID ids.NodeID, request []byte, responseHand
log.Debug("sending request to peer", "nodeID", nodeID, "requestLen", len(request))
n.peers.TrackPeer(nodeID)

// generate requestID
requestID := n.requestIDGen
n.requestIDGen++

requestID := n.nextRequestID()
n.outstandingRequestHandlers[requestID] = responseHandler

nodeIDs := set.NewSet[ids.NodeID](1)
Expand Down Expand Up @@ -209,10 +209,7 @@ func (n *network) SendCrossChainRequest(chainID ids.ID, request []byte, handler
return nil
}

// generate requestID
requestID := n.requestIDGen
n.requestIDGen++

requestID := n.nextRequestID()
n.outstandingRequestHandlers[requestID] = handler

// Send cross chain request to [chainID].
Expand Down Expand Up @@ -335,8 +332,8 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u

var req message.Request
if _, err := n.codec.Unmarshal(request, &req); err != nil {
log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return nil
log.Debug("forwarding unregistered AppRequest", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
return n.router.AppRequest(ctx, nodeID, requestID, deadline, request)
}

bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats)
Expand Down Expand Up @@ -366,7 +363,7 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u
// Error returned by this function is expected to be treated as fatal by the engine
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
n.lock.Lock()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since closed is atomic, could we move this lock to directly before
handler, exists := n.markRequestFulfilled(requestID)?

Should we move it to inside markRequestFulfilled instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that makes much more sense than what I was currently doing.

defer n.lock.Unlock()
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -378,9 +375,8 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppResponse to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return nil
log.Debug("forwarding unregistered AppResponse", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return n.router.AppResponse(ctx, nodeID, requestID, response)
}

// We must release the slot
Expand All @@ -395,7 +391,7 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui
// - request times out before a response is provided
// error returned by this function is expected to be treated as fatal by the engine
// returns error only when the response handler returns an error
func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error {
func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
n.lock.Lock()
defer n.lock.Unlock()
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved

Expand All @@ -407,9 +403,8 @@ func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, request

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID)
return nil
log.Debug("forwarding unregistered AppRequestFailed", "nodeID", nodeID, "requestID", requestID)
return n.router.AppRequestFailed(ctx, nodeID, requestID)
}

// We must release the slot
Expand Down Expand Up @@ -564,3 +559,12 @@ func (n *network) TrackBandwidth(nodeID ids.NodeID, bandwidth float64) {

n.peers.TrackBandwidth(nodeID, bandwidth)
}

// invariant: peer/network must use explicitly even request ids.
joshua-kim marked this conversation as resolved.
Show resolved Hide resolved
// for this reason, [n.requestID] is initialized as zero and incremented by 2.
func (n *network) nextRequestID() uint32 {
next := n.requestIDGen
n.requestIDGen += 2

return next
}
100 changes: 82 additions & 18 deletions peer/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"testing"
"time"

"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
ethcommon "github.com/ethereum/go-ethereum/common"

Expand Down Expand Up @@ -49,11 +51,13 @@ var (

_ message.CrossChainRequest = &ExampleCrossChainRequest{}
_ message.CrossChainRequestHandler = &testCrossChainHandler{}

_ p2p.Handler = &testSDKHandler{}
)

func TestNetworkDoesNotConnectToItself(t *testing.T) {
selfNodeID := ids.GenerateTestNodeID()
n := NewNetwork(nil, nil, nil, selfNodeID, 1, 1)
n := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), nil, nil, nil, selfNodeID, 1, 1)
assert.NoError(t, n.Connected(context.Background(), selfNodeID, defaultPeerVersion))
assert.EqualValues(t, 0, n.Size())
}
Expand Down Expand Up @@ -89,7 +93,7 @@ func TestRequestAnyRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager})
client := NewNetworkClient(net)
nodeID := ids.GenerateTestNodeID()
Expand Down Expand Up @@ -164,7 +168,7 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -244,7 +248,7 @@ func TestAppRequestOnShutdown(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
client := NewNetworkClient(net)
nodeID := ids.GenerateTestNodeID()
require.NoError(t, net.Connected(context.Background(), nodeID, defaultPeerVersion))
Expand Down Expand Up @@ -293,7 +297,7 @@ func TestRequestMinVersion(t *testing.T) {
}

// passing nil as codec works because the net.AppRequest is never called
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16)
client := NewNetworkClient(net)
requestMessage := TestMessage{Message: "this is a request"}
requestBytes, err := message.RequestToBytes(codecManager, requestMessage)
Expand Down Expand Up @@ -356,7 +360,7 @@ func TestOnRequestHonoursDeadline(t *testing.T) {
processingDuration: 500 * time.Millisecond,
}

net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetRequestHandler(requestHandler)
nodeID := ids.GenerateTestNodeID()

Expand Down Expand Up @@ -396,7 +400,7 @@ func TestGossip(t *testing.T) {
}

gossipHandler := &testGossipHandler{}
clientNetwork = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(gossipHandler)

assert.NoError(t, clientNetwork.Connected(context.Background(), nodeID, defaultPeerVersion))
Expand All @@ -423,7 +427,7 @@ func TestHandleInvalidMessages(t *testing.T) {
requestID := uint32(1)
sender := testAppSender{}

clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{})
clientNetwork.SetRequestHandler(&testRequestHandler{})

Expand Down Expand Up @@ -457,12 +461,11 @@ func TestHandleInvalidMessages(t *testing.T) {
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), garbageResponse))
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), emptyResponse))
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), nilResponse))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, gossipMsg))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, requestMessage))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse))
assert.NoError(t, clientNetwork.AppRequestFailed(context.Background(), nodeID, requestID))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, gossipMsg))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, requestMessage))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse))
Comment on lines +464 to +468
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was written to ensure that an invalid message NEVER triggers an unintentional fatal error, so it seems a bit weird to change it in this way.

Copy link
Contributor Author

@joshua-kim joshua-kim Sep 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was unsure if it made more sense to just remove these test cases or just test for the sdk error. I guess now the property that we have is that invalid messages are always forwarded into the router, so we can either check for this error, get rid these tests, or write a Router interface that p2p.Router implements but maybe that's overkill.

}

func TestNetworkPropagatesRequestHandlerError(t *testing.T) {
Expand All @@ -473,7 +476,7 @@ func TestNetworkPropagatesRequestHandlerError(t *testing.T) {
requestID := uint32(1)
sender := testAppSender{}

clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{})
clientNetwork.SetRequestHandler(&testRequestHandler{err: errors.New("fail")}) // Return an error from the request handler

Expand Down Expand Up @@ -513,7 +516,7 @@ func TestCrossChainAppRequest(t *testing.T) {
},
}

net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -568,7 +571,7 @@ func TestCrossChainRequestRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, TestMessage{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -628,7 +631,7 @@ func TestCrossChainRequestOnShutdown(t *testing.T) {
}
codecManager := buildCodec(t, TestMessage{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
client := NewNetworkClient(net)

exampleCrossChainRequest := ExampleCrossChainRequest{
Expand All @@ -649,6 +652,48 @@ func TestCrossChainRequestOnShutdown(t *testing.T) {
require.True(t, called)
}

func TestSDKRouting(t *testing.T) {
require := require.New(t)
sender := &testAppSender{
sendAppRequestFn: func(s set.Set[ids.NodeID], u uint32, bytes []byte) error {
return nil
},
sendAppResponseFn: func(id ids.NodeID, u uint32, bytes []byte) error {
return nil
},
}
protocol := 0
handler := &testSDKHandler{}
router := p2p.NewRouter(logging.NoLog{}, sender)
_, err := router.RegisterAppProtocol(uint64(protocol), handler)
require.NoError(err)

networkCodec := codec.NewManager(0)
crossChainCodec := codec.NewManager(0)

network := NewNetwork(
router,
nil,
networkCodec,
crossChainCodec,
ids.EmptyNodeID,
1,
1,
)

nodeID := ids.GenerateTestNodeID()
foobar := append([]byte{byte(protocol)}, []byte("foobar")...)
err = network.AppRequest(context.Background(), nodeID, 0, time.Time{}, foobar)
require.NoError(err)
require.True(handler.appRequested)

err = network.AppResponse(context.Background(), ids.GenerateTestNodeID(), 0, foobar)
require.ErrorIs(err, p2p.ErrUnrequestedResponse)

err = network.AppRequestFailed(context.Background(), nodeID, 0)
require.ErrorIs(err, p2p.ErrUnrequestedResponse)
}

func buildCodec(t *testing.T, types ...interface{}) codec.Manager {
codecManager := codec.NewDefaultManager()
c := linearcodec.NewDefault()
Expand Down Expand Up @@ -850,3 +895,22 @@ type testCrossChainHandler struct {
func (t *testCrossChainHandler) HandleCrossChainRequest(ctx context.Context, requestingChainID ids.ID, requestID uint32, exampleRequest message.CrossChainRequest) ([]byte, error) {
return t.codec.Marshal(message.Version, ExampleCrossChainResponse{Response: "this is an example response"})
}

type testSDKHandler struct {
appRequested bool
}

func (t *testSDKHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
// TODO implement me
panic("implement me")
}

func (t *testSDKHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
t.appRequested = true
return nil, nil
}

func (t *testSDKHandler) CrossChainAppRequest(ctx context.Context, chainID ids.ID, deadline time.Time, requestBytes []byte) ([]byte, error) {
// TODO implement me
panic("implement me")
}
6 changes: 5 additions & 1 deletion plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

avalanchegoMetrics "github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/network/p2p"

"github.com/ava-labs/coreth/consensus/dummy"
corethConstants "github.com/ava-labs/coreth/constants"
Expand Down Expand Up @@ -276,6 +277,8 @@ type VM struct {
client peer.NetworkClient
networkCodec codec.Manager

router *p2p.Router

// Metrics
multiGatherer avalanchegoMetrics.MultiGatherer

Expand Down Expand Up @@ -506,8 +509,9 @@ func (vm *VM) Initialize(
}

// initialize peer network
vm.router = p2p.NewRouter(vm.ctx.Log, appSender)
vm.networkCodec = message.Codec
vm.Network = peer.NewNetwork(appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests)
vm.Network = peer.NewNetwork(vm.router, appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests)
vm.client = peer.NewNetworkClient(vm.Network)

if err := vm.initializeChain(lastAcceptedHash); err != nil {
Expand Down