diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 35aa67c6e6..46cb9123f6 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -11,8 +11,11 @@ import ( logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/net/conngater" libhead "github.com/celestiaorg/go-header" @@ -34,6 +37,10 @@ const ( // ResultBlacklistPeer will blacklist peer. Blacklisted peers will be disconnected and blocked from // any p2p communication in future by libp2p Gater ResultBlacklistPeer = "result_blacklist_peer" + + // eventbusBufSize is the size of the buffered channel to handle + // events in libp2p + eventbusBufSize = 32 ) type result string @@ -48,6 +55,7 @@ type Manager struct { // header subscription is necessary in order to Validate the inbound eds hash headerSub libhead.Subscriber[*header.ExtendedHeader] shrexSub *shrexsub.PubSub + disc *discovery.Discovery host host.Host connGater *conngater.BasicConnectionGater @@ -65,8 +73,9 @@ type Manager struct { metrics *metrics - cancel context.CancelFunc - done chan struct{} + headerSubDone chan struct{} + disconnectedPeersDone chan struct{} + cancel context.CancelFunc } // DoneFunc updates internal state depending on call results. Should be called once per returned @@ -101,14 +110,16 @@ func NewManager( } s := &Manager{ - params: params, - headerSub: headerSub, - shrexSub: shrexSub, - connGater: connGater, - host: host, - pools: make(map[string]*syncPool), - blacklistedHashes: make(map[string]bool), - done: make(chan struct{}), + params: params, + headerSub: headerSub, + shrexSub: shrexSub, + connGater: connGater, + disc: discovery, + host: host, + pools: make(map[string]*syncPool), + blacklistedHashes: make(map[string]bool), + headerSubDone: make(chan struct{}), + disconnectedPeersDone: make(chan struct{}), } s.fullNodes = newPool(s.params.PeerCooldown) @@ -120,8 +131,8 @@ func NewManager( log.Debugw("got blacklisted peer from discovery", "peer", peerID) return } - log.Debugw("added to full nodes", "peer", peerID) s.fullNodes.add(peerID) + log.Debugw("added to full nodes", "peer", peerID) return } @@ -152,6 +163,12 @@ func (m *Manager) Start(startCtx context.Context) error { return fmt.Errorf("subscribing to headersub: %w", err) } + sub, err := m.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize)) + if err != nil { + return fmt.Errorf("subscribing to libp2p events: %w", err) + } + + go m.subscribeDisconnectedPeers(ctx, sub) go m.subscribeHeader(ctx, headerSub) go m.GC(ctx) @@ -162,11 +179,18 @@ func (m *Manager) Stop(ctx context.Context) error { m.cancel() select { - case <-m.done: - return nil + case <-m.headerSubDone: case <-ctx.Done(): return ctx.Err() } + + select { + case <-m.disconnectedPeersDone: + case <-ctx.Done(): + return ctx.Err() + } + + return nil } // Peer returns peer collected from shrex.Sub for given datahash if any available. @@ -182,11 +206,7 @@ func (m *Manager) Peer( // first, check if a peer is available for the given datahash peerID, ok := p.tryGet() if ok { - // some pools could still have blacklisted peers in storage - if m.isBlacklistedPeer(peerID) { - log.Debugw("removing blacklisted peer from pool", - "peer", peerID.String()) - p.remove(peerID) + if m.removeIfUnreachable(p, peerID) { return m.Peer(ctx, datahash) } return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0) @@ -203,6 +223,9 @@ func (m *Manager) Peer( start := time.Now() select { case peerID = <-p.next(ctx): + if m.removeIfUnreachable(p, peerID) { + return m.Peer(ctx, datahash) + } return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) case peerID = <-m.fullNodes.next(ctx): return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start)) @@ -217,7 +240,8 @@ func (m *Manager) newPeer( peerID peer.ID, source peerSource, poolSize int, - waitTime time.Duration) (peer.ID, DoneFunc, error) { + waitTime time.Duration, +) (peer.ID, DoneFunc, error) { log.Debugw("got peer", "hash", datahash.String(), "peer", peerID.String(), @@ -241,10 +265,11 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS case ResultSynced: m.markPoolAsSynced(datahash.String()) case ResultCooldownPeer: - m.getOrCreatePool(datahash.String()).putOnCooldown(peerID) if source == sourceFullNodes { m.fullNodes.putOnCooldown(peerID) + return } + m.getOrCreatePool(datahash.String()).putOnCooldown(peerID) case ResultBlacklistPeer: m.blacklistPeers(reasonMisbehave, peerID) } @@ -253,7 +278,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS // subscribeHeader takes datahash from received header and validates corresponding peer pool. func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscription[*header.ExtendedHeader]) { - defer close(m.done) + defer close(m.headerSubDone) defer headerSub.Cancel() for { @@ -274,6 +299,33 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri } } +// subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected +// peers from full nodes pool +func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) { + defer close(m.disconnectedPeersDone) + defer sub.Close() + for { + select { + case <-ctx.Done(): + return + case e, ok := <-sub.Out(): + if !ok { + log.Fatal("Subscription for connectedness events is closed.") //nolint:gocritic + return + } + // listen to disconnect event to remove peer from full nodes pool + connStatus := e.(event.EvtPeerConnectednessChanged) + if connStatus.Connectedness == network.NotConnected { + peer := connStatus.Peer + if m.fullNodes.has(peer) { + log.Debugw("peer disconnected, removing from full nodes", "peer", peer) + m.fullNodes.remove(peer) + } + } + } + } +} + // Validate will collect peer.ID into corresponding peer pool func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult { logger := log.With("peer", peerID, "hash", msg.DataHash.String()) @@ -311,19 +363,14 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif p := m.getOrCreatePool(msg.DataHash.String()) p.headerHeight.Store(msg.Height) - p.add(peerID) - logger.Debug("got hash from shrex-sub") - return pubsub.ValidationIgnore -} + logger.Debugw("got hash from shrex-sub") -func (m *Manager) validatedPool(datahash string) *syncPool { - p := m.getOrCreatePool(datahash) - if p.isValidatedDataHash.CompareAndSwap(false, true) { - log.Debugw("pool marked validated", - "hash", datahash, - "after (s)", time.Since(p.createdAt)) + p.add(peerID) + if p.isValidatedDataHash.Load() { + // add peer to full nodes pool only if datahash has been already validated + m.fullNodes.add(peerID) } - return p + return pubsub.ValidationIgnore } func (m *Manager) getOrCreatePool(datahash string) *syncPool { @@ -356,7 +403,7 @@ func (m *Manager) blacklistPeers(reason blacklistPeerReason, peerIDs ...peer.ID) // add peer to the blacklist, so we can't connect to it in the future. err := m.connGater.BlockPeer(peerID) if err != nil { - log.Warnw("failed tp block peer", "peer", peerID, "err", err) + log.Warnw("failed to block peer", "peer", peerID, "err", err) } // close connections to peer. err = m.host.Network().ClosePeer(peerID) @@ -376,6 +423,26 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool { return m.blacklistedHashes[hash.String()] } +func (m *Manager) validatedPool(hashStr string) *syncPool { + p := m.getOrCreatePool(hashStr) + if p.isValidatedDataHash.CompareAndSwap(false, true) { + log.Debugw("pool marked validated", "datahash", hashStr) + // if pool is proven to be valid, add all collected peers to full nodes + m.fullNodes.add(p.peers()...) + } + return p +} + +// removeIfUnreachable removes peer from some pool if it is blacklisted or disconnected +func (m *Manager) removeIfUnreachable(pool *syncPool, peerID peer.ID) bool { + if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { + log.Debugw("removing outdated peer from pool", "peer", peerID.String()) + pool.remove(peerID) + return true + } + return false +} + func (m *Manager) GC(ctx context.Context) { ticker := time.NewTicker(m.params.GcInterval) defer ticker.Stop() @@ -441,6 +508,7 @@ func (m *Manager) markPoolAsSynced(datahash string) { atomic.StorePointer(old, unsafe.Pointer(newPool(time.Second))) } } + func (p *syncPool) add(peers ...peer.ID) { if !p.isSynced.Load() { p.pool.add(peers...) diff --git a/share/p2p/peers/metrics.go b/share/p2p/peers/metrics.go index ab5bfc97b2..bf4d544d9f 100644 --- a/share/p2p/peers/metrics.go +++ b/share/p2p/peers/metrics.go @@ -169,8 +169,10 @@ func initMetrics(manager *Manager) (*metrics, error) { return metrics, nil } -func (m *metrics) observeGetPeer(ctx context.Context, - source peerSource, poolSize int, waitTime time.Duration) { +func (m *metrics) observeGetPeer( + ctx context.Context, + source peerSource, poolSize int, waitTime time.Duration, +) { if m == nil { return } diff --git a/share/p2p/peers/pool.go b/share/p2p/peers/pool.go index 4eae614ca1..c43bbc963b 100644 --- a/share/p2p/peers/pool.go +++ b/share/p2p/peers/pool.go @@ -139,6 +139,27 @@ func (p *pool) remove(peers ...peer.ID) { p.checkHasPeers() } +func (p *pool) has(peer peer.ID) bool { + p.m.RLock() + defer p.m.RUnlock() + + status, ok := p.statuses[peer] + return ok && status != removed +} + +func (p *pool) peers() []peer.ID { + p.m.RLock() + defer p.m.RUnlock() + + peers := make([]peer.ID, 0, len(p.peersList)) + for peer, status := range p.statuses { + if status != removed { + peers = append(peers, peer) + } + } + return peers +} + // cleanup will reduce memory footprint of pool. func (p *pool) cleanup() { newList := make([]peer.ID, 0, p.activeCount) diff --git a/share/p2p/shrexeds/client.go b/share/p2p/shrexeds/client.go index af408d130c..7c2591bcbd 100644 --- a/share/p2p/shrexeds/client.go +++ b/share/p2p/shrexeds/client.go @@ -114,9 +114,9 @@ func (c *Client) doRequest( } _, err = serde.Read(stream, resp) if err != nil { - // server closes the stream after returning a non-successful status + // server closes the stream here if we are rate limited if errors.Is(err, io.EOF) { - c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited) return nil, p2p.ErrNotFound } stream.Reset() //nolint:errcheck