diff --git a/share/getters/shrex.go b/share/getters/shrex.go index 95f06d821e..3d99cb808f 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -6,6 +6,8 @@ import ( "fmt" "time" + "github.com/libp2p/go-libp2p/core/routing" + "github.com/celestiaorg/nmt/namespace" "github.com/celestiaorg/rsmt2d" @@ -92,6 +94,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex case errors.Is(getErr, p2p.ErrNotFound): getErr = share.ErrNotFound setStatus(peers.ResultCooldownPeer) + case errors.Is(err, routing.ErrNotFound): + setStatus(peers.ResultRemovePeer) case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: @@ -145,6 +149,8 @@ func (sg *ShrexGetter) GetSharesByNamespace( case errors.Is(getErr, p2p.ErrNotFound): getErr = share.ErrNotFound setStatus(peers.ResultCooldownPeer) + case errors.Is(err, routing.ErrNotFound): + setStatus(peers.ResultRemovePeer) case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index c18e91bdb1..e3cd367956 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -11,8 +11,11 @@ import ( logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/net/conngater" libhead "github.com/celestiaorg/go-header" @@ -31,9 +34,15 @@ const ( // ResultCooldownPeer will put returned peer on cooldown, meaning it won't be available by Peer // method for some time ResultCooldownPeer + // ResultRemovePeer will remove peer from peer manager pool + ResultRemovePeer // ResultBlacklistPeer will blacklist peer. Blacklisted peers will be disconnected and blocked from // any p2p communication in future by libp2p Gater ResultBlacklistPeer + + // eventbusBufSize is the size of the buffered channel to handle + // events in libp2p + eventbusBufSize = 32 ) var log = logging.Logger("shrex/peer-manager") @@ -61,8 +70,9 @@ type Manager struct { // hashes that are not in the chain blacklistedHashes map[string]bool - cancel context.CancelFunc - done chan struct{} + cancel context.CancelFunc + headerSubDone chan struct{} + disconnectedPeersDone chan struct{} } // DoneFunc updates internal state depending on call results. Should be called once per returned @@ -106,7 +116,7 @@ func NewManager( host: host, pools: make(map[string]*syncPool), blacklistedHashes: make(map[string]bool), - done: make(chan struct{}), + headerSubDone: make(chan struct{}), } s.fullNodes = newPool(s.params.PeerCooldown) @@ -149,6 +159,12 @@ func (m *Manager) Start(startCtx context.Context) error { return fmt.Errorf("subscribing to headersub: %w", err) } + sub, err := m.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize)) + if err != nil { + return fmt.Errorf("subscribing to libp2p events: %w", err) + } + + go m.subscribeDisconnectedPeers(ctx, sub) go m.subscribeHeader(ctx, headerSub) go m.GC(ctx) @@ -159,7 +175,7 @@ func (m *Manager) Stop(ctx context.Context) error { m.cancel() select { - case <-m.done: + case <-m.headerSubDone: return nil case <-ctx.Done(): return ctx.Err() @@ -227,6 +243,8 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, fromFull boo if fromFull { m.fullNodes.putOnCooldown(peerID) } + case ResultRemovePeer: + m.fullNodes.remove(peerID) case ResultBlacklistPeer: m.blacklistPeers(peerID) } @@ -235,7 +253,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, fromFull boo // subscribeHeader takes datahash from received header and validates corresponding peer pool. func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscription[*header.ExtendedHeader]) { - defer close(m.done) + defer close(m.headerSubDone) defer headerSub.Cancel() for { @@ -256,6 +274,33 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri } } +// subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected +// peers from full nodes pool +func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) { + defer close(m.disconnectedPeersDone) + defer sub.Close() + for { + select { + case <-ctx.Done(): + return + case e, ok := <-sub.Out(): + if !ok { + log.Error("Subscription for connectedness events is closed.") + return + } + // listen to disconnect event to remove peer from full nodes pool + connStatus := e.(event.EvtPeerConnectednessChanged) + if connStatus.Connectedness == network.NotConnected { + peer := connStatus.Peer + if m.fullNodes.has(peer) { + log.Debugw("peer disconnected, removing from full nodes", "peer", peer) + m.fullNodes.remove(peer) + } + } + } + } +} + // Validate will collect peer.ID into corresponding peer pool func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult { @@ -295,6 +340,10 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif p := m.getOrCreatePool(msg.DataHash.String()) p.headerHeight.Store(msg.Height) p.add(peerID) + if p.isValidatedDataHash.Load() { + // add peer to full nodes pool only of datahash has been already validated + m.fullNodes.add(peerID) + } log.Debugw("got hash from shrex-sub", "peer", peerID, "datahash", msg.DataHash.String()) return pubsub.ValidationIgnore } @@ -350,6 +399,8 @@ func (m *Manager) validatedPool(hashStr string) *syncPool { p := m.getOrCreatePool(hashStr) if p.isValidatedDataHash.CompareAndSwap(false, true) { log.Debugw("pool marked validated", "datahash", hashStr) + // if pool is proven to be valid, add all collected peers to full nodes + m.fullNodes.add(p.peers()...) } return p } diff --git a/share/p2p/peers/pool.go b/share/p2p/peers/pool.go index a31c4345b7..32e2a174df 100644 --- a/share/p2p/peers/pool.go +++ b/share/p2p/peers/pool.go @@ -12,7 +12,7 @@ const defaultCleanupThreshold = 2 // pool stores peers and provides methods for simple round-robin access. type pool struct { - m sync.Mutex + m sync.RWMutex peersList []peer.ID statuses map[peer.ID]status cooldown *timedQueue @@ -139,6 +139,27 @@ func (p *pool) remove(peers ...peer.ID) { p.checkHasPeers() } +func (p *pool) has(peer peer.ID) bool { + p.m.RLock() + defer p.m.RUnlock() + + status := p.statuses[peer] + return status != removed +} + +func (p *pool) peers() []peer.ID { + p.m.RLock() + defer p.m.RUnlock() + + peers := make([]peer.ID, 0, len(p.peersList)) + for peer, status := range p.statuses { + if status != removed { + peers = append(peers, peer) + } + } + return peers +} + // cleanup will reduce memory footprint of pool. func (p *pool) cleanup() { newList := make([]peer.ID, 0, p.activeCount)