Skip to content

Commit

Permalink
fix(share/discovery)!: revamp Discovery (#2117)
Browse files Browse the repository at this point in the history
Closes #2107 

There was a discrepancy between the amount of peers inside of the peer manager full node pool, and the limitedSet inside of discovery. It was possible for the node to get in a state where it had no more full nodes to sample from, because the peers inside the limitedSet were blocking on the connection, and never being handed off to the OnPeersUpdate callback. 

To fix this, we (@Wondertan, @walldiss  and I):
* Set a context deadline for the call to Connect with a peer. Previously, it would never timeout. This is because RoutedHost will block until the context is canceled, even though there is a peer dial timeout.
* Only add peers to the limitedSet after we have successfully connected. This turns the previous peerLimit into a "soft" limit. This prevents in-progress connections from clogging spots in the limitedSet, but also allows for more peers to end up in the limited set than the set limit (we don't throw away already connected peers).
* Removed the timer reset upon a peer disconnecting. This peer disconnection happens often, causing a significant delay in the next call to FindPeers.
* Added logs, cleaned up various comments
* Backoff
	* Adds a RemoveBackoff method to the backoffConnector, to remove cancelled connections from the cache
	* Adds Backoff method
* Cleans up limited set
* Added tests for Discovery
* Handle the case where discovered peer is already connected
* Introduces importable Discovery params
* Revisits constans

Co-authored-by: Wondertan <hlibwondertan@gmail.com>
Co-authored-by: Vlad <vlad@celestia.org>
Co-authored-by: Vlad <13818348+walldiss@users.noreply.github.com>
  • Loading branch information
4 people committed May 3, 2023
1 parent 2055f22 commit 1329f8b
Show file tree
Hide file tree
Showing 12 changed files with 515 additions and 247 deletions.
25 changes: 4 additions & 21 deletions nodebuilder/share/config.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
package share

import (
"errors"
"fmt"
"time"

disc "github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
)

var (
ErrNegativeInterval = errors.New("interval must be positive")
)

type Config struct {
// PeersLimit defines how many peers will be added during discovery.
PeersLimit uint
// DiscoveryInterval is an interval between discovery sessions.
DiscoveryInterval time.Duration
// AdvertiseInterval is a interval between advertising sessions.
// NOTE: only full and bridge can advertise themselves.
AdvertiseInterval time.Duration
// UseShareExchange is a flag toggling the usage of shrex protocols for blocksync.
// NOTE: This config variable only has an effect on full and bridge nodes.
UseShareExchange bool
Expand All @@ -31,25 +17,22 @@ type Config struct {
ShrExNDParams *shrexnd.Parameters
// PeerManagerParams sets peer-manager configuration parameters
PeerManagerParams peers.Parameters
// Discovery sets peer discovery configuration parameters.
Discovery disc.Parameters
}

func DefaultConfig() Config {
return Config{
PeersLimit: 5,
DiscoveryInterval: time.Second * 30,
AdvertiseInterval: time.Second * 30,
UseShareExchange: true,
ShrExEDSParams: shrexeds.DefaultParameters(),
ShrExNDParams: shrexnd.DefaultParameters(),
PeerManagerParams: peers.DefaultParameters(),
Discovery: disc.DefaultParameters(),
}
}

// Validate performs basic validation of the config.
func (cfg *Config) Validate() error {
if cfg.DiscoveryInterval <= 0 || cfg.AdvertiseInterval <= 0 {
return fmt.Errorf("nodebuilder/share: %s", ErrNegativeInterval)
}
if err := cfg.ShrExNDParams.Validate(); err != nil {
return err
}
Expand Down
4 changes: 1 addition & 3 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discove
return disc.NewDiscovery(
h,
routingdisc.NewRoutingDiscovery(r),
cfg.PeersLimit,
cfg.DiscoveryInterval,
cfg.AdvertiseInterval,
cfg.Discovery,
)
}
}
Expand Down
9 changes: 4 additions & 5 deletions nodebuilder/tests/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestRestartNodeDiscovery(t *testing.T) {
const fullNodes = 2

setTimeInterval(cfg, defaultTimeInterval)
cfg.Share.PeersLimit = fullNodes
cfg.Share.Discovery.PeersLimit = fullNodes
bridge := sw.NewNodeWithConfig(node.Bridge, cfg)

ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout)
Expand All @@ -184,7 +184,7 @@ func TestRestartNodeDiscovery(t *testing.T) {
nodes := make([]*nodebuilder.Node, fullNodes)
cfg = nodebuilder.DefaultConfig(node.Full)
setTimeInterval(cfg, defaultTimeInterval)
cfg.Share.PeersLimit = fullNodes
cfg.Share.Discovery.PeersLimit = fullNodes
nodesConfig := nodebuilder.WithBootstrappers([]peer.AddrInfo{*bridgeAddr})
for index := 0; index < fullNodes; index++ {
nodes[index] = sw.NewNodeWithConfig(node.Full, cfg, nodesConfig)
Expand All @@ -201,7 +201,7 @@ func TestRestartNodeDiscovery(t *testing.T) {
// create one more node with disabled discovery
cfg = nodebuilder.DefaultConfig(node.Full)
setTimeInterval(cfg, defaultTimeInterval)
cfg.Share.PeersLimit = 0
cfg.Share.Discovery.PeersLimit = 0
node := sw.NewNodeWithConfig(node.Full, cfg, nodesConfig)
connectSub, err := nodes[0].Host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
require.NoError(t, err)
Expand All @@ -215,6 +215,5 @@ func TestRestartNodeDiscovery(t *testing.T) {

func setTimeInterval(cfg *nodebuilder.Config, interval time.Duration) {
cfg.P2P.RoutingTableRefreshPeriod = interval
cfg.Share.DiscoveryInterval = interval
cfg.Share.AdvertiseInterval = interval
cfg.Share.Discovery.AdvertiseInterval = interval
}
75 changes: 43 additions & 32 deletions share/availability/discovery/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package discovery

import (
"context"
"fmt"
"errors"
"sync"
"time"

Expand All @@ -11,10 +11,17 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
)

// gcInterval is a default period after which disconnected peers will be removed from cache
const gcInterval = time.Hour
const (
// gcInterval is a default period after which disconnected peers will be removed from cache
gcInterval = time.Minute
// connectTimeout is the timeout used for dialing peers and discovering peer addresses.
connectTimeout = time.Minute * 2
)

var defaultBackoffFactory = backoff.NewFixedBackoff(time.Hour)
var (
defaultBackoffFactory = backoff.NewFixedBackoff(time.Minute * 10)
errBackoffNotEnded = errors.New("share/discovery: backoff period has not ended")
)

// backoffConnector wraps a libp2p.Host to establish a connection with peers
// with adding a delay for the next connection attempt.
Expand All @@ -23,7 +30,7 @@ type backoffConnector struct {
backoff backoff.BackoffFactory

cacheLk sync.Mutex
cacheData map[peer.ID]*backoffData
cacheData map[peer.ID]backoffData
}

// backoffData stores time when next connection attempt with the remote peer.
Expand All @@ -36,48 +43,52 @@ func newBackoffConnector(h host.Host, factory backoff.BackoffFactory) *backoffCo
return &backoffConnector{
h: h,
backoff: factory,
cacheData: make(map[peer.ID]*backoffData),
cacheData: make(map[peer.ID]backoffData),
}
}

// Connect puts peer to the backoffCache and tries to establish a connection with it.
func (b *backoffConnector) Connect(ctx context.Context, p peer.AddrInfo) error {
// we should lock the mutex before calling connectionData and not inside because otherwise it could
// be modified from another goroutine as it returns a pointer
b.cacheLk.Lock()
cache := b.connectionData(p.ID)
if time.Now().Before(cache.nexttry) {
b.cacheLk.Unlock()
return fmt.Errorf("share/discovery: backoff period has not ended for peer=%s", p.ID.String())
if b.HasBackoff(p.ID) {
return errBackoffNotEnded
}
cache.nexttry = time.Now().Add(cache.backoff.Delay())
b.cacheLk.Unlock()
return b.h.Connect(ctx, p)

ctx, cancel := context.WithTimeout(ctx, connectTimeout)
defer cancel()

err := b.h.Connect(ctx, p)
// we don't want to add backoff when the context is canceled.
if !errors.Is(err, context.Canceled) {
b.Backoff(p.ID)
}
return err
}

// connectionData returns backoffData from the map if it was stored, otherwise it will instantiate
// a new one.
func (b *backoffConnector) connectionData(p peer.ID) *backoffData {
cache, ok := b.cacheData[p]
// Backoff adds or extends backoff delay for the peer.
func (b *backoffConnector) Backoff(p peer.ID) {
b.cacheLk.Lock()
defer b.cacheLk.Unlock()

data, ok := b.cacheData[p]
if !ok {
cache = &backoffData{}
cache.backoff = b.backoff()
b.cacheData[p] = cache
data = backoffData{}
data.backoff = b.backoff()
b.cacheData[p] = data
}
return cache

data.nexttry = time.Now().Add(data.backoff.Delay())
b.cacheData[p] = data
}

// RestartBackoff resets delay time between attempts and adds a delay for the next connection
// attempt to remote peer. It will mostly be called when host receives a notification that remote
// peer was disconnected.
func (b *backoffConnector) RestartBackoff(p peer.ID) {
// HasBackoff checks if peer is in backoff.
func (b *backoffConnector) HasBackoff(p peer.ID) bool {
b.cacheLk.Lock()
defer b.cacheLk.Unlock()
cache := b.connectionData(p)
cache.backoff.Reset()
cache.nexttry = time.Now().Add(cache.backoff.Delay())
cache, ok := b.cacheData[p]
b.cacheLk.Unlock()
return ok && time.Now().Before(cache.nexttry)
}

// GC is a perpetual GCing loop.
func (b *backoffConnector) GC(ctx context.Context) {
ticker := time.NewTicker(gcInterval)
for {
Expand Down
2 changes: 1 addition & 1 deletion share/availability/discovery/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ func TestBackoff_ResetBackoffPeriod(t *testing.T) {
info := host.InfoFromHost(m.Hosts()[1])
require.NoError(t, b.Connect(ctx, *info))
nexttry := b.cacheData[info.ID].nexttry
b.RestartBackoff(info.ID)
b.Backoff(info.ID)
require.True(t, b.cacheData[info.ID].nexttry.After(nexttry))
}
Loading

0 comments on commit 1329f8b

Please sign in to comment.