diff --git a/nodebuilder/share/config.go b/nodebuilder/share/config.go index 789b54dd75..179035e21d 100644 --- a/nodebuilder/share/config.go +++ b/nodebuilder/share/config.go @@ -1,27 +1,13 @@ package share import ( - "errors" - "fmt" - "time" - + disc "github.com/celestiaorg/celestia-node/share/availability/discovery" "github.com/celestiaorg/celestia-node/share/p2p/peers" "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" ) -var ( - ErrNegativeInterval = errors.New("interval must be positive") -) - type Config struct { - // PeersLimit defines how many peers will be added during discovery. - PeersLimit uint - // DiscoveryInterval is an interval between discovery sessions. - DiscoveryInterval time.Duration - // AdvertiseInterval is a interval between advertising sessions. - // NOTE: only full and bridge can advertise themselves. - AdvertiseInterval time.Duration // UseShareExchange is a flag toggling the usage of shrex protocols for blocksync. // NOTE: This config variable only has an effect on full and bridge nodes. UseShareExchange bool @@ -31,25 +17,22 @@ type Config struct { ShrExNDParams *shrexnd.Parameters // PeerManagerParams sets peer-manager configuration parameters PeerManagerParams peers.Parameters + // Discovery sets peer discovery configuration parameters. + Discovery disc.Parameters } func DefaultConfig() Config { return Config{ - PeersLimit: 5, - DiscoveryInterval: time.Second * 30, - AdvertiseInterval: time.Second * 30, UseShareExchange: true, ShrExEDSParams: shrexeds.DefaultParameters(), ShrExNDParams: shrexnd.DefaultParameters(), PeerManagerParams: peers.DefaultParameters(), + Discovery: disc.DefaultParameters(), } } // Validate performs basic validation of the config. func (cfg *Config) Validate() error { - if cfg.DiscoveryInterval <= 0 || cfg.AdvertiseInterval <= 0 { - return fmt.Errorf("nodebuilder/share: %s", ErrNegativeInterval) - } if err := cfg.ShrExNDParams.Validate(); err != nil { return err } diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index a8d45f6a40..59189243a5 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -29,9 +29,7 @@ func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discove return disc.NewDiscovery( h, routingdisc.NewRoutingDiscovery(r), - cfg.PeersLimit, - cfg.DiscoveryInterval, - cfg.AdvertiseInterval, + cfg.Discovery, ) } } diff --git a/nodebuilder/tests/p2p_test.go b/nodebuilder/tests/p2p_test.go index 95a33b11e8..39c3c985a2 100644 --- a/nodebuilder/tests/p2p_test.go +++ b/nodebuilder/tests/p2p_test.go @@ -171,7 +171,7 @@ func TestRestartNodeDiscovery(t *testing.T) { const fullNodes = 2 setTimeInterval(cfg, defaultTimeInterval) - cfg.Share.PeersLimit = fullNodes + cfg.Share.Discovery.PeersLimit = fullNodes bridge := sw.NewNodeWithConfig(node.Bridge, cfg) ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout) @@ -184,7 +184,7 @@ func TestRestartNodeDiscovery(t *testing.T) { nodes := make([]*nodebuilder.Node, fullNodes) cfg = nodebuilder.DefaultConfig(node.Full) setTimeInterval(cfg, defaultTimeInterval) - cfg.Share.PeersLimit = fullNodes + cfg.Share.Discovery.PeersLimit = fullNodes nodesConfig := nodebuilder.WithBootstrappers([]peer.AddrInfo{*bridgeAddr}) for index := 0; index < fullNodes; index++ { nodes[index] = sw.NewNodeWithConfig(node.Full, cfg, nodesConfig) @@ -201,7 +201,7 @@ func TestRestartNodeDiscovery(t *testing.T) { // create one more node with disabled discovery cfg = nodebuilder.DefaultConfig(node.Full) setTimeInterval(cfg, defaultTimeInterval) - cfg.Share.PeersLimit = 0 + cfg.Share.Discovery.PeersLimit = 0 node := sw.NewNodeWithConfig(node.Full, cfg, nodesConfig) connectSub, err := nodes[0].Host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) require.NoError(t, err) @@ -215,6 +215,5 @@ func TestRestartNodeDiscovery(t *testing.T) { func setTimeInterval(cfg *nodebuilder.Config, interval time.Duration) { cfg.P2P.RoutingTableRefreshPeriod = interval - cfg.Share.DiscoveryInterval = interval - cfg.Share.AdvertiseInterval = interval + cfg.Share.Discovery.AdvertiseInterval = interval } diff --git a/share/availability/discovery/backoff.go b/share/availability/discovery/backoff.go index e76449f2c6..5cd20e1497 100644 --- a/share/availability/discovery/backoff.go +++ b/share/availability/discovery/backoff.go @@ -2,7 +2,7 @@ package discovery import ( "context" - "fmt" + "errors" "sync" "time" @@ -11,10 +11,17 @@ import ( "github.com/libp2p/go-libp2p/p2p/discovery/backoff" ) -// gcInterval is a default period after which disconnected peers will be removed from cache -const gcInterval = time.Hour +const ( + // gcInterval is a default period after which disconnected peers will be removed from cache + gcInterval = time.Minute + // connectTimeout is the timeout used for dialing peers and discovering peer addresses. + connectTimeout = time.Minute * 2 +) -var defaultBackoffFactory = backoff.NewFixedBackoff(time.Hour) +var ( + defaultBackoffFactory = backoff.NewFixedBackoff(time.Minute * 10) + errBackoffNotEnded = errors.New("share/discovery: backoff period has not ended") +) // backoffConnector wraps a libp2p.Host to establish a connection with peers // with adding a delay for the next connection attempt. @@ -23,7 +30,7 @@ type backoffConnector struct { backoff backoff.BackoffFactory cacheLk sync.Mutex - cacheData map[peer.ID]*backoffData + cacheData map[peer.ID]backoffData } // backoffData stores time when next connection attempt with the remote peer. @@ -36,48 +43,52 @@ func newBackoffConnector(h host.Host, factory backoff.BackoffFactory) *backoffCo return &backoffConnector{ h: h, backoff: factory, - cacheData: make(map[peer.ID]*backoffData), + cacheData: make(map[peer.ID]backoffData), } } // Connect puts peer to the backoffCache and tries to establish a connection with it. func (b *backoffConnector) Connect(ctx context.Context, p peer.AddrInfo) error { - // we should lock the mutex before calling connectionData and not inside because otherwise it could - // be modified from another goroutine as it returns a pointer - b.cacheLk.Lock() - cache := b.connectionData(p.ID) - if time.Now().Before(cache.nexttry) { - b.cacheLk.Unlock() - return fmt.Errorf("share/discovery: backoff period has not ended for peer=%s", p.ID.String()) + if b.HasBackoff(p.ID) { + return errBackoffNotEnded } - cache.nexttry = time.Now().Add(cache.backoff.Delay()) - b.cacheLk.Unlock() - return b.h.Connect(ctx, p) + + ctx, cancel := context.WithTimeout(ctx, connectTimeout) + defer cancel() + + err := b.h.Connect(ctx, p) + // we don't want to add backoff when the context is canceled. + if !errors.Is(err, context.Canceled) { + b.Backoff(p.ID) + } + return err } -// connectionData returns backoffData from the map if it was stored, otherwise it will instantiate -// a new one. -func (b *backoffConnector) connectionData(p peer.ID) *backoffData { - cache, ok := b.cacheData[p] +// Backoff adds or extends backoff delay for the peer. +func (b *backoffConnector) Backoff(p peer.ID) { + b.cacheLk.Lock() + defer b.cacheLk.Unlock() + + data, ok := b.cacheData[p] if !ok { - cache = &backoffData{} - cache.backoff = b.backoff() - b.cacheData[p] = cache + data = backoffData{} + data.backoff = b.backoff() + b.cacheData[p] = data } - return cache + + data.nexttry = time.Now().Add(data.backoff.Delay()) + b.cacheData[p] = data } -// RestartBackoff resets delay time between attempts and adds a delay for the next connection -// attempt to remote peer. It will mostly be called when host receives a notification that remote -// peer was disconnected. -func (b *backoffConnector) RestartBackoff(p peer.ID) { +// HasBackoff checks if peer is in backoff. +func (b *backoffConnector) HasBackoff(p peer.ID) bool { b.cacheLk.Lock() - defer b.cacheLk.Unlock() - cache := b.connectionData(p) - cache.backoff.Reset() - cache.nexttry = time.Now().Add(cache.backoff.Delay()) + cache, ok := b.cacheData[p] + b.cacheLk.Unlock() + return ok && time.Now().Before(cache.nexttry) } +// GC is a perpetual GCing loop. func (b *backoffConnector) GC(ctx context.Context) { ticker := time.NewTicker(gcInterval) for { diff --git a/share/availability/discovery/backoff_test.go b/share/availability/discovery/backoff_test.go index 95e84fbb8c..24814ed199 100644 --- a/share/availability/discovery/backoff_test.go +++ b/share/availability/discovery/backoff_test.go @@ -42,6 +42,6 @@ func TestBackoff_ResetBackoffPeriod(t *testing.T) { info := host.InfoFromHost(m.Hosts()[1]) require.NoError(t, b.Connect(ctx, *info)) nexttry := b.cacheData[info.ID].nexttry - b.RestartBackoff(info.ID) + b.Backoff(info.ID) require.True(t, b.cacheData[info.ID].nexttry.After(nexttry)) } diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index 47733a9281..0a2deb7858 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "fmt" "time" logging "github.com/ipfs/go-log/v2" @@ -11,20 +12,27 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/host/eventbus" + "golang.org/x/sync/errgroup" ) var log = logging.Logger("share/discovery") const ( - // peerWeight is a weight of discovered peers. - // peerWeight is a number that will be assigned to all discovered full nodes, - // so ConnManager will not break a connection with them. - peerWeight = 1000 - topic = "full" + // rendezvousPoint is the namespace where peers advertise and discover each other. + rendezvousPoint = "full" // eventbusBufSize is the size of the buffered channel to handle - // events in libp2p - eventbusBufSize = 32 + // events in libp2p. We specify a larger buffer size for the channel + // to avoid overflowing and blocking subscription during disconnection bursts. + // (by default it is 16) + eventbusBufSize = 64 + + // findPeersStuckWarnDelay is the duration after which discover will log an error message to + // notify that it is stuck. + findPeersStuckWarnDelay = time.Minute + + // defaultRetryTimeout defines time interval between discovery attempts. + defaultRetryTimeout = time.Second ) // waitF calculates time to restart announcing. @@ -32,21 +40,51 @@ var waitF = func(ttl time.Duration) time.Duration { return 7 * ttl / 8 } +type Parameters struct { + // PeersLimit defines the soft limit of FNs to connect to via discovery. + // Set 0 to disable. + PeersLimit uint + // AdvertiseInterval is a interval between advertising sessions. + // Set -1 to disable. + // NOTE: only full and bridge can advertise themselves. + AdvertiseInterval time.Duration + // discoveryRetryTimeout is an interval between discovery attempts + // when we discovered lower than PeersLimit peers. + // Set -1 to disable. + discoveryRetryTimeout time.Duration +} + +func (p Parameters) withDefaults() Parameters { + def := DefaultParameters() + if p.AdvertiseInterval == 0 { + p.AdvertiseInterval = def.AdvertiseInterval + } + if p.discoveryRetryTimeout == 0 { + p.discoveryRetryTimeout = defaultRetryTimeout + } + return p +} + +func DefaultParameters() Parameters { + return Parameters{ + PeersLimit: 5, + AdvertiseInterval: time.Hour * 8, + } +} + // Discovery combines advertise and discover services and allows to store discovered nodes. +// TODO: The code here gets horribly hairy, so we should refactor this at some point type Discovery struct { - set *limitedSet - host host.Host - disc discovery.Discovery - connector *backoffConnector - // peersLimit is max amount of peers that will be discovered during a discovery session. - peersLimit uint - // discInterval is an interval between discovery sessions. - discoveryInterval time.Duration - // advertiseInterval is an interval between advertising sessions. - advertiseInterval time.Duration - // onUpdatedPeers will be called on peer set changes + params Parameters + + set *limitedSet + host host.Host + disc discovery.Discovery + connector *backoffConnector onUpdatedPeers OnUpdatedPeers + triggerDisc chan struct{} + cancel context.CancelFunc } @@ -56,19 +94,16 @@ type OnUpdatedPeers func(peerID peer.ID, isAdded bool) func NewDiscovery( h host.Host, d discovery.Discovery, - peersLimit uint, - discInterval, - advertiseInterval time.Duration, + params Parameters, ) *Discovery { return &Discovery{ - set: newLimitedSet(peersLimit), - host: h, - disc: d, - connector: newBackoffConnector(h, defaultBackoffFactory), - peersLimit: peersLimit, - discoveryInterval: discInterval, - advertiseInterval: advertiseInterval, - onUpdatedPeers: func(peer.ID, bool) {}, + params: params.withDefaults(), + set: newLimitedSet(params.PeersLimit), + host: h, + disc: d, + connector: newBackoffConnector(h, defaultBackoffFactory), + onUpdatedPeers: func(peer.ID, bool) {}, + triggerDisc: make(chan struct{}), } } @@ -76,7 +111,19 @@ func (d *Discovery) Start(context.Context) error { ctx, cancel := context.WithCancel(context.Background()) d.cancel = cancel - go d.ensurePeers(ctx) + if d.params.PeersLimit == 0 { + log.Warn("peers limit is set to 0. Skipping discovery...") + return nil + } + + sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize)) + if err != nil { + return fmt.Errorf("subscribing for connection events: %w", err) + } + + go d.discoveryLoop(ctx) + go d.disconnectsLoop(ctx, sub) + go d.connector.GC(ctx) return nil } @@ -94,144 +141,238 @@ func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) { } } -// handlePeersFound receives peers and tries to establish a connection with them. -// Peer will be added to PeerCache if connection succeeds. -func (d *Discovery) handlePeerFound(ctx context.Context, topic string, peer peer.AddrInfo) { - if peer.ID == d.host.ID() || len(peer.Addrs) == 0 || d.set.Contains(peer.ID) { - return - } - err := d.set.TryAdd(peer.ID) - if err != nil { - log.Debug(err) - return - } - - err = d.connector.Connect(ctx, peer) - if err != nil { - log.Debug(err) - d.set.Remove(peer.ID) - return - } - - d.onUpdatedPeers(peer.ID, true) - log.Debugw("added peer to set", "id", peer.ID) - // add tag to protect peer of being killed by ConnManager - d.host.ConnManager().TagPeer(peer.ID, topic, peerWeight) +// Peers provides a list of discovered peers in the "full" topic. +// If Discovery hasn't found any peers, it blocks until at least one peer is found. +func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) { + return d.set.Peers(ctx) } -// ensurePeers ensures we always have 'peerLimit' connected peers. -// It starts peer discovery every 30 seconds until peer cache reaches peersLimit. -// Discovery is restarted if any previously connected peers disconnect. -func (d *Discovery) ensurePeers(ctx context.Context) { - if d.peersLimit == 0 { - log.Warn("peers limit is set to 0. Skipping discovery...") - return - } - // subscribe on EventBus in order to catch disconnected peers and restart - // the discovery. We specify a larger buffer size for the channel where - // EvtPeerConnectednessChanged events are sent (by default it is 16, we - // specify 32) to avoid any blocks on writing to the full channel. - sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize)) - if err != nil { - log.Error(err) +// Advertise is a utility function that persistently advertises a service through an Advertiser. +// TODO: Start advertising only after the reachability is confirmed by AutoNAT +func (d *Discovery) Advertise(ctx context.Context) { + if d.params.AdvertiseInterval == -1 { return } - go d.connector.GC(ctx) - t := time.NewTicker(d.discoveryInterval) - defer func() { - t.Stop() - if err = sub.Close(); err != nil { - log.Error(err) - } - }() + timer := time.NewTimer(d.params.AdvertiseInterval) + defer timer.Stop() + for { + ttl, err := d.disc.Advertise(ctx, rendezvousPoint) + if err != nil { + log.Debugf("Error advertising %s: %s", rendezvousPoint, err.Error()) + if ctx.Err() != nil { + return + } - // starting to listen to subscriptions async will help us to avoid any blocking - // in the case when we will not have the needed amount of FNs and will be blocked in `FindPeers`. - go func() { - for { select { + case <-timer.C: + timer.Reset(d.params.AdvertiseInterval) + continue case <-ctx.Done(): - log.Debug("Context canceled. Finish listening for connectedness events.") return - case e, ok := <-sub.Out(): - if !ok { - log.Debug("Subscription for connectedness events is closed.") - return - } - // listen to disconnect event to remove peer from set and reset backoff time - // reset timer in order to restart the discovery, once stored peer is disconnected - connStatus := e.(event.EvtPeerConnectednessChanged) - if connStatus.Connectedness == network.NotConnected { - if d.set.Contains(connStatus.Peer) { - log.Debugw("removing the peer from the peer set", - "peer", connStatus.Peer, "status", connStatus.Connectedness.String()) - d.connector.RestartBackoff(connStatus.Peer) - d.set.Remove(connStatus.Peer) - d.onUpdatedPeers(connStatus.Peer, false) - d.host.ConnManager().UntagPeer(connStatus.Peer, topic) - t.Reset(d.discoveryInterval) - } - } } } - }() - for { + log.Debugf("advertised") select { + case <-timer.C: + timer.Reset(waitF(ttl)) case <-ctx.Done(): - log.Info("Context canceled. Finishing peer discovery") return + } + } +} + +// discoveryLoop ensures we always have '~peerLimit' connected peers. +// It starts peer discovery per request and restarts the process until the soft limit reached. +func (d *Discovery) discoveryLoop(ctx context.Context) { + t := time.NewTicker(d.params.discoveryRetryTimeout) + defer t.Stop() + for { + // drain all previous ticks from channel + drainChannel(t.C) + select { case <-t.C: - if uint(d.set.Size()) == d.peersLimit { - // stop ticker if we have reached the limit - t.Stop() - continue - } - peers, err := d.disc.FindPeers(ctx, topic) - if err != nil { - log.Error(err) + found := d.discover(ctx) + if !found { + // rerun discovery if amount of peers didn't reach the limit continue } - for p := range peers { - go d.handlePeerFound(ctx, topic, p) - } + case <-ctx.Done(): + return + } + + select { + case <-d.triggerDisc: + case <-ctx.Done(): + return } } } -// Advertise is a utility function that persistently advertises a service through an Advertiser. -func (d *Discovery) Advertise(ctx context.Context) { - timer := time.NewTimer(d.advertiseInterval) - defer timer.Stop() +// disconnectsLoop listen for disconnect events and ensures Discovery state +// is updated. +func (d *Discovery) disconnectsLoop(ctx context.Context, sub event.Subscription) { + defer sub.Close() + for { - ttl, err := d.disc.Advertise(ctx, topic) - if err != nil { - log.Debugf("Error advertising %s: %s", topic, err.Error()) - if ctx.Err() != nil { + select { + case <-ctx.Done(): + return + case e, ok := <-sub.Out(): + if !ok { + log.Error("connection subscription was closed unexpectedly") return } - select { - case <-timer.C: - timer.Reset(d.advertiseInterval) - continue - case <-ctx.Done(): - return + if evnt := e.(event.EvtPeerConnectednessChanged); evnt.Connectedness == network.NotConnected { + if !d.set.Contains(evnt.Peer) { + continue + } + + d.host.ConnManager().Unprotect(evnt.Peer, rendezvousPoint) + d.connector.Backoff(evnt.Peer) + d.set.Remove(evnt.Peer) + d.onUpdatedPeers(evnt.Peer, false) + log.Debugw("removed peer from the peer set", + "peer", evnt.Peer, "status", evnt.Connectedness.String()) + + if d.set.Size() < d.set.Limit() { + // trigger discovery + select { + case d.triggerDisc <- struct{}{}: + default: + } + } } } + } +} + +// discover finds new peers and reports whether it succeeded. +func (d *Discovery) discover(ctx context.Context) bool { + size := d.set.Size() + want := d.set.Limit() - size + if want == 0 { + log.Debugw("reached soft peer limit, skipping discovery", "size", size) + return true + } + log.Infow("discovering peers", "want", want) + + // we use errgroup as it provide limits + var wg errgroup.Group + // limit to minimize chances of overreaching the limit + wg.SetLimit(int(d.set.Limit())) + defer wg.Wait() //nolint:errcheck + + // stop discovery when we are done + findCtx, findCancel := context.WithCancel(ctx) + defer findCancel() + + peers, err := d.disc.FindPeers(findCtx, rendezvousPoint) + if err != nil { + log.Error("unable to start discovery", "err", err) + return false + } + ticker := time.NewTicker(findPeersStuckWarnDelay) + defer ticker.Stop() + for { + ticker.Reset(findPeersStuckWarnDelay) + // drain all previous ticks from channel + drainChannel(ticker.C) select { - case <-timer.C: - timer.Reset(waitF(ttl)) - case <-ctx.Done(): - return + case <-findCtx.Done(): + return true + case <-ticker.C: + log.Warn("wasn't able to find new peers for long time") + continue + case p, ok := <-peers: + if !ok { + log.Debugw("discovery channel closed", "find_is_canceled", findCtx.Err() != nil) + return d.set.Size() >= d.set.Limit() + } + + peer := p + wg.Go(func() error { + if findCtx.Err() != nil { + log.Debug("find has been canceled, skip peer") + return nil + } + + // we don't pass findCtx so that we don't cancel in progress connections + // that are likely to be valuable + if !d.handleDiscoveredPeer(ctx, peer) { + return nil + } + + size := d.set.Size() + log.Debugw("found peer", "peer", peer.ID, "found_amount", size) + if size < d.set.Limit() { + return nil + } + + log.Infow("discovered wanted peers", "amount", size) + findCancel() + return nil + }) } } } -// Peers provides a list of discovered peers in the "full" topic. -// If Discovery hasn't found any peers, it blocks until at least one peer is found. -func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) { - return d.set.Peers(ctx) +// handleDiscoveredPeer adds peer to the internal if can connect or is connected. +// Report whether it succeeded. +func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo) bool { + logger := log.With("peer", peer.ID) + switch { + case peer.ID == d.host.ID(): + logger.Debug("skip handle: self discovery") + return false + case len(peer.Addrs) == 0: + logger.Debug("skip handle: empty address list") + return false + case d.set.Size() >= d.set.Limit(): + logger.Debug("skip handle: enough peers found") + return false + case d.connector.HasBackoff(peer.ID): + logger.Debug("skip handle: backoff") + return false + } + + switch d.host.Network().Connectedness(peer.ID) { + case network.Connected: + d.connector.Backoff(peer.ID) // we still have to backoff the connected peer + case network.NotConnected: + err := d.connector.Connect(ctx, peer) + if err != nil { + logger.Debugw("unable to connect", "err", err) + return false + } + default: + panic("unknown connectedness") + } + + if !d.set.Add(peer.ID) { + logger.Debug("peer is already in discovery set") + return false + } + d.onUpdatedPeers(peer.ID, true) + logger.Debug("added peer to set") + + // tag to protect peer from being killed by ConnManager + // NOTE: This is does not protect from remote killing the connection. + // In the future, we should design a protocol that keeps bidirectional agreement on whether + // connection should be kept or not, similar to mesh link in GossipSub. + d.host.ConnManager().Protect(peer.ID, rendezvousPoint) + return true +} + +func drainChannel(c <-chan time.Time) { + for { + select { + case <-c: + default: + return + } + } } diff --git a/share/availability/discovery/discovery_test.go b/share/availability/discovery/discovery_test.go new file mode 100644 index 0000000000..fd88a98586 --- /dev/null +++ b/share/availability/discovery/discovery_test.go @@ -0,0 +1,136 @@ +package discovery + +import ( + "context" + "testing" + "time" + + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p/core/discovery" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/discovery/routing" + basic "github.com/libp2p/go-libp2p/p2p/host/basic" + swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDiscovery(t *testing.T) { + const nodes = 10 // higher number brings higher coverage + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + t.Cleanup(cancel) + + tn := newTestnet(ctx, t) + + peerA := tn.discovery(Parameters{ + PeersLimit: nodes, + discoveryRetryTimeout: time.Millisecond * 100, + AdvertiseInterval: -1, // we don't want to be found but only find + }) + + type peerUpdate struct { + peerID peer.ID + isAdded bool + } + updateCh := make(chan peerUpdate) + peerA.WithOnPeersUpdate(func(peerID peer.ID, isAdded bool) { + updateCh <- peerUpdate{peerID: peerID, isAdded: isAdded} + }) + + discs := make([]*Discovery, nodes) + for i := range discs { + discs[i] = tn.discovery(Parameters{ + PeersLimit: 0, + discoveryRetryTimeout: -1, + AdvertiseInterval: time.Millisecond * 100, + }) + + select { + case res := <-updateCh: + require.Equal(t, discs[i].host.ID(), res.peerID) + require.True(t, res.isAdded) + case <-ctx.Done(): + t.Fatal("did not discover peer in time") + } + } + + assert.EqualValues(t, nodes, peerA.set.Size()) + + for _, disc := range discs { + peerID := disc.host.ID() + err := peerA.host.Network().ClosePeer(peerID) + require.NoError(t, err) + + select { + case res := <-updateCh: + require.Equal(t, peerID, res.peerID) + require.False(t, res.isAdded) + case <-ctx.Done(): + t.Fatal("did not disconnect from peer in time") + } + } + + assert.EqualValues(t, 0, peerA.set.Size()) +} + +type testnet struct { + ctx context.Context + T *testing.T + + bootstrapper peer.AddrInfo +} + +func newTestnet(ctx context.Context, t *testing.T) *testnet { + swarm := swarmt.GenSwarm(t, swarmt.OptDisableTCP) + hst, err := basic.NewHost(swarm, &basic.HostOpts{}) + require.NoError(t, err) + hst.Start() + + _, err = dht.New(ctx, hst, + dht.Mode(dht.ModeServer), + dht.BootstrapPeers(), + dht.ProtocolPrefix("/test"), + ) + require.NoError(t, err) + + return &testnet{ctx: ctx, T: t, bootstrapper: *host.InfoFromHost(hst)} +} + +func (t *testnet) discovery(params Parameters) *Discovery { + hst, routingDisc := t.peer() + disc := NewDiscovery(hst, routingDisc, params) + err := disc.Start(t.ctx) + require.NoError(t.T, err) + t.T.Cleanup(func() { + err := disc.Stop(t.ctx) + require.NoError(t.T, err) + }) + + go disc.Advertise(t.ctx) + return disc +} + +func (t *testnet) peer() (host.Host, discovery.Discovery) { + swarm := swarmt.GenSwarm(t.T, swarmt.OptDisableTCP) + hst, err := basic.NewHost(swarm, &basic.HostOpts{}) + require.NoError(t.T, err) + hst.Start() + + err = hst.Connect(t.ctx, t.bootstrapper) + require.NoError(t.T, err) + + dht, err := dht.New(t.ctx, hst, + dht.Mode(dht.ModeServer), + dht.ProtocolPrefix("/test"), + // needed to reduce connections to peers on DHT level + dht.BucketSize(1), + ) + require.NoError(t.T, err) + + err = dht.Bootstrap(t.ctx) + require.NoError(t.T, err) + + return hst, routing.NewRoutingDiscovery(dht) +} diff --git a/share/availability/discovery/set.go b/share/availability/discovery/set.go index a7b330fadd..37b3851bdf 100644 --- a/share/availability/discovery/set.go +++ b/share/availability/discovery/set.go @@ -2,7 +2,6 @@ package discovery import ( "context" - "errors" "sync" "github.com/libp2p/go-libp2p/core/peer" @@ -34,36 +33,34 @@ func (ps *limitedSet) Contains(p peer.ID) bool { return ok } -func (ps *limitedSet) Size() int { +func (ps *limitedSet) Limit() uint { + return ps.limit +} + +func (ps *limitedSet) Size() uint { ps.lk.RLock() defer ps.lk.RUnlock() - return len(ps.ps) + return uint(len(ps.ps)) } -// TryAdd attempts to add the given peer into the set. -// This operation will fail if the number of peers in the set is equal to size. -func (ps *limitedSet) TryAdd(p peer.ID) error { +// Add attempts to add the given peer into the set. +func (ps *limitedSet) Add(p peer.ID) (added bool) { ps.lk.Lock() - defer ps.lk.Unlock() if _, ok := ps.ps[p]; ok { - return errors.New("share: discovery: peer already added") + return false } - if len(ps.ps) >= int(ps.limit) { - return errors.New("share: discovery: peers limit reached") - } - ps.ps[p] = struct{}{} -LOOP: + ps.lk.Unlock() + for { // peer will be pushed to the channel only when somebody is reading from it. // this is done to handle case when Peers() was called on empty set. select { case ps.waitPeer <- p: default: - break LOOP + return true } } - return nil } func (ps *limitedSet) Remove(id peer.ID) { @@ -76,16 +73,16 @@ func (ps *limitedSet) Remove(id peer.ID) { // Peers returns all discovered peers from the set. func (ps *limitedSet) Peers(ctx context.Context) ([]peer.ID, error) { - ps.lk.Lock() + ps.lk.RLock() if len(ps.ps) > 0 { out := make([]peer.ID, 0, len(ps.ps)) for p := range ps.ps { out = append(out, p) } - ps.lk.Unlock() + ps.lk.RUnlock() return out, nil } - ps.lk.Unlock() + ps.lk.RUnlock() // block until a new peer will be discovered select { diff --git a/share/availability/discovery/set_test.go b/share/availability/discovery/set_test.go index b4c6a8fb49..d5113a2291 100644 --- a/share/availability/discovery/set_test.go +++ b/share/availability/discovery/set_test.go @@ -15,29 +15,17 @@ func TestSet_TryAdd(t *testing.T) { require.NoError(t, err) set := newLimitedSet(1) - require.NoError(t, set.TryAdd(h.ID())) + set.Add(h.ID()) require.True(t, set.Contains(h.ID())) } -func TestSet_TryAddFails(t *testing.T) { - m := mocknet.New() - h1, err := m.GenPeer() - require.NoError(t, err) - h2, err := m.GenPeer() - require.NoError(t, err) - - set := newLimitedSet(1) - require.NoError(t, set.TryAdd(h1.ID())) - require.Error(t, set.TryAdd(h2.ID())) -} - func TestSet_Remove(t *testing.T) { m := mocknet.New() h, err := m.GenPeer() require.NoError(t, err) set := newLimitedSet(1) - require.NoError(t, set.TryAdd(h.ID())) + set.Add(h.ID()) set.Remove(h.ID()) require.False(t, set.Contains(h.ID())) } @@ -50,8 +38,8 @@ func TestSet_Peers(t *testing.T) { require.NoError(t, err) set := newLimitedSet(2) - require.NoError(t, set.TryAdd(h1.ID())) - require.NoError(t, set.TryAdd(h2.ID())) + set.Add(h1.ID()) + set.Add(h2.ID()) ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) t.Cleanup(cancel) @@ -71,7 +59,7 @@ func TestSet_WaitPeers(t *testing.T) { set := newLimitedSet(2) go func() { time.Sleep(time.Millisecond * 500) - set.TryAdd(h1.ID()) //nolint:errcheck + set.Add(h1.ID()) }() ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) @@ -91,9 +79,9 @@ func TestSet_Size(t *testing.T) { require.NoError(t, err) set := newLimitedSet(2) - require.NoError(t, set.TryAdd(h1.ID())) - require.NoError(t, set.TryAdd(h2.ID())) - require.Equal(t, 2, set.Size()) + set.Add(h1.ID()) + set.Add(h2.ID()) + require.EqualValues(t, 2, set.Size()) set.Remove(h2.ID()) - require.Equal(t, 1, set.Size()) + require.EqualValues(t, 1, set.Size()) } diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go index ca5d0f10c5..df4561a8eb 100644 --- a/share/availability/full/testing.go +++ b/share/availability/full/testing.go @@ -37,6 +37,10 @@ func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { } func TestAvailability(getter share.Getter) *ShareAvailability { - disc := discovery.NewDiscovery(nil, routing.NewRoutingDiscovery(routinghelpers.Null{}), 0, time.Second, time.Second) + params := discovery.DefaultParameters() + params.AdvertiseInterval = time.Second + params.PeersLimit = 10 + disc := discovery.NewDiscovery(nil, + routing.NewRoutingDiscovery(routinghelpers.Null{}), params) return NewShareAvailability(nil, getter, disc) } diff --git a/share/getters/shrex_test.go b/share/getters/shrex_test.go index 7578195639..b93a40d488 100644 --- a/share/getters/shrex_test.go +++ b/share/getters/shrex_test.go @@ -158,7 +158,11 @@ func testManager(ctx context.Context, host host.Host, headerSub libhead.Subscrib } disc := discovery.NewDiscovery(nil, - routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), 0, time.Second, time.Second) + routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), discovery.Parameters{ + PeersLimit: 10, + AdvertiseInterval: time.Second, + }, + ) connGater, err := conngater.NewBasicConnectionGater(ds_sync.MutexWrap(datastore.NewMapDatastore())) if err != nil { return nil, err diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index 05c1f5b33e..804dd4b673 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -365,7 +365,7 @@ func TestIntegration(t *testing.T) { t.Run("get peer from discovery", func(t *testing.T) { nw, err := mocknet.FullMeshConnected(3) require.NoError(t, err) - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) t.Cleanup(cancel) // set up bootstrapper @@ -393,9 +393,11 @@ func TestIntegration(t *testing.T) { bnDisc := discovery.NewDiscovery( nw.Hosts()[0], routingdisc.NewRoutingDiscovery(router1), - 10, - time.Second, - time.Second) + discovery.Parameters{ + PeersLimit: 0, + AdvertiseInterval: time.Second, + }, + ) // set up full node / receiver node fnHost := nw.Hosts()[0] @@ -404,9 +406,10 @@ func TestIntegration(t *testing.T) { fnDisc := discovery.NewDiscovery( nw.Hosts()[1], routingdisc.NewRoutingDiscovery(router2), - 10, - time.Second, - time.Second, + discovery.Parameters{ + PeersLimit: 10, + AdvertiseInterval: time.Second, + }, ) err = fnDisc.Start(ctx) require.NoError(t, err) @@ -458,8 +461,12 @@ func testManager(ctx context.Context, headerSub libhead.Subscriber[*header.Exten if err != nil { return nil, err } + disc := discovery.NewDiscovery(nil, - routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), 0, time.Second, time.Second) + routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), discovery.Parameters{ + PeersLimit: 0, + AdvertiseInterval: time.Second, + }) connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) if err != nil { return nil, err