Skip to content

Commit

Permalink
synchronise add and remove
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Apr 21, 2023
1 parent 7a3b38f commit d3a4c1a
Showing 1 changed file with 42 additions and 30 deletions.
72 changes: 42 additions & 30 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,17 @@ type Discovery struct {
advertiseInterval time.Duration
// onUpdatedPeers will be called on peer set changes
onUpdatedPeers OnUpdatedPeers
connected chan addConnectedReq

ctx context.Context
cancel context.CancelFunc
}

type addConnectedReq struct {
peerID peer.ID
cancelFind context.CancelFunc
}

type OnUpdatedPeers func(peerID peer.ID, isAdded bool)

// NewDiscovery constructs a new discovery.
Expand All @@ -77,14 +84,13 @@ func NewDiscovery(
discoveryInterval: discInterval,
advertiseInterval: advertiseInterval,
onUpdatedPeers: func(peer.ID, bool) {},
connected: make(chan addConnectedReq, peersLimit),
}
}

func (d *Discovery) Start(context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel

go d.ensurePeers(ctx)
d.ctx, d.cancel = context.WithCancel(context.Background())
go d.ensurePeers(d.ctx)
return nil
}

Expand Down Expand Up @@ -121,29 +127,16 @@ func (d *Discovery) handlePeerFound(ctx context.Context, peer peer.AddrInfo, can
return
}

err = d.set.Add(peer.ID)
if err != nil {
log.Debugw("failed to add peer to set", "peer", peer.ID, "error", err)
return
}
log.Debugw("added peer to set", "id", peer.ID)

// check the size only after we add
// so that peer set represents the actual number of connections we made
// which can go slightly over peersLimit
if uint(d.set.Size()) >= d.peersLimit {
log.Infow("soft peer limit reached", "count", d.set.Size(), "peer", peer.ID)
cancelFind()
}

// tag to protect peer from being killed by ConnManager
// NOTE: This is does not protect from remote killing the connection.
// In the future, we should design a protocol that keeps bidirectional agreement on whether
// connection should be kept or not, similar to mesh link in GossipSub.
d.host.ConnManager().TagPeer(peer.ID, topic, peerWeight)

// and notify our subscribers
d.onUpdatedPeers(peer.ID, true)
select {
case d.connected <- addConnectedReq{peer.ID, cancelFind}:
case <-d.ctx.Done():
}
}

// ensurePeers ensures we always have 'peerLimit' connected peers.
Expand Down Expand Up @@ -189,16 +182,35 @@ func (d *Discovery) ensurePeers(ctx context.Context) {
// listen to disconnect event to remove peer from set and reset backoff time
// reset timer in order to restart the discovery, once stored peer is disconnected
connStatus := e.(event.EvtPeerConnectednessChanged)
if connStatus.Connectedness == network.NotConnected {
if d.set.Contains(connStatus.Peer) {
log.Debugw("removing peer from the peer set",
"peer", connStatus.Peer, "status", connStatus.Connectedness.String())
d.connector.RestartBackoff(connStatus.Peer)
d.set.Remove(connStatus.Peer)
d.onUpdatedPeers(connStatus.Peer, false)
d.host.ConnManager().UntagPeer(connStatus.Peer, topic)
}
if connStatus.Connectedness == network.Connected || !d.set.Contains(connStatus.Peer) {
continue
}
d.host.ConnManager().UntagPeer(connStatus.Peer, topic)
d.connector.RestartBackoff(connStatus.Peer)

d.set.Remove(connStatus.Peer)
log.Debugw("removed peer from the peer set",
"peer", connStatus.Peer, "status", connStatus.Connectedness.String())

d.onUpdatedPeers(connStatus.Peer, false)
case req := <-d.connected:
err = d.set.Add(req.peerID)
if err != nil {
log.Debugw("failed to add peer to set", "peer", req.peerID, "error", err)
return
}
log.Debugw("added peer to set", "id", req.peerID)

// first do Add and only after check the limit
// so that peer set represents the actual number of connections we made
// which can go slightly over peersLimit
if uint(d.set.Size()) >= d.peersLimit {
log.Infow("soft peer limit reached", "count", d.set.Size(), "peer", req.peerID)
req.cancelFind()
}

// and notify our subscribers
d.onUpdatedPeers(req.peerID, true)
}
}
}()
Expand Down

0 comments on commit d3a4c1a

Please sign in to comment.