Skip to content

Commit

Permalink
restructure code and update/fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed May 3, 2023
1 parent 0fc94c2 commit eb9dc51
Showing 1 changed file with 133 additions and 137 deletions.
270 changes: 133 additions & 137 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const (
// events in libp2p
eventbusBufSize = 32

// findPeersStuckWarnDelay is the duration after which findPeers will log an error message to
// findPeersStuckWarnDelay is the duration after which discover will log an error message to
// notify that it is stuck.
findPeersStuckWarnDelay = time.Minute

Expand Down Expand Up @@ -56,10 +56,6 @@ func DefaultParameters() Parameters {
}
}

func (p *Parameters) Validate() error {
return nil
}

// Discovery combines advertise and discover services and allows to store discovered nodes.
// TODO: The code here gets horribly hairy, so we should refactor this at some point
type Discovery struct {
Expand Down Expand Up @@ -98,7 +94,15 @@ func NewDiscovery(
func (d *Discovery) Start(context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel
go d.ensurePeers(ctx)

if d.params.PeersLimit == 0 {
log.Warn("peers limit is set to 0. Skipping discovery...")
return nil
}

go d.discoveryLoop(ctx)
go d.disconnectsLoop(ctx)
go d.connector.GC(ctx)
return nil
}

Expand All @@ -116,121 +120,59 @@ func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) {
}
}

func (d *Discovery) triggerDiscovery() {
select {
case d.triggerDisc <- struct{}{}:
default:
}
// Peers provides a list of discovered peers in the "full" topic.
// If Discovery hasn't found any peers, it blocks until at least one peer is found.
func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) {
return d.set.Peers(ctx)
}

// handlePeersFound receives peers and tries to establish a connection with them.
// Peer will be added to PeerCache if connection succeeds.
func (d *Discovery) handlePeerFound(ctx context.Context, peer peer.AddrInfo) bool {
logger := log.With("peer", peer.ID)
switch {
case peer.ID == d.host.ID():
logger.Debug("skip handle: self discovery")
return false
case len(peer.Addrs) == 0:
logger.Debug("skip handle: empty address list")
return false
case d.set.Size() >= d.set.Limit():
logger.Debug("skip handle: enough peers found")
return false
case d.connector.HasBackoff(peer.ID):
logger.Debug("skip handle: backoff")
return false
// Advertise is a utility function that persistently advertises a service through an Advertiser.
// TODO: Start advertising only after the reachability is confirmed by AutoNAT
func (d *Discovery) Advertise(ctx context.Context) {
if d.params.AdvertiseInterval == -1 {
return
}

switch d.host.Network().Connectedness(peer.ID) {
case network.Connected:
d.connector.Backoff(peer.ID) // we still have to backoff the connected peer
case network.NotConnected:
err := d.connector.Connect(ctx, peer)
timer := time.NewTimer(d.params.AdvertiseInterval)
defer timer.Stop()
for {
ttl, err := d.disc.Advertise(ctx, topic)
if err != nil {
logger.Debugw("unable to connect", "err", err)
return false
}
default:
panic("unknown connectedness")
}

if !d.set.Add(peer.ID) {
logger.Debug("peer is already in discovery set")
return false
}
d.onUpdatedPeers(peer.ID, true)
logger.Debug("added peer to set")

// tag to protect peer from being killed by ConnManager
// NOTE: This is does not protect from remote killing the connection.
// In the future, we should design a protocol that keeps bidirectional agreement on whether
// connection should be kept or not, similar to mesh link in GossipSub.
d.host.ConnManager().Protect(peer.ID, topic)
return true
}

// ensurePeers ensures we always have 'peerLimit' connected peers.
// It starts peer discovery every 30 seconds until peer cache reaches peersLimit.
// Discovery is restarted if any previously connected peers disconnect.
func (d *Discovery) ensurePeers(ctx context.Context) {
if d.params.PeersLimit == 0 {
log.Warn("peers limit is set to 0. Skipping discovery...")
return
}
// subscribe on EventBus in order to catch disconnected peers and restart
// the discovery. We specify a larger buffer size for the channel where
// EvtPeerConnectednessChanged events are sent (by default it is 16, we
// specify 32) to avoid any blocks on writing to the full channel.
sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize))
if err != nil {
log.Error(err)
return
}
defer sub.Close()
log.Debugf("Error advertising %s: %s", topic, err.Error())
if ctx.Err() != nil {
return
}

// starting to listen to subscriptions async will help us to avoid any blocking
// in the case when we will not have the needed amount of FNs and will be blocked in `FindPeers`.
go func() {
for {
select {
case <-timer.C:
timer.Reset(d.params.AdvertiseInterval)
continue
case <-ctx.Done():
return
case e, ok := <-sub.Out():
if !ok {
log.Error("connection subscription was closed unexpectedly")
return
}

if evnt := e.(event.EvtPeerConnectednessChanged); evnt.Connectedness == network.NotConnected {
if !d.set.Contains(evnt.Peer) {
continue
}

d.host.ConnManager().Unprotect(evnt.Peer, topic)
d.connector.Backoff(evnt.Peer)
d.set.Remove(evnt.Peer)
d.onUpdatedPeers(evnt.Peer, false)
log.Debugw("removed peer from the peer set",
"peer", evnt.Peer, "status", evnt.Connectedness.String())

if d.set.Size() < d.set.Limit() {
d.triggerDiscovery()
}
}
}
}
}()
go d.connector.GC(ctx)

log.Debugf("advertised")
select {
case <-timer.C:
timer.Reset(waitF(ttl))
case <-ctx.Done():
return
}
}
}

// discoveryLoop ensures we always have '~peerLimit' connected peers.
// It starts peer discovery per request and restarts the process until the soft limit reached.
func (d *Discovery) discoveryLoop(ctx context.Context) {
t := time.NewTicker(d.params.discoveryRetryTimeout)
defer t.Stop()
for {
// drain all previous ticks from channel
drainChannel(t.C)
select {
case <-t.C:
found := d.findPeers(ctx)
found := d.discover(ctx)
if !found {
// rerun discovery if amount of peers didn't reach the limit
continue
Expand All @@ -247,7 +189,56 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
}
}

func (d *Discovery) findPeers(ctx context.Context) bool {
// disconnectsLoop listen for disconnect events and ensures Discovery state
// is updated.
func (d *Discovery) disconnectsLoop(ctx context.Context) {
// subscribe on EventBus in order to catch disconnected peers and restart
// the discovery. We specify a larger buffer size for the channel where
// EvtPeerConnectednessChanged events are sent (by default it is 16, we
// specify 32) to avoid any blocks on writing to the full channel.
sub, err := d.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize))
if err != nil {
log.Error(err)
return
}
defer sub.Close()

for {
select {
case <-ctx.Done():
return
case e, ok := <-sub.Out():
if !ok {
log.Error("connection subscription was closed unexpectedly")
return
}

if evnt := e.(event.EvtPeerConnectednessChanged); evnt.Connectedness == network.NotConnected {
if !d.set.Contains(evnt.Peer) {
continue
}

d.host.ConnManager().Unprotect(evnt.Peer, topic)
d.connector.Backoff(evnt.Peer)
d.set.Remove(evnt.Peer)
d.onUpdatedPeers(evnt.Peer, false)
log.Debugw("removed peer from the peer set",
"peer", evnt.Peer, "status", evnt.Connectedness.String())

if d.set.Size() < d.set.Limit() {
// trigger discovery
select {
case d.triggerDisc <- struct{}{}:
default:
}
}
}
}
}
}

// discover finds new peers and reports whether it succeeded.
func (d *Discovery) discover(ctx context.Context) bool {
size := d.set.Size()
want := d.set.Limit() - size
if want == 0 {
Expand Down Expand Up @@ -299,7 +290,7 @@ func (d *Discovery) findPeers(ctx context.Context) bool {

// we don't pass findCtx so that we don't cancel in progress connections
// that are likely to be valuable
if !d.handlePeerFound(ctx, peer) {
if !d.handleDiscoveredPeer(ctx, peer) {
return nil
}

Expand All @@ -317,46 +308,51 @@ func (d *Discovery) findPeers(ctx context.Context) bool {
}
}

// Advertise is a utility function that persistently advertises a service through an Advertiser.
// TODO: Start advertising only after the reachability is confirmed by AutoNAT
func (d *Discovery) Advertise(ctx context.Context) {
if d.params.AdvertiseInterval == -1 {
return
// handleDiscoveredPeer adds peer to the internal if can connect or is connected.
// Report whether it succeeded.
func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo) bool {
logger := log.With("peer", peer.ID)
switch {
case peer.ID == d.host.ID():
logger.Debug("skip handle: self discovery")
return false
case len(peer.Addrs) == 0:
logger.Debug("skip handle: empty address list")
return false
case d.set.Size() >= d.set.Limit():
logger.Debug("skip handle: enough peers found")
return false
case d.connector.HasBackoff(peer.ID):
logger.Debug("skip handle: backoff")
return false
}

timer := time.NewTimer(d.params.AdvertiseInterval)
defer timer.Stop()
for {
ttl, err := d.disc.Advertise(ctx, topic)
switch d.host.Network().Connectedness(peer.ID) {
case network.Connected:
d.connector.Backoff(peer.ID) // we still have to backoff the connected peer
case network.NotConnected:
err := d.connector.Connect(ctx, peer)
if err != nil {
log.Debugf("Error advertising %s: %s", topic, err.Error())
if ctx.Err() != nil {
return
}

select {
case <-timer.C:
timer.Reset(d.params.AdvertiseInterval)
continue
case <-ctx.Done():
return
}
logger.Debugw("unable to connect", "err", err)
return false
}
default:
panic("unknown connectedness")
}

log.Debugf("advertised")
select {
case <-timer.C:
timer.Reset(waitF(ttl))
case <-ctx.Done():
return
}
if !d.set.Add(peer.ID) {
logger.Debug("peer is already in discovery set")
return false
}
}
d.onUpdatedPeers(peer.ID, true)
logger.Debug("added peer to set")

// Peers provides a list of discovered peers in the "full" topic.
// If Discovery hasn't found any peers, it blocks until at least one peer is found.
func (d *Discovery) Peers(ctx context.Context) ([]peer.ID, error) {
return d.set.Peers(ctx)
// tag to protect peer from being killed by ConnManager
// NOTE: This is does not protect from remote killing the connection.
// In the future, we should design a protocol that keeps bidirectional agreement on whether
// connection should be kept or not, similar to mesh link in GossipSub.
d.host.ConnManager().Protect(peer.ID, topic)
return true
}

func (p Parameters) withDefaults() Parameters {
Expand Down

0 comments on commit eb9dc51

Please sign in to comment.