Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/p2p/peer-manager): use shrexSub peers as full nodes #2105

Merged
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (d *Discovery) Discard(id peer.ID) bool {
d.connector.Backoff(id)
d.set.Remove(id)
d.onUpdatedPeers(id, false)
d.host.Network().ClosePeer(id) //nolint:errcheck
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
log.Debugw("removed peer from the peer set", "peer", id)

if d.set.Size() < d.set.Limit() {
Expand Down
176 changes: 143 additions & 33 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (

logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/net/conngater"

libhead "github.com/celestiaorg/go-header"
Expand All @@ -34,6 +37,10 @@ const (
// ResultBlacklistPeer will blacklist peer. Blacklisted peers will be disconnected and blocked from
// any p2p communication in future by libp2p Gater
ResultBlacklistPeer = "result_blacklist_peer"

// eventbusBufSize is the size of the buffered channel to handle
// events in libp2p
eventbusBufSize = 32
)

type result string
Expand All @@ -48,6 +55,7 @@ type Manager struct {
// header subscription is necessary in order to Validate the inbound eds hash
headerSub libhead.Subscriber[*header.ExtendedHeader]
shrexSub *shrexsub.PubSub
disc *discovery.Discovery
host host.Host
connGater *conngater.BasicConnectionGater

Expand All @@ -63,10 +71,15 @@ type Manager struct {
// hashes that are not in the chain
blacklistedHashes map[string]bool

// peers that have previously been removed. It is only used for metrics and is not used to
// prevent previously removed peers from being added again
removedPeers map[peer.ID]bool

metrics *metrics

cancel context.CancelFunc
done chan struct{}
headerSubDone chan struct{}
disconnectedPeersDone chan struct{}
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
cancel context.CancelFunc
}

// DoneFunc updates internal state depending on call results. Should be called once per returned
Expand Down Expand Up @@ -101,14 +114,17 @@ func NewManager(
}

s := &Manager{
params: params,
headerSub: headerSub,
shrexSub: shrexSub,
connGater: connGater,
host: host,
pools: make(map[string]*syncPool),
blacklistedHashes: make(map[string]bool),
done: make(chan struct{}),
params: params,
headerSub: headerSub,
shrexSub: shrexSub,
connGater: connGater,
disc: discovery,
host: host,
removedPeers: make(map[peer.ID]bool),
pools: make(map[string]*syncPool),
blacklistedHashes: make(map[string]bool),
headerSubDone: make(chan struct{}),
disconnectedPeersDone: make(chan struct{}),
}

s.fullNodes = newPool(s.params.PeerCooldown)
Expand All @@ -120,13 +136,18 @@ func NewManager(
log.Debugw("got blacklisted peer from discovery", "peer", peerID)
return
}
if s.isRemovedPeer(peerID) {
s.lock.Lock()
delete(s.removedPeers, peerID)
s.lock.Unlock()
}
log.Debugw("added to full nodes", "peer", peerID)
s.fullNodes.add(peerID)
return
}

log.Debugw("removing peer from discovered full nodes", "peer", peerID)
s.fullNodes.remove(peerID)
s.removeFromPool(s.fullNodes, peerID)
})

return s, nil
Expand All @@ -152,6 +173,12 @@ func (m *Manager) Start(startCtx context.Context) error {
return fmt.Errorf("subscribing to headersub: %w", err)
}

sub, err := m.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize))
if err != nil {
return fmt.Errorf("subscribing to libp2p events: %w", err)
}

go m.subscribeDisconnectedPeers(ctx, sub)
go m.subscribeHeader(ctx, headerSub)
go m.GC(ctx)

Expand All @@ -162,7 +189,9 @@ func (m *Manager) Stop(ctx context.Context) error {
m.cancel()

select {
case <-m.done:
case <-m.headerSubDone:
renaynay marked this conversation as resolved.
Show resolved Hide resolved
return nil
case <-m.disconnectedPeersDone:
renaynay marked this conversation as resolved.
Show resolved Hide resolved
return nil
case <-ctx.Done():
return ctx.Err()
Expand All @@ -182,13 +211,11 @@ func (m *Manager) Peer(
// first, check if a peer is available for the given datahash
peerID, ok := p.tryGet()
if ok {
// some pools could still have blacklisted peers in storage
if m.isBlacklistedPeer(peerID) {
log.Debugw("removing blacklisted peer from pool",
"peer", peerID.String())
p.remove(peerID)
if m.removeUnreachable(p, peerID) {
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
return m.Peer(ctx, datahash)
}
log.Debugw("get peer from shrexsub pool",
"peer", peerID.String(), "hash", datahash.String())
walldiss marked this conversation as resolved.
Show resolved Hide resolved
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0)
}

Expand All @@ -203,6 +230,11 @@ func (m *Manager) Peer(
start := time.Now()
select {
case peerID = <-p.next(ctx):
if m.removeUnreachable(p, peerID) {
return m.Peer(ctx, datahash)
}
log.Debugw("got peer from shrexSub pool after wait",
"after (s)", time.Since(start), "peer", peerID.String(), "hash", datahash.String())
walldiss marked this conversation as resolved.
Show resolved Hide resolved
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start))
case peerID = <-m.fullNodes.next(ctx):
return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start))
Expand All @@ -217,7 +249,8 @@ func (m *Manager) newPeer(
peerID peer.ID,
source peerSource,
poolSize int,
waitTime time.Duration) (peer.ID, DoneFunc, error) {
waitTime time.Duration,
) (peer.ID, DoneFunc, error) {
log.Debugw("got peer",
"hash", datahash.String(),
"peer", peerID.String(),
Expand All @@ -241,10 +274,11 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS
case ResultSynced:
m.markPoolAsSynced(datahash.String())
case ResultCooldownPeer:
m.getOrCreatePool(datahash.String()).putOnCooldown(peerID)
if source == sourceFullNodes {
m.fullNodes.putOnCooldown(peerID)
return
}
m.getOrCreatePool(datahash.String()).putOnCooldown(peerID)
case ResultBlacklistPeer:
m.blacklistPeers(reasonMisbehave, peerID)
}
Expand All @@ -253,7 +287,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS

// subscribeHeader takes datahash from received header and validates corresponding peer pool.
func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscription[*header.ExtendedHeader]) {
defer close(m.done)
defer close(m.headerSubDone)
defer headerSub.Cancel()

for {
Expand All @@ -274,6 +308,37 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri
}
}

// subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected
// peers from full nodes pool
func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) {
renaynay marked this conversation as resolved.
Show resolved Hide resolved
defer close(m.disconnectedPeersDone)
defer sub.Close()
for {
select {
case <-ctx.Done():
return
case e, ok := <-sub.Out():
if !ok {
log.Error("Subscription for connectedness events is closed.")
walldiss marked this conversation as resolved.
Show resolved Hide resolved
return
}
// listen to disconnect event to remove peer from full nodes pool
connStatus := e.(event.EvtPeerConnectednessChanged)
if connStatus.Connectedness == network.NotConnected {
peer := connStatus.Peer
if m.fullNodes.has(peer) {
log.Debugw("peer disconnected, removing from full nodes", "peer", peer)
// we do not call discovery.Discard here because discovery handles disconnections from its own peers itself
m.fullNodes.remove(peer)
m.lock.Lock()
m.removedPeers[peer] = true
m.lock.Unlock()
}
}
}
}
}

// Validate will collect peer.ID into corresponding peer pool
func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult {
logger := log.With("peer", peerID, "hash", msg.DataHash.String())
Expand Down Expand Up @@ -311,19 +376,18 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif

p := m.getOrCreatePool(msg.DataHash.String())
p.headerHeight.Store(msg.Height)
p.add(peerID)
logger.Debug("got hash from shrex-sub")
return pubsub.ValidationIgnore
}
logger.Debugw("got hash from shrex-sub")

func (m *Manager) validatedPool(datahash string) *syncPool {
p := m.getOrCreatePool(datahash)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated",
"hash", datahash,
"after (s)", time.Since(p.createdAt))
p.add(peerID)
if p.isValidatedDataHash.Load() {
// add peer to full nodes pool only if datahash has been already validated
// if they were previously removed, give them another chance
m.lock.Lock()
delete(m.removedPeers, peerID)
m.lock.Unlock()
m.fullNodes.add(peerID)
}
return p
return pubsub.ValidationIgnore
}

func (m *Manager) getOrCreatePool(datahash string) *syncPool {
Expand Down Expand Up @@ -352,11 +416,11 @@ func (m *Manager) blacklistPeers(reason blacklistPeerReason, peerIDs ...peer.ID)
return
}
for _, peerID := range peerIDs {
m.fullNodes.remove(peerID)
m.removeFromPool(m.fullNodes, peerID)
// add peer to the blacklist, so we can't connect to it in the future.
err := m.connGater.BlockPeer(peerID)
if err != nil {
log.Warnw("failed tp block peer", "peer", peerID, "err", err)
log.Warnw("failed to block peer", "peer", peerID, "err", err)
}
// close connections to peer.
err = m.host.Network().ClosePeer(peerID)
Expand All @@ -376,6 +440,41 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool {
return m.blacklistedHashes[hash.String()]
}

func (m *Manager) isRemovedPeer(peerID peer.ID) bool {
m.lock.Lock()
defer m.lock.Unlock()
return m.removedPeers[peerID]
}

func (m *Manager) validatedPool(hashStr string) *syncPool {
p := m.getOrCreatePool(hashStr)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated", "datahash", hashStr)
// if pool is proven to be valid, add all collected peers to full nodes
for _, peer := range p.peers() {
// if peer was previously removed, give it another chance
if m.isRemovedPeer(peer) {
m.lock.Lock()
m.removedPeers[peer] = false
m.lock.Unlock()
}

m.fullNodes.add(peer)
}
}
return p
}

// removeUnreachable removes peer from some pool if it is blacklisted or disconnected
func (m *Manager) removeUnreachable(pool *syncPool, peerID peer.ID) bool {
if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) {
log.Debugw("removing outdated peer from pool", "peer", peerID.String())
m.removeFromPool(pool.pool, peerID)
return true
}
return false
}

func (m *Manager) GC(ctx context.Context) {
ticker := time.NewTicker(m.params.GcInterval)
defer ticker.Stop()
Expand Down Expand Up @@ -432,6 +531,16 @@ func (m *Manager) cleanUp() []peer.ID {
return blacklist
}

func (m *Manager) removeFromPool(pool *pool, peerID peer.ID) {
m.lock.Lock()
m.removedPeers[peerID] = true
m.lock.Unlock()

if !m.disc.Discard(peerID) {
pool.remove(peerID)
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved
}

func (m *Manager) markPoolAsSynced(datahash string) {
p := m.getOrCreatePool(datahash)
if p.isSynced.CompareAndSwap(false, true) {
Expand All @@ -441,6 +550,7 @@ func (m *Manager) markPoolAsSynced(datahash string) {
atomic.StorePointer(old, unsafe.Pointer(newPool(time.Second)))
}
}

func (p *syncPool) add(peers ...peer.ID) {
if !p.isSynced.Load() {
p.pool.add(peers...)
Expand Down
16 changes: 14 additions & 2 deletions share/p2p/peers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type metrics struct {
shrexPools asyncint64.Gauge // attributes: pool_status
fullNodesPool asyncint64.Gauge // attributes: pool_status
blacklistedPeersByReason sync.Map
removedPeers asyncint64.Gauge
blacklistedPeers asyncint64.Gauge // attributes: blacklist_reason
}

Expand Down Expand Up @@ -113,6 +114,12 @@ func initMetrics(manager *Manager) (*metrics, error) {
return nil, err
}

removedPeers, err := meter.AsyncInt64().Gauge("peer_manager_removed_peers_gauge",
instrument.WithDescription("removed peers amount"))
if err != nil {
return nil, err
}

fullNodesPool, err := meter.AsyncInt64().Gauge("peer_manager_full_nodes_gauge",
instrument.WithDescription("full nodes pool peers amount"))
if err != nil {
Expand All @@ -131,6 +138,7 @@ func initMetrics(manager *Manager) (*metrics, error) {
doneResult: doneResult,
validationResult: validationResult,
shrexPools: shrexPools,
removedPeers: removedPeers,
fullNodesPool: fullNodesPool,
getPeerPoolSizeHistogram: getPeerPoolSizeHistogram,
blacklistedPeers: blacklisted,
Expand All @@ -153,6 +161,8 @@ func initMetrics(manager *Manager) (*metrics, error) {
fullNodesPool.Observe(ctx, int64(manager.fullNodes.cooldown.len()),
attribute.String(peerStatusKey, string(peerStatusCooldown)))

removedPeers.Observe(ctx, int64(len(manager.removedPeers)))

metrics.blacklistedPeersByReason.Range(func(key, value any) bool {
reason := key.(blacklistPeerReason)
amount := value.(int)
Expand All @@ -169,8 +179,10 @@ func initMetrics(manager *Manager) (*metrics, error) {
return metrics, nil
}

func (m *metrics) observeGetPeer(ctx context.Context,
source peerSource, poolSize int, waitTime time.Duration) {
func (m *metrics) observeGetPeer(
ctx context.Context,
source peerSource, poolSize int, waitTime time.Duration,
) {
if m == nil {
return
}
Expand Down
Loading
Loading