From 6c3038a2f77677a59737c32a79e4807d68237ea5 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Fri, 15 Mar 2024 12:36:49 +0200 Subject: [PATCH] misc(p2p/peerTracker): extend conditions for peers handling --- go.mod | 12 +++ go.sum | 9 +- p2p/exchange.go | 8 +- p2p/exchange_test.go | 70 +++++++++---- p2p/peer_stats.go | 40 +++----- p2p/peer_stats_test.go | 4 +- p2p/peer_tracker.go | 214 +++++++++++++++++++++------------------ p2p/peer_tracker_test.go | 75 +++----------- p2p/session.go | 4 +- p2p/session_test.go | 4 +- 10 files changed, 221 insertions(+), 219 deletions(-) diff --git a/go.mod b/go.mod index 066834a1..42cb0400 100644 --- a/go.mod +++ b/go.mod @@ -24,18 +24,24 @@ require ( github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/containerd/cgroups v1.1.0 // indirect + github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect + github.com/docker/go-units v0.5.0 // indirect + github.com/elastic/gosigar v0.14.2 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect + github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect github.com/google/uuid v1.5.0 // indirect + github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/huin/goupnp v1.3.0 // indirect github.com/ipfs/go-cid v0.4.1 // indirect @@ -73,6 +79,9 @@ require ( github.com/multiformats/go-multistream v0.5.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/onsi/ginkgo/v2 v2.13.0 // indirect + github.com/opencontainers/runtime-spec v1.1.0 // indirect + github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect @@ -82,7 +91,10 @@ require ( github.com/quic-go/qtls-go1-20 v0.3.4 // indirect github.com/quic-go/quic-go v0.39.4 // indirect github.com/quic-go/webtransport-go v0.6.0 // indirect + github.com/raulk/go-watchdog v1.3.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + go.uber.org/dig v1.17.1 // indirect + go.uber.org/fx v1.20.1 // indirect go.uber.org/mock v0.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect diff --git a/go.sum b/go.sum index 888e6e20..dc4828dc 100644 --- a/go.sum +++ b/go.sum @@ -120,16 +120,17 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE= github.com/containerd/cgroups v1.0.3/go.mod h1:/ofk34relqNjSGyqPrmEULrO4Sc8LJhvJmWbUCUKqj8= github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM= +github.com/containerd/cgroups v1.1.0/go.mod h1:6ppBcbh/NOOUU+dMKrykgaBnK9lCIBxHqJDGwsa1mIw= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= -github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d h1:t5Wuyh53qYyg9eqn4BbnlIT+vmhyww0TatL+zT3uWgI= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= @@ -152,6 +153,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= +github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= @@ -698,6 +700,7 @@ github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/runtime-spec v1.1.0 h1:HHUyrt9mwHUjtasSbXSMvs4cyFxh+Bll4AjJ9odEGpg= +github.com/opencontainers/runtime-spec v1.1.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= @@ -779,6 +782,7 @@ github.com/quic-go/webtransport-go v0.6.0/go.mod h1:9KjU4AEBqEQidGHNDkZrb8CAa1ab github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0= github.com/raulk/go-watchdog v1.2.0/go.mod h1:lzSbAl5sh4rtI8tYHU01BWIDzgzqaQLj6RcA1i4mlqI= github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= +github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= @@ -896,8 +900,11 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc= +go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= go.uber.org/fx v1.20.1 h1:zVwVQGS8zYvhh9Xxcu4w1M6ESyeMzebzj2NbSayZ4Mk= +go.uber.org/fx v1.20.1/go.mod h1:iSYNbHf2y55acNCwCXKx7LbWb5WG1Bnue5RDXz1OREg= go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= diff --git a/p2p/exchange.go b/p2p/exchange.go index 13fdf43f..57a06a80 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -80,10 +80,11 @@ func NewExchange[H header.Header[H]]( } } + id := protocolID(params.networkID) ex := &Exchange[H]{ host: host, - protocolID: protocolID(params.networkID), - peerTracker: newPeerTracker(host, gater, params.pidstore, metrics), + protocolID: id, + peerTracker: newPeerTracker(host, gater, params.networkID, params.pidstore, metrics), Params: params, metrics: metrics, } @@ -98,7 +99,6 @@ func (ex *Exchange[H]) Start(ctx context.Context) error { ex.ctx, ex.cancel = context.WithCancel(context.Background()) log.Infow("client: starting client", "protocol ID", ex.protocolID) - go ex.peerTracker.gc() go ex.peerTracker.track() // bootstrap the peerTracker with trusted peers as well as previously seen @@ -172,7 +172,7 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( trace.WithAttributes(attribute.String("peerID", from.String())), ) defer newSpan.End() - + headers, err := ex.request(reqCtx, from, headerReq) if err != nil { newSpan.SetStatus(codes.Error, err.Error()) diff --git a/p2p/exchange_test.go b/p2p/exchange_test.go index 38298e0e..1724292a 100644 --- a/p2p/exchange_test.go +++ b/p2p/exchange_test.go @@ -8,12 +8,14 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" + "github.com/libp2p/go-libp2p" libhost "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" blankhost "github.com/libp2p/go-libp2p/p2p/host/blank" "github.com/libp2p/go-libp2p/p2p/net/conngater" + "github.com/libp2p/go-libp2p/p2p/net/connmgr" mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" swarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" "github.com/stretchr/testify/assert" @@ -242,7 +244,7 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) { require.NoError(t, err) servers[index].Start(context.Background()) //nolint:errcheck exchange.peerTracker.peerLk.Lock() - exchange.peerTracker.trackedPeers[hosts[index].ID()] = &peerStat{peerID: hosts[index].ID()} + exchange.peerTracker.trackedPeers[hosts[index].ID()] = struct{}{} exchange.peerTracker.peerLk.Unlock() } @@ -262,12 +264,20 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) { // TestExchange_RequestHeadersFromAnotherPeer tests that the Exchange instance will request range // from another peer with lower score after receiving header.ErrNotFound func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) { - hosts := createMocknet(t, 3) + hosts := quicHosts(t, 3) + // create client + server(it does not have needed headers) exchg, store := createP2PExAndServer(t, hosts[0], hosts[1]) + + serverSideStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10) + tmServerSideStore := &timedOutStore{timeout: time.Millisecond * 200, Store: *serverSideStore} + + hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100) + hosts[0].ConnManager().TagPeer(hosts[2].ID(), string(protocolID(networkID)), 90) + // create one more server(with more headers in the store) serverSideEx, err := NewExchangeServer[*headertest.DummyHeader]( - hosts[2], headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10), + hosts[2], tmServerSideStore, WithNetworkID[ServerParameters](networkID), ) require.NoError(t, err) @@ -275,17 +285,17 @@ func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) { t.Cleanup(func() { serverSideEx.Stop(context.Background()) //nolint:errcheck }) + exchg.peerTracker.peerLk.Lock() - exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 20} + exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{} exchg.peerTracker.peerLk.Unlock() - h, err := store.GetByHeight(context.Background(), 5) require.NoError(t, err) _, err = exchg.GetRangeByHeight(context.Background(), h, 8) require.NoError(t, err) // ensure that peerScore for the second peer is changed - newPeerScore := exchg.peerTracker.trackedPeers[hosts[2].ID()].score() + newPeerScore := score(t, exchg.peerTracker.host, hosts[2].ID()) require.NotEqual(t, 20, newPeerScore) } @@ -464,7 +474,9 @@ func TestExchange_HandleHeaderWithDifferentChainID(t *testing.T) { func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) { // create blankhost because mocknet does not support deadlines swarm0 := swarm.GenSwarm(t) - host0 := blankhost.NewBlankHost(swarm0) + mngr, err := connmgr.NewConnManager(0, 50) + require.NoError(t, err) + host0 := blankhost.NewBlankHost(swarm0, blankhost.WithConnectionManager(mngr)) swarm1 := swarm.GenSwarm(t) host1 := blankhost.NewBlankHost(swarm1) swarm2 := swarm.GenSwarm(t) @@ -495,9 +507,10 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) { t.Cleanup(func() { serverSideEx.Stop(context.Background()) //nolint:errcheck }) - prevScore := exchg.peerTracker.trackedPeers[host1.ID()].score() + prevScore := score(t, exchg.host, host1.ID()) exchg.peerTracker.peerLk.Lock() - exchg.peerTracker.trackedPeers[host2.ID()] = &peerStat{peerID: host2.ID(), peerScore: 200} + host0.ConnManager().TagPeer(host2.ID(), string(protocolID(networkID)), 100) + exchg.peerTracker.trackedPeers[host2.ID()] = struct{}{} exchg.peerTracker.peerLk.Unlock() gen, err := store.GetByHeight(context.Background(), 1) @@ -505,14 +518,14 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) { _, err = exchg.GetRangeByHeight(context.Background(), gen, 3) require.NoError(t, err) - newPeerScore := exchg.peerTracker.trackedPeers[host1.ID()].score() + newPeerScore := score(t, exchg.host, host1.ID()) assert.NotEqual(t, newPeerScore, prevScore) } // TestExchange_RequestPartialRange enusres in case of receiving a partial response // from server, Exchange will re-request remaining headers from another peer func TestExchange_RequestPartialRange(t *testing.T) { - hosts := createMocknet(t, 3) + hosts := quicHosts(t, 3) exchg, store := createP2PExAndServer(t, hosts[0], hosts[1]) // create one more server(with more headers in the store) @@ -523,13 +536,14 @@ func TestExchange_RequestPartialRange(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) t.Cleanup(cancel) + hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100) + require.NoError(t, err) require.NoError(t, serverSideEx.Start(ctx)) exchg.peerTracker.peerLk.Lock() - prevScoreBefore1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore - prevScoreBefore2 := 50 - // reducing peerScore of the second server, so our exchange will request host[1] first. - exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 50} + prevScoreBefore1 := score(t, exchg.host, hosts[1].ID()) + prevScoreBefore2 := score(t, exchg.host, hosts[2].ID()) + exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{} exchg.peerTracker.peerLk.Unlock() gen, err := store.GetByHeight(context.Background(), 1) @@ -540,8 +554,8 @@ func TestExchange_RequestPartialRange(t *testing.T) { require.NoError(t, err) exchg.peerTracker.peerLk.Lock() - prevScoreAfter1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore - prevScoreAfter2 := exchg.peerTracker.trackedPeers[hosts[2].ID()].peerScore + prevScoreAfter1 := score(t, exchg.host, hosts[1].ID()) + prevScoreAfter2 := score(t, exchg.host, hosts[2].ID()) exchg.peerTracker.peerLk.Unlock() assert.NotEqual(t, prevScoreBefore1, prevScoreAfter1) @@ -561,7 +575,6 @@ func createP2PExAndServer( host, tpeer libhost.Host, ) (*Exchange[*headertest.DummyHeader], *headertest.Store[*headertest.DummyHeader]) { store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 5) - serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](tpeer, store, WithNetworkID[ServerParameters](networkID), ) @@ -582,7 +595,7 @@ func createP2PExAndServer( time.Sleep(time.Millisecond * 100) // give peerTracker time to add a trusted peer ex.peerTracker.peerLk.Lock() - ex.peerTracker.trackedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID(), peerScore: 100.0} + ex.peerTracker.trackedPeers[tpeer.ID()] = struct{}{} ex.peerTracker.peerLk.Unlock() t.Cleanup(func() { @@ -595,12 +608,15 @@ func createP2PExAndServer( func quicHosts(t *testing.T, n int) []libhost.Host { hosts := make([]libhost.Host, n) + var err error for i := range hosts { - swrm := swarm.GenSwarm(t, swarm.OptDisableTCP) - hosts[i] = blankhost.NewBlankHost(swrm) + require.NoError(t, err) + hosts[i], err = libp2p.New() for _, host := range hosts[:i] { hosts[i].Peerstore().AddAddrs(host.ID(), host.Network().ListenAddresses(), peerstore.PermanentAddrTTL) host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Network().ListenAddresses(), peerstore.PermanentAddrTTL) + err = hosts[i].Connect(context.Background(), peer.AddrInfo{ID: host.ID(), Addrs: host.Addrs()}) + require.NoError(t, err) } } @@ -647,3 +663,15 @@ func (t *timedOutStore) Head(context.Context, ...header.HeadOption[*headertest.D time.Sleep(t.timeout) return nil, header.ErrNoHead } + +func (t *timedOutStore) GetRange(ctx context.Context, from, to uint64) ([]*headertest.DummyHeader, error) { + time.Sleep(t.timeout) + return t.Store.GetRange(ctx, from, to) +} + +func score(t *testing.T, h libhost.Host, id peer.ID) int { + t.Helper() + tags := h.ConnManager().GetTagInfo(id) + tag, _ := tags.Tags[string(protocolID(networkID))] + return tag +} diff --git a/p2p/peer_stats.go b/p2p/peer_stats.go index 0277fcd4..8c9257d2 100644 --- a/p2p/peer_stats.go +++ b/p2p/peer_stats.go @@ -14,10 +14,7 @@ type peerStat struct { sync.RWMutex peerID peer.ID // score is the average speed per single request - peerScore float32 - // pruneDeadline specifies when disconnected peer will be removed if - // it does not return online. - pruneDeadline time.Time + peerScore int } // updateStats recalculates peer.score by averaging the last score @@ -26,33 +23,28 @@ type peerStat struct { // by dividing the amount by time, so the result score will represent how many bytes // were retrieved in 1 millisecond. This value will then be averaged relative to the // previous peerScore. -func (p *peerStat) updateStats(amount uint64, duration time.Duration) { - p.Lock() - defer p.Unlock() +func (p *peerStat) updateStats(amount uint64, duration time.Duration) int { + if amount == 0 && duration == 0 { + // decrease peerScore by 20% of the peer that failed the request by any reason. + // NOTE: peerScore will not be decreased if the score is less than 100. + p.peerScore -= p.peerScore / 100 * 20 + return p.peerScore + } + averageSpeed := float32(amount) if duration != 0 { averageSpeed /= float32(duration.Milliseconds()) } if p.peerScore == 0.0 { - p.peerScore = averageSpeed - return + p.peerScore = int(averageSpeed * 100) + return p.peerScore } - p.peerScore = (p.peerScore + averageSpeed) / 2 -} - -// decreaseScore decreases peerScore by 20% of the peer that failed the request by any reason. -// NOTE: decreasing peerScore in one session will not affect its position in queue in another -// session(as we can have multiple sessions running concurrently). -// TODO(vgonkivs): to figure out the better scoring increments/decrements -func (p *peerStat) decreaseScore() { - p.Lock() - defer p.Unlock() - - p.peerScore -= p.peerScore / 100 * 20 + p.peerScore = (p.peerScore + int(averageSpeed*100)) / 2 + return p.peerScore } // score reads a peer's latest score from the queue -func (p *peerStat) score() float32 { +func (p *peerStat) score() int { p.RLock() defer p.RUnlock() return p.peerScore @@ -123,10 +115,6 @@ func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue { // in case if there are no peer available in current session, it blocks until // the peer will be pushed in. func (p *peerQueue) waitPop(ctx context.Context) *peerStat { - // TODO(vgonkivs): implement fallback solution for cases when peer queue is empty. - // As we discussed with @Wondertan there could be 2 possible solutions: - // * use libp2p.Discovery to find new peers outside peerTracker to request headers; - // * implement IWANT/IHAVE messaging system and start requesting ranges from the Peerstore; select { case <-ctx.Done(): return &peerStat{} diff --git a/p2p/peer_stats_test.go b/p2p/peer_stats_test.go index 1d2c5cfb..36fe797f 100644 --- a/p2p/peer_stats_test.go +++ b/p2p/peer_stats_test.go @@ -107,6 +107,6 @@ func Test_StatDecreaseScore(t *testing.T) { peerScore: 100, } // will decrease score by 20% - pStats.decreaseScore() - require.Equal(t, pStats.score(), float32(80.0)) + pStats.updateStats(0, 0) + require.Equal(t, pStats.score(), 80) } diff --git a/p2p/peer_tracker.go b/p2p/peer_tracker.go index be385c95..403e7d56 100644 --- a/p2p/peer_tracker.go +++ b/p2p/peer_tracker.go @@ -2,6 +2,8 @@ package p2p import ( "context" + "errors" + "slices" "sync" "time" @@ -9,33 +11,20 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" libpeer "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/p2p/net/conngater" ) -// defaultScore specifies the score for newly connected peers. -const defaultScore float32 = 1 - -var ( - // maxAwaitingTime specifies the duration that gives to the disconnected peer to be back online, - // otherwise it will be removed on the next GC cycle. - maxAwaitingTime = time.Hour - // gcCycle defines the duration after which the peerTracker starts removing peers. - gcCycle = time.Minute * 5 -) - type peerTracker struct { - host host.Host - connGater *conngater.BasicConnectionGater - metrics *exchangeMetrics - - peerLk sync.RWMutex + host host.Host + connGater *conngater.BasicConnectionGater + metrics *exchangeMetrics + protocolID protocol.ID + peerLk sync.RWMutex // trackedPeers contains active peers that we can request to. // we cache the peer once they disconnect, // so we can guarantee that peerQueue will only contain active peers - trackedPeers map[libpeer.ID]*peerStat - // disconnectedPeers contains disconnected peers. In case if peer does not return - // online until pruneDeadline, it will be removed and its score will be lost - disconnectedPeers map[libpeer.ID]*peerStat + trackedPeers map[libpeer.ID]struct{} // an optional interface used to periodically dump // good peers during garbage collection @@ -44,27 +33,28 @@ type peerTracker struct { ctx context.Context cancel context.CancelFunc // done is used to gracefully stop the peerTracker. - // It allows to wait until track() and gc() will be stopped. + // It allows to wait until track() will be stopped. done chan struct{} } func newPeerTracker( h host.Host, connGater *conngater.BasicConnectionGater, + networkID string, pidstore PeerIDStore, metrics *exchangeMetrics, ) *peerTracker { ctx, cancel := context.WithCancel(context.Background()) return &peerTracker{ - host: h, - connGater: connGater, - metrics: metrics, - trackedPeers: make(map[libpeer.ID]*peerStat), - disconnectedPeers: make(map[libpeer.ID]*peerStat), - pidstore: pidstore, - ctx: ctx, - cancel: cancel, - done: make(chan struct{}, 2), + host: h, + connGater: connGater, + protocolID: protocolID(networkID), + metrics: metrics, + trackedPeers: make(map[libpeer.ID]struct{}), + pidstore: pidstore, + ctx: ctx, + cancel: cancel, + done: make(chan struct{}), } } @@ -75,11 +65,23 @@ func newPeerTracker( // NOTE: bootstrap is intended to be used with an on-disk peerstore.Peerstore as // the peerTracker needs access to the previously-seen peers' AddrInfo on start. func (p *peerTracker) bootstrap(ctx context.Context, trusted []libpeer.ID) error { + // store peers that have been already connected + for _, c := range p.host.Network().Conns() { + p.connected(c.RemotePeer()) + } + connectCtx, cancel := context.WithTimeout(context.Background(), time.Second*60) defer cancel() + wg := sync.WaitGroup{} + wg.Add(len(trusted)) + for _, trust := range trusted { - go p.connectToPeer(connectCtx, trust) + trust := trust + go func() { + defer wg.Done() + p.connectToPeer(connectCtx, trust) + }() } // short-circuit if pidstore was not provided @@ -92,9 +94,16 @@ func (p *peerTracker) bootstrap(ctx context.Context, trusted []libpeer.ID) error return err } + wg.Add(len(prevSeen)) for _, peer := range prevSeen { - go p.connectToPeer(connectCtx, peer) + peer := peer + go func() { + defer wg.Done() + p.connectToPeer(connectCtx, peer) + }() } + + wg.Wait() return nil } @@ -105,7 +114,6 @@ func (p *peerTracker) connectToPeer(ctx context.Context, peer libpeer.ID) { log.Debugw("failed to connect to peer", "id", peer.String(), "err", err) return } - log.Debugw("connected to peer", "id", peer.String()) } func (p *peerTracker) track() { @@ -113,33 +121,48 @@ func (p *peerTracker) track() { p.done <- struct{}{} }() - // store peers that have been already connected - for _, c := range p.host.Network().Conns() { - p.connected(c.RemotePeer()) + connSubs, err := p.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) + if err != nil { + log.Errorw("subscribing to EvtPeerConnectednessChanged", "err", err) + return } - subs, err := p.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) + identifySub, err := p.host.EventBus().Subscribe(&event.EvtPeerIdentificationCompleted{}) if err != nil { - log.Errorw("subscribing to EvtPeerConnectednessChanged", "err", err) + log.Errorw("subscribing to EvtPeerIdentificationCompleted", "err", err) + return + } + + protocolSub, err := p.host.EventBus().Subscribe(&event.EvtPeerProtocolsUpdated{}) + if err != nil { + log.Errorw("subscribing to EvtPeerProtocolsUpdated", "err", err) return } for { select { case <-p.ctx.Done(): - err = subs.Close() + err = connSubs.Close() + errors.Join(err, identifySub.Close(), protocolSub.Close()) if err != nil { - log.Errorw("closing subscription", "err", err) + log.Errorw("closing subscriptions", "err", err) } return - case subscription := <-subs.Out(): - ev := subscription.(event.EvtPeerConnectednessChanged) - switch ev.Connectedness { - case network.Connected: - p.connected(ev.Peer) - case network.NotConnected: + case connSubscription := <-connSubs.Out(): + ev := connSubscription.(event.EvtPeerConnectednessChanged) + if network.NotConnected == ev.Connectedness { + p.disconnected(ev.Peer) + } + case subscription := <-identifySub.Out(): + ev := subscription.(event.EvtPeerIdentificationCompleted) + p.connected(ev.Peer) + case subscription := <-protocolSub.Out(): + ev := subscription.(event.EvtPeerProtocolsUpdated) + if slices.Contains(ev.Removed, p.protocolID) { p.disconnected(ev.Peer) + break } + p.connected(ev.Peer) } } } @@ -160,10 +183,23 @@ func (p *peerTracker) getPeers(max int) []libpeer.ID { } func (p *peerTracker) connected(pID libpeer.ID) { + if err := pID.Validate(); err != nil { + return + } + if p.host.ID() == pID { return } + // check that peer supports our protocol id. + protocol, err := p.host.Peerstore().SupportsProtocols(pID, p.protocolID) + if err != nil { + return + } + if !slices.Contains(protocol, p.protocolID) { + return + } + for _, c := range p.host.Network().ConnsToPeer(pID) { // check if connection is short-termed and skip this peer if c.Stat().Transient { @@ -173,31 +209,30 @@ func (p *peerTracker) connected(pID libpeer.ID) { p.peerLk.Lock() defer p.peerLk.Unlock() - - // additional check in p.trackedPeers should be done, - // because libp2p does not emit multiple Connected events per 1 peer - stats, ok := p.disconnectedPeers[pID] - if !ok { - stats = &peerStat{peerID: pID, peerScore: defaultScore} - } else { - delete(p.disconnectedPeers, pID) + if _, ok := p.trackedPeers[pID]; ok { + return } - p.trackedPeers[pID] = stats + + log.Debugw("connected to peer", "id", pID.String()) + p.trackedPeers[pID] = struct{}{} p.metrics.peersTracked(1) } func (p *peerTracker) disconnected(pID libpeer.ID) { + if err := pID.Validate(); err != nil { + return + } + p.peerLk.Lock() defer p.peerLk.Unlock() - stats, ok := p.trackedPeers[pID] - if !ok { + if _, ok := p.trackedPeers[pID]; !ok { return } - stats.pruneDeadline = time.Now().Add(maxAwaitingTime) - p.disconnectedPeers[pID] = stats delete(p.trackedPeers, pID) + p.host.ConnManager().UntagPeer(pID, string(p.protocolID)) + p.metrics.peersTracked(-1) p.metrics.peersDisconnected(1) } @@ -205,44 +240,16 @@ func (p *peerTracker) disconnected(pID libpeer.ID) { func (p *peerTracker) peers() []*peerStat { p.peerLk.RLock() defer p.peerLk.RUnlock() - peers := make([]*peerStat, 0, len(p.trackedPeers)) - for _, stat := range p.trackedPeers { - peers = append(peers, stat) - } - return peers -} -// gc goes through connected and disconnected peers once every gcPeriod -// and removes: -// * disconnected peers which have been disconnected for more than maxAwaitingTime; -// * connected peers whose scores are less than or equal than defaultScore; -func (p *peerTracker) gc() { - ticker := time.NewTicker(gcCycle) - for { - select { - case <-p.ctx.Done(): - p.done <- struct{}{} - return - case <-ticker.C: - p.cleanUpDisconnectedPeers() - p.dumpPeers(p.ctx) + peers := make([]*peerStat, 0) + for peerID := range p.trackedPeers { + score := 0 + if info := p.host.ConnManager().GetTagInfo(peerID); info != nil { + score, _ = info.Tags[string(p.protocolID)] } + peers = append(peers, &peerStat{peerID: peerID, peerScore: score}) } -} - -func (p *peerTracker) cleanUpDisconnectedPeers() { - p.peerLk.Lock() - defer p.peerLk.Unlock() - - now := time.Now() - var deletedDisconnectedNum int - for id, peer := range p.disconnectedPeers { - if peer.pruneDeadline.Before(now) { - delete(p.disconnectedPeers, id) - deletedDisconnectedNum++ - } - } - p.metrics.peersDisconnected(-deletedDisconnectedNum) + return peers } // dumpPeers stores peers to the peerTracker's PeerIDStore if @@ -275,12 +282,10 @@ func (p *peerTracker) dumpPeers(ctx context.Context) { func (p *peerTracker) stop(ctx context.Context) error { p.cancel() - for i := 0; i < cap(p.done); i++ { - select { - case <-p.done: - case <-ctx.Done(): - return ctx.Err() - } + select { + case <-p.done: + case <-ctx.Done(): + return ctx.Err() } // dump remaining tracked peers @@ -290,6 +295,10 @@ func (p *peerTracker) stop(ctx context.Context) error { // blockPeer blocks a peer on the networking level and removes it from the local cache. func (p *peerTracker) blockPeer(pID libpeer.ID, reason error) { + if err := pID.Validate(); err != nil { + return + } + // add peer to the blacklist, so we can't connect to it in the future. err := p.connGater.BlockPeer(pID) if err != nil { @@ -304,3 +313,8 @@ func (p *peerTracker) blockPeer(pID libpeer.ID, reason error) { log.Warnw("header/p2p: blocked peer", "pID", pID, "reason", reason) p.metrics.peerBlocked() } + +func (p *peerTracker) updateScore(stats *peerStat, size uint64, duration time.Duration) { + score := stats.updateStats(size, duration) + p.host.ConnManager().TagPeer(stats.peerID, string(p.protocolID), score) +} diff --git a/p2p/peer_tracker_test.go b/p2p/peer_tracker_test.go index db9ffc03..5641cb56 100644 --- a/p2p/peer_tracker_test.go +++ b/p2p/peer_tracker_test.go @@ -9,67 +9,21 @@ import ( "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/sync" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" testpeer "github.com/libp2p/go-libp2p/core/test" "github.com/libp2p/go-libp2p/p2p/net/conngater" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestPeerTracker_GC(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - h := createMocknet(t, 1) - - gcCycle = time.Millisecond * 200 - - connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) - require.NoError(t, err) - - pidstore := newDummyPIDStore() - p := newPeerTracker(h[0], connGater, pidstore, nil) - - maxAwaitingTime = time.Millisecond - - peerlist := generateRandomPeerlist(t, 10) - for i := 0; i < 10; i++ { - p.trackedPeers[peerlist[i]] = &peerStat{peerID: peerlist[i], peerScore: 0.5} - } - - peerlist = generateRandomPeerlist(t, 4) - pid1 := peerlist[2] - pid2 := peerlist[3] - - p.disconnectedPeers[pid1] = &peerStat{peerID: pid1, pruneDeadline: time.Now()} - p.disconnectedPeers[pid2] = &peerStat{peerID: pid2, pruneDeadline: time.Now().Add(time.Minute * 10)} - assert.True(t, len(p.trackedPeers) > 0) - assert.True(t, len(p.disconnectedPeers) > 0) - - go p.track() - go p.gc() - - time.Sleep(time.Millisecond * 500) - - err = p.stop(ctx) - require.NoError(t, err) - - require.Len(t, p.trackedPeers, 10) - require.Nil(t, p.disconnectedPeers[pid1]) - - // ensure good peers were dumped to store - peers, err := pidstore.Load(ctx) - require.NoError(t, err) - require.Equal(t, 10, len(peers)) -} - func TestPeerTracker_BlockPeer(t *testing.T) { h := createMocknet(t, 2) connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) require.NoError(t, err) - p := newPeerTracker(h[0], connGater, nil, nil) - maxAwaitingTime = time.Millisecond + p := newPeerTracker(h[0], connGater, "private", nil, nil) p.blockPeer(h[1].ID(), errors.New("test")) require.Len(t, connGater.ListBlockedPeers(), 1) require.True(t, connGater.ListBlockedPeers()[0] == h[1].ID()) @@ -82,26 +36,25 @@ func TestPeerTracker_Bootstrap(t *testing.T) { connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) require.NoError(t, err) - // mn := createMocknet(t, 10) - mn, err := mocknet.FullMeshConnected(10) - require.NoError(t, err) + hosts := make([]host.Host, 10) + + for i := range hosts { + hosts[i], err = libp2p.New() + require.NoError(t, err) + hosts[i].SetStreamHandler(protocolID("private"), nil) + } // store peers to peerstore prevSeen := make([]peer.ID, 9) - for i, peer := range mn.Hosts()[1:] { + for i, peer := range hosts[1:] { + hosts[0].Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Addrs(), peerstore.PermanentAddrTTL) prevSeen[i] = peer.ID() - - // disconnect so they're not already connected on attempt to - // connect - err = mn.DisconnectPeers(mn.Hosts()[i].ID(), peer.ID()) - require.NoError(t, err) } pidstore := newDummyPIDStore() // only store 7 peers to pidstore, and use 2 as trusted err = pidstore.Put(ctx, prevSeen[2:]) require.NoError(t, err) - - tracker := newPeerTracker(mn.Hosts()[0], connGater, pidstore, nil) + tracker := newPeerTracker(hosts[0], connGater, "private", pidstore, nil) go tracker.track() diff --git a/p2p/session.go b/p2p/session.go index 505759ff..6fc24f70 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -197,7 +197,7 @@ func (s *session[H]) doRequest( switch err { case header.ErrNotFound, errEmptyResponse: logFn = log.Debugw - stat.decreaseScore() + s.peerTracker.updateScore(stat, 0, 0) default: s.peerTracker.blockPeer(stat.peerID, err) } @@ -233,7 +233,7 @@ func (s *session[H]) doRequest( span.SetStatus(codes.Ok, "") // update peer stats - stat.updateStats(size, duration) + s.peerTracker.updateScore(stat, size, duration) // ensure that we received the correct amount of headers. if remainingHeaders > 0 { diff --git a/p2p/session_test.go b/p2p/session_test.go index d7044961..7c8599f5 100644 --- a/p2p/session_test.go +++ b/p2p/session_test.go @@ -28,7 +28,7 @@ func Test_Validate(t *testing.T) { ses := newSession( context.Background(), nil, - &peerTracker{trackedPeers: make(map[peer.ID]*peerStat)}, + &peerTracker{trackedPeers: make(map[peer.ID]struct{})}, "", time.Second, nil, withValidation(head), ) @@ -45,7 +45,7 @@ func Test_ValidateFails(t *testing.T) { ses := newSession( context.Background(), nil, - &peerTracker{trackedPeers: make(map[peer.ID]*peerStat)}, + &peerTracker{trackedPeers: make(map[peer.ID]struct{})}, "", time.Second, nil, withValidation(head), )