From 89eb8e1f36b105ea1c090d3e612caa36a860744c Mon Sep 17 00:00:00 2001 From: Vlad Date: Wed, 19 Apr 2023 19:25:45 +0300 Subject: [PATCH 01/19] use shrexSub peers as full nodes --- share/getters/shrex.go | 10 +++++-- share/p2p/peers/manager.go | 61 +++++++++++++++++++++++++++++++++++--- share/p2p/peers/pool.go | 21 +++++++++++++ 3 files changed, 86 insertions(+), 6 deletions(-) diff --git a/share/getters/shrex.go b/share/getters/shrex.go index dc26d6ed7a..4931cd6ce2 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -6,6 +6,8 @@ import ( "fmt" "time" + "github.com/libp2p/go-libp2p/core/routing" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/global" "go.opentelemetry.io/otel/metric/instrument" @@ -159,7 +161,9 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex setStatus(peers.ResultCooldownPeer) case errors.Is(getErr, p2p.ErrNotFound): getErr = share.ErrNotFound - setStatus(peers.ResultCooldownPeer) + setStatus(peers.ResultRemovePeer) + case errors.Is(err, routing.ErrNotFound): + setStatus(peers.ResultRemovePeer) case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: @@ -216,10 +220,12 @@ func (sg *ShrexGetter) GetSharesByNamespace( return nd, nil case errors.Is(getErr, context.DeadlineExceeded), errors.Is(getErr, context.Canceled): - setStatus(peers.ResultCooldownPeer) + setStatus(peers.ResultRemovePeer) case errors.Is(getErr, p2p.ErrNotFound): getErr = share.ErrNotFound setStatus(peers.ResultCooldownPeer) + case errors.Is(err, routing.ErrNotFound): + setStatus(peers.ResultRemovePeer) case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 35aa67c6e6..43a17eb59c 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -11,8 +11,11 @@ import ( logging "github.com/ipfs/go-log/v2" pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/net/conngater" libhead "github.com/celestiaorg/go-header" @@ -34,6 +37,12 @@ const ( // ResultBlacklistPeer will blacklist peer. Blacklisted peers will be disconnected and blocked from // any p2p communication in future by libp2p Gater ResultBlacklistPeer = "result_blacklist_peer" + // ResultRemovePeer will remove peer from peer manager pool + ResultRemovePeer = "result_remove_peer" + + // eventbusBufSize is the size of the buffered channel to handle + // events in libp2p + eventbusBufSize = 32 ) type result string @@ -67,6 +76,9 @@ type Manager struct { cancel context.CancelFunc done chan struct{} + + headerSubDone chan struct{} + disconnectedPeersDone chan struct{} } // DoneFunc updates internal state depending on call results. Should be called once per returned @@ -108,7 +120,7 @@ func NewManager( host: host, pools: make(map[string]*syncPool), blacklistedHashes: make(map[string]bool), - done: make(chan struct{}), + headerSubDone: make(chan struct{}), } s.fullNodes = newPool(s.params.PeerCooldown) @@ -152,6 +164,12 @@ func (m *Manager) Start(startCtx context.Context) error { return fmt.Errorf("subscribing to headersub: %w", err) } + sub, err := m.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize)) + if err != nil { + return fmt.Errorf("subscribing to libp2p events: %w", err) + } + + go m.subscribeDisconnectedPeers(ctx, sub) go m.subscribeHeader(ctx, headerSub) go m.GC(ctx) @@ -162,7 +180,7 @@ func (m *Manager) Stop(ctx context.Context) error { m.cancel() select { - case <-m.done: + case <-m.headerSubDone: return nil case <-ctx.Done(): return ctx.Err() @@ -245,6 +263,8 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS if source == sourceFullNodes { m.fullNodes.putOnCooldown(peerID) } + case ResultRemovePeer: + m.fullNodes.remove(peerID) case ResultBlacklistPeer: m.blacklistPeers(reasonMisbehave, peerID) } @@ -253,7 +273,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS // subscribeHeader takes datahash from received header and validates corresponding peer pool. func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscription[*header.ExtendedHeader]) { - defer close(m.done) + defer close(m.headerSubDone) defer headerSub.Cancel() for { @@ -274,6 +294,33 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri } } +// subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected +// peers from full nodes pool +func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) { + defer close(m.disconnectedPeersDone) + defer sub.Close() + for { + select { + case <-ctx.Done(): + return + case e, ok := <-sub.Out(): + if !ok { + log.Error("Subscription for connectedness events is closed.") + return + } + // listen to disconnect event to remove peer from full nodes pool + connStatus := e.(event.EvtPeerConnectednessChanged) + if connStatus.Connectedness == network.NotConnected { + peer := connStatus.Peer + if m.fullNodes.has(peer) { + log.Debugw("peer disconnected, removing from full nodes", "peer", peer) + m.fullNodes.remove(peer) + } + } + } + } +} + // Validate will collect peer.ID into corresponding peer pool func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult { logger := log.With("peer", peerID, "hash", msg.DataHash.String()) @@ -312,7 +359,11 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif p := m.getOrCreatePool(msg.DataHash.String()) p.headerHeight.Store(msg.Height) p.add(peerID) - logger.Debug("got hash from shrex-sub") + if p.isValidatedDataHash.Load() { + // add peer to full nodes pool only of datahash has been already validated + m.fullNodes.add(peerID) + } + log.Debugw("got hash from shrex-sub", "peer", peerID, "datahash", msg.DataHash.String()) return pubsub.ValidationIgnore } @@ -322,6 +373,8 @@ func (m *Manager) validatedPool(datahash string) *syncPool { log.Debugw("pool marked validated", "hash", datahash, "after (s)", time.Since(p.createdAt)) + // if pool is proven to be valid, add all collected peers to full nodes + m.fullNodes.add(p.peers()...) } return p } diff --git a/share/p2p/peers/pool.go b/share/p2p/peers/pool.go index 4eae614ca1..10811fefe4 100644 --- a/share/p2p/peers/pool.go +++ b/share/p2p/peers/pool.go @@ -139,6 +139,27 @@ func (p *pool) remove(peers ...peer.ID) { p.checkHasPeers() } +func (p *pool) has(peer peer.ID) bool { + p.m.RLock() + defer p.m.RUnlock() + + status := p.statuses[peer] + return status != removed +} + +func (p *pool) peers() []peer.ID { + p.m.RLock() + defer p.m.RUnlock() + + peers := make([]peer.ID, 0, len(p.peersList)) + for peer, status := range p.statuses { + if status != removed { + peers = append(peers, peer) + } + } + return peers +} + // cleanup will reduce memory footprint of pool. func (p *pool) cleanup() { newList := make([]peer.ID, 0, p.activeCount) From 50ee657014c398e83767a5fd7a5d992366bc70fe Mon Sep 17 00:00:00 2001 From: Vlad Date: Thu, 20 Apr 2023 13:18:19 +0300 Subject: [PATCH 02/19] lazy cleanup shrex pool if peer has dced --- share/p2p/peers/manager.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 43a17eb59c..23925f8411 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -200,9 +200,9 @@ func (m *Manager) Peer( // first, check if a peer is available for the given datahash peerID, ok := p.tryGet() if ok { - // some pools could still have blacklisted peers in storage - if m.isBlacklistedPeer(peerID) { - log.Debugw("removing blacklisted peer from pool", + // some pools could still have blacklisted or disconnected peers in storage + if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { + log.Debugw("removing outdated peer from pool", "hash", datahash.String(), "peer", peerID.String()) p.remove(peerID) return m.Peer(ctx, datahash) @@ -221,6 +221,15 @@ func (m *Manager) Peer( start := time.Now() select { case peerID = <-p.next(ctx): + // some pools could still have blacklisted or disconnected peers in storage + if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { + log.Debugw("removing outdated peer from pool", "hash", datahash.String(), + "peer", peerID.String()) + p.remove(peerID) + return m.Peer(ctx, datahash) + } + log.Debugw("got peer from shrexSub pool after wait", "peer", peerID, "datahash", datahash.String()) + log.Debugw("got peer from shrexSub pool after wait", "peer", peerID, "datahash", datahash.String()) return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) case peerID = <-m.fullNodes.next(ctx): return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start)) From 8909f079b6eab6f53ecfcddf01110897ae641fac Mon Sep 17 00:00:00 2001 From: Vlad Date: Fri, 21 Apr 2023 13:50:53 +0300 Subject: [PATCH 03/19] minor refactoring --- share/getters/shrex.go | 10 +++++---- share/p2p/peers/manager.go | 44 +++++++++++++++++++------------------- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/share/getters/shrex.go b/share/getters/shrex.go index 4931cd6ce2..81163ee38a 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -158,11 +158,12 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex return eds, nil case errors.Is(getErr, context.DeadlineExceeded), errors.Is(getErr, context.Canceled): - setStatus(peers.ResultCooldownPeer) + setStatus(peers.ResultRemovePeer) case errors.Is(getErr, p2p.ErrNotFound): getErr = share.ErrNotFound - setStatus(peers.ResultRemovePeer) - case errors.Is(err, routing.ErrNotFound): + setStatus(peers.ResultCooldownPeer) + case errors.Is(getErr, routing.ErrNotFound): + // peer is no longer reachable, remove it from all pools setStatus(peers.ResultRemovePeer) case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) @@ -224,7 +225,8 @@ func (sg *ShrexGetter) GetSharesByNamespace( case errors.Is(getErr, p2p.ErrNotFound): getErr = share.ErrNotFound setStatus(peers.ResultCooldownPeer) - case errors.Is(err, routing.ErrNotFound): + case errors.Is(getErr, routing.ErrNotFound): + // peer is no longer reachable, remove it from all pools setStatus(peers.ResultRemovePeer) case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 23925f8411..837c8f7725 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -200,11 +200,7 @@ func (m *Manager) Peer( // first, check if a peer is available for the given datahash peerID, ok := p.tryGet() if ok { - // some pools could still have blacklisted or disconnected peers in storage - if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { - log.Debugw("removing outdated peer from pool", "hash", datahash.String(), - "peer", peerID.String()) - p.remove(peerID) + if m.removeUnreachable(p, peerID) { return m.Peer(ctx, datahash) } return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0) @@ -221,11 +217,7 @@ func (m *Manager) Peer( start := time.Now() select { case peerID = <-p.next(ctx): - // some pools could still have blacklisted or disconnected peers in storage - if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { - log.Debugw("removing outdated peer from pool", "hash", datahash.String(), - "peer", peerID.String()) - p.remove(peerID) + if m.removeUnreachable(p, peerID) { return m.Peer(ctx, datahash) } log.Debugw("got peer from shrexSub pool after wait", "peer", peerID, "datahash", datahash.String()) @@ -376,18 +368,6 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif return pubsub.ValidationIgnore } -func (m *Manager) validatedPool(datahash string) *syncPool { - p := m.getOrCreatePool(datahash) - if p.isValidatedDataHash.CompareAndSwap(false, true) { - log.Debugw("pool marked validated", - "hash", datahash, - "after (s)", time.Since(p.createdAt)) - // if pool is proven to be valid, add all collected peers to full nodes - m.fullNodes.add(p.peers()...) - } - return p -} - func (m *Manager) getOrCreatePool(datahash string) *syncPool { m.lock.Lock() defer m.lock.Unlock() @@ -438,6 +418,26 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool { return m.blacklistedHashes[hash.String()] } +func (m *Manager) validatedPool(hashStr string) *syncPool { + p := m.getOrCreatePool(hashStr) + if p.isValidatedDataHash.CompareAndSwap(false, true) { + log.Debugw("pool marked validated", "datahash", hashStr) + // if pool is proven to be valid, add all collected peers to full nodes + m.fullNodes.add(p.peers()...) + } + return p +} + +// removeUnreachable removes peer from some pool if it is blacklisted or disconnected +func (m *Manager) removeUnreachable(pool *syncPool, peerID peer.ID) bool { + if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { + log.Debugw("removing outdated peer from pool", "peer", peerID.String()) + pool.remove(peerID) + return true + } + return false +} + func (m *Manager) GC(ctx context.Context) { ticker := time.NewTicker(m.params.GcInterval) defer ticker.Stop() From a715c6e370b7005774ac2fca66623323a8bd7199 Mon Sep 17 00:00:00 2001 From: Vlad Date: Fri, 21 Apr 2023 13:59:47 +0300 Subject: [PATCH 04/19] resolve merge conflicts --- share/p2p/peers/manager.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 837c8f7725..9575a796a3 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "go.uber.org/zap" "sync" "sync/atomic" "time" @@ -195,14 +196,19 @@ func (m *Manager) Stop(ctx context.Context) error { func (m *Manager) Peer( ctx context.Context, datahash share.DataHash, ) (peer.ID, DoneFunc, error) { + logger := log.With("hash", datahash.String()) p := m.validatedPool(datahash.String()) // first, check if a peer is available for the given datahash peerID, ok := p.tryGet() if ok { - if m.removeUnreachable(p, peerID) { + logger = logger.With("peer", peerID.String()) + if m.removeUnreachable(logger, p, peerID) { return m.Peer(ctx, datahash) } + logger.Debugw("get peer from shrexsub pool", + "peer", peerID.String(), + "pool_size", p.size()) return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0) } @@ -217,11 +223,13 @@ func (m *Manager) Peer( start := time.Now() select { case peerID = <-p.next(ctx): - if m.removeUnreachable(p, peerID) { + logger = logger.With("peer", peerID.String()) + if m.removeUnreachable(logger, p, peerID) { return m.Peer(ctx, datahash) } - log.Debugw("got peer from shrexSub pool after wait", "peer", peerID, "datahash", datahash.String()) - log.Debugw("got peer from shrexSub pool after wait", "peer", peerID, "datahash", datahash.String()) + logger.Debugw("got peer from shrexSub pool after wait", + "pool_size", p.size(), + "after (s)", time.Since(start)) return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) case peerID = <-m.fullNodes.next(ctx): return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start)) @@ -429,9 +437,9 @@ func (m *Manager) validatedPool(hashStr string) *syncPool { } // removeUnreachable removes peer from some pool if it is blacklisted or disconnected -func (m *Manager) removeUnreachable(pool *syncPool, peerID peer.ID) bool { +func (m *Manager) removeUnreachable(logger *zap.SugaredLogger, pool *syncPool, peerID peer.ID) bool { if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { - log.Debugw("removing outdated peer from pool", "peer", peerID.String()) + logger.Debug("removing outdated peer from pool") pool.remove(peerID) return true } From e2a0177cdfacf272c4c3adf9db7bcd1ab9c71498 Mon Sep 17 00:00:00 2001 From: Vlad Date: Tue, 25 Apr 2023 13:42:11 +0300 Subject: [PATCH 05/19] fix --- share/p2p/peers/manager.go | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 9575a796a3..4e215bed53 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "go.uber.org/zap" "sync" "sync/atomic" "time" @@ -18,6 +17,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/net/conngater" + "go.uber.org/zap" libhead "github.com/celestiaorg/go-header" @@ -114,14 +114,15 @@ func NewManager( } s := &Manager{ - params: params, - headerSub: headerSub, - shrexSub: shrexSub, - connGater: connGater, - host: host, - pools: make(map[string]*syncPool), - blacklistedHashes: make(map[string]bool), - headerSubDone: make(chan struct{}), + params: params, + headerSub: headerSub, + shrexSub: shrexSub, + connGater: connGater, + host: host, + pools: make(map[string]*syncPool), + blacklistedHashes: make(map[string]bool), + headerSubDone: make(chan struct{}), + disconnectedPeersDone: make(chan struct{}), } s.fullNodes = newPool(s.params.PeerCooldown) From 1fd176053f77e96b2ca911ae84b39d1fb0b139cf Mon Sep 17 00:00:00 2001 From: Ryan Date: Mon, 15 May 2023 17:08:15 +0200 Subject: [PATCH 06/19] fixing rebase mistakes --- share/getters/shrex.go | 1 - share/p2p/peers/manager.go | 12 ++++-------- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/share/getters/shrex.go b/share/getters/shrex.go index 81163ee38a..9cef600e1f 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -7,7 +7,6 @@ import ( "time" "github.com/libp2p/go-libp2p/core/routing" - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/global" "go.opentelemetry.io/otel/metric/instrument" diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 4e215bed53..30f4eb6fd9 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -75,11 +75,9 @@ type Manager struct { metrics *metrics - cancel context.CancelFunc - done chan struct{} - headerSubDone chan struct{} disconnectedPeersDone chan struct{} + cancel context.CancelFunc } // DoneFunc updates internal state depending on call results. Should be called once per returned @@ -207,9 +205,7 @@ func (m *Manager) Peer( if m.removeUnreachable(logger, p, peerID) { return m.Peer(ctx, datahash) } - logger.Debugw("get peer from shrexsub pool", - "peer", peerID.String(), - "pool_size", p.size()) + logger.Debugw("get peer from shrexsub pool", "peer", peerID.String()) return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0) } @@ -229,7 +225,6 @@ func (m *Manager) Peer( return m.Peer(ctx, datahash) } logger.Debugw("got peer from shrexSub pool after wait", - "pool_size", p.size(), "after (s)", time.Since(start)) return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) case peerID = <-m.fullNodes.next(ctx): @@ -245,7 +240,8 @@ func (m *Manager) newPeer( peerID peer.ID, source peerSource, poolSize int, - waitTime time.Duration) (peer.ID, DoneFunc, error) { + waitTime time.Duration, +) (peer.ID, DoneFunc, error) { log.Debugw("got peer", "hash", datahash.String(), "peer", peerID.String(), From 27d117fc25d133af13cddfa81f4a3c1171c0f20f Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 16 May 2023 13:13:06 +0200 Subject: [PATCH 07/19] tracking already removed peers --- share/p2p/peers/manager.go | 28 ++++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 30f4eb6fd9..d25127ec4c 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -73,6 +73,9 @@ type Manager struct { // hashes that are not in the chain blacklistedHashes map[string]bool + // peers that have previously been removed + removedPeers map[peer.ID]bool + metrics *metrics headerSubDone chan struct{} @@ -132,6 +135,10 @@ func NewManager( log.Debugw("got blacklisted peer from discovery", "peer", peerID) return } + if s.isRemovedPeer(peerID) { + log.Debugw("got previously removed peer from discovery", "peer", peerID) + return + } log.Debugw("added to full nodes", "peer", peerID) s.fullNodes.add(peerID) return @@ -364,12 +371,19 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif p := m.getOrCreatePool(msg.DataHash.String()) p.headerHeight.Store(msg.Height) + logger.Debugw("got hash from shrex-sub") + + // we want to skip adding peers to pools that have already been removed once + if m.isRemovedPeer(peerID) { + logger.Debugw("got previously removed peer from shrex-sub") + return pubsub.ValidationIgnore + } + p.add(peerID) if p.isValidatedDataHash.Load() { // add peer to full nodes pool only of datahash has been already validated m.fullNodes.add(peerID) } - log.Debugw("got hash from shrex-sub", "peer", peerID, "datahash", msg.DataHash.String()) return pubsub.ValidationIgnore } @@ -423,12 +437,22 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool { return m.blacklistedHashes[hash.String()] } +func (m *Manager) isRemovedPeer(peerID peer.ID) bool { + m.lock.Lock() + defer m.lock.Unlock() + return m.removedPeers[peerID] +} + func (m *Manager) validatedPool(hashStr string) *syncPool { p := m.getOrCreatePool(hashStr) if p.isValidatedDataHash.CompareAndSwap(false, true) { log.Debugw("pool marked validated", "datahash", hashStr) // if pool is proven to be valid, add all collected peers to full nodes - m.fullNodes.add(p.peers()...) + for _, peer := range p.peers() { + if !m.isRemovedPeer(peer) { + m.fullNodes.add(peer) + } + } } return p } From dded3cd03e88051555796e5982f09b231384e145 Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 16 May 2023 13:41:22 +0200 Subject: [PATCH 08/19] fixing review comments --- share/getters/shrex.go | 11 ++--------- share/p2p/peers/manager.go | 38 +++++++++++++++++++++++--------------- share/p2p/peers/pool.go | 4 ++-- 3 files changed, 27 insertions(+), 26 deletions(-) diff --git a/share/getters/shrex.go b/share/getters/shrex.go index 9cef600e1f..2d4e5f554b 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - "github.com/libp2p/go-libp2p/core/routing" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/global" "go.opentelemetry.io/otel/metric/instrument" @@ -161,13 +160,10 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex case errors.Is(getErr, p2p.ErrNotFound): getErr = share.ErrNotFound setStatus(peers.ResultCooldownPeer) - case errors.Is(getErr, routing.ErrNotFound): - // peer is no longer reachable, remove it from all pools - setStatus(peers.ResultRemovePeer) case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: - setStatus(peers.ResultCooldownPeer) + setStatus(peers.ResultRemovePeer) } if !ErrorContains(err, getErr) { @@ -224,13 +220,10 @@ func (sg *ShrexGetter) GetSharesByNamespace( case errors.Is(getErr, p2p.ErrNotFound): getErr = share.ErrNotFound setStatus(peers.ResultCooldownPeer) - case errors.Is(getErr, routing.ErrNotFound): - // peer is no longer reachable, remove it from all pools - setStatus(peers.ResultRemovePeer) case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: - setStatus(peers.ResultCooldownPeer) + setStatus(peers.ResultRemovePeer) } if !ErrorContains(err, getErr) { diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index d25127ec4c..dd85ec155f 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -17,7 +17,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/host/eventbus" "github.com/libp2p/go-libp2p/p2p/net/conngater" - "go.uber.org/zap" libhead "github.com/celestiaorg/go-header" @@ -58,6 +57,7 @@ type Manager struct { // header subscription is necessary in order to Validate the inbound eds hash headerSub libhead.Subscriber[*header.ExtendedHeader] shrexSub *shrexsub.PubSub + disc *discovery.Discovery host host.Host connGater *conngater.BasicConnectionGater @@ -119,6 +119,7 @@ func NewManager( headerSub: headerSub, shrexSub: shrexSub, connGater: connGater, + disc: discovery, host: host, pools: make(map[string]*syncPool), blacklistedHashes: make(map[string]bool), @@ -189,6 +190,8 @@ func (m *Manager) Stop(ctx context.Context) error { select { case <-m.headerSubDone: return nil + case <-m.disconnectedPeersDone: + return nil case <-ctx.Done(): return ctx.Err() } @@ -202,17 +205,16 @@ func (m *Manager) Stop(ctx context.Context) error { func (m *Manager) Peer( ctx context.Context, datahash share.DataHash, ) (peer.ID, DoneFunc, error) { - logger := log.With("hash", datahash.String()) p := m.validatedPool(datahash.String()) // first, check if a peer is available for the given datahash peerID, ok := p.tryGet() if ok { - logger = logger.With("peer", peerID.String()) - if m.removeUnreachable(logger, p, peerID) { + if m.removeUnreachable(p, peerID) { return m.Peer(ctx, datahash) } - logger.Debugw("get peer from shrexsub pool", "peer", peerID.String()) + log.Debugw("get peer from shrexsub pool", + "peer", peerID.String(), "hash", datahash.String()) return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0) } @@ -227,12 +229,11 @@ func (m *Manager) Peer( start := time.Now() select { case peerID = <-p.next(ctx): - logger = logger.With("peer", peerID.String()) - if m.removeUnreachable(logger, p, peerID) { + if m.removeUnreachable(p, peerID) { return m.Peer(ctx, datahash) } - logger.Debugw("got peer from shrexSub pool after wait", - "after (s)", time.Since(start)) + log.Debugw("got peer from shrexSub pool after wait", + "after (s)", time.Since(start), "peer", peerID.String(), "hash", datahash.String()) return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) case peerID = <-m.fullNodes.next(ctx): return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start)) @@ -277,7 +278,9 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS m.fullNodes.putOnCooldown(peerID) } case ResultRemovePeer: - m.fullNodes.remove(peerID) + if !m.disc.Discard(peerID) { + m.fullNodes.remove(peerID) + } case ResultBlacklistPeer: m.blacklistPeers(reasonMisbehave, peerID) } @@ -327,6 +330,7 @@ func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subs peer := connStatus.Peer if m.fullNodes.has(peer) { log.Debugw("peer disconnected, removing from full nodes", "peer", peer) + // we do not call discovery.Discard here because discovery handles disconnections from its own peers itself m.fullNodes.remove(peer) } } @@ -381,7 +385,7 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif p.add(peerID) if p.isValidatedDataHash.Load() { - // add peer to full nodes pool only of datahash has been already validated + // add peer to full nodes pool only if datahash has been already validated m.fullNodes.add(peerID) } return pubsub.ValidationIgnore @@ -413,7 +417,9 @@ func (m *Manager) blacklistPeers(reason blacklistPeerReason, peerIDs ...peer.ID) return } for _, peerID := range peerIDs { - m.fullNodes.remove(peerID) + if !m.disc.Discard(peerID) { + m.fullNodes.remove(peerID) + } // add peer to the blacklist, so we can't connect to it in the future. err := m.connGater.BlockPeer(peerID) if err != nil { @@ -458,10 +464,12 @@ func (m *Manager) validatedPool(hashStr string) *syncPool { } // removeUnreachable removes peer from some pool if it is blacklisted or disconnected -func (m *Manager) removeUnreachable(logger *zap.SugaredLogger, pool *syncPool, peerID peer.ID) bool { +func (m *Manager) removeUnreachable(pool *syncPool, peerID peer.ID) bool { if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { - logger.Debug("removing outdated peer from pool") - pool.remove(peerID) + log.Debugw("removing outdated peer from pool", "peer", peerID.String()) + if !m.disc.Discard(peerID) { + pool.remove(peerID) + } return true } return false diff --git a/share/p2p/peers/pool.go b/share/p2p/peers/pool.go index 10811fefe4..c43bbc963b 100644 --- a/share/p2p/peers/pool.go +++ b/share/p2p/peers/pool.go @@ -143,8 +143,8 @@ func (p *pool) has(peer peer.ID) bool { p.m.RLock() defer p.m.RUnlock() - status := p.statuses[peer] - return status != removed + status, ok := p.statuses[peer] + return ok && status != removed } func (p *pool) peers() []peer.ID { From c952803c307876049f432d5ba8ba893ef9f34a89 Mon Sep 17 00:00:00 2001 From: Ryan Date: Tue, 16 May 2023 14:47:13 +0200 Subject: [PATCH 09/19] fix WithOnPeersUpdate --- share/p2p/peers/manager.go | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index dd85ec155f..14a1f8f960 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -121,6 +121,7 @@ func NewManager( connGater: connGater, disc: discovery, host: host, + removedPeers: make(map[peer.ID]bool), pools: make(map[string]*syncPool), blacklistedHashes: make(map[string]bool), headerSubDone: make(chan struct{}), @@ -137,8 +138,9 @@ func NewManager( return } if s.isRemovedPeer(peerID) { - log.Debugw("got previously removed peer from discovery", "peer", peerID) - return + s.lock.Lock() + delete(s.removedPeers, peerID) + s.lock.Unlock() } log.Debugw("added to full nodes", "peer", peerID) s.fullNodes.add(peerID) @@ -146,7 +148,7 @@ func NewManager( } log.Debugw("removing peer from discovered full nodes", "peer", peerID) - s.fullNodes.remove(peerID) + s.removeFromPool(s.fullNodes, peerID) }) return s, nil @@ -278,9 +280,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS m.fullNodes.putOnCooldown(peerID) } case ResultRemovePeer: - if !m.disc.Discard(peerID) { - m.fullNodes.remove(peerID) - } + m.removeFromPool(m.fullNodes, peerID) case ResultBlacklistPeer: m.blacklistPeers(reasonMisbehave, peerID) } @@ -332,6 +332,9 @@ func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subs log.Debugw("peer disconnected, removing from full nodes", "peer", peer) // we do not call discovery.Discard here because discovery handles disconnections from its own peers itself m.fullNodes.remove(peer) + m.lock.Lock() + m.removedPeers[peer] = true + m.lock.Unlock() } } } @@ -417,9 +420,7 @@ func (m *Manager) blacklistPeers(reason blacklistPeerReason, peerIDs ...peer.ID) return } for _, peerID := range peerIDs { - if !m.disc.Discard(peerID) { - m.fullNodes.remove(peerID) - } + m.removeFromPool(m.fullNodes, peerID) // add peer to the blacklist, so we can't connect to it in the future. err := m.connGater.BlockPeer(peerID) if err != nil { @@ -467,9 +468,7 @@ func (m *Manager) validatedPool(hashStr string) *syncPool { func (m *Manager) removeUnreachable(pool *syncPool, peerID peer.ID) bool { if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { log.Debugw("removing outdated peer from pool", "peer", peerID.String()) - if !m.disc.Discard(peerID) { - pool.remove(peerID) - } + m.removeFromPool(pool.pool, peerID) return true } return false @@ -531,6 +530,16 @@ func (m *Manager) cleanUp() []peer.ID { return blacklist } +func (m *Manager) removeFromPool(pool *pool, peerID peer.ID) { + m.lock.Lock() + m.removedPeers[peerID] = true + m.lock.Unlock() + + if !m.disc.Discard(peerID) { + pool.remove(peerID) + } +} + func (m *Manager) markPoolAsSynced(datahash string) { p := m.getOrCreatePool(datahash) if p.isSynced.CompareAndSwap(false, true) { From 7730d139c53ffb8b52773f13f66a18678c39f08d Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 17 May 2023 09:45:58 +0200 Subject: [PATCH 10/19] adding metric, closing connection to peer in discovery.Discard, fixing peer manager cooldown issue --- share/availability/discovery/discovery.go | 1 + share/p2p/peers/manager.go | 3 ++- share/p2p/peers/metrics.go | 17 +++++++++++++++-- share/p2p/shrexeds/client.go | 4 ++-- 4 files changed, 20 insertions(+), 5 deletions(-) diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index e8e5594e31..9a4c934880 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -134,6 +134,7 @@ func (d *Discovery) Discard(id peer.ID) bool { d.set.Remove(id) d.onUpdatedPeers(id, false) log.Debugw("removed peer from the peer set", "peer", id) + d.host.Network().ClosePeer(id) //nolint:errcheck if d.set.Size() < d.set.Limit() { // trigger discovery diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 14a1f8f960..ebbc19abba 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -275,9 +275,10 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS case ResultSynced: m.markPoolAsSynced(datahash.String()) case ResultCooldownPeer: - m.getOrCreatePool(datahash.String()).putOnCooldown(peerID) if source == sourceFullNodes { m.fullNodes.putOnCooldown(peerID) + } else { + m.getOrCreatePool(datahash.String()).putOnCooldown(peerID) } case ResultRemovePeer: m.removeFromPool(m.fullNodes, peerID) diff --git a/share/p2p/peers/metrics.go b/share/p2p/peers/metrics.go index ab5bfc97b2..1d5e0f2322 100644 --- a/share/p2p/peers/metrics.go +++ b/share/p2p/peers/metrics.go @@ -37,6 +37,7 @@ const ( peerStatusKey = "peer_status" peerStatusActive peerStatus = "active" peerStatusCooldown peerStatus = "cooldown" + peerStatusRemoved peerStatus = "removed" poolStatusKey = "pool_status" poolStatusCreated poolStatus = "created" @@ -73,6 +74,7 @@ type metrics struct { shrexPools asyncint64.Gauge // attributes: pool_status fullNodesPool asyncint64.Gauge // attributes: pool_status blacklistedPeersByReason sync.Map + removedPeers asyncint64.Gauge blacklistedPeers asyncint64.Gauge // attributes: blacklist_reason } @@ -113,6 +115,12 @@ func initMetrics(manager *Manager) (*metrics, error) { return nil, err } + removedPeers, err := meter.AsyncInt64().Gauge("peer_manager_removed_peers_gauge", + instrument.WithDescription("removed peers amount")) + if err != nil { + return nil, err + } + fullNodesPool, err := meter.AsyncInt64().Gauge("peer_manager_full_nodes_gauge", instrument.WithDescription("full nodes pool peers amount")) if err != nil { @@ -131,6 +139,7 @@ func initMetrics(manager *Manager) (*metrics, error) { doneResult: doneResult, validationResult: validationResult, shrexPools: shrexPools, + removedPeers: removedPeers, fullNodesPool: fullNodesPool, getPeerPoolSizeHistogram: getPeerPoolSizeHistogram, blacklistedPeers: blacklisted, @@ -153,6 +162,8 @@ func initMetrics(manager *Manager) (*metrics, error) { fullNodesPool.Observe(ctx, int64(manager.fullNodes.cooldown.len()), attribute.String(peerStatusKey, string(peerStatusCooldown))) + removedPeers.Observe(ctx, int64(len(manager.removedPeers))) + metrics.blacklistedPeersByReason.Range(func(key, value any) bool { reason := key.(blacklistPeerReason) amount := value.(int) @@ -169,8 +180,10 @@ func initMetrics(manager *Manager) (*metrics, error) { return metrics, nil } -func (m *metrics) observeGetPeer(ctx context.Context, - source peerSource, poolSize int, waitTime time.Duration) { +func (m *metrics) observeGetPeer( + ctx context.Context, + source peerSource, poolSize int, waitTime time.Duration, +) { if m == nil { return } diff --git a/share/p2p/shrexeds/client.go b/share/p2p/shrexeds/client.go index af408d130c..7c2591bcbd 100644 --- a/share/p2p/shrexeds/client.go +++ b/share/p2p/shrexeds/client.go @@ -114,9 +114,9 @@ func (c *Client) doRequest( } _, err = serde.Read(stream, resp) if err != nil { - // server closes the stream after returning a non-successful status + // server closes the stream here if we are rate limited if errors.Is(err, io.EOF) { - c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited) return nil, p2p.ErrNotFound } stream.Reset() //nolint:errcheck From 631c8d1480497b414a14e29a8700e013bb60ed80 Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 17 May 2023 11:12:05 +0200 Subject: [PATCH 11/19] adding metric, closing connection to peer in discovery.Discard, fixing peer manager cooldown issue --- share/availability/discovery/discovery.go | 2 +- share/p2p/peers/manager.go | 16 +++++++--------- share/p2p/peers/metrics.go | 1 - 3 files changed, 8 insertions(+), 11 deletions(-) diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index 9a4c934880..c20cb04c60 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -133,8 +133,8 @@ func (d *Discovery) Discard(id peer.ID) bool { d.connector.Backoff(id) d.set.Remove(id) d.onUpdatedPeers(id, false) - log.Debugw("removed peer from the peer set", "peer", id) d.host.Network().ClosePeer(id) //nolint:errcheck + log.Debugw("removed peer from the peer set", "peer", id) if d.set.Size() < d.set.Limit() { // trigger discovery diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index ebbc19abba..db07c2b669 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -277,9 +277,9 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS case ResultCooldownPeer: if source == sourceFullNodes { m.fullNodes.putOnCooldown(peerID) - } else { - m.getOrCreatePool(datahash.String()).putOnCooldown(peerID) + return } + m.getOrCreatePool(datahash.String()).putOnCooldown(peerID) case ResultRemovePeer: m.removeFromPool(m.fullNodes, peerID) case ResultBlacklistPeer: @@ -381,15 +381,13 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif p.headerHeight.Store(msg.Height) logger.Debugw("got hash from shrex-sub") - // we want to skip adding peers to pools that have already been removed once - if m.isRemovedPeer(peerID) { - logger.Debugw("got previously removed peer from shrex-sub") - return pubsub.ValidationIgnore - } - p.add(peerID) if p.isValidatedDataHash.Load() { // add peer to full nodes pool only if datahash has been already validated + // if they were previously removed, give them another chance + m.lock.Lock() + delete(m.removedPeers, peerID) + m.lock.Unlock() m.fullNodes.add(peerID) } return pubsub.ValidationIgnore @@ -425,7 +423,7 @@ func (m *Manager) blacklistPeers(reason blacklistPeerReason, peerIDs ...peer.ID) // add peer to the blacklist, so we can't connect to it in the future. err := m.connGater.BlockPeer(peerID) if err != nil { - log.Warnw("failed tp block peer", "peer", peerID, "err", err) + log.Warnw("failed to block peer", "peer", peerID, "err", err) } // close connections to peer. err = m.host.Network().ClosePeer(peerID) diff --git a/share/p2p/peers/metrics.go b/share/p2p/peers/metrics.go index 1d5e0f2322..17c63d45bc 100644 --- a/share/p2p/peers/metrics.go +++ b/share/p2p/peers/metrics.go @@ -37,7 +37,6 @@ const ( peerStatusKey = "peer_status" peerStatusActive peerStatus = "active" peerStatusCooldown peerStatus = "cooldown" - peerStatusRemoved peerStatus = "removed" poolStatusKey = "pool_status" poolStatusCreated poolStatus = "created" From 45676fe134348bc0123d23133a09cddd4cbaf846 Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 17 May 2023 17:01:41 +0200 Subject: [PATCH 12/19] removing removals --- share/getters/shrex.go | 8 ++++---- share/p2p/peers/manager.go | 5 +---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/share/getters/shrex.go b/share/getters/shrex.go index 2d4e5f554b..dc26d6ed7a 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -156,14 +156,14 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex return eds, nil case errors.Is(getErr, context.DeadlineExceeded), errors.Is(getErr, context.Canceled): - setStatus(peers.ResultRemovePeer) + setStatus(peers.ResultCooldownPeer) case errors.Is(getErr, p2p.ErrNotFound): getErr = share.ErrNotFound setStatus(peers.ResultCooldownPeer) case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: - setStatus(peers.ResultRemovePeer) + setStatus(peers.ResultCooldownPeer) } if !ErrorContains(err, getErr) { @@ -216,14 +216,14 @@ func (sg *ShrexGetter) GetSharesByNamespace( return nd, nil case errors.Is(getErr, context.DeadlineExceeded), errors.Is(getErr, context.Canceled): - setStatus(peers.ResultRemovePeer) + setStatus(peers.ResultCooldownPeer) case errors.Is(getErr, p2p.ErrNotFound): getErr = share.ErrNotFound setStatus(peers.ResultCooldownPeer) case errors.Is(getErr, p2p.ErrInvalidResponse): setStatus(peers.ResultBlacklistPeer) default: - setStatus(peers.ResultRemovePeer) + setStatus(peers.ResultCooldownPeer) } if !ErrorContains(err, getErr) { diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index db07c2b669..c94bf78511 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -37,8 +37,6 @@ const ( // ResultBlacklistPeer will blacklist peer. Blacklisted peers will be disconnected and blocked from // any p2p communication in future by libp2p Gater ResultBlacklistPeer = "result_blacklist_peer" - // ResultRemovePeer will remove peer from peer manager pool - ResultRemovePeer = "result_remove_peer" // eventbusBufSize is the size of the buffered channel to handle // events in libp2p @@ -280,8 +278,6 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS return } m.getOrCreatePool(datahash.String()).putOnCooldown(peerID) - case ResultRemovePeer: - m.removeFromPool(m.fullNodes, peerID) case ResultBlacklistPeer: m.blacklistPeers(reasonMisbehave, peerID) } @@ -548,6 +544,7 @@ func (m *Manager) markPoolAsSynced(datahash string) { atomic.StorePointer(old, unsafe.Pointer(newPool(time.Second))) } } + func (p *syncPool) add(peers ...peer.ID) { if !p.isSynced.Load() { p.pool.add(peers...) From 8f95e5718a35f6dcc4ef75631f66abe0a5e49ebb Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 17 May 2023 17:30:29 +0200 Subject: [PATCH 13/19] adding comments, removing final restriction --- share/p2p/peers/manager.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index c94bf78511..aece12ef48 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -71,7 +71,8 @@ type Manager struct { // hashes that are not in the chain blacklistedHashes map[string]bool - // peers that have previously been removed + // peers that have previously been removed. It is only used for metrics and is not used to + // prevent previously removed peers from being added again removedPeers map[peer.ID]bool metrics *metrics @@ -451,9 +452,14 @@ func (m *Manager) validatedPool(hashStr string) *syncPool { log.Debugw("pool marked validated", "datahash", hashStr) // if pool is proven to be valid, add all collected peers to full nodes for _, peer := range p.peers() { - if !m.isRemovedPeer(peer) { - m.fullNodes.add(peer) + // if peer was previously removed, give it another chance + if m.isRemovedPeer(peer) { + m.lock.Lock() + m.removedPeers[peer] = false + m.lock.Unlock() } + + m.fullNodes.add(peer) } } return p From 3ffd59715ff79a67dd6e5342d659b88d2b3bd909 Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 17 May 2023 18:30:37 +0200 Subject: [PATCH 14/19] cleaning up logic --- share/p2p/peers/manager.go | 42 +++++++++++--------------------------- 1 file changed, 12 insertions(+), 30 deletions(-) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index aece12ef48..71b7e25bc5 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -136,13 +136,11 @@ func NewManager( log.Debugw("got blacklisted peer from discovery", "peer", peerID) return } - if s.isRemovedPeer(peerID) { - s.lock.Lock() - delete(s.removedPeers, peerID) - s.lock.Unlock() - } - log.Debugw("added to full nodes", "peer", peerID) + s.lock.Lock() + delete(s.removedPeers, peerID) + s.lock.Unlock() s.fullNodes.add(peerID) + log.Debugw("added to full nodes", "peer", peerID) return } @@ -328,11 +326,7 @@ func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subs peer := connStatus.Peer if m.fullNodes.has(peer) { log.Debugw("peer disconnected, removing from full nodes", "peer", peer) - // we do not call discovery.Discard here because discovery handles disconnections from its own peers itself - m.fullNodes.remove(peer) - m.lock.Lock() - m.removedPeers[peer] = true - m.lock.Unlock() + m.removeFromPool(m.fullNodes, peer) } } } @@ -378,13 +372,12 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif p.headerHeight.Store(msg.Height) logger.Debugw("got hash from shrex-sub") + m.lock.Lock() + delete(m.removedPeers, peerID) + m.lock.Unlock() p.add(peerID) if p.isValidatedDataHash.Load() { // add peer to full nodes pool only if datahash has been already validated - // if they were previously removed, give them another chance - m.lock.Lock() - delete(m.removedPeers, peerID) - m.lock.Unlock() m.fullNodes.add(peerID) } return pubsub.ValidationIgnore @@ -440,12 +433,6 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool { return m.blacklistedHashes[hash.String()] } -func (m *Manager) isRemovedPeer(peerID peer.ID) bool { - m.lock.Lock() - defer m.lock.Unlock() - return m.removedPeers[peerID] -} - func (m *Manager) validatedPool(hashStr string) *syncPool { p := m.getOrCreatePool(hashStr) if p.isValidatedDataHash.CompareAndSwap(false, true) { @@ -453,12 +440,9 @@ func (m *Manager) validatedPool(hashStr string) *syncPool { // if pool is proven to be valid, add all collected peers to full nodes for _, peer := range p.peers() { // if peer was previously removed, give it another chance - if m.isRemovedPeer(peer) { - m.lock.Lock() - m.removedPeers[peer] = false - m.lock.Unlock() - } - + m.lock.Lock() + delete(m.removedPeers, peer) + m.lock.Unlock() m.fullNodes.add(peer) } } @@ -536,9 +520,7 @@ func (m *Manager) removeFromPool(pool *pool, peerID peer.ID) { m.removedPeers[peerID] = true m.lock.Unlock() - if !m.disc.Discard(peerID) { - pool.remove(peerID) - } + pool.remove(peerID) } func (m *Manager) markPoolAsSynced(datahash string) { From 7b66b0b4525f8497bc1ac88cbc245574c5563beb Mon Sep 17 00:00:00 2001 From: Ryan Date: Fri, 19 May 2023 10:23:26 +0200 Subject: [PATCH 15/19] removing metric --- share/p2p/peers/manager.go | 35 +++++------------------------------ share/p2p/peers/metrics.go | 10 ---------- 2 files changed, 5 insertions(+), 40 deletions(-) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 71b7e25bc5..97674bf039 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -71,10 +71,6 @@ type Manager struct { // hashes that are not in the chain blacklistedHashes map[string]bool - // peers that have previously been removed. It is only used for metrics and is not used to - // prevent previously removed peers from being added again - removedPeers map[peer.ID]bool - metrics *metrics headerSubDone chan struct{} @@ -120,7 +116,6 @@ func NewManager( connGater: connGater, disc: discovery, host: host, - removedPeers: make(map[peer.ID]bool), pools: make(map[string]*syncPool), blacklistedHashes: make(map[string]bool), headerSubDone: make(chan struct{}), @@ -136,16 +131,13 @@ func NewManager( log.Debugw("got blacklisted peer from discovery", "peer", peerID) return } - s.lock.Lock() - delete(s.removedPeers, peerID) - s.lock.Unlock() s.fullNodes.add(peerID) log.Debugw("added to full nodes", "peer", peerID) return } log.Debugw("removing peer from discovered full nodes", "peer", peerID) - s.removeFromPool(s.fullNodes, peerID) + s.fullNodes.remove(peerID) }) return s, nil @@ -326,7 +318,7 @@ func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subs peer := connStatus.Peer if m.fullNodes.has(peer) { log.Debugw("peer disconnected, removing from full nodes", "peer", peer) - m.removeFromPool(m.fullNodes, peer) + m.fullNodes.remove(peer) } } } @@ -372,9 +364,6 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif p.headerHeight.Store(msg.Height) logger.Debugw("got hash from shrex-sub") - m.lock.Lock() - delete(m.removedPeers, peerID) - m.lock.Unlock() p.add(peerID) if p.isValidatedDataHash.Load() { // add peer to full nodes pool only if datahash has been already validated @@ -409,7 +398,7 @@ func (m *Manager) blacklistPeers(reason blacklistPeerReason, peerIDs ...peer.ID) return } for _, peerID := range peerIDs { - m.removeFromPool(m.fullNodes, peerID) + m.fullNodes.remove(peerID) // add peer to the blacklist, so we can't connect to it in the future. err := m.connGater.BlockPeer(peerID) if err != nil { @@ -438,13 +427,7 @@ func (m *Manager) validatedPool(hashStr string) *syncPool { if p.isValidatedDataHash.CompareAndSwap(false, true) { log.Debugw("pool marked validated", "datahash", hashStr) // if pool is proven to be valid, add all collected peers to full nodes - for _, peer := range p.peers() { - // if peer was previously removed, give it another chance - m.lock.Lock() - delete(m.removedPeers, peer) - m.lock.Unlock() - m.fullNodes.add(peer) - } + m.fullNodes.add(p.peers()...) } return p } @@ -453,7 +436,7 @@ func (m *Manager) validatedPool(hashStr string) *syncPool { func (m *Manager) removeUnreachable(pool *syncPool, peerID peer.ID) bool { if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { log.Debugw("removing outdated peer from pool", "peer", peerID.String()) - m.removeFromPool(pool.pool, peerID) + pool.remove(peerID) return true } return false @@ -515,14 +498,6 @@ func (m *Manager) cleanUp() []peer.ID { return blacklist } -func (m *Manager) removeFromPool(pool *pool, peerID peer.ID) { - m.lock.Lock() - m.removedPeers[peerID] = true - m.lock.Unlock() - - pool.remove(peerID) -} - func (m *Manager) markPoolAsSynced(datahash string) { p := m.getOrCreatePool(datahash) if p.isSynced.CompareAndSwap(false, true) { diff --git a/share/p2p/peers/metrics.go b/share/p2p/peers/metrics.go index 17c63d45bc..bf4d544d9f 100644 --- a/share/p2p/peers/metrics.go +++ b/share/p2p/peers/metrics.go @@ -73,7 +73,6 @@ type metrics struct { shrexPools asyncint64.Gauge // attributes: pool_status fullNodesPool asyncint64.Gauge // attributes: pool_status blacklistedPeersByReason sync.Map - removedPeers asyncint64.Gauge blacklistedPeers asyncint64.Gauge // attributes: blacklist_reason } @@ -114,12 +113,6 @@ func initMetrics(manager *Manager) (*metrics, error) { return nil, err } - removedPeers, err := meter.AsyncInt64().Gauge("peer_manager_removed_peers_gauge", - instrument.WithDescription("removed peers amount")) - if err != nil { - return nil, err - } - fullNodesPool, err := meter.AsyncInt64().Gauge("peer_manager_full_nodes_gauge", instrument.WithDescription("full nodes pool peers amount")) if err != nil { @@ -138,7 +131,6 @@ func initMetrics(manager *Manager) (*metrics, error) { doneResult: doneResult, validationResult: validationResult, shrexPools: shrexPools, - removedPeers: removedPeers, fullNodesPool: fullNodesPool, getPeerPoolSizeHistogram: getPeerPoolSizeHistogram, blacklistedPeers: blacklisted, @@ -161,8 +153,6 @@ func initMetrics(manager *Manager) (*metrics, error) { fullNodesPool.Observe(ctx, int64(manager.fullNodes.cooldown.len()), attribute.String(peerStatusKey, string(peerStatusCooldown))) - removedPeers.Observe(ctx, int64(len(manager.removedPeers))) - metrics.blacklistedPeersByReason.Range(func(key, value any) bool { reason := key.(blacklistPeerReason) amount := value.(int) From 71c956f29ba84631f679b3ad1d65cddf9a455062 Mon Sep 17 00:00:00 2001 From: Ryan Date: Fri, 19 May 2023 10:26:36 +0200 Subject: [PATCH 16/19] addressing review comments --- share/availability/discovery/discovery.go | 1 - share/p2p/peers/manager.go | 24 ++++++++++++----------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index 36fa1201dc..23489c3147 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -134,7 +134,6 @@ func (d *Discovery) Discard(id peer.ID) bool { d.connector.Backoff(id) d.set.Remove(id) d.onUpdatedPeers(id, false) - d.host.Network().ClosePeer(id) //nolint:errcheck log.Debugw("removed peer from the peer set", "peer", id) if d.set.Size() < d.set.Limit() { diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 97674bf039..b3f88db1c1 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -178,14 +178,16 @@ func (m *Manager) Start(startCtx context.Context) error { func (m *Manager) Stop(ctx context.Context) error { m.cancel() - select { - case <-m.headerSubDone: - return nil - case <-m.disconnectedPeersDone: - return nil - case <-ctx.Done(): - return ctx.Err() + for i := 0; i < 2; i++ { + select { + case <-m.headerSubDone: + case <-m.disconnectedPeersDone: + case <-ctx.Done(): + return ctx.Err() + } } + + return nil } // Peer returns peer collected from shrex.Sub for given datahash if any available. @@ -201,7 +203,7 @@ func (m *Manager) Peer( // first, check if a peer is available for the given datahash peerID, ok := p.tryGet() if ok { - if m.removeUnreachable(p, peerID) { + if m.removeIfUnreachable(p, peerID) { return m.Peer(ctx, datahash) } log.Debugw("get peer from shrexsub pool", @@ -220,7 +222,7 @@ func (m *Manager) Peer( start := time.Now() select { case peerID = <-p.next(ctx): - if m.removeUnreachable(p, peerID) { + if m.removeIfUnreachable(p, peerID) { return m.Peer(ctx, datahash) } log.Debugw("got peer from shrexSub pool after wait", @@ -432,8 +434,8 @@ func (m *Manager) validatedPool(hashStr string) *syncPool { return p } -// removeUnreachable removes peer from some pool if it is blacklisted or disconnected -func (m *Manager) removeUnreachable(pool *syncPool, peerID peer.ID) bool { +// removeIfUnreachable removes peer from some pool if it is blacklisted or disconnected +func (m *Manager) removeIfUnreachable(pool *syncPool, peerID peer.ID) bool { if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) { log.Debugw("removing outdated peer from pool", "peer", peerID.String()) pool.remove(peerID) From f63b30c7250347f662bfd3cef882cb12977d720f Mon Sep 17 00:00:00 2001 From: Ryan Date: Fri, 19 May 2023 10:38:57 +0200 Subject: [PATCH 17/19] fixing channel close --- share/p2p/peers/manager.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index b3f88db1c1..9a0d5b0afd 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -279,6 +279,9 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS // subscribeHeader takes datahash from received header and validates corresponding peer pool. func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscription[*header.ExtendedHeader]) { + defer func() { + m.headerSubDone <- struct{}{} + }() defer close(m.headerSubDone) defer headerSub.Cancel() @@ -303,7 +306,9 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri // subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected // peers from full nodes pool func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) { - defer close(m.disconnectedPeersDone) + defer func() { + m.disconnectedPeersDone <- struct{}{} + }() defer sub.Close() for { select { From a4e998a61694eb27261ad6bbb921f3d2ae3558b9 Mon Sep 17 00:00:00 2001 From: Ryan Date: Fri, 19 May 2023 10:55:17 +0200 Subject: [PATCH 18/19] removing forgotten line --- share/p2p/peers/manager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 9a0d5b0afd..a85348a071 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -282,7 +282,6 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri defer func() { m.headerSubDone <- struct{}{} }() - defer close(m.headerSubDone) defer headerSub.Cancel() for { From a55bf2098b654a38375848335487545ae06f31b5 Mon Sep 17 00:00:00 2001 From: Ryan Date: Fri, 19 May 2023 14:09:14 +0200 Subject: [PATCH 19/19] review fixes --- share/p2p/peers/manager.go | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index a85348a071..46cb9123f6 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -178,13 +178,16 @@ func (m *Manager) Start(startCtx context.Context) error { func (m *Manager) Stop(ctx context.Context) error { m.cancel() - for i := 0; i < 2; i++ { - select { - case <-m.headerSubDone: - case <-m.disconnectedPeersDone: - case <-ctx.Done(): - return ctx.Err() - } + select { + case <-m.headerSubDone: + case <-ctx.Done(): + return ctx.Err() + } + + select { + case <-m.disconnectedPeersDone: + case <-ctx.Done(): + return ctx.Err() } return nil @@ -206,8 +209,6 @@ func (m *Manager) Peer( if m.removeIfUnreachable(p, peerID) { return m.Peer(ctx, datahash) } - log.Debugw("get peer from shrexsub pool", - "peer", peerID.String(), "hash", datahash.String()) return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0) } @@ -225,8 +226,6 @@ func (m *Manager) Peer( if m.removeIfUnreachable(p, peerID) { return m.Peer(ctx, datahash) } - log.Debugw("got peer from shrexSub pool after wait", - "after (s)", time.Since(start), "peer", peerID.String(), "hash", datahash.String()) return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) case peerID = <-m.fullNodes.next(ctx): return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start)) @@ -279,9 +278,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS // subscribeHeader takes datahash from received header and validates corresponding peer pool. func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscription[*header.ExtendedHeader]) { - defer func() { - m.headerSubDone <- struct{}{} - }() + defer close(m.headerSubDone) defer headerSub.Cancel() for { @@ -305,9 +302,7 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri // subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected // peers from full nodes pool func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) { - defer func() { - m.disconnectedPeersDone <- struct{}{} - }() + defer close(m.disconnectedPeersDone) defer sub.Close() for { select { @@ -315,7 +310,7 @@ func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subs return case e, ok := <-sub.Out(): if !ok { - log.Error("Subscription for connectedness events is closed.") + log.Fatal("Subscription for connectedness events is closed.") //nolint:gocritic return } // listen to disconnect event to remove peer from full nodes pool