Skip to content

Commit

Permalink
use shrexSub peers as full nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss committed Apr 19, 2023
1 parent 552e811 commit b0ce602
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 6 deletions.
6 changes: 6 additions & 0 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"time"

"github.com/libp2p/go-libp2p/core/routing"

"github.com/celestiaorg/nmt/namespace"
"github.com/celestiaorg/rsmt2d"

Expand Down Expand Up @@ -92,6 +94,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
case errors.Is(getErr, p2p.ErrNotFound):
getErr = share.ErrNotFound
setStatus(peers.ResultCooldownPeer)
case errors.Is(err, routing.ErrNotFound):
setStatus(peers.ResultRemovePeer)
case errors.Is(getErr, p2p.ErrInvalidResponse):
setStatus(peers.ResultBlacklistPeer)
default:
Expand Down Expand Up @@ -145,6 +149,8 @@ func (sg *ShrexGetter) GetSharesByNamespace(
case errors.Is(getErr, p2p.ErrNotFound):
getErr = share.ErrNotFound
setStatus(peers.ResultCooldownPeer)
case errors.Is(err, routing.ErrNotFound):
setStatus(peers.ResultRemovePeer)
case errors.Is(getErr, p2p.ErrInvalidResponse):
setStatus(peers.ResultBlacklistPeer)
default:
Expand Down
61 changes: 56 additions & 5 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (

logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
"github.com/libp2p/go-libp2p/p2p/net/conngater"

libhead "github.com/celestiaorg/go-header"
Expand All @@ -31,9 +34,15 @@ const (
// ResultCooldownPeer will put returned peer on cooldown, meaning it won't be available by Peer
// method for some time
ResultCooldownPeer
// ResultRemovePeer will remove peer from peer manager pool
ResultRemovePeer
// ResultBlacklistPeer will blacklist peer. Blacklisted peers will be disconnected and blocked from
// any p2p communication in future by libp2p Gater
ResultBlacklistPeer

// eventbusBufSize is the size of the buffered channel to handle
// events in libp2p
eventbusBufSize = 32
)

var log = logging.Logger("shrex/peer-manager")
Expand Down Expand Up @@ -61,8 +70,9 @@ type Manager struct {
// hashes that are not in the chain
blacklistedHashes map[string]bool

cancel context.CancelFunc
done chan struct{}
cancel context.CancelFunc
headerSubDone chan struct{}
disconnectedPeersDone chan struct{}
}

// DoneFunc updates internal state depending on call results. Should be called once per returned
Expand Down Expand Up @@ -106,7 +116,7 @@ func NewManager(
host: host,
pools: make(map[string]*syncPool),
blacklistedHashes: make(map[string]bool),
done: make(chan struct{}),
headerSubDone: make(chan struct{}),
}

s.fullNodes = newPool(s.params.PeerCooldown)
Expand Down Expand Up @@ -149,6 +159,12 @@ func (m *Manager) Start(startCtx context.Context) error {
return fmt.Errorf("subscribing to headersub: %w", err)
}

sub, err := m.host.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}, eventbus.BufSize(eventbusBufSize))
if err != nil {
return fmt.Errorf("subscribing to libp2p events: %w", err)
}

go m.subscribeDisconnectedPeers(ctx, sub)
go m.subscribeHeader(ctx, headerSub)
go m.GC(ctx)

Expand All @@ -159,7 +175,7 @@ func (m *Manager) Stop(ctx context.Context) error {
m.cancel()

select {
case <-m.done:
case <-m.headerSubDone:
return nil
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -227,6 +243,8 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, fromFull boo
if fromFull {
m.fullNodes.putOnCooldown(peerID)
}
case ResultRemovePeer:
m.fullNodes.remove(peerID)
case ResultBlacklistPeer:
m.blacklistPeers(peerID)
}
Expand All @@ -235,7 +253,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, fromFull boo

// subscribeHeader takes datahash from received header and validates corresponding peer pool.
func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscription[*header.ExtendedHeader]) {
defer close(m.done)
defer close(m.headerSubDone)
defer headerSub.Cancel()

for {
Expand All @@ -256,6 +274,33 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri
}
}

// subscribeDisconnectedPeers subscribes to libp2p connectivity events and removes disconnected
// peers from full nodes pool
func (m *Manager) subscribeDisconnectedPeers(ctx context.Context, sub event.Subscription) {
defer close(m.disconnectedPeersDone)
defer sub.Close()
for {
select {
case <-ctx.Done():
return
case e, ok := <-sub.Out():
if !ok {
log.Error("Subscription for connectedness events is closed.")
return
}
// listen to disconnect event to remove peer from full nodes pool
connStatus := e.(event.EvtPeerConnectednessChanged)
if connStatus.Connectedness == network.NotConnected {
peer := connStatus.Peer
if m.fullNodes.has(peer) {
log.Debugw("peer disconnected, removing from full nodes", "peer", peer)
m.fullNodes.remove(peer)
}
}
}
}
}

// Validate will collect peer.ID into corresponding peer pool
func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult {

Expand Down Expand Up @@ -295,6 +340,10 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif
p := m.getOrCreatePool(msg.DataHash.String())
p.headerHeight.Store(msg.Height)
p.add(peerID)
if p.isValidatedDataHash.Load() {
// add peer to full nodes pool only of datahash has been already validated
m.fullNodes.add(peerID)
}
log.Debugw("got hash from shrex-sub", "peer", peerID, "datahash", msg.DataHash.String())
return pubsub.ValidationIgnore
}
Expand Down Expand Up @@ -350,6 +399,8 @@ func (m *Manager) validatedPool(hashStr string) *syncPool {
p := m.getOrCreatePool(hashStr)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated", "datahash", hashStr)
// if pool is proven to be valid, add all collected peers to full nodes
m.fullNodes.add(p.peers()...)
}
return p
}
Expand Down
23 changes: 22 additions & 1 deletion share/p2p/peers/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const defaultCleanupThreshold = 2

// pool stores peers and provides methods for simple round-robin access.
type pool struct {
m sync.Mutex
m sync.RWMutex
peersList []peer.ID
statuses map[peer.ID]status
cooldown *timedQueue
Expand Down Expand Up @@ -139,6 +139,27 @@ func (p *pool) remove(peers ...peer.ID) {
p.checkHasPeers()
}

func (p *pool) has(peer peer.ID) bool {
p.m.RLock()
defer p.m.RUnlock()

status := p.statuses[peer]
return status != removed
}

func (p *pool) peers() []peer.ID {
p.m.RLock()
defer p.m.RUnlock()

peers := make([]peer.ID, 0, len(p.peersList))
for peer, status := range p.statuses {
if status != removed {
peers = append(peers, peer)
}
}
return peers
}

// cleanup will reduce memory footprint of pool.
func (p *pool) cleanup() {
newList := make([]peer.ID, 0, p.activeCount)
Expand Down

0 comments on commit b0ce602

Please sign in to comment.