Skip to content

Commit

Permalink
add sdk router to network
Browse files Browse the repository at this point in the history
  • Loading branch information
joshua-kim committed Sep 5, 2023
1 parent 3f5dc8a commit 91c34d2
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 39 deletions.
42 changes: 22 additions & 20 deletions peer/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

"golang.org/x/sync/semaphore"

"github.com/ava-labs/avalanchego/network/p2p"

"github.com/ethereum/go-ethereum/log"

"github.com/ava-labs/avalanchego/codec"
Expand Down Expand Up @@ -87,23 +89,25 @@ type network struct {
outstandingRequestHandlers map[uint32]message.ResponseHandler // maps avalanchego requestID => message.ResponseHandler
activeAppRequests *semaphore.Weighted // controls maximum number of active outbound requests
activeCrossChainRequests *semaphore.Weighted // controls maximum number of active outbound cross chain requests
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics
router *p2p.Router
appSender common.AppSender // avalanchego AppSender for sending messages
codec codec.Manager // Codec used for parsing messages
crossChainCodec codec.Manager // Codec used for parsing cross chain messages
appRequestHandler message.RequestHandler // maps request type => handler
crossChainRequestHandler message.CrossChainRequestHandler // maps cross chain request type => handler
gossipHandler message.GossipHandler // maps gossip type => handler
peers *peerTracker // tracking of peers & bandwidth
appStats stats.RequestHandlerStats // Provide request handler metrics
crossChainStats stats.RequestHandlerStats // Provide cross chain request handler metrics

// Set to true when Shutdown is called, after which all operations on this
// struct are no-ops.
closed utils.Atomic[bool]
}

func NewNetwork(appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
func NewNetwork(router *p2p.Router, appSender common.AppSender, codec codec.Manager, crossChainCodec codec.Manager, self ids.NodeID, maxActiveAppRequests int64, maxActiveCrossChainRequests int64) Network {
return &network{
router: router,
appSender: appSender,
codec: codec,
crossChainCodec: crossChainCodec,
Expand Down Expand Up @@ -335,8 +339,8 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u

var req message.Request
if _, err := n.codec.Unmarshal(request, &req); err != nil {
log.Debug("failed to unmarshal app request", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return nil
log.Debug("forwarding unknown AppRequest to sdk", "nodeID", nodeID, "requestID", requestID, "requestLen", len(request), "err", err)
return n.router.AppRequest(ctx, nodeID, requestID, deadline, request)
}

bufferedDeadline, err := calculateTimeUntilDeadline(deadline, n.appStats)
Expand Down Expand Up @@ -366,7 +370,7 @@ func (n *network) AppRequest(ctx context.Context, nodeID ids.NodeID, requestID u
// Error returned by this function is expected to be treated as fatal by the engine
// If [requestID] is not known, this function will emit a log and return a nil error.
// If the response handler returns an error it is propagated as a fatal error.
func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
func (n *network) AppResponse(ctx context.Context, nodeID ids.NodeID, requestID uint32, response []byte) error {
n.lock.Lock()
defer n.lock.Unlock()

Expand All @@ -378,9 +382,8 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppResponse to unknown request", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return nil
log.Debug("forwarding unknown AppResponse to sdk", "nodeID", nodeID, "requestID", requestID, "responseLen", len(response))
return n.router.AppResponse(ctx, nodeID, requestID, response)
}

// We must release the slot
Expand All @@ -395,7 +398,7 @@ func (n *network) AppResponse(_ context.Context, nodeID ids.NodeID, requestID ui
// - request times out before a response is provided
// error returned by this function is expected to be treated as fatal by the engine
// returns error only when the response handler returns an error
func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, requestID uint32) error {
func (n *network) AppRequestFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
n.lock.Lock()
defer n.lock.Unlock()

Expand All @@ -407,9 +410,8 @@ func (n *network) AppRequestFailed(_ context.Context, nodeID ids.NodeID, request

handler, exists := n.markRequestFulfilled(requestID)
if !exists {
// Should never happen since the engine should be managing outstanding requests
log.Error("received AppRequestFailed to unknown request", "nodeID", nodeID, "requestID", requestID)
return nil
log.Debug("forwarding unknown AppRequestFailed to sdk", "nodeID", nodeID, "requestID", requestID)
return n.router.AppRequestFailed(ctx, nodeID, requestID)
}

// We must release the slot
Expand Down
100 changes: 82 additions & 18 deletions peer/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (
"testing"
"time"

"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
ethcommon "github.com/ethereum/go-ethereum/common"

Expand Down Expand Up @@ -49,11 +51,13 @@ var (

_ message.CrossChainRequest = &ExampleCrossChainRequest{}
_ message.CrossChainRequestHandler = &testCrossChainHandler{}

_ p2p.Handler = &testSDKHandler{}
)

func TestNetworkDoesNotConnectToItself(t *testing.T) {
selfNodeID := ids.GenerateTestNodeID()
n := NewNetwork(nil, nil, nil, selfNodeID, 1, 1)
n := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), nil, nil, nil, selfNodeID, 1, 1)
assert.NoError(t, n.Connected(context.Background(), selfNodeID, defaultPeerVersion))
assert.EqualValues(t, 0, n.Size())
}
Expand Down Expand Up @@ -89,7 +93,7 @@ func TestRequestAnyRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager})
client := NewNetworkClient(net)
nodeID := ids.GenerateTestNodeID()
Expand Down Expand Up @@ -164,7 +168,7 @@ func TestRequestRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 16, 16)
net.SetRequestHandler(&HelloGreetingRequestHandler{codec: codecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -244,7 +248,7 @@ func TestAppRequestOnShutdown(t *testing.T) {

codecManager := buildCodec(t, HelloRequest{}, HelloResponse{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
client := NewNetworkClient(net)
nodeID := ids.GenerateTestNodeID()
require.NoError(t, net.Connected(context.Background(), nodeID, defaultPeerVersion))
Expand Down Expand Up @@ -293,7 +297,7 @@ func TestRequestMinVersion(t *testing.T) {
}

// passing nil as codec works because the net.AppRequest is never called
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 16)
client := NewNetworkClient(net)
requestMessage := TestMessage{Message: "this is a request"}
requestBytes, err := message.RequestToBytes(codecManager, requestMessage)
Expand Down Expand Up @@ -356,7 +360,7 @@ func TestOnRequestHonoursDeadline(t *testing.T) {
processingDuration: 500 * time.Millisecond,
}

net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetRequestHandler(requestHandler)
nodeID := ids.GenerateTestNodeID()

Expand Down Expand Up @@ -396,7 +400,7 @@ func TestGossip(t *testing.T) {
}

gossipHandler := &testGossipHandler{}
clientNetwork = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(gossipHandler)

assert.NoError(t, clientNetwork.Connected(context.Background(), nodeID, defaultPeerVersion))
Expand All @@ -423,7 +427,7 @@ func TestHandleInvalidMessages(t *testing.T) {
requestID := uint32(1)
sender := testAppSender{}

clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{})
clientNetwork.SetRequestHandler(&testRequestHandler{})

Expand Down Expand Up @@ -457,12 +461,11 @@ func TestHandleInvalidMessages(t *testing.T) {
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), garbageResponse))
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), emptyResponse))
assert.NoError(t, clientNetwork.AppRequest(context.Background(), nodeID, requestID, time.Now().Add(time.Second), nilResponse))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, gossipMsg))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, requestMessage))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse))
assert.NoError(t, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse))
assert.NoError(t, clientNetwork.AppRequestFailed(context.Background(), nodeID, requestID))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, gossipMsg))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, requestMessage))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, garbageResponse))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, emptyResponse))
assert.ErrorIs(t, p2p.ErrUnrequestedResponse, clientNetwork.AppResponse(context.Background(), nodeID, requestID, nilResponse))
}

func TestNetworkPropagatesRequestHandlerError(t *testing.T) {
Expand All @@ -473,7 +476,7 @@ func TestNetworkPropagatesRequestHandlerError(t *testing.T) {
requestID := uint32(1)
sender := testAppSender{}

clientNetwork := NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork := NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
clientNetwork.SetGossipHandler(message.NoopMempoolGossipHandler{})
clientNetwork.SetRequestHandler(&testRequestHandler{err: errors.New("fail")}) // Return an error from the request handler

Expand Down Expand Up @@ -513,7 +516,7 @@ func TestCrossChainAppRequest(t *testing.T) {
},
}

net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -568,7 +571,7 @@ func TestCrossChainRequestRequestsRoutingAndResponse(t *testing.T) {

codecManager := buildCodec(t, TestMessage{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net.SetCrossChainRequestHandler(&testCrossChainHandler{codec: crossChainCodecManager})
client := NewNetworkClient(net)

Expand Down Expand Up @@ -628,7 +631,7 @@ func TestCrossChainRequestOnShutdown(t *testing.T) {
}
codecManager := buildCodec(t, TestMessage{})
crossChainCodecManager := buildCodec(t, ExampleCrossChainRequest{}, ExampleCrossChainResponse{})
net = NewNetwork(sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
net = NewNetwork(p2p.NewRouter(logging.NoLog{}, nil), sender, codecManager, crossChainCodecManager, ids.EmptyNodeID, 1, 1)
client := NewNetworkClient(net)

exampleCrossChainRequest := ExampleCrossChainRequest{
Expand All @@ -649,6 +652,48 @@ func TestCrossChainRequestOnShutdown(t *testing.T) {
require.True(t, called)
}

func TestSDKRouting(t *testing.T) {
require := require.New(t)
sender := &testAppSender{
sendAppRequestFn: func(s set.Set[ids.NodeID], u uint32, bytes []byte) error {
return nil
},
sendAppResponseFn: func(id ids.NodeID, u uint32, bytes []byte) error {
return nil
},
}
protocol := 0
handler := &testSDKHandler{}
router := p2p.NewRouter(logging.NoLog{}, sender)
_, err := router.RegisterAppProtocol(uint64(protocol), handler)
require.NoError(err)

networkCodec := codec.NewManager(0)
crossChainCodec := codec.NewManager(0)

network := NewNetwork(
router,
nil,
networkCodec,
crossChainCodec,
ids.EmptyNodeID,
1,
1,
)

nodeID := ids.GenerateTestNodeID()
foobar := append([]byte{byte(protocol)}, []byte("foobar")...)
err = network.AppRequest(context.Background(), nodeID, 0, time.Time{}, foobar)
require.NoError(err)
require.True(handler.appRequested)

err = network.AppResponse(context.Background(), ids.GenerateTestNodeID(), 0, foobar)
require.ErrorIs(err, p2p.ErrUnrequestedResponse)

err = network.AppRequestFailed(context.Background(), nodeID, 0)
require.ErrorIs(err, p2p.ErrUnrequestedResponse)
}

func buildCodec(t *testing.T, types ...interface{}) codec.Manager {
codecManager := codec.NewDefaultManager()
c := linearcodec.NewDefault()
Expand Down Expand Up @@ -850,3 +895,22 @@ type testCrossChainHandler struct {
func (t *testCrossChainHandler) HandleCrossChainRequest(ctx context.Context, requestingChainID ids.ID, requestID uint32, exampleRequest message.CrossChainRequest) ([]byte, error) {
return t.codec.Marshal(message.Version, ExampleCrossChainResponse{Response: "this is an example response"})
}

type testSDKHandler struct {
appRequested bool
}

func (t *testSDKHandler) AppGossip(ctx context.Context, nodeID ids.NodeID, gossipBytes []byte) error {
// TODO implement me
panic("implement me")
}

func (t *testSDKHandler) AppRequest(ctx context.Context, nodeID ids.NodeID, deadline time.Time, requestBytes []byte) ([]byte, error) {
t.appRequested = true
return nil, nil
}

func (t *testSDKHandler) CrossChainAppRequest(ctx context.Context, chainID ids.ID, deadline time.Time, requestBytes []byte) ([]byte, error) {
// TODO implement me
panic("implement me")
}
6 changes: 5 additions & 1 deletion plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

avalanchegoMetrics "github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/network/p2p"

"github.com/ava-labs/coreth/consensus/dummy"
corethConstants "github.com/ava-labs/coreth/constants"
Expand Down Expand Up @@ -276,6 +277,8 @@ type VM struct {
client peer.NetworkClient
networkCodec codec.Manager

router *p2p.Router

// Metrics
multiGatherer avalanchegoMetrics.MultiGatherer

Expand Down Expand Up @@ -506,8 +509,9 @@ func (vm *VM) Initialize(
}

// initialize peer network
vm.router = p2p.NewRouter(vm.ctx.Log, appSender)
vm.networkCodec = message.Codec
vm.Network = peer.NewNetwork(appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests)
vm.Network = peer.NewNetwork(vm.router, appSender, vm.networkCodec, message.CrossChainCodec, chainCtx.NodeID, vm.config.MaxOutboundActiveRequests, vm.config.MaxOutboundActiveCrossChainRequests)
vm.client = peer.NewNetworkClient(vm.Network)

if err := vm.initializeChain(lastAcceptedHash); err != nil {
Expand Down

0 comments on commit 91c34d2

Please sign in to comment.