diff --git a/nodebuilder/tests/p2p_test.go b/nodebuilder/tests/p2p_test.go index 497b3b76a5..39c3c985a2 100644 --- a/nodebuilder/tests/p2p_test.go +++ b/nodebuilder/tests/p2p_test.go @@ -215,6 +215,5 @@ func TestRestartNodeDiscovery(t *testing.T) { func setTimeInterval(cfg *nodebuilder.Config, interval time.Duration) { cfg.P2P.RoutingTableRefreshPeriod = interval - cfg.Share.Discovery.DiscoveryInterval = interval cfg.Share.Discovery.AdvertiseInterval = interval } diff --git a/share/availability/discovery/backoff.go b/share/availability/discovery/backoff.go index 8ce00d07b8..5cd20e1497 100644 --- a/share/availability/discovery/backoff.go +++ b/share/availability/discovery/backoff.go @@ -11,8 +11,8 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/backoff" ) -// gcInterval is a default period after which disconnected peers will be removed from cache const ( + // gcInterval is a default period after which disconnected peers will be removed from cache gcInterval = time.Minute // connectTimeout is the timeout used for dialing peers and discovering peer addresses. connectTimeout = time.Minute * 2 @@ -85,12 +85,10 @@ func (b *backoffConnector) HasBackoff(p peer.ID) bool { b.cacheLk.Lock() cache, ok := b.cacheData[p] b.cacheLk.Unlock() - if ok && time.Now().Before(cache.nexttry) { - return true - } - return false + return ok && time.Now().Before(cache.nexttry) } +// GC is a perpetual GCing loop. func (b *backoffConnector) GC(ctx context.Context) { ticker := time.NewTicker(gcInterval) for { diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index 2b4a86f857..4879b85d8e 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -2,8 +2,6 @@ package discovery import ( "context" - "sync" - "sync/atomic" "time" logging "github.com/ipfs/go-log/v2" @@ -25,6 +23,8 @@ const ( // events in libp2p eventbusBufSize = 32 + // findPeersStuckWarnDelay is the duration after which findPeers will log an error message to + // notify that it is stuck. findPeersStuckWarnDelay = time.Minute ) @@ -36,21 +36,22 @@ var waitF = func(ttl time.Duration) time.Duration { type Parameters struct { // PeersLimit defines the soft limit of FNs to connect to via discovery. // Set 0 to disable. - PeersLimit int - // DiscoveryInterval is an interval between discovery sessions. - // Set -1 to disable. - DiscoveryInterval time.Duration + PeersLimit uint // AdvertiseInterval is a interval between advertising sessions. // Set -1 to disable. // NOTE: only full and bridge can advertise themselves. AdvertiseInterval time.Duration + // discoveryRetryTimeout is an interval between discovery attempts + // when we discovered lower than PeersLimit peers. + // Set -1 to disable. + discoveryRetryTimeout time.Duration } func DefaultParameters() Parameters { return Parameters{ - PeersLimit: 5, - DiscoveryInterval: time.Minute, - AdvertiseInterval: time.Hour * 8, + PeersLimit: 5, + discoveryRetryTimeout: time.Second, + AdvertiseInterval: time.Hour * 8, } } @@ -69,14 +70,13 @@ type Discovery struct { connector *backoffConnector onUpdatedPeers OnUpdatedPeers - connectingLk sync.Mutex - connecting map[peer.ID]context.CancelFunc + triggerDisc chan struct{} metrics *metrics cancel context.CancelFunc } -type OnUpdatedPeers func(peerIOnUpdatedPeersD peer.ID, isAdded bool) +type OnUpdatedPeers func(peerID peer.ID, isAdded bool) // NewDiscovery constructs a new discovery. func NewDiscovery( @@ -91,7 +91,7 @@ func NewDiscovery( disc: d, connector: newBackoffConnector(h, defaultBackoffFactory), onUpdatedPeers: func(peer.ID, bool) {}, - connecting: make(map[peer.ID]context.CancelFunc), + triggerDisc: make(chan struct{}), } } @@ -116,9 +116,16 @@ func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) { } } +func (d *Discovery) triggerDiscovery() { + select { + case d.triggerDisc <- struct{}{}: + default: + } +} + // handlePeersFound receives peers and tries to establish a connection with them. // Peer will be added to PeerCache if connection succeeds. -func (d *Discovery) handlePeerFound(ctx context.Context, cancelFind context.CancelFunc, peer peer.AddrInfo) bool { +func (d *Discovery) handlePeerFound(ctx context.Context, peer peer.AddrInfo) bool { log := log.With("peer", peer.ID) switch { case peer.ID == d.host.ID(): @@ -139,38 +146,28 @@ func (d *Discovery) handlePeerFound(ctx context.Context, cancelFind context.Canc return false } - d.connectingLk.Lock() - if _, ok := d.connecting[peer.ID]; ok { - d.connectingLk.Unlock() - d.metrics.observeHandlePeer(handlePeerConnInProgress) - log.Debug("skip handle: connecting to the peer in another routine") - return false - } - d.connecting[peer.ID] = cancelFind - d.connectingLk.Unlock() - switch d.host.Network().Connectedness(peer.ID) { case network.Connected: - if added := d.addPeerToSet(peer.ID); !added { - d.metrics.observeHandlePeer(handlePeerInSet) - return added - } - d.metrics.observeHandlePeer(handlePeerConnected) - // we still have to backoff the connected peer - d.connector.Backoff(peer.ID) + d.connector.Backoff(peer.ID) // we still have to backoff the connected peer case network.NotConnected: err := d.connector.Connect(ctx, peer) if err != nil { - d.connectingLk.Lock() - delete(d.connecting, peer.ID) - d.connectingLk.Unlock() d.metrics.observeHandlePeer(handlePeerConnErr) log.Debugw("unable to connect", "err", err) return false } - d.metrics.observeHandlePeer(handlePeerConnect) - log.Debug("started connecting to the peer") + default: + panic("unknown connectedness") + } + + if !d.set.Add(peer.ID) { + d.metrics.observeHandlePeer(handlePeerInSet) + log.Debug("peer is already in discovery set") + return false } + d.onUpdatedPeers(peer.ID, true) + d.metrics.observeHandlePeer(handlePeerConnect) + log.Debug("added peer to set") // tag to protect peer from being killed by ConnManager // NOTE: This is does not protect from remote killing the connection. @@ -184,8 +181,8 @@ func (d *Discovery) handlePeerFound(ctx context.Context, cancelFind context.Canc // It starts peer discovery every 30 seconds until peer cache reaches peersLimit. // Discovery is restarted if any previously connected peers disconnect. func (d *Discovery) ensurePeers(ctx context.Context) { - if d.params.PeersLimit == 0 || d.params.DiscoveryInterval == -1 { - log.Warn("peers limit is set to 0 and/or discovery interval is set to -1. Skipping discovery...") + if d.params.PeersLimit == 0 { + log.Warn("peers limit is set to 0. Skipping discovery...") return } // subscribe on EventBus in order to catch disconnected peers and restart @@ -208,11 +205,11 @@ func (d *Discovery) ensurePeers(ctx context.Context) { return case e, ok := <-sub.Out(): if !ok { + log.Error("connection subscription was closed unexpectedly") return } - evnt := e.(event.EvtPeerConnectednessChanged) - switch evnt.Connectedness { - case network.NotConnected: + + if evnt := e.(event.EvtPeerConnectednessChanged); evnt.Connectedness == network.NotConnected { if !d.set.Contains(evnt.Peer) { continue } @@ -223,76 +220,53 @@ func (d *Discovery) ensurePeers(ctx context.Context) { d.onUpdatedPeers(evnt.Peer, false) log.Debugw("removed peer from the peer set", "peer", evnt.Peer, "status", evnt.Connectedness.String()) - case network.Connected: - d.addPeerToSet(evnt.Peer) + + if d.set.Size() < d.set.Limit() { + d.triggerDiscovery() + } } } } }() go d.connector.GC(ctx) - t := time.NewTicker(d.params.DiscoveryInterval) + t := time.NewTicker(d.params.discoveryRetryTimeout) defer t.Stop() for { - d.findPeers(ctx) - if d.set.Size() < d.set.Limit() { - // rerun discovery is amount of peers didn't reach the limit - continue - } - - t.Reset(d.params.DiscoveryInterval) // drain all previous ticks from channel drainChannel(t.C) select { case <-t.C: + found := d.findPeers(ctx) + if !found { + // rerun discovery if amount of peers didn't reach the limit + continue + } case <-ctx.Done(): return } - } -} - -func (d *Discovery) addPeerToSet(peerID peer.ID) bool { - d.connectingLk.Lock() - cancelFind, ok := d.connecting[peerID] - d.connectingLk.Unlock() - if !ok { - return false - } - - if !d.set.Add(peerID) { - log.Debug("skip: peer is already in discovery set") - return false - } - - // and notify our subscribers - d.onUpdatedPeers(peerID, true) - log.Debugw("added peer to set", "id", peerID) - // first do Add and only after check the limit - // so that peer set represents the actual number of connections we made - // which can go slightly over peersLimit - if d.set.Size() >= d.set.Limit() { - log.Infow("soft peer limit reached", "count", d.set.Size()) - cancelFind() + select { + case <-d.triggerDisc: + case <-ctx.Done(): + return + } } - - d.connectingLk.Lock() - delete(d.connecting, peerID) - d.connectingLk.Unlock() - return true } -func (d *Discovery) findPeers(ctx context.Context) { - if d.set.Size() >= d.set.Limit() { - log.Debugw("reached soft peer limit, skipping discovery", "size", d.set.Size()) - return +func (d *Discovery) findPeers(ctx context.Context) bool { + size := d.set.Size() + want := d.set.Limit() - size + if want == 0 { + log.Debugw("reached soft peer limit, skipping discovery", "size", size) + return true } - log.Infow("below soft peer limit, discovering peers", "remaining", d.set.Limit()-d.set.Size()) + log.Infow("discovering peers", "want", want) - // we use errgroup as it obeys the context + // we use errgroup as it provide limits var wg errgroup.Group // limit to minimize chances of overreaching the limit - wg.SetLimit(d.set.Limit()) + wg.SetLimit(int(d.set.Limit())) defer wg.Wait() //nolint:errcheck // stop discovery when we are done @@ -302,12 +276,11 @@ func (d *Discovery) findPeers(ctx context.Context) { peers, err := d.disc.FindPeers(findCtx, topic) if err != nil { log.Error("unable to start discovery", "err", err) - return + return false } ticker := time.NewTicker(findPeersStuckWarnDelay) defer ticker.Stop() - var amount atomic.Int32 for { ticker.Reset(findPeersStuckWarnDelay) // drain all previous ticks from channel @@ -316,7 +289,7 @@ func (d *Discovery) findPeers(ctx context.Context) { case <-findCtx.Done(): d.metrics.observeFindPeers(ctx.Err() != nil, findCtx != nil) log.Debugw("found enough peers", "amount", d.set.Size()) - return + return true case <-ticker.C: log.Warn("wasn't able to find new peers for long time") continue @@ -324,7 +297,7 @@ func (d *Discovery) findPeers(ctx context.Context) { if !ok { d.metrics.observeFindPeers(ctx.Err() != nil, findCtx != nil) log.Debugw("discovery channel closed", "find_is_canceled", findCtx.Err() != nil) - return + return d.set.Size() >= d.set.Limit() } peer := p @@ -333,12 +306,21 @@ func (d *Discovery) findPeers(ctx context.Context) { log.Debug("find has been canceled, skip peer") return nil } - // pass the cancel so that we cancel FindPeers when we connected to enough peers - // we don't pass findCtx so that we don't cancel outgoing connections - if d.handlePeerFound(ctx, findCancel, peer) { - amount.Add(1) - log.Debugw("found peer", "peer", peer.ID, "found_amount", amount.Load()) + + // we don't pass findCtx so that we don't cancel in progress connections + // that are likely to be valuable + if !d.handlePeerFound(ctx, peer) { + return nil } + + size := d.set.Size() + log.Debugw("found peer", "peer", peer.ID, "found_amount", size) + if size < d.set.Limit() { + return nil + } + + log.Infow("discovered wanted peers", "amount", size) + findCancel() return nil }) } diff --git a/share/availability/discovery/discovery_test.go b/share/availability/discovery/discovery_test.go index cf20ff6c53..fd88a98586 100644 --- a/share/availability/discovery/discovery_test.go +++ b/share/availability/discovery/discovery_test.go @@ -10,13 +10,14 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/discovery/routing" - mocknet "github.com/libp2p/go-libp2p/p2p/net/mock" + basic "github.com/libp2p/go-libp2p/p2p/host/basic" + swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestDiscovery(t *testing.T) { - const nodes = 30 // higher number brings higher coverage + const nodes = 10 // higher number brings higher coverage ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) t.Cleanup(cancel) @@ -24,9 +25,9 @@ func TestDiscovery(t *testing.T) { tn := newTestnet(ctx, t) peerA := tn.discovery(Parameters{ - PeersLimit: nodes, - DiscoveryInterval: time.Millisecond * 100, - AdvertiseInterval: -1, + PeersLimit: nodes, + discoveryRetryTimeout: time.Millisecond * 100, + AdvertiseInterval: -1, // we don't want to be found but only find }) type peerUpdate struct { @@ -41,9 +42,9 @@ func TestDiscovery(t *testing.T) { discs := make([]*Discovery, nodes) for i := range discs { discs[i] = tn.discovery(Parameters{ - PeersLimit: 0, - DiscoveryInterval: -1, - AdvertiseInterval: time.Millisecond * 100, + PeersLimit: 0, + discoveryRetryTimeout: -1, + AdvertiseInterval: time.Millisecond * 100, }) select { @@ -55,7 +56,7 @@ func TestDiscovery(t *testing.T) { } } - assert.Equal(t, nodes, peerA.set.Size()) + assert.EqualValues(t, nodes, peerA.set.Size()) for _, disc := range discs { peerID := disc.host.ID() @@ -71,21 +72,21 @@ func TestDiscovery(t *testing.T) { } } - assert.Equal(t, 0, peerA.set.Size()) + assert.EqualValues(t, 0, peerA.set.Size()) } type testnet struct { ctx context.Context T *testing.T - net mocknet.Mocknet - bootstrapper peer.ID + bootstrapper peer.AddrInfo } func newTestnet(ctx context.Context, t *testing.T) *testnet { - net := mocknet.New() - hst, err := net.GenPeer() + swarm := swarmt.GenSwarm(t, swarmt.OptDisableTCP) + hst, err := basic.NewHost(swarm, &basic.HostOpts{}) require.NoError(t, err) + hst.Start() _, err = dht.New(ctx, hst, dht.Mode(dht.ModeServer), @@ -94,7 +95,7 @@ func newTestnet(ctx context.Context, t *testing.T) *testnet { ) require.NoError(t, err) - return &testnet{ctx: ctx, T: t, net: net, bootstrapper: hst.ID()} + return &testnet{ctx: ctx, T: t, bootstrapper: *host.InfoFromHost(hst)} } func (t *testnet) discovery(params Parameters) *Discovery { @@ -112,13 +113,12 @@ func (t *testnet) discovery(params Parameters) *Discovery { } func (t *testnet) peer() (host.Host, discovery.Discovery) { - hst, err := t.net.GenPeer() + swarm := swarmt.GenSwarm(t.T, swarmt.OptDisableTCP) + hst, err := basic.NewHost(swarm, &basic.HostOpts{}) require.NoError(t.T, err) + hst.Start() - err = t.net.LinkAll() - require.NoError(t.T, err) - - _, err = t.net.ConnectPeers(hst.ID(), t.bootstrapper) + err = hst.Connect(t.ctx, t.bootstrapper) require.NoError(t.T, err) dht, err := dht.New(t.ctx, hst, diff --git a/share/availability/discovery/set.go b/share/availability/discovery/set.go index 8adb9a7b60..37b3851bdf 100644 --- a/share/availability/discovery/set.go +++ b/share/availability/discovery/set.go @@ -13,12 +13,12 @@ type limitedSet struct { lk sync.RWMutex ps map[peer.ID]struct{} - limit int + limit uint waitPeer chan peer.ID } // newLimitedSet constructs a set with the maximum peers amount. -func newLimitedSet(limit int) *limitedSet { +func newLimitedSet(limit uint) *limitedSet { ps := new(limitedSet) ps.ps = make(map[peer.ID]struct{}) ps.limit = limit @@ -33,14 +33,14 @@ func (ps *limitedSet) Contains(p peer.ID) bool { return ok } -func (ps *limitedSet) Limit() int { +func (ps *limitedSet) Limit() uint { return ps.limit } -func (ps *limitedSet) Size() int { +func (ps *limitedSet) Size() uint { ps.lk.RLock() defer ps.lk.RUnlock() - return len(ps.ps) + return uint(len(ps.ps)) } // Add attempts to add the given peer into the set. diff --git a/share/availability/discovery/set_test.go b/share/availability/discovery/set_test.go index 1507509732..d5113a2291 100644 --- a/share/availability/discovery/set_test.go +++ b/share/availability/discovery/set_test.go @@ -81,7 +81,7 @@ func TestSet_Size(t *testing.T) { set := newLimitedSet(2) set.Add(h1.ID()) set.Add(h2.ID()) - require.Equal(t, 2, set.Size()) + require.EqualValues(t, 2, set.Size()) set.Remove(h2.ID()) - require.Equal(t, 1, set.Size()) + require.EqualValues(t, 1, set.Size()) } diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go index e1f62f3c62..df4561a8eb 100644 --- a/share/availability/full/testing.go +++ b/share/availability/full/testing.go @@ -37,12 +37,10 @@ func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { } func TestAvailability(getter share.Getter) *ShareAvailability { - disc := discovery.NewDiscovery(nil, routing.NewRoutingDiscovery( - routinghelpers.Null{}), discovery.Parameters{ - PeersLimit: 10, - DiscoveryInterval: time.Second, - AdvertiseInterval: time.Second, - }, - ) + params := discovery.DefaultParameters() + params.AdvertiseInterval = time.Second + params.PeersLimit = 10 + disc := discovery.NewDiscovery(nil, + routing.NewRoutingDiscovery(routinghelpers.Null{}), params) return NewShareAvailability(nil, getter, disc) } diff --git a/share/getters/shrex_test.go b/share/getters/shrex_test.go index 65192ded8e..b93a40d488 100644 --- a/share/getters/shrex_test.go +++ b/share/getters/shrex_test.go @@ -160,7 +160,6 @@ func testManager(ctx context.Context, host host.Host, headerSub libhead.Subscrib disc := discovery.NewDiscovery(nil, routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), discovery.Parameters{ PeersLimit: 10, - DiscoveryInterval: time.Second, AdvertiseInterval: time.Second, }, ) diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index eecf2f3ac4..804dd4b673 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -395,7 +395,6 @@ func TestIntegration(t *testing.T) { routingdisc.NewRoutingDiscovery(router1), discovery.Parameters{ PeersLimit: 0, - DiscoveryInterval: time.Second, AdvertiseInterval: time.Second, }, ) @@ -409,7 +408,6 @@ func TestIntegration(t *testing.T) { routingdisc.NewRoutingDiscovery(router2), discovery.Parameters{ PeersLimit: 10, - DiscoveryInterval: time.Second, AdvertiseInterval: time.Second, }, ) @@ -467,7 +465,6 @@ func testManager(ctx context.Context, headerSub libhead.Subscriber[*header.Exten disc := discovery.NewDiscovery(nil, routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), discovery.Parameters{ PeersLimit: 0, - DiscoveryInterval: time.Second, AdvertiseInterval: time.Second, }) connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))