Skip to content

Commit

Permalink
Merge branch 'discovery-dial-timeout' into discovery_metrics
Browse files Browse the repository at this point in the history
# Conflicts:
#	share/availability/discovery/discovery.go
  • Loading branch information
walldiss committed May 3, 2023
2 parents 18b4eb6 + 8e20342 commit 2ebbf3c
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 141 deletions.
1 change: 0 additions & 1 deletion nodebuilder/tests/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,5 @@ func TestRestartNodeDiscovery(t *testing.T) {

func setTimeInterval(cfg *nodebuilder.Config, interval time.Duration) {
cfg.P2P.RoutingTableRefreshPeriod = interval
cfg.Share.Discovery.DiscoveryInterval = interval
cfg.Share.Discovery.AdvertiseInterval = interval
}
8 changes: 3 additions & 5 deletions share/availability/discovery/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
)

// gcInterval is a default period after which disconnected peers will be removed from cache
const (
// gcInterval is a default period after which disconnected peers will be removed from cache
gcInterval = time.Minute
// connectTimeout is the timeout used for dialing peers and discovering peer addresses.
connectTimeout = time.Minute * 2
Expand Down Expand Up @@ -85,12 +85,10 @@ func (b *backoffConnector) HasBackoff(p peer.ID) bool {
b.cacheLk.Lock()
cache, ok := b.cacheData[p]
b.cacheLk.Unlock()
if ok && time.Now().Before(cache.nexttry) {
return true
}
return false
return ok && time.Now().Before(cache.nexttry)
}

// GC is a perpetual GCing loop.
func (b *backoffConnector) GC(ctx context.Context) {
ticker := time.NewTicker(gcInterval)
for {
Expand Down
176 changes: 79 additions & 97 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package discovery

import (
"context"
"sync"
"sync/atomic"
"time"

logging "github.com/ipfs/go-log/v2"
Expand All @@ -25,6 +23,8 @@ const (
// events in libp2p
eventbusBufSize = 32

// findPeersStuckWarnDelay is the duration after which findPeers will log an error message to
// notify that it is stuck.
findPeersStuckWarnDelay = time.Minute
)

Expand All @@ -36,21 +36,22 @@ var waitF = func(ttl time.Duration) time.Duration {
type Parameters struct {
// PeersLimit defines the soft limit of FNs to connect to via discovery.
// Set 0 to disable.
PeersLimit int
// DiscoveryInterval is an interval between discovery sessions.
// Set -1 to disable.
DiscoveryInterval time.Duration
PeersLimit uint
// AdvertiseInterval is a interval between advertising sessions.
// Set -1 to disable.
// NOTE: only full and bridge can advertise themselves.
AdvertiseInterval time.Duration
// discoveryRetryTimeout is an interval between discovery attempts
// when we discovered lower than PeersLimit peers.
// Set -1 to disable.
discoveryRetryTimeout time.Duration
}

func DefaultParameters() Parameters {
return Parameters{
PeersLimit: 5,
DiscoveryInterval: time.Minute,
AdvertiseInterval: time.Hour * 8,
PeersLimit: 5,
discoveryRetryTimeout: time.Second,
AdvertiseInterval: time.Hour * 8,
}
}

Expand All @@ -69,14 +70,13 @@ type Discovery struct {
connector *backoffConnector
onUpdatedPeers OnUpdatedPeers

connectingLk sync.Mutex
connecting map[peer.ID]context.CancelFunc
triggerDisc chan struct{}

metrics *metrics
cancel context.CancelFunc
}

type OnUpdatedPeers func(peerIOnUpdatedPeersD peer.ID, isAdded bool)
type OnUpdatedPeers func(peerID peer.ID, isAdded bool)

// NewDiscovery constructs a new discovery.
func NewDiscovery(
Expand All @@ -91,7 +91,7 @@ func NewDiscovery(
disc: d,
connector: newBackoffConnector(h, defaultBackoffFactory),
onUpdatedPeers: func(peer.ID, bool) {},
connecting: make(map[peer.ID]context.CancelFunc),
triggerDisc: make(chan struct{}),
}
}

Expand All @@ -116,9 +116,16 @@ func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) {
}
}

func (d *Discovery) triggerDiscovery() {
select {
case d.triggerDisc <- struct{}{}:
default:
}
}

// handlePeersFound receives peers and tries to establish a connection with them.
// Peer will be added to PeerCache if connection succeeds.
func (d *Discovery) handlePeerFound(ctx context.Context, cancelFind context.CancelFunc, peer peer.AddrInfo) bool {
func (d *Discovery) handlePeerFound(ctx context.Context, peer peer.AddrInfo) bool {
log := log.With("peer", peer.ID)
switch {
case peer.ID == d.host.ID():
Expand All @@ -139,38 +146,28 @@ func (d *Discovery) handlePeerFound(ctx context.Context, cancelFind context.Canc
return false
}

d.connectingLk.Lock()
if _, ok := d.connecting[peer.ID]; ok {
d.connectingLk.Unlock()
d.metrics.observeHandlePeer(handlePeerConnInProgress)
log.Debug("skip handle: connecting to the peer in another routine")
return false
}
d.connecting[peer.ID] = cancelFind
d.connectingLk.Unlock()

switch d.host.Network().Connectedness(peer.ID) {
case network.Connected:
if added := d.addPeerToSet(peer.ID); !added {
d.metrics.observeHandlePeer(handlePeerInSet)
return added
}
d.metrics.observeHandlePeer(handlePeerConnected)
// we still have to backoff the connected peer
d.connector.Backoff(peer.ID)
d.connector.Backoff(peer.ID) // we still have to backoff the connected peer
case network.NotConnected:
err := d.connector.Connect(ctx, peer)
if err != nil {
d.connectingLk.Lock()
delete(d.connecting, peer.ID)
d.connectingLk.Unlock()
d.metrics.observeHandlePeer(handlePeerConnErr)
log.Debugw("unable to connect", "err", err)
return false
}
d.metrics.observeHandlePeer(handlePeerConnect)
log.Debug("started connecting to the peer")
default:
panic("unknown connectedness")
}

if !d.set.Add(peer.ID) {
d.metrics.observeHandlePeer(handlePeerInSet)
log.Debug("peer is already in discovery set")
return false
}
d.onUpdatedPeers(peer.ID, true)
d.metrics.observeHandlePeer(handlePeerConnect)
log.Debug("added peer to set")

// tag to protect peer from being killed by ConnManager
// NOTE: This is does not protect from remote killing the connection.
Expand All @@ -184,8 +181,8 @@ func (d *Discovery) handlePeerFound(ctx context.Context, cancelFind context.Canc
// It starts peer discovery every 30 seconds until peer cache reaches peersLimit.
// Discovery is restarted if any previously connected peers disconnect.
func (d *Discovery) ensurePeers(ctx context.Context) {
if d.params.PeersLimit == 0 || d.params.DiscoveryInterval == -1 {
log.Warn("peers limit is set to 0 and/or discovery interval is set to -1. Skipping discovery...")
if d.params.PeersLimit == 0 {
log.Warn("peers limit is set to 0. Skipping discovery...")
return
}
// subscribe on EventBus in order to catch disconnected peers and restart
Expand All @@ -208,11 +205,11 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
return
case e, ok := <-sub.Out():
if !ok {
log.Error("connection subscription was closed unexpectedly")
return
}
evnt := e.(event.EvtPeerConnectednessChanged)
switch evnt.Connectedness {
case network.NotConnected:

if evnt := e.(event.EvtPeerConnectednessChanged); evnt.Connectedness == network.NotConnected {
if !d.set.Contains(evnt.Peer) {
continue
}
Expand All @@ -223,76 +220,53 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
d.onUpdatedPeers(evnt.Peer, false)
log.Debugw("removed peer from the peer set",
"peer", evnt.Peer, "status", evnt.Connectedness.String())
case network.Connected:
d.addPeerToSet(evnt.Peer)

if d.set.Size() < d.set.Limit() {
d.triggerDiscovery()
}
}
}
}
}()
go d.connector.GC(ctx)

t := time.NewTicker(d.params.DiscoveryInterval)
t := time.NewTicker(d.params.discoveryRetryTimeout)
defer t.Stop()
for {
d.findPeers(ctx)
if d.set.Size() < d.set.Limit() {
// rerun discovery is amount of peers didn't reach the limit
continue
}

t.Reset(d.params.DiscoveryInterval)
// drain all previous ticks from channel
drainChannel(t.C)
select {
case <-t.C:
found := d.findPeers(ctx)
if !found {
// rerun discovery if amount of peers didn't reach the limit
continue
}
case <-ctx.Done():
return
}
}
}

func (d *Discovery) addPeerToSet(peerID peer.ID) bool {
d.connectingLk.Lock()
cancelFind, ok := d.connecting[peerID]
d.connectingLk.Unlock()
if !ok {
return false
}

if !d.set.Add(peerID) {
log.Debug("skip: peer is already in discovery set")
return false
}

// and notify our subscribers
d.onUpdatedPeers(peerID, true)
log.Debugw("added peer to set", "id", peerID)

// first do Add and only after check the limit
// so that peer set represents the actual number of connections we made
// which can go slightly over peersLimit
if d.set.Size() >= d.set.Limit() {
log.Infow("soft peer limit reached", "count", d.set.Size())
cancelFind()
select {
case <-d.triggerDisc:
case <-ctx.Done():
return
}
}

d.connectingLk.Lock()
delete(d.connecting, peerID)
d.connectingLk.Unlock()
return true
}

func (d *Discovery) findPeers(ctx context.Context) {
if d.set.Size() >= d.set.Limit() {
log.Debugw("reached soft peer limit, skipping discovery", "size", d.set.Size())
return
func (d *Discovery) findPeers(ctx context.Context) bool {
size := d.set.Size()
want := d.set.Limit() - size
if want == 0 {
log.Debugw("reached soft peer limit, skipping discovery", "size", size)
return true
}
log.Infow("below soft peer limit, discovering peers", "remaining", d.set.Limit()-d.set.Size())
log.Infow("discovering peers", "want", want)

// we use errgroup as it obeys the context
// we use errgroup as it provide limits
var wg errgroup.Group
// limit to minimize chances of overreaching the limit
wg.SetLimit(d.set.Limit())
wg.SetLimit(int(d.set.Limit()))
defer wg.Wait() //nolint:errcheck

// stop discovery when we are done
Expand All @@ -302,12 +276,11 @@ func (d *Discovery) findPeers(ctx context.Context) {
peers, err := d.disc.FindPeers(findCtx, topic)
if err != nil {
log.Error("unable to start discovery", "err", err)
return
return false
}

ticker := time.NewTicker(findPeersStuckWarnDelay)
defer ticker.Stop()
var amount atomic.Int32
for {
ticker.Reset(findPeersStuckWarnDelay)
// drain all previous ticks from channel
Expand All @@ -316,15 +289,15 @@ func (d *Discovery) findPeers(ctx context.Context) {
case <-findCtx.Done():
d.metrics.observeFindPeers(ctx.Err() != nil, findCtx != nil)
log.Debugw("found enough peers", "amount", d.set.Size())
return
return true
case <-ticker.C:
log.Warn("wasn't able to find new peers for long time")
continue
case p, ok := <-peers:
if !ok {
d.metrics.observeFindPeers(ctx.Err() != nil, findCtx != nil)
log.Debugw("discovery channel closed", "find_is_canceled", findCtx.Err() != nil)
return
return d.set.Size() >= d.set.Limit()
}

peer := p
Expand All @@ -333,12 +306,21 @@ func (d *Discovery) findPeers(ctx context.Context) {
log.Debug("find has been canceled, skip peer")
return nil
}
// pass the cancel so that we cancel FindPeers when we connected to enough peers
// we don't pass findCtx so that we don't cancel outgoing connections
if d.handlePeerFound(ctx, findCancel, peer) {
amount.Add(1)
log.Debugw("found peer", "peer", peer.ID, "found_amount", amount.Load())

// we don't pass findCtx so that we don't cancel in progress connections
// that are likely to be valuable
if !d.handlePeerFound(ctx, peer) {
return nil
}

size := d.set.Size()
log.Debugw("found peer", "peer", peer.ID, "found_amount", size)
if size < d.set.Limit() {
return nil
}

log.Infow("discovered wanted peers", "amount", size)
findCancel()
return nil
})
}
Expand Down
Loading

0 comments on commit 2ebbf3c

Please sign in to comment.