Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combine AppGossip and AppGossipSpecific #2836

Merged
merged 6 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 48 additions & 57 deletions network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/ava-labs/avalanchego/network/dialer"
"github.com/ava-labs/avalanchego/network/peer"
"github.com/ava-labs/avalanchego/network/throttling"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/networking/router"
"github.com/ava-labs/avalanchego/snow/networking/sender"
"github.com/ava-labs/avalanchego/subnets"
Expand All @@ -47,8 +48,7 @@ const (
)

var (
_ sender.ExternalSender = (*network)(nil)
_ Network = (*network)(nil)
Comment on lines -50 to -51
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Network embeds sender.ExternalSender

_ Network = (*network)(nil)

errNotValidator = errors.New("node is not a validator")
errNotTracked = errors.New("subnet is not tracked")
Expand Down Expand Up @@ -310,25 +310,40 @@ func NewNetwork(
return n, nil
}

func (n *network) Send(msg message.OutboundMessage, nodeIDs set.Set[ids.NodeID], subnetID ids.ID, allower subnets.Allower) set.Set[ids.NodeID] {
peers := n.getPeers(nodeIDs, subnetID, allower)
n.peerConfig.Metrics.MultipleSendsFailed(
msg.Op(),
nodeIDs.Len()-len(peers),
)
return n.send(msg, peers)
}

func (n *network) Gossip(
func (n *network) Send(
msg message.OutboundMessage,
config common.SendConfig,
subnetID ids.ID,
numValidatorsToSend int,
numNonValidatorsToSend int,
numPeersToSend int,
allower subnets.Allower,
) set.Set[ids.NodeID] {
peers := n.samplePeers(subnetID, numValidatorsToSend, numNonValidatorsToSend, numPeersToSend, allower)
return n.send(msg, peers)
namedPeers := n.getPeers(config.NodeIDs, subnetID, allower)
n.peerConfig.Metrics.MultipleSendsFailed(
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
msg.Op(),
config.NodeIDs.Len()-len(namedPeers),
)

var (
sampledPeers = n.samplePeers(config, subnetID, allower)
sentTo = set.NewSet[ids.NodeID](len(namedPeers) + len(sampledPeers))
now = n.peerConfig.Clock.Time()
)

// send to peer and update metrics
for _, peers := range [][]peer.Peer{namedPeers, sampledPeers} {
for _, peer := range peers {
if peer.Send(n.onCloseCtx, msg) {
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
sentTo.Add(peer.ID())

// TODO: move send fail rate calculations into the peer metrics
// record metrics for success
n.sendFailRateCalculator.Observe(0, now)
} else {
// record metrics for failure
n.sendFailRateCalculator.Observe(1, now)
}
}
}
return sentTo
}

// HealthCheck returns information about several network layer health checks.
Expand Down Expand Up @@ -696,24 +711,19 @@ func (n *network) getPeers(
}

func (n *network) samplePeers(
config common.SendConfig,
subnetID ids.ID,
numValidatorsToSample,
numNonValidatorsToSample int,
numPeersToSample int,
allower subnets.Allower,
) []peer.Peer {
// If there are fewer validators than [numValidatorsToSample], then only
// sample [numValidatorsToSample] validators.
StephenButtolph marked this conversation as resolved.
Show resolved Hide resolved
subnetValidatorsLen := n.config.Validators.Count(subnetID)
if subnetValidatorsLen < numValidatorsToSample {
numValidatorsToSample = subnetValidatorsLen
}
numValidatorsToSample := min(config.Validators, n.config.Validators.Count(subnetID))

n.peersLock.RLock()
defer n.peersLock.RUnlock()

return n.connectedPeers.Sample(
numValidatorsToSample+numNonValidatorsToSample+numPeersToSample,
numValidatorsToSample+config.NonValidators+config.Peers,
func(p peer.Peer) bool {
// Only return peers that are tracking [subnetID]
trackedSubnets := p.TrackedSubnets()
Expand All @@ -722,14 +732,20 @@ func (n *network) samplePeers(
}

peerID := p.ID()
// if the peer was already explicitly included, don't include in the
// sample
if config.NodeIDs.Contains(peerID) {
return false
}
Comment on lines +741 to +745
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only slightly counter intuitive part of the code. Technically, we could inverse this and explicitly return true here (and remove the getPeers logic). But then we'd need to do an O(numPeers) search during the normal send path rather than the O(numToSend) that we currently have.


_, isValidator := n.config.Validators.GetValidator(subnetID, peerID)
// check if the peer is allowed to connect to the subnet
if !allower.IsAllowed(peerID, isValidator) {
return false
}

if numPeersToSample > 0 {
numPeersToSample--
if config.Peers > 0 {
config.Peers--
return true
}

Expand All @@ -738,37 +754,12 @@ func (n *network) samplePeers(
return numValidatorsToSample >= 0
}

numNonValidatorsToSample--
return numNonValidatorsToSample >= 0
config.NonValidators--
return config.NonValidators >= 0
},
)
}

// send the message to the provided peers.
//
// send takes ownership of the provided message reference. So, the provided
// message should only be inspected if the reference has been externally
// increased.
func (n *network) send(msg message.OutboundMessage, peers []peer.Peer) set.Set[ids.NodeID] {
sentTo := set.NewSet[ids.NodeID](len(peers))
now := n.peerConfig.Clock.Time()

// send to peer and update metrics
for _, peer := range peers {
if peer.Send(n.onCloseCtx, msg) {
sentTo.Add(peer.ID())

// TODO: move send fail rate calculations into the peer metrics
// record metrics for success
n.sendFailRateCalculator.Observe(0, now)
} else {
// record metrics for failure
n.sendFailRateCalculator.Observe(1, now)
}
}
return sentTo
}

func (n *network) disconnectedFromConnecting(nodeID ids.NodeID) {
n.peersLock.Lock()
defer n.peersLock.Unlock()
Expand Down Expand Up @@ -1208,10 +1199,10 @@ func (n *network) runTimers() {
// pullGossipPeerLists requests validators from peers in the network
func (n *network) pullGossipPeerLists() {
peers := n.samplePeers(
common.SendConfig{
Validators: 1,
},
constants.PrimaryNetworkID,
1, // numValidatorsToSample
0, // numNonValidatorsToSample
0, // numPeersToSample
subnets.NoOpAllower,
)

Expand Down
29 changes: 18 additions & 11 deletions network/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/ava-labs/avalanchego/network/peer"
"github.com/ava-labs/avalanchego/network/throttling"
"github.com/ava-labs/avalanchego/proto/pb/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/networking/router"
"github.com/ava-labs/avalanchego/snow/networking/tracker"
"github.com/ava-labs/avalanchego/snow/uptime"
Expand Down Expand Up @@ -327,7 +328,14 @@ func TestSend(t *testing.T) {
require.NoError(err)

toSend := set.Of(nodeIDs[1])
sentTo := net0.Send(outboundGetMsg, toSend, constants.PrimaryNetworkID, subnets.NoOpAllower)
sentTo := net0.Send(
outboundGetMsg,
common.SendConfig{
NodeIDs: toSend,
},
constants.PrimaryNetworkID,
subnets.NoOpAllower,
)
require.Equal(toSend, sentTo)

inboundGetMsg := <-received
Expand All @@ -339,7 +347,7 @@ func TestSend(t *testing.T) {
wg.Wait()
}

func TestSendAndGossipWithFilter(t *testing.T) {
func TestSendWithFilter(t *testing.T) {
require := require.New(t)

received := make(chan message.InboundMessage)
Expand All @@ -366,21 +374,20 @@ func TestSendAndGossipWithFilter(t *testing.T) {

toSend := set.Of(nodeIDs...)
validNodeID := nodeIDs[1]
sentTo := net0.Send(outboundGetMsg, toSend, constants.PrimaryNetworkID, newNodeIDConnector(validNodeID))
sentTo := net0.Send(
outboundGetMsg,
common.SendConfig{
NodeIDs: toSend,
},
constants.PrimaryNetworkID,
newNodeIDConnector(validNodeID),
)
require.Len(sentTo, 1)
require.Contains(sentTo, validNodeID)

inboundGetMsg := <-received
require.Equal(message.GetOp, inboundGetMsg.Op())

// Test Gossip now
sentTo = net0.Gossip(outboundGetMsg, constants.PrimaryNetworkID, 0, 0, len(nodeIDs), newNodeIDConnector(validNodeID))
require.Len(sentTo, 1)
require.Contains(sentTo, validNodeID)

inboundGetMsg = <-received
require.Equal(message.GetOp, inboundGetMsg.Op())

for _, net := range networks {
net.StartClose()
}
Expand Down
21 changes: 2 additions & 19 deletions network/p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,29 +108,12 @@ func (c *Client) AppRequest(
// AppGossip sends a gossip message to a random set of peers.
func (c *Client) AppGossip(
ctx context.Context,
config common.SendConfig,
appGossipBytes []byte,
numValidators int,
numNonValidators int,
numPeers int,
) error {
return c.sender.SendAppGossip(
ctx,
PrefixMessage(c.handlerPrefix, appGossipBytes),
numValidators,
numNonValidators,
numPeers,
)
}

// AppGossipSpecific sends a gossip message to a predetermined set of peers.
func (c *Client) AppGossipSpecific(
ctx context.Context,
nodeIDs set.Set[ids.NodeID],
appGossipBytes []byte,
) error {
return c.sender.SendAppGossipSpecific(
ctx,
nodeIDs,
config,
PrefixMessage(c.handlerPrefix, appGossipBytes),
)
}
Expand Down
9 changes: 6 additions & 3 deletions network/p2p/gossip/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/ava-labs/avalanchego/cache"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p"
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/utils"
"github.com/ava-labs/avalanchego/utils/bloom"
"github.com/ava-labs/avalanchego/utils/buffer"
Expand Down Expand Up @@ -462,10 +463,12 @@ func (p *PushGossiper[T]) gossip(

return p.client.AppGossip(
ctx,
common.SendConfig{
Validators: gossipParams.Validators,
NonValidators: gossipParams.NonValidators,
Peers: gossipParams.Peers,
},
msgBytes,
gossipParams.Validators,
gossipParams.NonValidators,
gossipParams.Peers,
)
}

Expand Down
22 changes: 14 additions & 8 deletions network/p2p/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ func TestMessageRouting(t *testing.T) {
require.NoError(network.AddHandler(1, testHandler))
client := network.NewClient(1)

require.NoError(client.AppGossip(ctx, wantMsg, 0, 0, 1))
require.NoError(client.AppGossip(
ctx,
common.SendConfig{
Peers: 1,
},
wantMsg,
))
require.NoError(network.AppGossip(ctx, wantNodeID, <-sender.SentAppGossip))
require.True(appGossipCalled)

Expand All @@ -89,7 +95,6 @@ func TestClientPrefixesMessages(t *testing.T) {
sender := common.FakeSender{
SentAppRequest: make(chan []byte, 1),
SentAppGossip: make(chan []byte, 1),
SentAppGossipSpecific: make(chan []byte, 1),
SentCrossChainAppRequest: make(chan []byte, 1),
}

Expand Down Expand Up @@ -129,15 +134,16 @@ func TestClientPrefixesMessages(t *testing.T) {
require.Equal(handlerPrefix, gotCrossChainAppRequest[0])
require.Equal(want, gotCrossChainAppRequest[1:])

require.NoError(client.AppGossip(ctx, want, 0, 0, 1))
require.NoError(client.AppGossip(
ctx,
common.SendConfig{
Peers: 1,
},
want,
))
gotAppGossip := <-sender.SentAppGossip
require.Equal(handlerPrefix, gotAppGossip[0])
require.Equal(want, gotAppGossip[1:])

require.NoError(client.AppGossipSpecific(ctx, set.Of(ids.EmptyNodeID), want))
gotAppGossip = <-sender.SentAppGossipSpecific
require.Equal(handlerPrefix, gotAppGossip[0])
require.Equal(want, gotAppGossip[1:])
}

// Tests that the Client callback is called on a successful response
Expand Down
16 changes: 5 additions & 11 deletions proto/appsender/appsender.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ service AppSender {
rpc SendAppResponse(SendAppResponseMsg) returns (google.protobuf.Empty);
rpc SendAppError(SendAppErrorMsg) returns (google.protobuf.Empty);
rpc SendAppGossip(SendAppGossipMsg) returns (google.protobuf.Empty);
rpc SendAppGossipSpecific(SendAppGossipSpecificMsg) returns (google.protobuf.Empty);

rpc SendCrossChainAppRequest(SendCrossChainAppRequestMsg) returns (google.protobuf.Empty);
rpc SendCrossChainAppResponse(SendCrossChainAppResponseMsg) returns (google.protobuf.Empty);
Expand Down Expand Up @@ -48,18 +47,13 @@ message SendAppErrorMsg {
}

message SendAppGossipMsg {
// The message body
bytes msg = 1;
uint64 num_validators = 2;
uint64 num_non_validators = 3;
uint64 num_peers = 4;
}

message SendAppGossipSpecificMsg {
// The nodes to send this request to
// Who to send this message to
repeated bytes node_ids = 1;
uint64 validators = 2;
uint64 non_validators = 3;
uint64 peers = 4;
// The message body
bytes msg = 2;
bytes msg = 5;
}

message SendCrossChainAppRequestMsg {
Expand Down