Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(share/discovery)!: revamp Discovery #2117

Merged
merged 59 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
3e5abdb
prog
distractedm1nd Apr 20, 2023
e3b8595
only adding connected peers
distractedm1nd Apr 21, 2023
1f9a052
improve logic a bit
Wondertan Apr 21, 2023
f058a21
add connecting map
Wondertan Apr 21, 2023
48732ff
cleanup the connecting map
Wondertan Apr 21, 2023
3f3c287
limit connections
Wondertan Apr 21, 2023
0148dd2
dialTimeout -> connectTimeout
Wondertan Apr 21, 2023
23b590e
unlock on error
Wondertan Apr 24, 2023
9943c17
make connect timeout a constatnt
Wondertan Apr 24, 2023
e58c3d5
discovery parameters
Wondertan Apr 26, 2023
c717917
actually break the config, refine defaults and allow disabling discov…
Wondertan Apr 26, 2023
ab03470
fixing log and peer limit deactivation comment
distractedm1nd Apr 27, 2023
7cf7287
removing storage of ctx on Discovery
distractedm1nd Apr 27, 2023
86a8a06
notify subscribers directly after adding
distractedm1nd Apr 27, 2023
14adb1c
restarting FindPeers until limit is reached
distractedm1nd Apr 27, 2023
8b8941f
respecting ctx in findPeers loop
distractedm1nd Apr 27, 2023
c40f68e
start discovery asap
Wondertan Apr 26, 2023
4917621
case for already connected peer
Wondertan Apr 27, 2023
ca3a4be
protect rather than tag
Wondertan Apr 27, 2023
f7c413a
first working iteration for tests
Wondertan Apr 27, 2023
c5e0d00
set and backoff audit + remove retrying for FindPeers
Wondertan Apr 27, 2023
c382f16
the least flaky version of the test; 2/100 fails
Wondertan Apr 27, 2023
d96ade7
- stop wait group when enough peers found
walldiss Apr 28, 2023
2005330
log Error if Discovery is unable to find new peers for long time
walldiss Apr 28, 2023
37a159e
rerun findPeers if previous run stopped too early
walldiss Apr 28, 2023
82eb331
- do not connect to already connected peers
walldiss Apr 28, 2023
36deab8
improve logs
walldiss Apr 28, 2023
d83f2ef
increase time
Wondertan Apr 27, 2023
c154073
another attempt to deflake the test
Wondertan Apr 28, 2023
cf20647
various fixes
Wondertan Apr 28, 2023
d139979
update backoff logic to deflake test
Wondertan Apr 28, 2023
72548fb
more fixes
Wondertan Apr 28, 2023
2869a45
decrease backoff timeout
Wondertan Apr 28, 2023
c723d5d
drain ticker and timer channel before reset
walldiss Apr 28, 2023
439580b
unflake the test by removing unnecessary stop of the timer
Wondertan Apr 30, 2023
a4d2b27
log as found only valid peers
Wondertan May 2, 2023
dd8abcf
fix shadow read for add peers to set
walldiss May 2, 2023
32050eb
another backoff cleanup
Wondertan May 2, 2023
de865a9
small fix
Wondertan May 2, 2023
0b56b79
add delay for findpeers retry
walldiss May 2, 2023
0796147
make findPeers fast retry delay a var to modify it in tests
walldiss May 2, 2023
8eed668
fix comment
Wondertan May 2, 2023
38d97e9
notify when we actually need the discovery, instead of periodic check…
Wondertan May 2, 2023
c8fb086
fix comment
Wondertan May 2, 2023
6cb5cdf
simplify the code further
Wondertan May 2, 2023
0ebf3af
rework tests to use real swarm. Mocknet has some bug regarding connec…
Wondertan May 2, 2023
3ba19e9
ensurePeers should depend on global state rather than local
Wondertan May 2, 2023
d8b85d6
int back to uint
Wondertan May 2, 2023
8e20342
apply review suggestions
walldiss May 3, 2023
acf54f2
fix config test
walldiss May 3, 2023
9e13270
Merge branch 'main' into discovery-dial-timeout
walldiss May 3, 2023
8ec8343
don't overwrite peerLimit
walldiss May 3, 2023
cc15bbf
Merge remote-tracking branch 'celestia/discovery-dial-timeout' into d…
walldiss May 3, 2023
7f05b06
fix RetryTimeout default value
walldiss May 3, 2023
0fc94c2
do not shadow logger
walldiss May 3, 2023
eb9dc51
restructure code and update/fix comments
Wondertan May 3, 2023
2f16224
review fixes
Wondertan May 3, 2023
cf38663
document rendezvous
Wondertan May 3, 2023
77a4299
Merge branch 'main' into discovery-dial-timeout
Wondertan May 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion nodebuilder/tests/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,5 @@ func TestRestartNodeDiscovery(t *testing.T) {

func setTimeInterval(cfg *nodebuilder.Config, interval time.Duration) {
cfg.P2P.RoutingTableRefreshPeriod = interval
cfg.Share.Discovery.DiscoveryRetryTimeout = interval
cfg.Share.Discovery.AdvertiseInterval = interval
}
5 changes: 1 addition & 4 deletions share/availability/discovery/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,7 @@ func (b *backoffConnector) HasBackoff(p peer.ID) bool {
b.cacheLk.Lock()
cache, ok := b.cacheData[p]
b.cacheLk.Unlock()
if ok && time.Now().Before(cache.nexttry) {
return true
}
return false
return ok && time.Now().Before(cache.nexttry)
}

// GC is a perpetual GCing loop.
Expand Down
26 changes: 14 additions & 12 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ const (
// events in libp2p
eventbusBufSize = 32

// findPeersStuckWarnDelay is the duration after which findPeers will log an error message to
// notify that it is stuck.
findPeersStuckWarnDelay = time.Minute
walldiss marked this conversation as resolved.
Show resolved Hide resolved
)

Expand All @@ -35,20 +37,20 @@ type Parameters struct {
// PeersLimit defines the soft limit of FNs to connect to via discovery.
// Set 0 to disable.
PeersLimit uint
// DiscoveryRetryTimeout is an interval between discovery attempts
// when we discovered lower than PeersLimit peers.
// Set -1 to disable.
DiscoveryRetryTimeout time.Duration
// AdvertiseInterval is a interval between advertising sessions.
// Set -1 to disable.
// NOTE: only full and bridge can advertise themselves.
AdvertiseInterval time.Duration
// discoveryRetryTimeout is an interval between discovery attempts
// when we discovered lower than PeersLimit peers.
// Set -1 to disable.
discoveryRetryTimeout time.Duration
}

func DefaultParameters() Parameters {
return Parameters{
PeersLimit: 5,
DiscoveryRetryTimeout: time.Second * 1,
discoveryRetryTimeout: time.Second,
AdvertiseInterval: time.Hour * 8,
}
}
Expand All @@ -68,7 +70,7 @@ type Discovery struct {
connector *backoffConnector
onUpdatedPeers OnUpdatedPeers

triggerDisq chan struct{}
triggerDisc chan struct{}

cancel context.CancelFunc
}
Expand All @@ -88,7 +90,7 @@ func NewDiscovery(
disc: d,
connector: newBackoffConnector(h, defaultBackoffFactory),
onUpdatedPeers: func(peer.ID, bool) {},
triggerDisq: make(chan struct{}),
triggerDisc: make(chan struct{}),
}
}

Expand All @@ -115,7 +117,7 @@ func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) {

func (d *Discovery) triggerDiscovery() {
select {
case d.triggerDisq <- struct{}{}:
case d.triggerDisc <- struct{}{}:
default:
}
}
Expand Down Expand Up @@ -153,11 +155,11 @@ func (d *Discovery) handlePeerFound(ctx context.Context, peer peer.AddrInfo) boo
}

if !d.set.Add(peer.ID) {
log.Debugw("peer is already in discovery set", "peer", peer.ID)
log.Debug("peer is already in discovery set")
return false
}
d.onUpdatedPeers(peer.ID, true)
log.Debugw("added peer to set", "peer", peer.ID)
log.Debug("added peer to set")
walldiss marked this conversation as resolved.
Show resolved Hide resolved

// tag to protect peer from being killed by ConnManager
// NOTE: This is does not protect from remote killing the connection.
Expand Down Expand Up @@ -220,7 +222,7 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
}()
go d.connector.GC(ctx)

t := time.NewTicker(d.params.DiscoveryRetryTimeout)
t := time.NewTicker(d.params.discoveryRetryTimeout)
defer t.Stop()
for {
// drain all previous ticks from channel
Expand All @@ -237,7 +239,7 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
}

select {
case <-d.triggerDisq:
case <-d.triggerDisc:
case <-ctx.Done():
return
}
Expand Down
4 changes: 2 additions & 2 deletions share/availability/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestDiscovery(t *testing.T) {

peerA := tn.discovery(Parameters{
PeersLimit: nodes,
DiscoveryRetryTimeout: time.Millisecond * 100,
discoveryRetryTimeout: time.Millisecond * 100,
AdvertiseInterval: -1, // we don't want to be found but only find
})

Expand All @@ -43,7 +43,7 @@ func TestDiscovery(t *testing.T) {
for i := range discs {
discs[i] = tn.discovery(Parameters{
PeersLimit: 0,
DiscoveryRetryTimeout: -1,
discoveryRetryTimeout: -1,
AdvertiseInterval: time.Millisecond * 100,
})

Expand Down
12 changes: 5 additions & 7 deletions share/availability/full/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ func Node(dn *availability_test.TestDagNet) *availability_test.TestNode {
}

func TestAvailability(getter share.Getter) *ShareAvailability {
disc := discovery.NewDiscovery(nil, routing.NewRoutingDiscovery(
routinghelpers.Null{}), discovery.Parameters{
PeersLimit: 10,
DiscoveryRetryTimeout: time.Second,
AdvertiseInterval: time.Second,
},
)
params := discovery.DefaultParameters()
params.AdvertiseInterval = time.Second
params.PeersLimit = 10
disc := discovery.NewDiscovery(nil,
routing.NewRoutingDiscovery(routinghelpers.Null{}), params)
return NewShareAvailability(nil, getter, disc)
}
5 changes: 2 additions & 3 deletions share/getters/shrex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,8 @@ func testManager(ctx context.Context, host host.Host, headerSub libhead.Subscrib

disc := discovery.NewDiscovery(nil,
routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), discovery.Parameters{
PeersLimit: 10,
DiscoveryRetryTimeout: time.Second,
AdvertiseInterval: time.Second,
PeersLimit: 10,
AdvertiseInterval: time.Second,
},
)
connGater, err := conngater.NewBasicConnectionGater(ds_sync.MutexWrap(datastore.NewMapDatastore()))
Expand Down
15 changes: 6 additions & 9 deletions share/p2p/peers/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,8 @@ func TestIntegration(t *testing.T) {
nw.Hosts()[0],
routingdisc.NewRoutingDiscovery(router1),
discovery.Parameters{
PeersLimit: 0,
DiscoveryRetryTimeout: time.Second,
AdvertiseInterval: time.Second,
PeersLimit: 0,
AdvertiseInterval: time.Second,
},
)

Expand All @@ -408,9 +407,8 @@ func TestIntegration(t *testing.T) {
nw.Hosts()[1],
routingdisc.NewRoutingDiscovery(router2),
discovery.Parameters{
PeersLimit: 10,
DiscoveryRetryTimeout: time.Second,
AdvertiseInterval: time.Second,
PeersLimit: 10,
AdvertiseInterval: time.Second,
},
)
err = fnDisc.Start(ctx)
Expand Down Expand Up @@ -466,9 +464,8 @@ func testManager(ctx context.Context, headerSub libhead.Subscriber[*header.Exten

disc := discovery.NewDiscovery(nil,
routingdisc.NewRoutingDiscovery(routinghelpers.Null{}), discovery.Parameters{
PeersLimit: 0,
DiscoveryRetryTimeout: time.Second,
AdvertiseInterval: time.Second,
PeersLimit: 0,
AdvertiseInterval: time.Second,
})
connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore()))
if err != nil {
Expand Down
Loading