diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index d50715d957..ed01e1e022 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -23,7 +23,7 @@ const ( // events in libp2p eventbusBufSize = 32 - // findPeersStuckWarnDelay is the duration after which findPeers will log an error message to + // findPeersStuckWarnDelay is the duration after which discover will log an error message to // notify that it is stuck. findPeersStuckWarnDelay = time.Minute @@ -56,10 +56,6 @@ func DefaultParameters() Parameters { } } -func (p *Parameters) Validate() error { - return nil -} - // Discovery combines advertise and discover services and allows to store discovered nodes. // TODO: The code here gets horribly hairy, so we should refactor this at some point type Discovery struct { @@ -98,7 +94,15 @@ func NewDiscovery( func (d *Discovery) Start(context.Context) error { ctx, cancel := context.WithCancel(context.Background()) d.cancel = cancel - go d.ensurePeers(ctx) + + if d.params.PeersLimit == 0 { + log.Warn("peers limit is set to 0. Skipping discovery...") + return nil + } + + go d.discoveryLoop(ctx) + go d.disconnectsLoop(ctx) + go d.connector.GC(ctx) return nil } @@ -116,113 +120,51 @@ func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) { } } -func (d *Discovery) triggerDiscovery() { - select { - case d.triggerDisc <- struct{}{}: - default: - } +// Peers provides a list of discovered peers in the "full" topic. +// If Discovery hasn't found any peers, it blocks until at least one peer is found. +func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) { + return d.set.Peers(ctx) } -// handlePeersFound receives peers and tries to establish a connection with them. -// Peer will be added to PeerCache if connection succeeds. -func (d *Discovery) handlePeerFound(ctx context.Context, peer peer.AddrInfo) bool { - logger := log.With("peer", peer.ID) - switch { - case peer.ID == d.host.ID(): - logger.Debug("skip handle: self discovery") - return false - case len(peer.Addrs) == 0: - logger.Debug("skip handle: empty address list") - return false - case d.set.Size() >= d.set.Limit(): - logger.Debug("skip handle: enough peers found") - return false - case d.connector.HasBackoff(peer.ID): - logger.Debug("skip handle: backoff") - return false +// Advertise is a utility function that persistently advertises a service through an Advertiser. +// TODO: Start advertising only after the reachability is confirmed by AutoNAT +func (d *Discovery) Advertise(ctx context.Context) { + if d.params.AdvertiseInterval == -1 { + return } - switch d.host.Network().Connectedness(peer.ID) { - case network.Connected: - d.connector.Backoff(peer.ID) // we still have to backoff the connected peer - case network.NotConnected: - err := d.connector.Connect(ctx, peer) + timer := time.NewTimer(d.params.AdvertiseInterval) + defer timer.Stop() + for { + ttl, err := d.disc.Advertise(ctx, topic) if err != nil { - logger.Debugw("unable to connect", "err", err) - return false - } - default: - panic("unknown connectedness") - } - - if !d.set.Add(peer.ID) { - logger.Debug("peer is already in discovery set") - return false - } - d.onUpdatedPeers(peer.ID, true) - logger.Debug("added peer to set") - - // tag to protect peer from being killed by ConnManager - // NOTE: This is does not protect from remote killing the connection. - // In the future, we should design a protocol that keeps bidirectional agreement on whether - // connection should be kept or not, similar to mesh link in GossipSub. - d.host.ConnManager().Protect(peer.ID, topic) - return true -} - -// ensurePeers ensures we always have 'peerLimit' connected peers. -// It starts peer discovery every 30 seconds until peer cache reaches peersLimit. -// Discovery is restarted if any previously connected peers disconnect. -func (d *Discovery) ensurePeers(ctx context.Context) { - if d.params.PeersLimit == 0 { - log.Warn("peers limit is set to 0. Skipping discovery...") - return - } - // subscribe on EventBus in order to catch disconnected peers and restart - // the discovery. We specify a larger buffer size for the channel where - // EvtPeerConnectednessChanged events are sent (by default it is 16, we - // specify 32) to avoid any blocks on writing to the full channel. - sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize)) - if err != nil { - log.Error(err) - return - } - defer sub.Close() + log.Debugf("Error advertising %s: %s", topic, err.Error()) + if ctx.Err() != nil { + return + } - // starting to listen to subscriptions async will help us to avoid any blocking - // in the case when we will not have the needed amount of FNs and will be blocked in `FindPeers`. - go func() { - for { select { + case <-timer.C: + timer.Reset(d.params.AdvertiseInterval) + continue case <-ctx.Done(): return - case e, ok := <-sub.Out(): - if !ok { - log.Error("connection subscription was closed unexpectedly") - return - } - - if evnt := e.(event.EvtPeerConnectednessChanged); evnt.Connectedness == network.NotConnected { - if !d.set.Contains(evnt.Peer) { - continue - } - - d.host.ConnManager().Unprotect(evnt.Peer, topic) - d.connector.Backoff(evnt.Peer) - d.set.Remove(evnt.Peer) - d.onUpdatedPeers(evnt.Peer, false) - log.Debugw("removed peer from the peer set", - "peer", evnt.Peer, "status", evnt.Connectedness.String()) - - if d.set.Size() < d.set.Limit() { - d.triggerDiscovery() - } - } } } - }() - go d.connector.GC(ctx) + log.Debugf("advertised") + select { + case <-timer.C: + timer.Reset(waitF(ttl)) + case <-ctx.Done(): + return + } + } +} + +// discoveryLoop ensures we always have '~peerLimit' connected peers. +// It starts peer discovery per request and restarts the process until the soft limit reached. +func (d *Discovery) discoveryLoop(ctx context.Context) { t := time.NewTicker(d.params.discoveryRetryTimeout) defer t.Stop() for { @@ -230,7 +172,7 @@ func (d *Discovery) ensurePeers(ctx context.Context) { drainChannel(t.C) select { case <-t.C: - found := d.findPeers(ctx) + found := d.discover(ctx) if !found { // rerun discovery if amount of peers didn't reach the limit continue @@ -247,7 +189,56 @@ func (d *Discovery) ensurePeers(ctx context.Context) { } } -func (d *Discovery) findPeers(ctx context.Context) bool { +// disconnectsLoop listen for disconnect events and ensures Discovery state +// is updated. +func (d *Discovery) disconnectsLoop(ctx context.Context) { + // subscribe on EventBus in order to catch disconnected peers and restart + // the discovery. We specify a larger buffer size for the channel where + // EvtPeerConnectednessChanged events are sent (by default it is 16, we + // specify 32) to avoid any blocks on writing to the full channel. + sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize)) + if err != nil { + log.Error(err) + return + } + defer sub.Close() + + for { + select { + case <-ctx.Done(): + return + case e, ok := <-sub.Out(): + if !ok { + log.Error("connection subscription was closed unexpectedly") + return + } + + if evnt := e.(event.EvtPeerConnectednessChanged); evnt.Connectedness == network.NotConnected { + if !d.set.Contains(evnt.Peer) { + continue + } + + d.host.ConnManager().Unprotect(evnt.Peer, topic) + d.connector.Backoff(evnt.Peer) + d.set.Remove(evnt.Peer) + d.onUpdatedPeers(evnt.Peer, false) + log.Debugw("removed peer from the peer set", + "peer", evnt.Peer, "status", evnt.Connectedness.String()) + + if d.set.Size() < d.set.Limit() { + // trigger discovery + select { + case d.triggerDisc <- struct{}{}: + default: + } + } + } + } + } +} + +// discover finds new peers and reports whether it succeeded. +func (d *Discovery) discover(ctx context.Context) bool { size := d.set.Size() want := d.set.Limit() - size if want == 0 { @@ -299,7 +290,7 @@ func (d *Discovery) findPeers(ctx context.Context) bool { // we don't pass findCtx so that we don't cancel in progress connections // that are likely to be valuable - if !d.handlePeerFound(ctx, peer) { + if !d.handleDiscoveredPeer(ctx, peer) { return nil } @@ -317,46 +308,51 @@ func (d *Discovery) findPeers(ctx context.Context) bool { } } -// Advertise is a utility function that persistently advertises a service through an Advertiser. -// TODO: Start advertising only after the reachability is confirmed by AutoNAT -func (d *Discovery) Advertise(ctx context.Context) { - if d.params.AdvertiseInterval == -1 { - return +// handleDiscoveredPeer adds peer to the internal if can connect or is connected. +// Report whether it succeeded. +func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo) bool { + logger := log.With("peer", peer.ID) + switch { + case peer.ID == d.host.ID(): + logger.Debug("skip handle: self discovery") + return false + case len(peer.Addrs) == 0: + logger.Debug("skip handle: empty address list") + return false + case d.set.Size() >= d.set.Limit(): + logger.Debug("skip handle: enough peers found") + return false + case d.connector.HasBackoff(peer.ID): + logger.Debug("skip handle: backoff") + return false } - timer := time.NewTimer(d.params.AdvertiseInterval) - defer timer.Stop() - for { - ttl, err := d.disc.Advertise(ctx, topic) + switch d.host.Network().Connectedness(peer.ID) { + case network.Connected: + d.connector.Backoff(peer.ID) // we still have to backoff the connected peer + case network.NotConnected: + err := d.connector.Connect(ctx, peer) if err != nil { - log.Debugf("Error advertising %s: %s", topic, err.Error()) - if ctx.Err() != nil { - return - } - - select { - case <-timer.C: - timer.Reset(d.params.AdvertiseInterval) - continue - case <-ctx.Done(): - return - } + logger.Debugw("unable to connect", "err", err) + return false } + default: + panic("unknown connectedness") + } - log.Debugf("advertised") - select { - case <-timer.C: - timer.Reset(waitF(ttl)) - case <-ctx.Done(): - return - } + if !d.set.Add(peer.ID) { + logger.Debug("peer is already in discovery set") + return false } -} + d.onUpdatedPeers(peer.ID, true) + logger.Debug("added peer to set") -// Peers provides a list of discovered peers in the "full" topic. -// If Discovery hasn't found any peers, it blocks until at least one peer is found. -func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) { - return d.set.Peers(ctx) + // tag to protect peer from being killed by ConnManager + // NOTE: This is does not protect from remote killing the connection. + // In the future, we should design a protocol that keeps bidirectional agreement on whether + // connection should be kept or not, similar to mesh link in GossipSub. + d.host.ConnManager().Protect(peer.ID, topic) + return true } func (p Parameters) withDefaults() Parameters {