From 0bb9d82233ab94576cca8bfab1fbc315d42b4e0b Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 29 Apr 2024 15:10:48 -0400 Subject: [PATCH 01/13] Remove subnet filter from Peer.TrackedSubnets() --- network/metrics.go | 24 +++++++++++++----------- network/network.go | 8 ++++++-- network/peer/peer.go | 42 ++++++++++++++++++++++++++++++------------ 3 files changed, 49 insertions(+), 25 deletions(-) diff --git a/network/metrics.go b/network/metrics.go index e2a3a363b40..ac1df146ccd 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -12,11 +12,12 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/peer" "github.com/ava-labs/avalanchego/utils" - "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/set" ) type metrics struct { + trackedSubnets set.Set[ids.ID] + numTracked prometheus.Gauge numPeers prometheus.Gauge numSubnetPeers *prometheus.GaugeVec @@ -41,8 +42,9 @@ type metrics struct { peerConnectedStartTimesSum float64 } -func newMetrics(namespace string, registerer prometheus.Registerer, initialSubnetIDs set.Set[ids.ID]) (*metrics, error) { +func newMetrics(namespace string, registerer prometheus.Registerer, trackedSubnets set.Set[ids.ID]) (*metrics, error) { m := &metrics{ + trackedSubnets: trackedSubnets, numPeers: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: namespace, Name: "peers", @@ -169,11 +171,7 @@ func newMetrics(namespace string, registerer prometheus.Registerer, initialSubne ) // init subnet tracker metrics with tracked subnets - for subnetID := range initialSubnetIDs { - // no need to track primary network ID - if subnetID == constants.PrimaryNetworkID { - continue - } + for subnetID := range trackedSubnets { // initialize to 0 subnetIDStr := subnetID.String() m.numSubnetPeers.WithLabelValues(subnetIDStr).Set(0) @@ -189,8 +187,10 @@ func (m *metrics) markConnected(peer peer.Peer) { m.connected.Inc() trackedSubnets := peer.TrackedSubnets() - for subnetID := range trackedSubnets { - m.numSubnetPeers.WithLabelValues(subnetID.String()).Inc() + for subnetID := range m.trackedSubnets { + if trackedSubnets.Contains(subnetID) { + m.numSubnetPeers.WithLabelValues(subnetID.String()).Inc() + } } m.lock.Lock() @@ -206,8 +206,10 @@ func (m *metrics) markDisconnected(peer peer.Peer) { m.disconnected.Inc() trackedSubnets := peer.TrackedSubnets() - for subnetID := range trackedSubnets { - m.numSubnetPeers.WithLabelValues(subnetID.String()).Dec() + for subnetID := range m.trackedSubnets { + if trackedSubnets.Contains(subnetID) { + m.numSubnetPeers.WithLabelValues(subnetID.String()).Dec() + } } m.lock.Lock() diff --git a/network/network.go b/network/network.go index 506e5190858..1f0692b29e8 100644 --- a/network/network.go +++ b/network/network.go @@ -460,8 +460,12 @@ func (n *network) Connected(nodeID ids.NodeID) { peerVersion := peer.Version() n.router.Connected(nodeID, peerVersion, constants.PrimaryNetworkID) - for subnetID := range peer.TrackedSubnets() { - n.router.Connected(nodeID, peerVersion, subnetID) + + trackedSubnets := peer.TrackedSubnets() + for subnetID := range n.peerConfig.MySubnets { + if trackedSubnets.Contains(subnetID) { + n.router.Connected(nodeID, peerVersion, subnetID) + } } } diff --git a/network/peer/peer.go b/network/peer/peer.go index 1774d3f24f9..818a5d389a3 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -31,9 +31,14 @@ import ( "github.com/ava-labs/avalanchego/version" ) -// maxBloomSaltLen restricts the allowed size of the bloom salt to prevent -// excessively expensive bloom filter contains checks. -const maxBloomSaltLen = 32 +const ( + // maxBloomSaltLen restricts the allowed size of the bloom salt to prevent + // excessively expensive bloom filter contains checks. + maxBloomSaltLen = 32 + // maxNumTrackedSubnets limits how many subnets a peer can track to prevent + // excessive memory usage. + maxNumTrackedSubnets = 16 +) var ( errClosed = errors.New("closed") @@ -131,8 +136,8 @@ type peer struct { // version is the claimed version the peer is running that we received in // the Handshake message. version *version.Application - // trackedSubnets is the subset of subnetIDs the peer sent us in the Handshake - // message that we are also tracking. + // trackedSubnets are the subnetIDs the peer sent us in the Handshake + // message. trackedSubnets set.Set[ids.ID] // options of ACPs provided in the Handshake message. supportedACPs set.Set[uint32] @@ -263,9 +268,8 @@ func (p *peer) Info() Info { publicIPStr = p.ip.IPPort.String() } - uptimes := make(map[ids.ID]json.Uint32, p.trackedSubnets.Len()) - - for subnetID := range p.trackedSubnets { + uptimes := make(map[ids.ID]json.Uint32, p.MySubnets.Len()) + for subnetID := range p.MySubnets { uptime, exist := p.ObservedUptime(subnetID) if !exist { continue @@ -802,8 +806,12 @@ func (p *peer) getUptimes() (uint32, []*p2p.SubnetUptime) { primaryUptime = 0 } - subnetUptimes := make([]*p2p.SubnetUptime, 0, p.trackedSubnets.Len()) - for subnetID := range p.trackedSubnets { + subnetUptimes := make([]*p2p.SubnetUptime, 0, p.MySubnets.Len()) + for subnetID := range p.MySubnets { + if !p.trackedSubnets.Contains(subnetID) { + continue + } + subnetUptime, err := p.UptimeCalculator.CalculateUptimePercent(p.id, subnetID) if err != nil { p.Log.Debug("failed to get peer uptime percentage", @@ -948,6 +956,17 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { } // handle subnet IDs + if numTrackedSubnets := len(msg.TrackedSubnets); numTrackedSubnets > maxNumTrackedSubnets { + p.Log.Debug("message with invalid field", + zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.HandshakeOp), + zap.String("field", "TrackedSubnets"), + zap.Int("numTrackedSubnets", numTrackedSubnets), + ) + p.StartClose() + return + } + for _, subnetIDBytes := range msg.TrackedSubnets { subnetID, err := ids.ToID(subnetIDBytes) if err != nil { @@ -958,8 +977,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { p.StartClose() return } - // add only if we also track this subnet - if p.MySubnets.Contains(subnetID) { + if subnetID != constants.PrimaryNetworkID { p.trackedSubnets.Add(subnetID) } } From 7c3b2284b7dba6d63b7444fe041e92f8efae1318 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 29 Apr 2024 16:49:45 -0400 Subject: [PATCH 02/13] refactor test peer creation --- network/peer/peer_test.go | 253 +++++++++++++------------------------- 1 file changed, 86 insertions(+), 167 deletions(-) diff --git a/network/peer/peer_test.go b/network/peer/peer_test.go index 96d6ddb7b98..17b13c32cfc 100644 --- a/network/peer/peer_test.go +++ b/network/peer/peer_test.go @@ -39,7 +39,6 @@ type testPeer struct { type rawTestPeer struct { config *Config - conn net.Conn cert *staking.Certificate nodeID ids.NodeID inboundMsgChan <-chan message.InboundMessage @@ -60,27 +59,10 @@ func newMessageCreator(t *testing.T) message.Creator { return mc } -func makeRawTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*rawTestPeer, *rawTestPeer) { +func newConfig(t *testing.T) Config { t.Helper() require := require.New(t) - conn0, conn1 := net.Pipe() - - tlsCert0, err := staking.NewTLSCert() - require.NoError(err) - cert0, err := staking.ParseCertificate(tlsCert0.Leaf.Raw) - require.NoError(err) - - tlsCert1, err := staking.NewTLSCert() - require.NoError(err) - cert1, err := staking.ParseCertificate(tlsCert1.Leaf.Raw) - require.NoError(err) - - nodeID0 := ids.NodeIDFromCert(cert0) - nodeID1 := ids.NodeIDFromCert(cert1) - - mc := newMessageCreator(t) - metrics, err := NewMetrics( logging.NoLog{}, "", @@ -96,14 +78,17 @@ func makeRawTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*rawTestPee ) require.NoError(err) - sharedConfig := Config{ + return Config{ + ReadBufferSize: constants.DefaultNetworkPeerReadBufferSize, + WriteBufferSize: constants.DefaultNetworkPeerWriteBufferSize, Metrics: metrics, - MessageCreator: mc, + MessageCreator: newMessageCreator(t), Log: logging.NoLog{}, InboundMsgThrottler: throttling.NewNoInboundThrottler(), + Network: TestNetwork, + Router: nil, VersionCompatibility: version.GetCompatibility(constants.LocalID), - MySubnets: trackedSubnets, - UptimeCalculator: uptime.NoOpCalculator, + MySubnets: nil, Beacons: validators.NewManager(), Validators: validators.NewManager(), NetworkID: constants.LocalID, @@ -111,141 +96,91 @@ func makeRawTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*rawTestPee PongTimeout: constants.DefaultPingPongTimeout, MaxClockDifference: time.Minute, ResourceTracker: resourceTracker, + UptimeCalculator: uptime.NoOpCalculator, + IPSigner: nil, } - peerConfig0 := sharedConfig - peerConfig1 := sharedConfig - - ip0 := ips.NewDynamicIPPort(net.IPv6loopback, 1) - tls0 := tlsCert0.PrivateKey.(crypto.Signer) - bls0, err := bls.NewSecretKey() - require.NoError(err) +} - peerConfig0.IPSigner = NewIPSigner(ip0, tls0, bls0) +func newRawTestPeer(t *testing.T, config Config) *rawTestPeer { + t.Helper() + require := require.New(t) - peerConfig0.Network = TestNetwork - inboundMsgChan0 := make(chan message.InboundMessage) - peerConfig0.Router = router.InboundHandlerFunc(func(_ context.Context, msg message.InboundMessage) { - inboundMsgChan0 <- msg - }) + tlsCert, err := staking.NewTLSCert() + require.NoError(err) + cert, err := staking.ParseCertificate(tlsCert.Leaf.Raw) + require.NoError(err) + nodeID := ids.NodeIDFromCert(cert) - ip1 := ips.NewDynamicIPPort(net.IPv6loopback, 2) - tls1 := tlsCert1.PrivateKey.(crypto.Signer) - bls1, err := bls.NewSecretKey() + ip := ips.NewDynamicIPPort(net.IPv6loopback, 1) + tls := tlsCert.PrivateKey.(crypto.Signer) + bls, err := bls.NewSecretKey() require.NoError(err) - peerConfig1.IPSigner = NewIPSigner(ip1, tls1, bls1) + config.IPSigner = NewIPSigner(ip, tls, bls) - peerConfig1.Network = TestNetwork - inboundMsgChan1 := make(chan message.InboundMessage) - peerConfig1.Router = router.InboundHandlerFunc(func(_ context.Context, msg message.InboundMessage) { - inboundMsgChan1 <- msg + inboundMsgChan := make(chan message.InboundMessage) + config.Router = router.InboundHandlerFunc(func(_ context.Context, msg message.InboundMessage) { + inboundMsgChan <- msg }) - peer0 := &rawTestPeer{ - config: &peerConfig0, - conn: conn0, - cert: cert0, - nodeID: nodeID0, - inboundMsgChan: inboundMsgChan0, + return &rawTestPeer{ + config: &config, + cert: cert, + nodeID: nodeID, + inboundMsgChan: inboundMsgChan, } - peer1 := &rawTestPeer{ - config: &peerConfig1, - conn: conn1, - cert: cert1, - nodeID: nodeID1, - inboundMsgChan: inboundMsgChan1, - } - return peer0, peer1 } -func makeTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*testPeer, *testPeer) { - rawPeer0, rawPeer1 := makeRawTestPeers(t, trackedSubnets) - - peer0 := &testPeer{ - Peer: Start( - rawPeer0.config, - rawPeer0.conn, - rawPeer1.cert, - rawPeer1.nodeID, - NewThrottledMessageQueue( - rawPeer0.config.Metrics, - rawPeer1.nodeID, - logging.NoLog{}, - throttling.NewNoOutboundThrottler(), - ), - ), - inboundMsgChan: rawPeer0.inboundMsgChan, - } - peer1 := &testPeer{ +func startTestPeer(self *rawTestPeer, peer *rawTestPeer, conn net.Conn) *testPeer { + return &testPeer{ Peer: Start( - rawPeer1.config, - rawPeer1.conn, - rawPeer0.cert, - rawPeer0.nodeID, + self.config, + conn, + peer.cert, + peer.nodeID, NewThrottledMessageQueue( - rawPeer1.config.Metrics, - rawPeer0.nodeID, + self.config.Metrics, + peer.nodeID, logging.NoLog{}, throttling.NewNoOutboundThrottler(), ), ), - inboundMsgChan: rawPeer1.inboundMsgChan, + inboundMsgChan: self.inboundMsgChan, } +} + +func startTestPeers(rawPeer0 *rawTestPeer, rawPeer1 *rawTestPeer) (*testPeer, *testPeer) { + conn0, conn1 := net.Pipe() + peer0 := startTestPeer(rawPeer0, rawPeer1, conn0) + peer1 := startTestPeer(rawPeer1, rawPeer0, conn1) return peer0, peer1 } -func makeReadyTestPeers(t *testing.T, trackedSubnets set.Set[ids.ID]) (*testPeer, *testPeer) { +func awaitReady(t *testing.T, peers ...Peer) { t.Helper() require := require.New(t) - peer0, peer1 := makeTestPeers(t, trackedSubnets) - - require.NoError(peer0.AwaitReady(context.Background())) - require.True(peer0.Ready()) - - require.NoError(peer1.AwaitReady(context.Background())) - require.True(peer1.Ready()) - - return peer0, peer1 + for _, peer := range peers { + require.NoError(peer.AwaitReady(context.Background())) + require.True(peer.Ready()) + } } func TestReady(t *testing.T) { require := require.New(t) - rawPeer0, rawPeer1 := makeRawTestPeers(t, set.Set[ids.ID]{}) - peer0 := Start( - rawPeer0.config, - rawPeer0.conn, - rawPeer1.cert, - rawPeer1.nodeID, - NewThrottledMessageQueue( - rawPeer0.config.Metrics, - rawPeer1.nodeID, - logging.NoLog{}, - throttling.NewNoOutboundThrottler(), - ), - ) + config := newConfig(t) - require.False(peer0.Ready()) + rawPeer0 := newRawTestPeer(t, config) + rawPeer1 := newRawTestPeer(t, config) - peer1 := Start( - rawPeer1.config, - rawPeer1.conn, - rawPeer0.cert, - rawPeer0.nodeID, - NewThrottledMessageQueue( - rawPeer1.config.Metrics, - rawPeer0.nodeID, - logging.NoLog{}, - throttling.NewNoOutboundThrottler(), - ), - ) + conn0, conn1 := net.Pipe() - require.NoError(peer0.AwaitReady(context.Background())) - require.True(peer0.Ready()) + peer0 := startTestPeer(rawPeer0, rawPeer1, conn0) + require.False(peer0.Ready()) - require.NoError(peer1.AwaitReady(context.Background())) - require.True(peer1.Ready()) + peer1 := startTestPeer(rawPeer1, rawPeer0, conn1) + awaitReady(t, peer0, peer1) peer0.StartClose() require.NoError(peer0.AwaitClosed(context.Background())) @@ -255,10 +190,15 @@ func TestReady(t *testing.T) { func TestSend(t *testing.T) { require := require.New(t) - peer0, peer1 := makeReadyTestPeers(t, set.Set[ids.ID]{}) - mc := newMessageCreator(t) + sharedConfig := newConfig(t) - outboundGetMsg, err := mc.Get(ids.Empty, 1, time.Second, ids.Empty) + rawPeer0 := newRawTestPeer(t, sharedConfig) + rawPeer1 := newRawTestPeer(t, sharedConfig) + + peer0, peer1 := startTestPeers(rawPeer0, rawPeer1) + awaitReady(t, peer0, peer1) + + outboundGetMsg, err := sharedConfig.MessageCreator.Get(ids.Empty, 1, time.Second, ids.Empty) require.NoError(err) require.True(peer0.Send(context.Background(), outboundGetMsg)) @@ -275,9 +215,8 @@ func TestPingUptimes(t *testing.T) { trackedSubnetID := ids.GenerateTestID() untrackedSubnetID := ids.GenerateTestID() - trackedSubnets := set.Of(trackedSubnetID) - - mc := newMessageCreator(t) + sharedConfig := newConfig(t) + sharedConfig.MySubnets = set.Of(trackedSubnetID) testCases := []struct { name string @@ -288,7 +227,7 @@ func TestPingUptimes(t *testing.T) { { name: "primary network only", msg: func() message.OutboundMessage { - pingMsg, err := mc.Ping(1, nil) + pingMsg, err := sharedConfig.MessageCreator.Ping(1, nil) require.NoError(t, err) return pingMsg }(), @@ -305,7 +244,7 @@ func TestPingUptimes(t *testing.T) { { name: "primary network and subnet", msg: func() message.OutboundMessage { - pingMsg, err := mc.Ping( + pingMsg, err := sharedConfig.MessageCreator.Ping( 1, []*p2p.SubnetUptime{ { @@ -330,7 +269,7 @@ func TestPingUptimes(t *testing.T) { { name: "primary network and non tracked subnet", msg: func() message.OutboundMessage { - pingMsg, err := mc.Ping( + pingMsg, err := sharedConfig.MessageCreator.Ping( 1, []*p2p.SubnetUptime{ { @@ -352,9 +291,13 @@ func TestPingUptimes(t *testing.T) { }, } - // Note: we reuse peers across tests because makeReadyTestPeers takes awhile - // to run. - peer0, peer1 := makeReadyTestPeers(t, trackedSubnets) + // Note: we reuse peers across tests because newRawTestPeer takes awhile to + // run. + rawPeer0 := newRawTestPeer(t, sharedConfig) + rawPeer1 := newRawTestPeer(t, sharedConfig) + + peer0, peer1 := startTestPeers(rawPeer0, rawPeer1) + awaitReady(t, peer0, peer1) defer func() { peer1.StartClose() peer0.StartClose() @@ -390,7 +333,11 @@ func TestPingUptimes(t *testing.T) { func TestInvalidBLSKeyDisconnects(t *testing.T) { require := require.New(t) - rawPeer0, rawPeer1 := makeRawTestPeers(t, nil) + sharedConfig := newConfig(t) + + rawPeer0 := newRawTestPeer(t, sharedConfig) + rawPeer1 := newRawTestPeer(t, sharedConfig) + require.NoError(rawPeer0.config.Validators.AddStaker( constants.PrimaryNetworkID, rawPeer1.nodeID, @@ -408,36 +355,8 @@ func TestInvalidBLSKeyDisconnects(t *testing.T) { ids.GenerateTestID(), 1, )) - peer0 := &testPeer{ - Peer: Start( - rawPeer0.config, - rawPeer0.conn, - rawPeer1.cert, - rawPeer1.nodeID, - NewThrottledMessageQueue( - rawPeer0.config.Metrics, - rawPeer1.nodeID, - logging.NoLog{}, - throttling.NewNoOutboundThrottler(), - ), - ), - inboundMsgChan: rawPeer0.inboundMsgChan, - } - peer1 := &testPeer{ - Peer: Start( - rawPeer1.config, - rawPeer1.conn, - rawPeer0.cert, - rawPeer0.nodeID, - NewThrottledMessageQueue( - rawPeer1.config.Metrics, - rawPeer0.nodeID, - logging.NoLog{}, - throttling.NewNoOutboundThrottler(), - ), - ), - inboundMsgChan: rawPeer1.inboundMsgChan, - } + + peer0, peer1 := startTestPeers(rawPeer0, rawPeer1) // Because peer1 thinks that peer0 is using the wrong BLS key, they should // disconnect from each other. From 4a5cfaaa9671f42923e6f1f038e096bda735b9d2 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 29 Apr 2024 16:59:45 -0400 Subject: [PATCH 03/13] Add tracked subnets test --- network/peer/peer_test.go | 70 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/network/peer/peer_test.go b/network/peer/peer_test.go index 17b13c32cfc..cb09e1409f1 100644 --- a/network/peer/peer_test.go +++ b/network/peer/peer_test.go @@ -329,6 +329,76 @@ func TestPingUptimes(t *testing.T) { } } +func TestTrackedSubnets(t *testing.T) { + sharedConfig := newConfig(t) + rawPeer0 := newRawTestPeer(t, sharedConfig) + rawPeer1 := newRawTestPeer(t, sharedConfig) + + tests := []struct { + name string + trackedSubnets set.Set[ids.ID] + shouldDisconnect bool + }{ + { + name: "primary network only", + trackedSubnets: nil, + shouldDisconnect: false, + }, + { + name: "single subnet", + trackedSubnets: set.Of(ids.GenerateTestID()), + shouldDisconnect: false, + }, + { + name: "max subnets", + trackedSubnets: func() set.Set[ids.ID] { + trackedSubnets := set.NewSet[ids.ID](maxNumTrackedSubnets) + for i := 0; i < maxNumTrackedSubnets; i++ { + trackedSubnets.Add(ids.GenerateTestID()) + } + return trackedSubnets + }(), + shouldDisconnect: false, + }, + { + name: "too many subnets", + trackedSubnets: func() set.Set[ids.ID] { + trackedSubnets := set.NewSet[ids.ID](maxNumTrackedSubnets + 1) + for i := 0; i < maxNumTrackedSubnets+1; i++ { + trackedSubnets.Add(ids.GenerateTestID()) + } + return trackedSubnets + }(), + shouldDisconnect: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + require := require.New(t) + + rawPeer0.config.MySubnets = test.trackedSubnets + peer0, peer1 := startTestPeers(rawPeer0, rawPeer1) + if test.shouldDisconnect { + require.NoError(peer0.AwaitClosed(context.Background())) + require.NoError(peer1.AwaitClosed(context.Background())) + return + } + + defer func() { + peer1.StartClose() + peer0.StartClose() + require.NoError(peer0.AwaitClosed(context.Background())) + require.NoError(peer1.AwaitClosed(context.Background())) + }() + + awaitReady(t, peer0, peer1) + require.Empty(peer0.TrackedSubnets()) + require.Equal(test.trackedSubnets, peer1.TrackedSubnets()) + }) + } +} + // Test that a peer using the wrong BLS key is disconnected from. func TestInvalidBLSKeyDisconnects(t *testing.T) { require := require.New(t) From 8a15984dd2a3f0fb9a679e531cf3bc5dee2ed5c0 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 13 May 2024 13:25:39 -0400 Subject: [PATCH 04/13] nit --- network/peer/peer_test.go | 27 ++++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/network/peer/peer_test.go b/network/peer/peer_test.go index cb09e1409f1..83eb640b28d 100644 --- a/network/peer/peer_test.go +++ b/network/peer/peer_test.go @@ -231,6 +231,7 @@ func TestPingUptimes(t *testing.T) { require.NoError(t, err) return pingMsg }(), + shouldClose: false, assertFn: func(require *require.Assertions, peer *testPeer) { uptime, ok := peer.ObservedUptime(constants.PrimaryNetworkID) require.True(ok) @@ -256,6 +257,7 @@ func TestPingUptimes(t *testing.T) { require.NoError(t, err) return pingMsg }(), + shouldClose: false, assertFn: func(require *require.Assertions, peer *testPeer) { uptime, ok := peer.ObservedUptime(constants.PrimaryNetworkID) require.True(ok) @@ -288,31 +290,30 @@ func TestPingUptimes(t *testing.T) { return pingMsg }(), shouldClose: true, + assertFn: nil, }, } - // Note: we reuse peers across tests because newRawTestPeer takes awhile to - // run. + // The raw peers are generated outside of the test cases to avoid generating + // many TLS keys. rawPeer0 := newRawTestPeer(t, sharedConfig) rawPeer1 := newRawTestPeer(t, sharedConfig) - peer0, peer1 := startTestPeers(rawPeer0, rawPeer1) - awaitReady(t, peer0, peer1) - defer func() { - peer1.StartClose() - peer0.StartClose() - require.NoError(t, peer0.AwaitClosed(context.Background())) - require.NoError(t, peer1.AwaitClosed(context.Background())) - }() - for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { require := require.New(t) + peer0, peer1 := startTestPeers(rawPeer0, rawPeer1) + awaitReady(t, peer0, peer1) + defer func() { + peer1.StartClose() + peer0.StartClose() + require.NoError(peer0.AwaitClosed(context.Background())) + require.NoError(peer1.AwaitClosed(context.Background())) + }() + require.True(peer0.Send(context.Background(), tc.msg)) - // Note: shouldClose can only be `true` for the last test because - // we reuse peers across tests. if tc.shouldClose { require.NoError(peer1.AwaitClosed(context.Background())) return From 4c9fc816f0addd8c645bc10c3fc2577f32cd2571 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 13 May 2024 17:08:12 -0400 Subject: [PATCH 05/13] Standardize peer logging --- network/peer/msg_length.go | 2 - network/peer/peer.go | 316 ++++++++++++++++++++----------------- 2 files changed, 172 insertions(+), 146 deletions(-) diff --git a/network/peer/msg_length.go b/network/peer/msg_length.go index 0072734c587..00f3396672b 100644 --- a/network/peer/msg_length.go +++ b/network/peer/msg_length.go @@ -16,7 +16,6 @@ var ( errMaxMessageLengthExceeded = errors.New("maximum message length exceeded") ) -// Assumes the specified [msgLen] will never >= 1<<31. func writeMsgLen(msgLen uint32, maxMsgLen uint32) ([wrappers.IntLen]byte, error) { if msgLen > maxMsgLen { return [wrappers.IntLen]byte{}, fmt.Errorf( @@ -33,7 +32,6 @@ func writeMsgLen(msgLen uint32, maxMsgLen uint32) ([wrappers.IntLen]byte, error) return b, nil } -// Assumes the read [msgLen] will never >= 1<<31. func readMsgLen(b []byte, maxMsgLen uint32) (uint32, error) { if len(b) != wrappers.IntLen { return 0, fmt.Errorf( diff --git a/network/peer/peer.go b/network/peer/peer.go index 08b635600c7..4feda2ab55b 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -31,9 +31,18 @@ import ( "github.com/ava-labs/avalanchego/version" ) -// maxBloomSaltLen restricts the allowed size of the bloom salt to prevent -// excessively expensive bloom filter contains checks. -const maxBloomSaltLen = 32 +const ( + // maxBloomSaltLen restricts the allowed size of the bloom salt to prevent + // excessively expensive bloom filter contains checks. + maxBloomSaltLen = 32 + + disconnectingLogMessage = "disconnecting from peer" + failedToCreateMessageLogMessage = "failed to create message" + failedToSendMessageLogMessage = "failed to send message" + failedToSetDeadlineLogMessage = "failed to set connection deadline" + failedToGetUptimeLogMessage = "failed to get peer uptime percentage" + malformedMessageLogMessage = "malformed message" +) var ( errClosed = errors.New("closed") @@ -384,8 +393,9 @@ func (p *peer) readMessages() { for { // Time out and close connection if we can't read the message length if err := p.conn.SetReadDeadline(p.nextTimeout()); err != nil { - p.Log.Verbo("error setting the connection read timeout", + p.Log.Verbo(failedToSetDeadlineLogMessage, zap.Stringer("nodeID", p.id), + zap.String("direction", "read"), zap.Error(err), ) return @@ -393,7 +403,7 @@ func (p *peer) readMessages() { // Read the message length if _, err := io.ReadFull(reader, msgLenBytes); err != nil { - p.Log.Verbo("error reading message", + p.Log.Verbo("error reading message length", zap.Stringer("nodeID", p.id), zap.Error(err), ) @@ -403,7 +413,7 @@ func (p *peer) readMessages() { // Parse the message length msgLen, err := readMsgLen(msgLenBytes, constants.DefaultMaxMessageSize) if err != nil { - p.Log.Verbo("error reading message length", + p.Log.Verbo("error parsing message length", zap.Stringer("nodeID", p.id), zap.Error(err), ) @@ -437,8 +447,9 @@ func (p *peer) readMessages() { // Time out and close connection if we can't read message if err := p.conn.SetReadDeadline(p.nextTimeout()); err != nil { - p.Log.Verbo("error setting the connection read timeout", + p.Log.Verbo(failedToSetDeadlineLogMessage, zap.Stringer("nodeID", p.id), + zap.String("direction", "read"), zap.Error(err), ) onFinishedHandling() @@ -543,9 +554,9 @@ func (p *peer) writeMessages() { knownPeersSalt, ) if err != nil { - p.Log.Error("failed to create message", - zap.Stringer("messageOp", message.HandshakeOp), + p.Log.Error(failedToCreateMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.HandshakeOp), zap.Error(err), ) return @@ -588,8 +599,9 @@ func (p *peer) writeMessage(writer io.Writer, msg message.OutboundMessage) { ) if err := p.conn.SetWriteDeadline(p.nextTimeout()); err != nil { - p.Log.Verbo("error setting write deadline", + p.Log.Verbo(failedToSetDeadlineLogMessage, zap.Stringer("nodeID", p.id), + zap.String("direction", "write"), zap.Error(err), ) return @@ -635,21 +647,23 @@ func (p *peer) sendNetworkMessages() { knownPeersFilter, knownPeersSalt := p.Config.Network.KnownPeers() msg, err := p.Config.MessageCreator.GetPeerList(knownPeersFilter, knownPeersSalt) if err != nil { - p.Log.Error("failed to create get peer list message", + p.Log.Error(failedToCreateMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.GetPeerListOp), zap.Error(err), ) continue } if !p.Send(p.onClosingCtx, msg) { - p.Log.Debug("failed to send get peer list", + p.Log.Debug(failedToSendMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.GetPeerListOp), ) } case <-sendPingsTicker.C: if !p.Network.AllowConnection(p.id) { - p.Log.Debug("disconnecting from peer", + p.Log.Debug(disconnectingLogMessage, zap.String("reason", "connection is no longer desired"), zap.Stringer("nodeID", p.id), ) @@ -666,15 +680,20 @@ func (p *peer) sendNetworkMessages() { primaryUptime, subnetUptimes := p.getUptimes() pingMessage, err := p.MessageCreator.Ping(primaryUptime, subnetUptimes) if err != nil { - p.Log.Error("failed to create message", - zap.Stringer("messageOp", message.PingOp), + p.Log.Error(failedToCreateMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.PingOp), zap.Error(err), ) return } - p.Send(p.onClosingCtx, pingMessage) + if !p.Send(p.onClosingCtx, pingMessage) { + p.Log.Debug(failedToSendMessageLogMessage, + zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.PingOp), + ) + } case <-p.onClosingCtx.Done(): return } @@ -692,7 +711,7 @@ func (p *peer) sendNetworkMessages() { // callback to avoid signature verification on the P-chain accept path. func (p *peer) shouldDisconnect() bool { if err := p.VersionCompatibility.Compatible(p.version); err != nil { - p.Log.Debug("disconnecting from peer", + p.Log.Debug(disconnectingLogMessage, zap.String("reason", "version not compatible"), zap.Stringer("nodeID", p.id), zap.Stringer("peerVersion", p.version), @@ -709,7 +728,7 @@ func (p *peer) shouldDisconnect() bool { } if p.ip.BLSSignature == nil { - p.Log.Debug("disconnecting from peer", + p.Log.Debug(disconnectingLogMessage, zap.String("reason", "missing BLS signature"), zap.Stringer("nodeID", p.id), ) @@ -722,7 +741,7 @@ func (p *peer) shouldDisconnect() bool { p.ip.UnsignedIP.bytes(), ) if !validSignature { - p.Log.Debug("disconnecting from peer", + p.Log.Debug(disconnectingLogMessage, zap.String("reason", "invalid BLS signature"), zap.Stringer("nodeID", p.id), ) @@ -759,8 +778,7 @@ func (p *peer) handle(msg message.InboundMessage) { return } if !p.finishedHandshake.Get() { - p.Log.Debug( - "dropping message", + p.Log.Debug("dropping message", zap.String("reason", "handshake isn't finished"), zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", msg.Op()), @@ -774,18 +792,72 @@ func (p *peer) handle(msg message.InboundMessage) { } func (p *peer) handlePing(msg *p2p.Ping) { - p.observeUptimes(msg.Uptime, msg.SubnetUptimes) + if msg.Uptime > 100 { + p.Log.Debug(malformedMessageLogMessage, + zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.PingOp), + zap.Stringer("subnetID", constants.PrimaryNetworkID), + zap.Uint32("uptime", msg.Uptime), + ) + p.StartClose() + return + } + p.observeUptime(constants.PrimaryNetworkID, msg.Uptime) + + for _, subnetUptime := range msg.SubnetUptimes { + subnetID, err := ids.ToID(subnetUptime.SubnetId) + if err != nil { + p.Log.Debug(malformedMessageLogMessage, + zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.PingOp), + zap.String("field", "subnetID"), + zap.Error(err), + ) + p.StartClose() + return + } + + if !p.MySubnets.Contains(subnetID) { + p.Log.Debug(malformedMessageLogMessage, + zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.PingOp), + zap.Stringer("subnetID", subnetID), + zap.String("reason", "not tracking subnet"), + ) + p.StartClose() + return + } + + uptime := subnetUptime.Uptime + if uptime > 100 { + p.Log.Debug(malformedMessageLogMessage, + zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.PingOp), + zap.Stringer("subnetID", subnetID), + zap.Uint32("uptime", uptime), + ) + p.StartClose() + return + } + p.observeUptime(subnetID, uptime) + } pongMessage, err := p.MessageCreator.Pong() if err != nil { - p.Log.Error("failed to create message", - zap.Stringer("messageOp", message.PongOp), + p.Log.Error(failedToCreateMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.PongOp), zap.Error(err), ) return } - p.Send(p.onClosingCtx, pongMessage) + + if !p.Send(p.onClosingCtx, pongMessage) { + p.Log.Debug(failedToSendMessageLogMessage, + zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.PongOp), + ) + } } func (p *peer) getUptimes() (uint32, []*p2p.SubnetUptime) { @@ -794,7 +866,7 @@ func (p *peer) getUptimes() (uint32, []*p2p.SubnetUptime) { constants.PrimaryNetworkID, ) if err != nil { - p.Log.Debug("failed to get peer primary uptime percentage", + p.Log.Debug(failedToGetUptimeLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("subnetID", constants.PrimaryNetworkID), zap.Error(err), @@ -806,7 +878,7 @@ func (p *peer) getUptimes() (uint32, []*p2p.SubnetUptime) { for subnetID := range p.trackedSubnets { subnetUptime, err := p.UptimeCalculator.CalculateUptimePercent(p.id, subnetID) if err != nil { - p.Log.Debug("failed to get peer uptime percentage", + p.Log.Debug(failedToGetUptimeLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("subnetID", subnetID), zap.Error(err), @@ -827,52 +899,6 @@ func (p *peer) getUptimes() (uint32, []*p2p.SubnetUptime) { func (*peer) handlePong(*p2p.Pong) {} -func (p *peer) observeUptimes(primaryUptime uint32, subnetUptimes []*p2p.SubnetUptime) { - if primaryUptime > 100 { - p.Log.Debug("dropping message with invalid uptime", - zap.Stringer("nodeID", p.id), - zap.Stringer("subnetID", constants.PrimaryNetworkID), - zap.Uint32("uptime", primaryUptime), - ) - p.StartClose() - return - } - p.observeUptime(constants.PrimaryNetworkID, primaryUptime) - - for _, subnetUptime := range subnetUptimes { - subnetID, err := ids.ToID(subnetUptime.SubnetId) - if err != nil { - p.Log.Debug("dropping message with invalid subnetID", - zap.Stringer("nodeID", p.id), - zap.Error(err), - ) - p.StartClose() - return - } - - if !p.MySubnets.Contains(subnetID) { - p.Log.Debug("dropping message with unexpected subnetID", - zap.Stringer("nodeID", p.id), - zap.Stringer("subnetID", subnetID), - ) - p.StartClose() - return - } - - uptime := subnetUptime.Uptime - if uptime > 100 { - p.Log.Debug("dropping message with invalid uptime", - zap.Stringer("nodeID", p.id), - zap.Stringer("subnetID", subnetID), - zap.Uint32("uptime", uptime), - ) - p.StartClose() - return - } - p.observeUptime(subnetID, uptime) - } -} - // Record that the given peer perceives our uptime for the given [subnetID] // to be [uptime]. // Assumes [uptime] is in the range [0, 100] and [subnetID] is a valid ID of a @@ -885,16 +911,20 @@ func (p *peer) observeUptime(subnetID ids.ID, uptime uint32) { func (p *peer) handleHandshake(msg *p2p.Handshake) { if p.gotHandshake.Get() { - // TODO: this should never happen, should we close the connection here? - p.Log.Verbo("dropping duplicated handshake message", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.HandshakeOp), + zap.String("reason", "already received handshake"), ) + p.StartClose() return } if msg.NetworkId != p.NetworkID { - p.Log.Debug("networkID mismatch", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.HandshakeOp), + zap.String("field", "networkID"), zap.Uint32("peerNetworkID", msg.NetworkId), zap.Uint32("ourNetworkID", p.NetworkID), ) @@ -902,27 +932,25 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { return } - myTime := p.Clock.Time() - myTimeUnix := uint64(myTime.Unix()) - clockDifference := math.Abs(float64(msg.MyTime) - float64(myTimeUnix)) + localTime := p.Clock.Time() + localUnixTime := uint64(localTime.Unix()) + clockDifference := math.Abs(float64(msg.MyTime) - float64(localUnixTime)) p.Metrics.ClockSkewCount.Inc() p.Metrics.ClockSkewSum.Add(clockDifference) if clockDifference > p.MaxClockDifference.Seconds() { + log := p.Log.Debug if _, ok := p.Beacons.GetValidator(constants.PrimaryNetworkID, p.id); ok { - p.Log.Warn("beacon reports out of sync time", - zap.Stringer("nodeID", p.id), - zap.Uint64("peerTime", msg.MyTime), - zap.Uint64("myTime", myTimeUnix), - ) - } else { - p.Log.Debug("peer reports out of sync time", - zap.Stringer("nodeID", p.id), - zap.Uint64("peerTime", msg.MyTime), - zap.Uint64("myTime", myTimeUnix), - ) + log = p.Log.Warn } + log(malformedMessageLogMessage, + zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.HandshakeOp), + zap.String("field", "myTime"), + zap.Uint64("peerTime", msg.MyTime), + zap.Uint64("localTime", localUnixTime), + ) p.StartClose() return } @@ -935,25 +963,24 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { } if p.VersionCompatibility.Version().Before(p.version) { + log := p.Log.Debug if _, ok := p.Beacons.GetValidator(constants.PrimaryNetworkID, p.id); ok { - p.Log.Info("beacon attempting to connect with newer version. You may want to update your client", - zap.Stringer("nodeID", p.id), - zap.Stringer("beaconVersion", p.version), - ) - } else { - p.Log.Debug("peer attempting to connect with newer version. You may want to update your client", - zap.Stringer("nodeID", p.id), - zap.Stringer("peerVersion", p.version), - ) + log = p.Log.Info } + log("peer attempting to connect with newer version. You may want to update your client", + zap.Stringer("nodeID", p.id), + zap.Stringer("peerVersion", p.version), + ) } // handle subnet IDs for _, subnetIDBytes := range msg.TrackedSubnets { subnetID, err := ids.ToID(subnetIDBytes) if err != nil { - p.Log.Debug("failed to parse peer's tracked subnets", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.HandshakeOp), + zap.String("field", "trackedSubnets"), zap.Error(err), ) p.StartClose() @@ -977,10 +1004,10 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { } if p.supportedACPs.Overlaps(p.objectedACPs) { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), - zap.String("field", "ACPs"), + zap.String("field", "acps"), zap.Reflect("supportedACPs", p.supportedACPs), zap.Reflect("objectedACPs", p.objectedACPs), ) @@ -996,10 +1023,10 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { var err error knownPeers, err = bloom.Parse(msg.KnownPeers.Filter) if err != nil { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), - zap.String("field", "KnownPeers.Filter"), + zap.String("field", "knownPeers.filter"), zap.Error(err), ) p.StartClose() @@ -1008,10 +1035,10 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { salt = msg.KnownPeers.Salt if saltLen := len(salt); saltLen > maxBloomSaltLen { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), - zap.String("field", "KnownPeers.Salt"), + zap.String("field", "knownPeers.salt"), zap.Int("saltLen", saltLen), ) p.StartClose() @@ -1021,20 +1048,20 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { // "net.IP" type in Golang is 16-byte if ipLen := len(msg.IpAddr); ipLen != net.IPv6len { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), - zap.String("field", "IP"), + zap.String("field", "ip"), zap.Int("ipLen", ipLen), ) p.StartClose() return } if msg.IpPort == 0 { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), - zap.String("field", "Port"), + zap.String("field", "port"), zap.Uint32("port", msg.IpPort), ) p.StartClose() @@ -1051,25 +1078,20 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { }, TLSSignature: msg.IpNodeIdSig, } - maxTimestamp := myTime.Add(p.MaxClockDifference) + maxTimestamp := localTime.Add(p.MaxClockDifference) if err := p.ip.Verify(p.cert, maxTimestamp); err != nil { + log := p.Log.Debug if _, ok := p.Beacons.GetValidator(constants.PrimaryNetworkID, p.id); ok { - p.Log.Warn("beacon has invalid signature or is out of sync", - zap.Stringer("nodeID", p.id), - zap.String("signatureType", "tls"), - zap.Uint64("peerTime", msg.MyTime), - zap.Uint64("myTime", myTimeUnix), - zap.Error(err), - ) - } else { - p.Log.Debug("peer has invalid signature or is out of sync", - zap.Stringer("nodeID", p.id), - zap.String("signatureType", "tls"), - zap.Uint64("peerTime", msg.MyTime), - zap.Uint64("myTime", myTimeUnix), - zap.Error(err), - ) + log = p.Log.Warn } + log(malformedMessageLogMessage, + zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.HandshakeOp), + zap.String("field", "tlsSignature"), + zap.Uint64("peerTime", msg.MyTime), + zap.Uint64("localTime", localUnixTime), + zap.Error(err), + ) p.StartClose() return @@ -1079,9 +1101,10 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { if len(msg.IpBlsSig) > 0 { signature, err := bls.SignatureFromBytes(msg.IpBlsSig) if err != nil { - p.Log.Debug("peer has malformed signature", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), - zap.String("signatureType", "bls"), + zap.Stringer("messageOp", message.HandshakeOp), + zap.String("field", "blsSignature"), zap.Error(err), ) p.StartClose() @@ -1108,7 +1131,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { // acknowledged correctly. peerListMsg, err := p.Config.MessageCreator.PeerList(peerIPs, true /*=bypassThrottling*/) if err != nil { - p.Log.Error("failed to create peer list handshake message", + p.Log.Error(failedToCreateMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), zap.Error(err), @@ -1119,8 +1142,9 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { if !p.Send(p.onClosingCtx, peerListMsg) { // Because throttling was marked to be bypassed with this message, // sending should only fail if the peer has started closing. - p.Log.Debug("failed to send peer list for handshake", + p.Log.Debug(failedToSendMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.PeerListOp), zap.Error(p.onClosingCtx.Err()), ) } @@ -1128,8 +1152,10 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { func (p *peer) handleGetPeerList(msg *p2p.GetPeerList) { if !p.finishedHandshake.Get() { - p.Log.Verbo("dropping get peer list message", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.GetPeerListOp), + zap.String("reason", "not finished handshake"), ) return } @@ -1137,10 +1163,10 @@ func (p *peer) handleGetPeerList(msg *p2p.GetPeerList) { knownPeersMsg := msg.GetKnownPeers() filter, err := bloom.Parse(knownPeersMsg.GetFilter()) if err != nil { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.GetPeerListOp), - zap.String("field", "KnownPeers.Filter"), + zap.String("field", "knownPeers.filter"), zap.Error(err), ) p.StartClose() @@ -1149,10 +1175,10 @@ func (p *peer) handleGetPeerList(msg *p2p.GetPeerList) { salt := knownPeersMsg.GetSalt() if saltLen := len(salt); saltLen > maxBloomSaltLen { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.GetPeerListOp), - zap.String("field", "KnownPeers.Salt"), + zap.String("field", "knownPeers.salt"), zap.Int("saltLen", saltLen), ) p.StartClose() @@ -1171,16 +1197,18 @@ func (p *peer) handleGetPeerList(msg *p2p.GetPeerList) { // sending pattern. peerListMsg, err := p.Config.MessageCreator.PeerList(peerIPs, false /*=bypassThrottling*/) if err != nil { - p.Log.Error("failed to create peer list message", + p.Log.Error(failedToCreateMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.PeerListOp), zap.Error(err), ) return } if !p.Send(p.onClosingCtx, peerListMsg) { - p.Log.Debug("failed to send peer list", + p.Log.Debug(failedToSendMessageLogMessage, zap.Stringer("nodeID", p.id), + zap.Stringer("messageOp", message.PeerListOp), ) } } @@ -1200,10 +1228,10 @@ func (p *peer) handlePeerList(msg *p2p.PeerList) { for i, claimedIPPort := range msg.ClaimedIpPorts { tlsCert, err := staking.ParseCertificate(claimedIPPort.X509Certificate) if err != nil { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), - zap.String("field", "Cert"), + zap.String("field", "cert"), zap.Error(err), ) p.StartClose() @@ -1212,20 +1240,20 @@ func (p *peer) handlePeerList(msg *p2p.PeerList) { // "net.IP" type in Golang is 16-byte if ipLen := len(claimedIPPort.IpAddr); ipLen != net.IPv6len { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), - zap.String("field", "IP"), + zap.String("field", "ip"), zap.Int("ipLen", ipLen), ) p.StartClose() return } if claimedIPPort.IpPort == 0 { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), - zap.String("field", "Port"), + zap.String("field", "port"), zap.Uint32("port", claimedIPPort.IpPort), ) // TODO: After v1.11.x is activated, close the peer here. @@ -1244,7 +1272,7 @@ func (p *peer) handlePeerList(msg *p2p.PeerList) { } if err := p.Network.Track(discoveredIPs); err != nil { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLogMessage, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), zap.String("field", "claimedIP"), From 5a25bd177d7037027dd392eb7cfbc017865ecf19 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 13 May 2024 17:10:46 -0400 Subject: [PATCH 06/13] nit --- network/peer/peer.go | 96 ++++++++++++++++++++++---------------------- 1 file changed, 48 insertions(+), 48 deletions(-) diff --git a/network/peer/peer.go b/network/peer/peer.go index 4feda2ab55b..0a47978fb8e 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -36,12 +36,12 @@ const ( // excessively expensive bloom filter contains checks. maxBloomSaltLen = 32 - disconnectingLogMessage = "disconnecting from peer" - failedToCreateMessageLogMessage = "failed to create message" - failedToSendMessageLogMessage = "failed to send message" - failedToSetDeadlineLogMessage = "failed to set connection deadline" - failedToGetUptimeLogMessage = "failed to get peer uptime percentage" - malformedMessageLogMessage = "malformed message" + disconnectingLog = "disconnecting from peer" + failedToCreateMessageLog = "failed to create message" + failedToSendMessageLog = "failed to send message" + failedToSetDeadlineLog = "failed to set connection deadline" + failedToGetUptimeLog = "failed to get peer uptime percentage" + malformedMessageLog = "malformed message" ) var ( @@ -393,7 +393,7 @@ func (p *peer) readMessages() { for { // Time out and close connection if we can't read the message length if err := p.conn.SetReadDeadline(p.nextTimeout()); err != nil { - p.Log.Verbo(failedToSetDeadlineLogMessage, + p.Log.Verbo(failedToSetDeadlineLog, zap.Stringer("nodeID", p.id), zap.String("direction", "read"), zap.Error(err), @@ -447,7 +447,7 @@ func (p *peer) readMessages() { // Time out and close connection if we can't read message if err := p.conn.SetReadDeadline(p.nextTimeout()); err != nil { - p.Log.Verbo(failedToSetDeadlineLogMessage, + p.Log.Verbo(failedToSetDeadlineLog, zap.Stringer("nodeID", p.id), zap.String("direction", "read"), zap.Error(err), @@ -554,7 +554,7 @@ func (p *peer) writeMessages() { knownPeersSalt, ) if err != nil { - p.Log.Error(failedToCreateMessageLogMessage, + p.Log.Error(failedToCreateMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.Error(err), @@ -599,7 +599,7 @@ func (p *peer) writeMessage(writer io.Writer, msg message.OutboundMessage) { ) if err := p.conn.SetWriteDeadline(p.nextTimeout()); err != nil { - p.Log.Verbo(failedToSetDeadlineLogMessage, + p.Log.Verbo(failedToSetDeadlineLog, zap.Stringer("nodeID", p.id), zap.String("direction", "write"), zap.Error(err), @@ -647,7 +647,7 @@ func (p *peer) sendNetworkMessages() { knownPeersFilter, knownPeersSalt := p.Config.Network.KnownPeers() msg, err := p.Config.MessageCreator.GetPeerList(knownPeersFilter, knownPeersSalt) if err != nil { - p.Log.Error(failedToCreateMessageLogMessage, + p.Log.Error(failedToCreateMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.GetPeerListOp), zap.Error(err), @@ -656,14 +656,14 @@ func (p *peer) sendNetworkMessages() { } if !p.Send(p.onClosingCtx, msg) { - p.Log.Debug(failedToSendMessageLogMessage, + p.Log.Debug(failedToSendMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.GetPeerListOp), ) } case <-sendPingsTicker.C: if !p.Network.AllowConnection(p.id) { - p.Log.Debug(disconnectingLogMessage, + p.Log.Debug(disconnectingLog, zap.String("reason", "connection is no longer desired"), zap.Stringer("nodeID", p.id), ) @@ -680,7 +680,7 @@ func (p *peer) sendNetworkMessages() { primaryUptime, subnetUptimes := p.getUptimes() pingMessage, err := p.MessageCreator.Ping(primaryUptime, subnetUptimes) if err != nil { - p.Log.Error(failedToCreateMessageLogMessage, + p.Log.Error(failedToCreateMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PingOp), zap.Error(err), @@ -689,7 +689,7 @@ func (p *peer) sendNetworkMessages() { } if !p.Send(p.onClosingCtx, pingMessage) { - p.Log.Debug(failedToSendMessageLogMessage, + p.Log.Debug(failedToSendMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PingOp), ) @@ -711,7 +711,7 @@ func (p *peer) sendNetworkMessages() { // callback to avoid signature verification on the P-chain accept path. func (p *peer) shouldDisconnect() bool { if err := p.VersionCompatibility.Compatible(p.version); err != nil { - p.Log.Debug(disconnectingLogMessage, + p.Log.Debug(disconnectingLog, zap.String("reason", "version not compatible"), zap.Stringer("nodeID", p.id), zap.Stringer("peerVersion", p.version), @@ -728,7 +728,7 @@ func (p *peer) shouldDisconnect() bool { } if p.ip.BLSSignature == nil { - p.Log.Debug(disconnectingLogMessage, + p.Log.Debug(disconnectingLog, zap.String("reason", "missing BLS signature"), zap.Stringer("nodeID", p.id), ) @@ -741,7 +741,7 @@ func (p *peer) shouldDisconnect() bool { p.ip.UnsignedIP.bytes(), ) if !validSignature { - p.Log.Debug(disconnectingLogMessage, + p.Log.Debug(disconnectingLog, zap.String("reason", "invalid BLS signature"), zap.Stringer("nodeID", p.id), ) @@ -793,7 +793,7 @@ func (p *peer) handle(msg message.InboundMessage) { func (p *peer) handlePing(msg *p2p.Ping) { if msg.Uptime > 100 { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PingOp), zap.Stringer("subnetID", constants.PrimaryNetworkID), @@ -807,7 +807,7 @@ func (p *peer) handlePing(msg *p2p.Ping) { for _, subnetUptime := range msg.SubnetUptimes { subnetID, err := ids.ToID(subnetUptime.SubnetId) if err != nil { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PingOp), zap.String("field", "subnetID"), @@ -818,7 +818,7 @@ func (p *peer) handlePing(msg *p2p.Ping) { } if !p.MySubnets.Contains(subnetID) { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PingOp), zap.Stringer("subnetID", subnetID), @@ -830,7 +830,7 @@ func (p *peer) handlePing(msg *p2p.Ping) { uptime := subnetUptime.Uptime if uptime > 100 { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PingOp), zap.Stringer("subnetID", subnetID), @@ -844,7 +844,7 @@ func (p *peer) handlePing(msg *p2p.Ping) { pongMessage, err := p.MessageCreator.Pong() if err != nil { - p.Log.Error(failedToCreateMessageLogMessage, + p.Log.Error(failedToCreateMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PongOp), zap.Error(err), @@ -853,7 +853,7 @@ func (p *peer) handlePing(msg *p2p.Ping) { } if !p.Send(p.onClosingCtx, pongMessage) { - p.Log.Debug(failedToSendMessageLogMessage, + p.Log.Debug(failedToSendMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PongOp), ) @@ -866,7 +866,7 @@ func (p *peer) getUptimes() (uint32, []*p2p.SubnetUptime) { constants.PrimaryNetworkID, ) if err != nil { - p.Log.Debug(failedToGetUptimeLogMessage, + p.Log.Debug(failedToGetUptimeLog, zap.Stringer("nodeID", p.id), zap.Stringer("subnetID", constants.PrimaryNetworkID), zap.Error(err), @@ -878,7 +878,7 @@ func (p *peer) getUptimes() (uint32, []*p2p.SubnetUptime) { for subnetID := range p.trackedSubnets { subnetUptime, err := p.UptimeCalculator.CalculateUptimePercent(p.id, subnetID) if err != nil { - p.Log.Debug(failedToGetUptimeLogMessage, + p.Log.Debug(failedToGetUptimeLog, zap.Stringer("nodeID", p.id), zap.Stringer("subnetID", subnetID), zap.Error(err), @@ -911,7 +911,7 @@ func (p *peer) observeUptime(subnetID ids.ID, uptime uint32) { func (p *peer) handleHandshake(msg *p2p.Handshake) { if p.gotHandshake.Get() { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.String("reason", "already received handshake"), @@ -921,7 +921,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { } if msg.NetworkId != p.NetworkID { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.String("field", "networkID"), @@ -944,7 +944,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { if _, ok := p.Beacons.GetValidator(constants.PrimaryNetworkID, p.id); ok { log = p.Log.Warn } - log(malformedMessageLogMessage, + log(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.String("field", "myTime"), @@ -977,7 +977,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { for _, subnetIDBytes := range msg.TrackedSubnets { subnetID, err := ids.ToID(subnetIDBytes) if err != nil { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.String("field", "trackedSubnets"), @@ -1004,7 +1004,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { } if p.supportedACPs.Overlaps(p.objectedACPs) { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.String("field", "acps"), @@ -1023,7 +1023,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { var err error knownPeers, err = bloom.Parse(msg.KnownPeers.Filter) if err != nil { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.String("field", "knownPeers.filter"), @@ -1035,7 +1035,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { salt = msg.KnownPeers.Salt if saltLen := len(salt); saltLen > maxBloomSaltLen { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.String("field", "knownPeers.salt"), @@ -1048,7 +1048,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { // "net.IP" type in Golang is 16-byte if ipLen := len(msg.IpAddr); ipLen != net.IPv6len { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.String("field", "ip"), @@ -1058,7 +1058,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { return } if msg.IpPort == 0 { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.String("field", "port"), @@ -1084,7 +1084,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { if _, ok := p.Beacons.GetValidator(constants.PrimaryNetworkID, p.id); ok { log = p.Log.Warn } - log(malformedMessageLogMessage, + log(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.String("field", "tlsSignature"), @@ -1101,7 +1101,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { if len(msg.IpBlsSig) > 0 { signature, err := bls.SignatureFromBytes(msg.IpBlsSig) if err != nil { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), zap.String("field", "blsSignature"), @@ -1131,7 +1131,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { // acknowledged correctly. peerListMsg, err := p.Config.MessageCreator.PeerList(peerIPs, true /*=bypassThrottling*/) if err != nil { - p.Log.Error(failedToCreateMessageLogMessage, + p.Log.Error(failedToCreateMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), zap.Error(err), @@ -1142,7 +1142,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { if !p.Send(p.onClosingCtx, peerListMsg) { // Because throttling was marked to be bypassed with this message, // sending should only fail if the peer has started closing. - p.Log.Debug(failedToSendMessageLogMessage, + p.Log.Debug(failedToSendMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), zap.Error(p.onClosingCtx.Err()), @@ -1152,7 +1152,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { func (p *peer) handleGetPeerList(msg *p2p.GetPeerList) { if !p.finishedHandshake.Get() { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.GetPeerListOp), zap.String("reason", "not finished handshake"), @@ -1163,7 +1163,7 @@ func (p *peer) handleGetPeerList(msg *p2p.GetPeerList) { knownPeersMsg := msg.GetKnownPeers() filter, err := bloom.Parse(knownPeersMsg.GetFilter()) if err != nil { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.GetPeerListOp), zap.String("field", "knownPeers.filter"), @@ -1175,7 +1175,7 @@ func (p *peer) handleGetPeerList(msg *p2p.GetPeerList) { salt := knownPeersMsg.GetSalt() if saltLen := len(salt); saltLen > maxBloomSaltLen { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.GetPeerListOp), zap.String("field", "knownPeers.salt"), @@ -1197,7 +1197,7 @@ func (p *peer) handleGetPeerList(msg *p2p.GetPeerList) { // sending pattern. peerListMsg, err := p.Config.MessageCreator.PeerList(peerIPs, false /*=bypassThrottling*/) if err != nil { - p.Log.Error(failedToCreateMessageLogMessage, + p.Log.Error(failedToCreateMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), zap.Error(err), @@ -1206,7 +1206,7 @@ func (p *peer) handleGetPeerList(msg *p2p.GetPeerList) { } if !p.Send(p.onClosingCtx, peerListMsg) { - p.Log.Debug(failedToSendMessageLogMessage, + p.Log.Debug(failedToSendMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), ) @@ -1228,7 +1228,7 @@ func (p *peer) handlePeerList(msg *p2p.PeerList) { for i, claimedIPPort := range msg.ClaimedIpPorts { tlsCert, err := staking.ParseCertificate(claimedIPPort.X509Certificate) if err != nil { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), zap.String("field", "cert"), @@ -1240,7 +1240,7 @@ func (p *peer) handlePeerList(msg *p2p.PeerList) { // "net.IP" type in Golang is 16-byte if ipLen := len(claimedIPPort.IpAddr); ipLen != net.IPv6len { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), zap.String("field", "ip"), @@ -1250,7 +1250,7 @@ func (p *peer) handlePeerList(msg *p2p.PeerList) { return } if claimedIPPort.IpPort == 0 { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), zap.String("field", "port"), @@ -1272,7 +1272,7 @@ func (p *peer) handlePeerList(msg *p2p.PeerList) { } if err := p.Network.Track(discoveredIPs); err != nil { - p.Log.Debug(malformedMessageLogMessage, + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), zap.String("field", "claimedIP"), From 66cae7aef590bdf0e4c195c303d244aff9421da3 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 13 May 2024 17:14:55 -0400 Subject: [PATCH 07/13] nit --- network/peer/peer.go | 31 +++++-------------------------- 1 file changed, 5 insertions(+), 26 deletions(-) diff --git a/network/peer/peer.go b/network/peer/peer.go index 0a47978fb8e..90d1157f322 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -38,7 +38,6 @@ const ( disconnectingLog = "disconnecting from peer" failedToCreateMessageLog = "failed to create message" - failedToSendMessageLog = "failed to send message" failedToSetDeadlineLog = "failed to set connection deadline" failedToGetUptimeLog = "failed to get peer uptime percentage" malformedMessageLog = "malformed message" @@ -655,12 +654,7 @@ func (p *peer) sendNetworkMessages() { continue } - if !p.Send(p.onClosingCtx, msg) { - p.Log.Debug(failedToSendMessageLog, - zap.Stringer("nodeID", p.id), - zap.Stringer("messageOp", message.GetPeerListOp), - ) - } + p.Send(p.onClosingCtx, msg) case <-sendPingsTicker.C: if !p.Network.AllowConnection(p.id) { p.Log.Debug(disconnectingLog, @@ -688,12 +682,7 @@ func (p *peer) sendNetworkMessages() { return } - if !p.Send(p.onClosingCtx, pingMessage) { - p.Log.Debug(failedToSendMessageLog, - zap.Stringer("nodeID", p.id), - zap.Stringer("messageOp", message.PingOp), - ) - } + p.Send(p.onClosingCtx, pingMessage) case <-p.onClosingCtx.Done(): return } @@ -852,12 +841,7 @@ func (p *peer) handlePing(msg *p2p.Ping) { return } - if !p.Send(p.onClosingCtx, pongMessage) { - p.Log.Debug(failedToSendMessageLog, - zap.Stringer("nodeID", p.id), - zap.Stringer("messageOp", message.PongOp), - ) - } + p.Send(p.onClosingCtx, pongMessage) } func (p *peer) getUptimes() (uint32, []*p2p.SubnetUptime) { @@ -1142,7 +1126,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { if !p.Send(p.onClosingCtx, peerListMsg) { // Because throttling was marked to be bypassed with this message, // sending should only fail if the peer has started closing. - p.Log.Debug(failedToSendMessageLog, + p.Log.Debug("failed to send reliable message", zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.PeerListOp), zap.Error(p.onClosingCtx.Err()), @@ -1205,12 +1189,7 @@ func (p *peer) handleGetPeerList(msg *p2p.GetPeerList) { return } - if !p.Send(p.onClosingCtx, peerListMsg) { - p.Log.Debug(failedToSendMessageLog, - zap.Stringer("nodeID", p.id), - zap.Stringer("messageOp", message.PeerListOp), - ) - } + p.Send(p.onClosingCtx, peerListMsg) } func (p *peer) handlePeerList(msg *p2p.PeerList) { From ec069b7ee29dfb7f40707e48715a15c601cca54a Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 13 May 2024 17:16:11 -0400 Subject: [PATCH 08/13] nit --- network/peer/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/network/peer/peer.go b/network/peer/peer.go index 90d1157f322..3d1ab3e6a6a 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -768,9 +768,9 @@ func (p *peer) handle(msg message.InboundMessage) { } if !p.finishedHandshake.Get() { p.Log.Debug("dropping message", - zap.String("reason", "handshake isn't finished"), zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", msg.Op()), + zap.String("reason", "handshake isn't finished"), ) msg.OnFinishedHandling() return From f44b492f1dc054dedd56262ab434efbcde2368e7 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 13 May 2024 17:19:47 -0400 Subject: [PATCH 09/13] nit --- network/peer/peer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/network/peer/peer.go b/network/peer/peer.go index 3d1ab3e6a6a..ea721087c38 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -1120,6 +1120,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { zap.Stringer("messageOp", message.PeerListOp), zap.Error(err), ) + p.StartClose() return } @@ -1131,6 +1132,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { zap.Stringer("messageOp", message.PeerListOp), zap.Error(p.onClosingCtx.Err()), ) + p.StartClose() } } From d6e99b5cd49fcf08f23c652d7aeb1da23dc37341 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 13 May 2024 17:29:48 -0400 Subject: [PATCH 10/13] nit --- network/peer/peer.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/network/peer/peer.go b/network/peer/peer.go index ea721087c38..2b0f5788895 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -651,7 +651,7 @@ func (p *peer) sendNetworkMessages() { zap.Stringer("messageOp", message.GetPeerListOp), zap.Error(err), ) - continue + return } p.Send(p.onClosingCtx, msg) @@ -838,6 +838,7 @@ func (p *peer) handlePing(msg *p2p.Ping) { zap.Stringer("messageOp", message.PongOp), zap.Error(err), ) + p.StartClose() return } From 40bb983350a130df3a0cedf539e3e5af61a12142 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 13 May 2024 17:41:17 -0400 Subject: [PATCH 11/13] merged --- network/peer/peer.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/peer/peer.go b/network/peer/peer.go index 9866be545ca..0e65b017708 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -966,10 +966,10 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { // handle subnet IDs if numTrackedSubnets := len(msg.TrackedSubnets); numTrackedSubnets > maxNumTrackedSubnets { - p.Log.Debug("message with invalid field", + p.Log.Debug(malformedMessageLog, zap.Stringer("nodeID", p.id), zap.Stringer("messageOp", message.HandshakeOp), - zap.String("field", "TrackedSubnets"), + zap.String("field", "trackedSubnets"), zap.Int("numTrackedSubnets", numTrackedSubnets), ) p.StartClose() From 1a08def0e6ee2da2662774306580c1b769f7e025 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 14 May 2024 15:14:23 -0400 Subject: [PATCH 12/13] nit --- network/peer/peer_test.go | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/network/peer/peer_test.go b/network/peer/peer_test.go index c10fedfda25..b0f723a90fe 100644 --- a/network/peer/peer_test.go +++ b/network/peer/peer_test.go @@ -334,6 +334,18 @@ func TestTrackedSubnets(t *testing.T) { rawPeer0 := newRawTestPeer(t, sharedConfig) rawPeer1 := newRawTestPeer(t, sharedConfig) + makeSubnetIDs := func(numSubnets int) set.Set[ids.ID] { + if numSubnets == 0 { + return nil + } + + subnetIDs := set.NewSet[ids.ID](numSubnets) + for i := 0; i < numSubnets; i++ { + subnetIDs.Add(ids.GenerateTestID()) + } + return subnetIDs + } + tests := []struct { name string trackedSubnets set.Set[ids.ID] @@ -341,34 +353,22 @@ func TestTrackedSubnets(t *testing.T) { }{ { name: "primary network only", - trackedSubnets: nil, + trackedSubnets: makeSubnetIDs(0), shouldDisconnect: false, }, { name: "single subnet", - trackedSubnets: set.Of(ids.GenerateTestID()), + trackedSubnets: makeSubnetIDs(1), shouldDisconnect: false, }, { - name: "max subnets", - trackedSubnets: func() set.Set[ids.ID] { - trackedSubnets := set.NewSet[ids.ID](maxNumTrackedSubnets) - for i := 0; i < maxNumTrackedSubnets; i++ { - trackedSubnets.Add(ids.GenerateTestID()) - } - return trackedSubnets - }(), + name: "max subnets", + trackedSubnets: makeSubnetIDs(maxNumTrackedSubnets), shouldDisconnect: false, }, { - name: "too many subnets", - trackedSubnets: func() set.Set[ids.ID] { - trackedSubnets := set.NewSet[ids.ID](maxNumTrackedSubnets + 1) - for i := 0; i < maxNumTrackedSubnets+1; i++ { - trackedSubnets.Add(ids.GenerateTestID()) - } - return trackedSubnets - }(), + name: "too many subnets", + trackedSubnets: makeSubnetIDs(maxNumTrackedSubnets + 1), shouldDisconnect: true, }, } From 4fe32d70f0bab22dc8a69ca181fea9ccff64d883 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 14 May 2024 17:57:10 -0400 Subject: [PATCH 13/13] add primary network to tracked subnets --- network/metrics.go | 7 ++++++- network/network.go | 6 ++---- network/peer/config.go | 15 ++++++++------- network/peer/peer.go | 7 +++---- network/peer/peer_test.go | 24 ++++++++++++------------ 5 files changed, 31 insertions(+), 28 deletions(-) diff --git a/network/metrics.go b/network/metrics.go index ac1df146ccd..c6b47a1360a 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -16,6 +16,7 @@ import ( ) type metrics struct { + // trackedSubnets does not include the primary network ID trackedSubnets set.Set[ids.ID] numTracked prometheus.Gauge @@ -42,7 +43,11 @@ type metrics struct { peerConnectedStartTimesSum float64 } -func newMetrics(namespace string, registerer prometheus.Registerer, trackedSubnets set.Set[ids.ID]) (*metrics, error) { +func newMetrics( + namespace string, + registerer prometheus.Registerer, + trackedSubnets set.Set[ids.ID], +) (*metrics, error) { m := &metrics{ trackedSubnets: trackedSubnets, numPeers: prometheus.NewGauge(prometheus.GaugeOpts{ diff --git a/network/network.go b/network/network.go index a8173b13cb8..9963612c016 100644 --- a/network/network.go +++ b/network/network.go @@ -698,8 +698,7 @@ func (n *network) getPeers( continue } - trackedSubnets := peer.TrackedSubnets() - if subnetID != constants.PrimaryNetworkID && !trackedSubnets.Contains(subnetID) { + if trackedSubnets := peer.TrackedSubnets(); !trackedSubnets.Contains(subnetID) { continue } @@ -735,8 +734,7 @@ func (n *network) samplePeers( numValidatorsToSample+config.NonValidators+config.Peers, func(p peer.Peer) bool { // Only return peers that are tracking [subnetID] - trackedSubnets := p.TrackedSubnets() - if subnetID != constants.PrimaryNetworkID && !trackedSubnets.Contains(subnetID) { + if trackedSubnets := p.TrackedSubnets(); !trackedSubnets.Contains(subnetID) { return false } diff --git a/network/peer/config.go b/network/peer/config.go index 3eb8319216d..8aa12820cc4 100644 --- a/network/peer/config.go +++ b/network/peer/config.go @@ -33,13 +33,14 @@ type Config struct { Network Network Router router.InboundHandler VersionCompatibility version.Compatibility - MySubnets set.Set[ids.ID] - Beacons validators.Manager - Validators validators.Manager - NetworkID uint32 - PingFrequency time.Duration - PongTimeout time.Duration - MaxClockDifference time.Duration + // MySubnets does not include the primary network ID + MySubnets set.Set[ids.ID] + Beacons validators.Manager + Validators validators.Manager + NetworkID uint32 + PingFrequency time.Duration + PongTimeout time.Duration + MaxClockDifference time.Duration SupportedACPs []uint32 ObjectedACPs []uint32 diff --git a/network/peer/peer.go b/network/peer/peer.go index 9085c39dd30..a87bca70854 100644 --- a/network/peer/peer.go +++ b/network/peer/peer.go @@ -143,7 +143,7 @@ type peer struct { // the Handshake message. version *version.Application // trackedSubnets are the subnetIDs the peer sent us in the Handshake - // message. + // message. The primary network ID is always included. trackedSubnets set.Set[ids.ID] // options of ACPs provided in the Handshake message. supportedACPs set.Set[uint32] @@ -968,6 +968,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { return } + p.trackedSubnets.Add(constants.PrimaryNetworkID) for _, subnetIDBytes := range msg.TrackedSubnets { subnetID, err := ids.ToID(subnetIDBytes) if err != nil { @@ -980,9 +981,7 @@ func (p *peer) handleHandshake(msg *p2p.Handshake) { p.StartClose() return } - if subnetID != constants.PrimaryNetworkID { - p.trackedSubnets.Add(subnetID) - } + p.trackedSubnets.Add(subnetID) } for _, acp := range msg.SupportedAcps { diff --git a/network/peer/peer_test.go b/network/peer/peer_test.go index b0f723a90fe..ffd5915aa2c 100644 --- a/network/peer/peer_test.go +++ b/network/peer/peer_test.go @@ -334,21 +334,17 @@ func TestTrackedSubnets(t *testing.T) { rawPeer0 := newRawTestPeer(t, sharedConfig) rawPeer1 := newRawTestPeer(t, sharedConfig) - makeSubnetIDs := func(numSubnets int) set.Set[ids.ID] { - if numSubnets == 0 { - return nil - } - - subnetIDs := set.NewSet[ids.ID](numSubnets) - for i := 0; i < numSubnets; i++ { - subnetIDs.Add(ids.GenerateTestID()) + makeSubnetIDs := func(numSubnets int) []ids.ID { + subnetIDs := make([]ids.ID, numSubnets) + for i := range subnetIDs { + subnetIDs[i] = ids.GenerateTestID() } return subnetIDs } tests := []struct { name string - trackedSubnets set.Set[ids.ID] + trackedSubnets []ids.ID shouldDisconnect bool }{ { @@ -377,7 +373,7 @@ func TestTrackedSubnets(t *testing.T) { t.Run(test.name, func(t *testing.T) { require := require.New(t) - rawPeer0.config.MySubnets = test.trackedSubnets + rawPeer0.config.MySubnets = set.Of(test.trackedSubnets...) peer0, peer1 := startTestPeers(rawPeer0, rawPeer1) if test.shouldDisconnect { require.NoError(peer0.AwaitClosed(context.Background())) @@ -393,8 +389,12 @@ func TestTrackedSubnets(t *testing.T) { }() awaitReady(t, peer0, peer1) - require.Empty(peer0.TrackedSubnets()) - require.Equal(test.trackedSubnets, peer1.TrackedSubnets()) + + require.Equal(set.Of(constants.PrimaryNetworkID), peer0.TrackedSubnets()) + + expectedTrackedSubnets := set.Of(test.trackedSubnets...) + expectedTrackedSubnets.Add(constants.PrimaryNetworkID) + require.Equal(expectedTrackedSubnets, peer1.TrackedSubnets()) }) } }