Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(share/discovery)!: revamp Discovery #2117

Merged
merged 59 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3e5abdb
prog
distractedm1nd Apr 20, 2023
e3b8595
only adding connected peers
distractedm1nd Apr 21, 2023
1f9a052
improve logic a bit
Wondertan Apr 21, 2023
f058a21
add connecting map
Wondertan Apr 21, 2023
48732ff
cleanup the connecting map
Wondertan Apr 21, 2023
3f3c287
limit connections
Wondertan Apr 21, 2023
0148dd2
dialTimeout -> connectTimeout
Wondertan Apr 21, 2023
23b590e
unlock on error
Wondertan Apr 24, 2023
9943c17
make connect timeout a constatnt
Wondertan Apr 24, 2023
e58c3d5
discovery parameters
Wondertan Apr 26, 2023
c717917
actually break the config, refine defaults and allow disabling discov…
Wondertan Apr 26, 2023
ab03470
fixing log and peer limit deactivation comment
distractedm1nd Apr 27, 2023
7cf7287
removing storage of ctx on Discovery
distractedm1nd Apr 27, 2023
86a8a06
notify subscribers directly after adding
distractedm1nd Apr 27, 2023
14adb1c
restarting FindPeers until limit is reached
distractedm1nd Apr 27, 2023
8b8941f
respecting ctx in findPeers loop
distractedm1nd Apr 27, 2023
c40f68e
start discovery asap
Wondertan Apr 26, 2023
4917621
case for already connected peer
Wondertan Apr 27, 2023
ca3a4be
protect rather than tag
Wondertan Apr 27, 2023
f7c413a
first working iteration for tests
Wondertan Apr 27, 2023
c5e0d00
set and backoff audit + remove retrying for FindPeers
Wondertan Apr 27, 2023
c382f16
the least flaky version of the test; 2/100 fails
Wondertan Apr 27, 2023
d96ade7
- stop wait group when enough peers found
walldiss Apr 28, 2023
2005330
log Error if Discovery is unable to find new peers for long time
walldiss Apr 28, 2023
37a159e
rerun findPeers if previous run stopped too early
walldiss Apr 28, 2023
82eb331
- do not connect to already connected peers
walldiss Apr 28, 2023
36deab8
improve logs
walldiss Apr 28, 2023
d83f2ef
increase time
Wondertan Apr 27, 2023
c154073
another attempt to deflake the test
Wondertan Apr 28, 2023
cf20647
various fixes
Wondertan Apr 28, 2023
d139979
update backoff logic to deflake test
Wondertan Apr 28, 2023
72548fb
more fixes
Wondertan Apr 28, 2023
2869a45
decrease backoff timeout
Wondertan Apr 28, 2023
c723d5d
drain ticker and timer channel before reset
walldiss Apr 28, 2023
439580b
unflake the test by removing unnecessary stop of the timer
Wondertan Apr 30, 2023
a4d2b27
log as found only valid peers
Wondertan May 2, 2023
dd8abcf
fix shadow read for add peers to set
walldiss May 2, 2023
32050eb
another backoff cleanup
Wondertan May 2, 2023
de865a9
small fix
Wondertan May 2, 2023
0b56b79
add delay for findpeers retry
walldiss May 2, 2023
0796147
make findPeers fast retry delay a var to modify it in tests
walldiss May 2, 2023
8eed668
fix comment
Wondertan May 2, 2023
38d97e9
notify when we actually need the discovery, instead of periodic check…
Wondertan May 2, 2023
c8fb086
fix comment
Wondertan May 2, 2023
6cb5cdf
simplify the code further
Wondertan May 2, 2023
0ebf3af
rework tests to use real swarm. Mocknet has some bug regarding connec…
Wondertan May 2, 2023
3ba19e9
ensurePeers should depend on global state rather than local
Wondertan May 2, 2023
d8b85d6
int back to uint
Wondertan May 2, 2023
8e20342
apply review suggestions
walldiss May 3, 2023
acf54f2
fix config test
walldiss May 3, 2023
9e13270
Merge branch 'main' into discovery-dial-timeout
walldiss May 3, 2023
8ec8343
don't overwrite peerLimit
walldiss May 3, 2023
cc15bbf
Merge remote-tracking branch 'celestia/discovery-dial-timeout' into d…
walldiss May 3, 2023
7f05b06
fix RetryTimeout default value
walldiss May 3, 2023
0fc94c2
do not shadow logger
walldiss May 3, 2023
eb9dc51
restructure code and update/fix comments
Wondertan May 3, 2023
2f16224
review fixes
Wondertan May 3, 2023
cf38663
document rendezvous
Wondertan May 3, 2023
77a4299
Merge branch 'main' into discovery-dial-timeout
Wondertan May 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 4 additions & 21 deletions nodebuilder/share/config.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,13 @@
package share

import (
"errors"
"fmt"
"time"

disc "github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/p2p/peers"
"github.com/celestiaorg/celestia-node/share/p2p/shrexeds"
"github.com/celestiaorg/celestia-node/share/p2p/shrexnd"
)

var (
ErrNegativeInterval = errors.New("interval must be positive")
)

type Config struct {
// PeersLimit defines how many peers will be added during discovery.
PeersLimit uint
// DiscoveryInterval is an interval between discovery sessions.
DiscoveryInterval time.Duration
// AdvertiseInterval is a interval between advertising sessions.
// NOTE: only full and bridge can advertise themselves.
AdvertiseInterval time.Duration
// UseShareExchange is a flag toggling the usage of shrex protocols for blocksync.
// NOTE: This config variable only has an effect on full and bridge nodes.
UseShareExchange bool
Expand All @@ -31,25 +17,22 @@ type Config struct {
ShrExNDParams *shrexnd.Parameters
// PeerManagerParams sets peer-manager configuration parameters
PeerManagerParams peers.Parameters
// Discovery sets peer discovery configuration parameters.
Discovery disc.Parameters
}

func DefaultConfig() Config {
return Config{
PeersLimit: 5,
DiscoveryInterval: time.Second * 30,
AdvertiseInterval: time.Second * 30,
UseShareExchange: true,
ShrExEDSParams: shrexeds.DefaultParameters(),
ShrExNDParams: shrexnd.DefaultParameters(),
PeerManagerParams: peers.DefaultParameters(),
Discovery: disc.DefaultParameters(),
}
}

// Validate performs basic validation of the config.
func (cfg *Config) Validate() error {
if cfg.DiscoveryInterval <= 0 || cfg.AdvertiseInterval <= 0 {
return fmt.Errorf("nodebuilder/share: %s", ErrNegativeInterval)
}
if err := cfg.ShrExNDParams.Validate(); err != nil {
return err
}
Expand Down
4 changes: 1 addition & 3 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discove
return disc.NewDiscovery(
h,
routingdisc.NewRoutingDiscovery(r),
cfg.PeersLimit,
cfg.DiscoveryInterval,
cfg.AdvertiseInterval,
cfg.Discovery,
)
}
}
Expand Down
9 changes: 4 additions & 5 deletions nodebuilder/tests/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func TestRestartNodeDiscovery(t *testing.T) {
const fullNodes = 2

setTimeInterval(cfg, defaultTimeInterval)
cfg.Share.PeersLimit = fullNodes
cfg.Share.Discovery.PeersLimit = fullNodes
bridge := sw.NewNodeWithConfig(node.Bridge, cfg)

ctx, cancel := context.WithTimeout(context.Background(), swamp.DefaultTestTimeout)
Expand All @@ -184,7 +184,7 @@ func TestRestartNodeDiscovery(t *testing.T) {
nodes := make([]*nodebuilder.Node, fullNodes)
cfg = nodebuilder.DefaultConfig(node.Full)
setTimeInterval(cfg, defaultTimeInterval)
cfg.Share.PeersLimit = fullNodes
cfg.Share.Discovery.PeersLimit = fullNodes
nodesConfig := nodebuilder.WithBootstrappers([]peer.AddrInfo{*bridgeAddr})
for index := 0; index < fullNodes; index++ {
nodes[index] = sw.NewNodeWithConfig(node.Full, cfg, nodesConfig)
Expand All @@ -201,7 +201,7 @@ func TestRestartNodeDiscovery(t *testing.T) {
// create one more node with disabled discovery
cfg = nodebuilder.DefaultConfig(node.Full)
setTimeInterval(cfg, defaultTimeInterval)
cfg.Share.PeersLimit = 0
cfg.Share.Discovery.PeersLimit = 0
node := sw.NewNodeWithConfig(node.Full, cfg, nodesConfig)
connectSub, err := nodes[0].Host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{})
require.NoError(t, err)
Expand All @@ -215,6 +215,5 @@ func TestRestartNodeDiscovery(t *testing.T) {

func setTimeInterval(cfg *nodebuilder.Config, interval time.Duration) {
cfg.P2P.RoutingTableRefreshPeriod = interval
cfg.Share.DiscoveryInterval = interval
cfg.Share.AdvertiseInterval = interval
cfg.Share.Discovery.AdvertiseInterval = interval
}
75 changes: 43 additions & 32 deletions share/availability/discovery/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package discovery

import (
"context"
"fmt"
"errors"
"sync"
"time"

Expand All @@ -11,10 +11,17 @@ import (
"github.com/libp2p/go-libp2p/p2p/discovery/backoff"
)

// gcInterval is a default period after which disconnected peers will be removed from cache
const gcInterval = time.Hour
const (
// gcInterval is a default period after which disconnected peers will be removed from cache
gcInterval = time.Minute
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
// connectTimeout is the timeout used for dialing peers and discovering peer addresses.
connectTimeout = time.Minute * 2
)

var defaultBackoffFactory = backoff.NewFixedBackoff(time.Hour)
var (
defaultBackoffFactory = backoff.NewFixedBackoff(time.Minute * 10)
errBackoffNotEnded = errors.New("share/discovery: backoff period has not ended")
)

// backoffConnector wraps a libp2p.Host to establish a connection with peers
// with adding a delay for the next connection attempt.
Expand All @@ -23,7 +30,7 @@ type backoffConnector struct {
backoff backoff.BackoffFactory

cacheLk sync.Mutex
cacheData map[peer.ID]*backoffData
cacheData map[peer.ID]backoffData
}

// backoffData stores time when next connection attempt with the remote peer.
Expand All @@ -36,48 +43,52 @@ func newBackoffConnector(h host.Host, factory backoff.BackoffFactory) *backoffCo
return &backoffConnector{
h: h,
backoff: factory,
cacheData: make(map[peer.ID]*backoffData),
cacheData: make(map[peer.ID]backoffData),
}
}

// Connect puts peer to the backoffCache and tries to establish a connection with it.
func (b *backoffConnector) Connect(ctx context.Context, p peer.AddrInfo) error {
// we should lock the mutex before calling connectionData and not inside because otherwise it could
// be modified from another goroutine as it returns a pointer
b.cacheLk.Lock()
cache := b.connectionData(p.ID)
if time.Now().Before(cache.nexttry) {
b.cacheLk.Unlock()
return fmt.Errorf("share/discovery: backoff period has not ended for peer=%s", p.ID.String())
if b.HasBackoff(p.ID) {
return errBackoffNotEnded
}
cache.nexttry = time.Now().Add(cache.backoff.Delay())
b.cacheLk.Unlock()
return b.h.Connect(ctx, p)

ctx, cancel := context.WithTimeout(ctx, connectTimeout)
defer cancel()

err := b.h.Connect(ctx, p)
// we don't want to add backoff when the context is canceled.
if !errors.Is(err, context.Canceled) {
b.Backoff(p.ID)
}
return err
}

// connectionData returns backoffData from the map if it was stored, otherwise it will instantiate
// a new one.
func (b *backoffConnector) connectionData(p peer.ID) *backoffData {
cache, ok := b.cacheData[p]
// Backoff adds or extends backoff delay for the peer.
func (b *backoffConnector) Backoff(p peer.ID) {
b.cacheLk.Lock()
defer b.cacheLk.Unlock()

data, ok := b.cacheData[p]
if !ok {
cache = &backoffData{}
cache.backoff = b.backoff()
b.cacheData[p] = cache
data = backoffData{}
data.backoff = b.backoff()
b.cacheData[p] = data
}
return cache

data.nexttry = time.Now().Add(data.backoff.Delay())
b.cacheData[p] = data
}

// RestartBackoff resets delay time between attempts and adds a delay for the next connection
// attempt to remote peer. It will mostly be called when host receives a notification that remote
// peer was disconnected.
func (b *backoffConnector) RestartBackoff(p peer.ID) {
// HasBackoff checks if peer is in backoff.
func (b *backoffConnector) HasBackoff(p peer.ID) bool {
b.cacheLk.Lock()
defer b.cacheLk.Unlock()
cache := b.connectionData(p)
cache.backoff.Reset()
cache.nexttry = time.Now().Add(cache.backoff.Delay())
cache, ok := b.cacheData[p]
b.cacheLk.Unlock()
return ok && time.Now().Before(cache.nexttry)
}

// GC is a perpetual GCing loop.
func (b *backoffConnector) GC(ctx context.Context) {
ticker := time.NewTicker(gcInterval)
for {
Expand Down
2 changes: 1 addition & 1 deletion share/availability/discovery/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,6 @@ func TestBackoff_ResetBackoffPeriod(t *testing.T) {
info := host.InfoFromHost(m.Hosts()[1])
require.NoError(t, b.Connect(ctx, *info))
nexttry := b.cacheData[info.ID].nexttry
b.RestartBackoff(info.ID)
b.Backoff(info.ID)
require.True(t, b.cacheData[info.ID].nexttry.After(nexttry))
}
Loading
Loading