Skip to content

Commit

Permalink
set and backoff audit + remove retrying for FindPeers
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Apr 27, 2023
1 parent f7c413a commit c5e0d00
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 88 deletions.
49 changes: 30 additions & 19 deletions share/availability/discovery/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

// gcInterval is a default period after which disconnected peers will be removed from cache
const (
gcInterval = time.Hour
gcInterval = time.Minute
// connectTimeout is the timeout used for dialing peers and discovering peer addresses.
connectTimeout = time.Minute * 2
)
Expand All @@ -30,7 +30,7 @@ type backoffConnector struct {
backoff backoff.BackoffFactory

cacheLk sync.Mutex
cacheData map[peer.ID]*backoffData
cacheData map[peer.ID]backoffData
}

// backoffData stores time when next connection attempt with the remote peer.
Expand All @@ -43,35 +43,48 @@ func newBackoffConnector(h host.Host, factory backoff.BackoffFactory) *backoffCo
return &backoffConnector{
h: h,
backoff: factory,
cacheData: make(map[peer.ID]*backoffData),
cacheData: make(map[peer.ID]backoffData),
}
}

// Connect puts peer to the backoffCache and tries to establish a connection with it.
func (b *backoffConnector) Connect(ctx context.Context, p peer.AddrInfo) error {
// we should lock the mutex before calling connectionData and not inside because otherwise it could
// we should lock the mutex before calling backoffData and not inside because otherwise it could
// be modified from another goroutine as it returns a pointer
b.cacheLk.Lock()
cache := b.connectionData(p.ID)
cache := b.backoffData(p.ID)
if time.Now().Before(cache.nexttry) {
b.cacheLk.Unlock()
return errBackoffNotEnded
}
cache.nexttry = time.Now().Add(cache.backoff.Delay())
b.cacheLk.Unlock()

ctx, cancel := context.WithTimeout(ctx, connectTimeout)
defer cancel()

return b.h.Connect(ctx, p)
err := b.h.Connect(ctx, p)
// we don't want to add backoff when the context is canceled.
if !errors.Is(err, context.Canceled) {
b.Backoff(p.ID)
}
return err
}

// connectionData returns backoffData from the map if it was stored, otherwise it will instantiate
func (b *backoffConnector) Backoff(p peer.ID) {
data := b.backoffData(p)
data.nexttry = time.Now().Add(data.backoff.Delay())

b.cacheLk.Lock()
b.cacheData[p] = data
b.cacheLk.Unlock()
}

// backoffData returns backoffData from the map if it was stored, otherwise it will instantiate
// a new one.
func (b *backoffConnector) connectionData(p peer.ID) *backoffData {
func (b *backoffConnector) backoffData(p peer.ID) backoffData {
b.cacheLk.Lock()
defer b.cacheLk.Unlock()

cache, ok := b.cacheData[p]
if !ok {
cache = &backoffData{}
cache = backoffData{}
cache.backoff = b.backoff()
b.cacheData[p] = cache
}
Expand All @@ -86,15 +99,13 @@ func (b *backoffConnector) RemoveBackoff(p peer.ID) {
delete(b.cacheData, p)
}

// RestartBackoff resets delay time between attempts and adds a delay for the next connection
// ResetBackoff resets delay time between attempts and adds a delay for the next connection
// attempt to remote peer. It will mostly be called when host receives a notification that remote
// peer was disconnected.
func (b *backoffConnector) RestartBackoff(p peer.ID) {
b.cacheLk.Lock()
defer b.cacheLk.Unlock()
cache := b.connectionData(p)
func (b *backoffConnector) ResetBackoff(p peer.ID) {
cache := b.backoffData(p)
cache.backoff.Reset()
cache.nexttry = time.Now().Add(cache.backoff.Delay())
b.Backoff(p)
}

func (b *backoffConnector) GC(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion share/availability/discovery/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ func TestBackoff_ResetBackoffPeriod(t *testing.T) {
info := host.InfoFromHost(m.Hosts()[1])
require.NoError(t, b.Connect(ctx, *info))
nexttry := b.cacheData[info.ID].nexttry
b.RestartBackoff(info.ID)
b.ResetBackoff(info.ID)
require.True(t, b.cacheData[info.ID].nexttry.After(nexttry))
}
59 changes: 19 additions & 40 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package discovery

import (
"context"
"errors"
"sync"
"time"

Expand All @@ -12,7 +11,6 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -125,16 +123,14 @@ func (d *Discovery) handlePeerFound(ctx context.Context, peer peer.AddrInfo, can
}

if d.host.Network().Connectedness(peer.ID) == network.Connected {
err := d.set.Add(peer.ID)
if err != nil {
return
}
d.set.Add(peer.ID)
log.Debugw("added peer to set", "id", peer.ID)
if d.set.Size() >= d.set.Limit() {
log.Infow("soft peer limit reached", "count", d.set.Size())
cancelFind()
}
d.host.ConnManager().Protect(peer.ID, topic)
d.connector.Backoff(peer.ID)

// and notify our subscribers
d.onUpdatedPeers(peer.ID, true)
Expand All @@ -151,11 +147,6 @@ func (d *Discovery) handlePeerFound(ctx context.Context, peer peer.AddrInfo, can

err := d.connector.Connect(ctx, peer)
if err != nil {
// we don't want to add backoff when the context is canceled.
if errors.Is(err, context.Canceled) || errors.Is(err, routing.ErrNotFound) {
d.connector.RemoveBackoff(peer.ID)
}

d.connectingLk.Lock()
delete(d.connecting, peer.ID)
d.connectingLk.Unlock()
Expand Down Expand Up @@ -209,7 +200,7 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
}

d.host.ConnManager().UntagPeer(evnt.Peer, topic)
d.connector.RestartBackoff(evnt.Peer)
d.connector.ResetBackoff(evnt.Peer)
d.set.Remove(evnt.Peer)
d.onUpdatedPeers(evnt.Peer, false)
log.Debugw("removed peer from the peer set",
Expand All @@ -223,12 +214,8 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
continue
}

err = d.set.Add(peerID)
if err != nil {
log.Debugw("failed to add peer to set", "peer", peerID, "error", err)
continue
}
log.Debugw("added peer to set", "id", peerID)
d.set.Add(peerID)
// and notify our subscribers
d.onUpdatedPeers(peerID, true)

Expand All @@ -255,7 +242,6 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
d.findPeers(ctx)

t.Reset(d.params.DiscoveryInterval)
log.Debugw("restarted")
select {
case <-t.C:
case <-ctx.Done():
Expand All @@ -276,30 +262,23 @@ func (d *Discovery) findPeers(ctx context.Context) {
// limit to minimize chances of overreaching the limit
wg.SetLimit(d.set.Limit())

for d.set.Size() < d.set.Limit() && wgCtx.Err() == nil {
log.Debugw("finding peers", "remaining", d.set.Limit()-d.set.Size())
findCtx, findCancel := context.WithCancel(wgCtx)
defer findCancel()
log.Debugw("finding peers", "remaining", d.set.Limit()-d.set.Size())
findCtx, findCancel := context.WithCancel(wgCtx)
defer findCancel()

peers, err := d.disc.FindPeers(findCtx, topic)
if err != nil {
log.Warn(err)
return
}
peers, err := d.disc.FindPeers(findCtx, topic)
if err != nil {
log.Warn(err)
return
}

for p := range peers {
peer := p
wg.Go(func() error {
// pass the cancel so that we cancel FindPeers when we connected to enough peers
d.handlePeerFound(findCtx, peer, findCancel)
return nil
})

// break the loop if we have found enough peers
if d.set.Size() >= d.set.Limit() {
break
}
}
for p := range peers {
peer := p
wg.Go(func() error {
// pass the cancel so that we cancel FindPeers when we connected to enough peers
d.handlePeerFound(findCtx, peer, findCancel)
return nil
})
}

// we expect no errors
Expand Down
16 changes: 5 additions & 11 deletions share/availability/discovery/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package discovery

import (
"context"
"errors"
"sync"

"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -45,13 +44,8 @@ func (ps *limitedSet) Size() int {
}

// Add attempts to add the given peer into the set.
// This operation will fail if the number of peers in the set is equal to size.
func (ps *limitedSet) Add(p peer.ID) error {
func (ps *limitedSet) Add(p peer.ID) {
ps.lk.Lock()
if _, ok := ps.ps[p]; ok {
ps.lk.Unlock()
return errors.New("share: discovery: peer already added")
}
ps.ps[p] = struct{}{}
ps.lk.Unlock()

Expand All @@ -61,7 +55,7 @@ func (ps *limitedSet) Add(p peer.ID) error {
select {
case ps.waitPeer <- p:
default:
return nil
return
}
}
}
Expand All @@ -76,16 +70,16 @@ func (ps *limitedSet) Remove(id peer.ID) {

// Peers returns all discovered peers from the set.
func (ps *limitedSet) Peers(ctx context.Context) ([]peer.ID, error) {
ps.lk.Lock()
ps.lk.RLock()
if len(ps.ps) > 0 {
out := make([]peer.ID, 0, len(ps.ps))
for p := range ps.ps {
out = append(out, p)
}
ps.lk.Unlock()
ps.lk.RUnlock()
return out, nil
}
ps.lk.Unlock()
ps.lk.RUnlock()

// block until a new peer will be discovered
select {
Expand Down
24 changes: 7 additions & 17 deletions share/availability/discovery/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,17 @@ func TestSet_TryAdd(t *testing.T) {
require.NoError(t, err)

set := newLimitedSet(1)
require.NoError(t, set.Add(h.ID()))
set.Add(h.ID())
require.True(t, set.Contains(h.ID()))
}

func TestSet_TryAddFails(t *testing.T) {
m := mocknet.New()
h1, err := m.GenPeer()
require.NoError(t, err)

set := newLimitedSet(1)
require.NoError(t, set.Add(h1.ID()))
require.Error(t, set.Add(h1.ID()))
}

func TestSet_Remove(t *testing.T) {
m := mocknet.New()
h, err := m.GenPeer()
require.NoError(t, err)

set := newLimitedSet(1)
require.NoError(t, set.Add(h.ID()))
set.Add(h.ID())
set.Remove(h.ID())
require.False(t, set.Contains(h.ID()))
}
Expand All @@ -48,8 +38,8 @@ func TestSet_Peers(t *testing.T) {
require.NoError(t, err)

set := newLimitedSet(2)
require.NoError(t, set.Add(h1.ID()))
require.NoError(t, set.Add(h2.ID()))
set.Add(h1.ID())
set.Add(h2.ID())

ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
t.Cleanup(cancel)
Expand All @@ -69,7 +59,7 @@ func TestSet_WaitPeers(t *testing.T) {
set := newLimitedSet(2)
go func() {
time.Sleep(time.Millisecond * 500)
set.Add(h1.ID()) //nolint:errcheck
set.Add(h1.ID())
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
Expand All @@ -89,8 +79,8 @@ func TestSet_Size(t *testing.T) {
require.NoError(t, err)

set := newLimitedSet(2)
require.NoError(t, set.Add(h1.ID()))
require.NoError(t, set.Add(h2.ID()))
set.Add(h1.ID())
set.Add(h2.ID())
require.Equal(t, 2, set.Size())
set.Remove(h2.ID())
require.Equal(t, 1, set.Size())
Expand Down

0 comments on commit c5e0d00

Please sign in to comment.