Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/p2p/peer-manager): use shrexSub peers as full nodes #2105

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 109 additions & 36 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (

logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/net/conngater"

libhead "github.com/celestiaorg/go-header"
Expand All @@ -34,6 +37,10 @@ const (
// ResultBlacklistPeer will blacklist peer. Blacklisted peers will be disconnected and blocked from
// any p2p communication in future by libp2p Gater
ResultBlacklistPeer = "result_blacklist_peer"

// eventbusBufSize is the size of the buffered channel to handle
// events in libp2p
eventbusBufSize = 32
)

type result string
Expand All @@ -48,6 +55,7 @@ type Manager struct {
// header subscription is necessary in order to Validate the inbound eds hash
headerSub libhead.Subscriber[*header.ExtendedHeader]
shrexSub *shrexsub.PubSub
disc *discovery.Discovery
host host.Host
connGater *conngater.BasicConnectionGater

Expand All @@ -65,8 +73,9 @@ type Manager struct {

metrics *metrics

cancel context.CancelFunc
done chan struct{}
headerSubDone chan struct{}
disconnectedPeersDone chan struct{}
distractedm1nd marked this conversation as resolved.
Show resolved Hide resolved
cancel context.CancelFunc
}

// DoneFunc updates internal state depending on call results. Should be called once per returned
Expand Down Expand Up @@ -101,14 +110,16 @@ func NewManager(
}

s := &Manager{
params: params,
headerSub: headerSub,
shrexSub: shrexSub,
connGater: connGater,
host: host,
pools: make(map[string]*syncPool),
blacklistedHashes: make(map[string]bool),
done: make(chan struct{}),
params: params,
headerSub: headerSub,
shrexSub: shrexSub,
connGater: connGater,
disc: discovery,
host: host,
pools: make(map[string]*syncPool),
blacklistedHashes: make(map[string]bool),
headerSubDone: make(chan struct{}),
disconnectedPeersDone: make(chan struct{}),
}

s.fullNodes = newPool(s.params.PeerCooldown)
Expand All @@ -120,8 +131,8 @@ func NewManager(
log.Debugw("got blacklisted peer from discovery", "peer", peerID)
return
}
log.Debugw("added to full nodes", "peer", peerID)
s.fullNodes.add(peerID)
log.Debugw("added to full nodes", "peer", peerID)
return
}

Expand Down Expand Up @@ -152,6 +163,12 @@ func (m *Manager) Start(startCtx context.Context) error {
return fmt.Errorf("subscribing to headersub: %w", err)
}

sub, err := m.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize))
if err != nil {
return fmt.Errorf("subscribing to libp2p events: %w", err)
}

go m.subscribeDisconnectedPeers(ctx, sub)
go m.subscribeHeader(ctx, headerSub)
go m.GC(ctx)

Expand All @@ -161,12 +178,16 @@ func (m *Manager) Start(startCtx context.Context) error {
func (m *Manager) Stop(ctx context.Context) error {
m.cancel()

select {
case <-m.done:
return nil
case <-ctx.Done():
return ctx.Err()
for i := 0; i < 2; i++ {
select {
case <-m.headerSubDone:
case <-m.disconnectedPeersDone:
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}

// Peer returns peer collected from shrex.Sub for given datahash if any available.
Expand All @@ -182,13 +203,11 @@ func (m *Manager) Peer(
// first, check if a peer is available for the given datahash
peerID, ok := p.tryGet()
if ok {
// some pools could still have blacklisted peers in storage
if m.isBlacklistedPeer(peerID) {
log.Debugw("removing blacklisted peer from pool",
"peer", peerID.String())
p.remove(peerID)
if m.removeIfUnreachable(p, peerID) {
return m.Peer(ctx, datahash)
}
log.Debugw("get peer from shrexsub pool",
"peer", peerID.String(), "hash", datahash.String())
walldiss marked this conversation as resolved.
Show resolved Hide resolved
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0)
}

Expand All @@ -203,6 +222,11 @@ func (m *Manager) Peer(
start := time.Now()
select {
case peerID = <-p.next(ctx):
if m.removeIfUnreachable(p, peerID) {
return m.Peer(ctx, datahash)
}
log.Debugw("got peer from shrexSub pool after wait",
"after (s)", time.Since(start), "peer", peerID.String(), "hash", datahash.String())
walldiss marked this conversation as resolved.
Show resolved Hide resolved
return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start))
case peerID = <-m.fullNodes.next(ctx):
return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start))
Expand All @@ -217,7 +241,8 @@ func (m *Manager) newPeer(
peerID peer.ID,
source peerSource,
poolSize int,
waitTime time.Duration) (peer.ID, DoneFunc, error) {
waitTime time.Duration,
) (peer.ID, DoneFunc, error) {
log.Debugw("got peer",
"hash", datahash.String(),
"peer", peerID.String(),
Expand All @@ -241,10 +266,11 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS
case ResultSynced:
m.markPoolAsSynced(datahash.String())
case ResultCooldownPeer:
m.getOrCreatePool(datahash.String()).putOnCooldown(peerID)
if source == sourceFullNodes {
m.fullNodes.putOnCooldown(peerID)
return
}
m.getOrCreatePool(datahash.String()).putOnCooldown(peerID)
case ResultBlacklistPeer:
m.blacklistPeers(reasonMisbehave, peerID)
}
Expand All @@ -253,7 +279,9 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, source peerS

// subscribeHeader takes datahash from received header and validates corresponding peer pool.
func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscription[*header.ExtendedHeader]) {
defer close(m.done)
defer func() {
m.headerSubDone <- struct{}{}
}()
walldiss marked this conversation as resolved.
Show resolved Hide resolved
defer headerSub.Cancel()

for {
Expand All @@ -274,6 +302,35 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri
}
}

// subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected
// peers from full nodes pool
func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) {
renaynay marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
m.disconnectedPeersDone <- struct{}{}
}()
defer sub.Close()
for {
select {
case <-ctx.Done():
return
case e, ok := <-sub.Out():
if !ok {
log.Error("Subscription for connectedness events is closed.")
walldiss marked this conversation as resolved.
Show resolved Hide resolved
return
}
// listen to disconnect event to remove peer from full nodes pool
connStatus := e.(event.EvtPeerConnectednessChanged)
if connStatus.Connectedness == network.NotConnected {
peer := connStatus.Peer
if m.fullNodes.has(peer) {
log.Debugw("peer disconnected, removing from full nodes", "peer", peer)
m.fullNodes.remove(peer)
}
}
}
}
}

// Validate will collect peer.ID into corresponding peer pool
func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult {
logger := log.With("peer", peerID, "hash", msg.DataHash.String())
Expand Down Expand Up @@ -311,19 +368,14 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif

p := m.getOrCreatePool(msg.DataHash.String())
p.headerHeight.Store(msg.Height)
p.add(peerID)
logger.Debug("got hash from shrex-sub")
return pubsub.ValidationIgnore
}
logger.Debugw("got hash from shrex-sub")

func (m *Manager) validatedPool(datahash string) *syncPool {
p := m.getOrCreatePool(datahash)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated",
"hash", datahash,
"after (s)", time.Since(p.createdAt))
p.add(peerID)
if p.isValidatedDataHash.Load() {
// add peer to full nodes pool only if datahash has been already validated
m.fullNodes.add(peerID)
}
return p
return pubsub.ValidationIgnore
}

func (m *Manager) getOrCreatePool(datahash string) *syncPool {
Expand Down Expand Up @@ -356,7 +408,7 @@ func (m *Manager) blacklistPeers(reason blacklistPeerReason, peerIDs ...peer.ID)
// add peer to the blacklist, so we can't connect to it in the future.
err := m.connGater.BlockPeer(peerID)
if err != nil {
log.Warnw("failed tp block peer", "peer", peerID, "err", err)
log.Warnw("failed to block peer", "peer", peerID, "err", err)
}
// close connections to peer.
err = m.host.Network().ClosePeer(peerID)
Expand All @@ -376,6 +428,26 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool {
return m.blacklistedHashes[hash.String()]
}

func (m *Manager) validatedPool(hashStr string) *syncPool {
p := m.getOrCreatePool(hashStr)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated", "datahash", hashStr)
// if pool is proven to be valid, add all collected peers to full nodes
m.fullNodes.add(p.peers()...)
}
return p
}

// removeIfUnreachable removes peer from some pool if it is blacklisted or disconnected
func (m *Manager) removeIfUnreachable(pool *syncPool, peerID peer.ID) bool {
if m.isBlacklistedPeer(peerID) || !m.fullNodes.has(peerID) {
log.Debugw("removing outdated peer from pool", "peer", peerID.String())
pool.remove(peerID)
return true
}
return false
}

func (m *Manager) GC(ctx context.Context) {
ticker := time.NewTicker(m.params.GcInterval)
defer ticker.Stop()
Expand Down Expand Up @@ -441,6 +513,7 @@ func (m *Manager) markPoolAsSynced(datahash string) {
atomic.StorePointer(old, unsafe.Pointer(newPool(time.Second)))
}
}

func (p *syncPool) add(peers ...peer.ID) {
if !p.isSynced.Load() {
p.pool.add(peers...)
Expand Down
6 changes: 4 additions & 2 deletions share/p2p/peers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,10 @@ func initMetrics(manager *Manager) (*metrics, error) {
return metrics, nil
}

func (m *metrics) observeGetPeer(ctx context.Context,
source peerSource, poolSize int, waitTime time.Duration) {
func (m *metrics) observeGetPeer(
ctx context.Context,
source peerSource, poolSize int, waitTime time.Duration,
) {
if m == nil {
return
}
Expand Down
21 changes: 21 additions & 0 deletions share/p2p/peers/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,27 @@ func (p *pool) remove(peers ...peer.ID) {
p.checkHasPeers()
}

func (p *pool) has(peer peer.ID) bool {
p.m.RLock()
defer p.m.RUnlock()

status, ok := p.statuses[peer]
return ok && status != removed
}

func (p *pool) peers() []peer.ID {
p.m.RLock()
defer p.m.RUnlock()

peers := make([]peer.ID, 0, len(p.peersList))
for peer, status := range p.statuses {
if status != removed {
peers = append(peers, peer)
}
}
return peers
}

// cleanup will reduce memory footprint of pool.
func (p *pool) cleanup() {
newList := make([]peer.ID, 0, p.activeCount)
Expand Down
4 changes: 2 additions & 2 deletions share/p2p/shrexeds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ func (c *Client) doRequest(
}
_, err = serde.Read(stream, resp)
if err != nil {
// server closes the stream after returning a non-successful status
// server closes the stream here if we are rate limited
if errors.Is(err, io.EOF) {
c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound)
c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited)
return nil, p2p.ErrNotFound
}
stream.Reset() //nolint:errcheck
Expand Down
Loading