Skip to content

Commit

Permalink
refactor(share/shrex): improve shrex logging (#2101)
Browse files Browse the repository at this point in the history
## Overview
- Previously shrex-nd and shrex-eds error logs were indistinguishable,
that lead to confusion of errors source.
 - add peerID to server logs
 - Improve some other shrex logging.
 

Resolves #2035
  • Loading branch information
walldiss committed Apr 20, 2023
1 parent a98923b commit c4e994b
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 72 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ require (
go.opentelemetry.io/otel/trace v1.13.0
go.opentelemetry.io/proto/otlp v0.19.0
go.uber.org/fx v1.19.2
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.7.0
golang.org/x/sync v0.1.0
golang.org/x/text v0.8.0
Expand Down Expand Up @@ -306,7 +307,6 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.16.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.8.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
peer, setStatus, getErr := sg.peerManager.Peer(ctx, root.Hash())
if getErr != nil {
err = errors.Join(err, getErr)
log.Debugw("couldn't find peer",
"datahash", root.String(),
log.Debugw("eds: couldn't find peer",
"hash", root.String(),
"err", getErr,
"finished (s)", time.Since(start))
return nil, fmt.Errorf("getter/shrex: %w", err)
Expand Down Expand Up @@ -101,8 +101,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
if !ErrorContains(err, getErr) {
err = errors.Join(err, getErr)
}
log.Debugw("request failed",
"datahash", root.String(),
log.Debugw("eds: request failed",
"hash", root.String(),
"peer", peer.String(),
"attempt", attempt,
"err", getErr,
Expand All @@ -125,8 +125,8 @@ func (sg *ShrexGetter) GetSharesByNamespace(
peer, setStatus, getErr := sg.peerManager.Peer(ctx, root.Hash())
if getErr != nil {
err = errors.Join(err, getErr)
log.Debugw("couldn't find peer",
"datahash", root.String(),
log.Debugw("nd: couldn't find peer",
"hash", root.String(),
"err", getErr,
"finished (s)", time.Since(start))
return nil, fmt.Errorf("getter/shrex: %w", err)
Expand Down Expand Up @@ -154,8 +154,8 @@ func (sg *ShrexGetter) GetSharesByNamespace(
if !ErrorContains(err, getErr) {
err = errors.Join(err, getErr)
}
log.Debugw("request failed",
"datahash", root.String(),
log.Debugw("nd: request failed",
"hash", root.String(),
"peer", peer.String(),
"attempt", attempt,
"err", getErr,
Expand Down
2 changes: 1 addition & 1 deletion share/p2p/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func RateLimitMiddleware(inner network.StreamHandler, concurrencyLimit int) netw
log.Debug("concurrency limit reached")
err := stream.Close()
if err != nil {
log.Errorw("server: closing stream", "err", err)
log.Debugw("server: closing stream", "err", err)
}
return
}
Expand Down
44 changes: 27 additions & 17 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,38 +174,49 @@ func (m *Manager) Stop(ctx context.Context) error {
func (m *Manager) Peer(
ctx context.Context, datahash share.DataHash,
) (peer.ID, DoneFunc, error) {
logger := log.With("hash", datahash.String())
p := m.validatedPool(datahash.String())

// first, check if a peer is available for the given datahash
peerID, ok := p.tryGet()
if ok {
// some pools could still have blacklisted peers in storage
if m.isBlacklistedPeer(peerID) {
log.Debugw("removing blacklisted peer from pool", "hash", datahash.String(),
logger.Debugw("removing blacklisted peer from pool",
"peer", peerID.String())
p.remove(peerID)
return m.Peer(ctx, datahash)
}
log.Debugw("returning shrex-sub peer", "hash", datahash.String(),
"peer", peerID.String())
logger.Debugw("get peer from shrexsub pool",
"peer", peerID.String(),
"pool_size", p.size())
return peerID, m.doneFunc(datahash, peerID, false), nil
}

// if no peer for datahash is currently available, try to use full node
// obtained from discovery
peerID, ok = m.fullNodes.tryGet()
if ok {
log.Debugw("got peer from full nodes discovery pool", "peer", peerID, "datahash", datahash.String())
logger.Debugw("got peer from full nodes pool",
"peer", peerID.String(),
"pool_size", m.fullNodes.size())
return peerID, m.doneFunc(datahash, peerID, true), nil
}

// no peers are available right now, wait for the first one
start := time.Now()
select {
case peerID = <-p.next(ctx):
log.Debugw("got peer from shrexSub pool after wait", "peer", peerID, "datahash", datahash.String())
logger.Debugw("got peer from shrexSub pool after wait",
"peer", peerID.String(),
"pool_size", p.size(),
"after (s)", time.Since(start))
return peerID, m.doneFunc(datahash, peerID, false), nil
case peerID = <-m.fullNodes.next(ctx):
log.Debugw("got peer from discovery pool after wait", "peer", peerID, "datahash", datahash.String())
logger.Debugw("got peer from full nodes pool after wait",
"peer", peerID.String(),
"pool_size", m.fullNodes.size(),
"after (s)", time.Since(start))
return peerID, m.doneFunc(datahash, peerID, true), nil
case <-ctx.Done():
return "", nil, ctx.Err()
Expand All @@ -216,7 +227,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, fromFull boo
return func(result result) {
log.Debugw("set peer status",
"peer", peerID,
"datahash", datahash.String(),
"hash", datahash.String(),
"result", result)
switch result {
case ResultSuccess:
Expand Down Expand Up @@ -258,28 +269,27 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri

// Validate will collect peer.ID into corresponding peer pool
func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult {
logger := log.With("peer", peerID, "hash", msg.DataHash.String())

// messages broadcast from self should bypass the validation with Accept
if peerID == m.host.ID() {
log.Debugw("received datahash from self", "datahash", msg.DataHash.String())
logger.Debug("received datahash from self")
return pubsub.ValidationAccept
}

// punish peer for sending invalid hash if it has misbehaved in the past
if m.isBlacklistedHash(msg.DataHash) {
log.Debugw("received blacklisted hash, reject validation", "peer", peerID, "datahash", msg.DataHash.String())
logger.Debug("received blacklisted hash, reject validation")
return pubsub.ValidationReject
}

if m.isBlacklistedPeer(peerID) {
log.Debugw("received message from blacklisted peer, reject validation",
"peer", peerID,
"datahash", msg.DataHash.String())
logger.Debug("received message from blacklisted peer, reject validation")
return pubsub.ValidationReject
}

if msg.Height == 0 {
log.Debugw("received message with 0 height", "peer", peerID)
logger.Debug("received message with 0 height")
return pubsub.ValidationReject
}

Expand All @@ -288,14 +298,14 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif
// if we allow pool creation for those headers, there is chance the pool will not be validated in
// time and will be false-positively trigger blacklisting of hash and all peers that sent msgs for
// that hash
log.Debugw("received message for past header", "peer", peerID, "datahash", msg.DataHash.String())
logger.Debug("received message for past header")
return pubsub.ValidationIgnore
}

p := m.getOrCreatePool(msg.DataHash.String())
p.headerHeight.Store(msg.Height)
p.add(peerID)
log.Debugw("got hash from shrex-sub", "peer", peerID, "datahash", msg.DataHash.String())
logger.Debug("got hash from shrex-sub")
return pubsub.ValidationIgnore
}

Expand Down Expand Up @@ -349,7 +359,7 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool {
func (m *Manager) validatedPool(hashStr string) *syncPool {
p := m.getOrCreatePool(hashStr)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated", "datahash", hashStr)
log.Debugw("pool marked validated", "hash", hashStr)
}
return p
}
Expand Down Expand Up @@ -391,7 +401,7 @@ func (m *Manager) cleanUp() []peer.ID {
continue
}
log.Debug("blacklisting datahash with all corresponding peers",
"datahash", h,
"hash", h,
"peer_list", p.peersList)
// blacklist hash
m.blacklistedHashes[h] = true
Expand Down
8 changes: 7 additions & 1 deletion share/p2p/peers/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const defaultCleanupThreshold = 2

// pool stores peers and provides methods for simple round-robin access.
type pool struct {
m sync.Mutex
m sync.RWMutex
peersList []peer.ID
statuses map[peer.ID]status
cooldown *timedQueue
Expand Down Expand Up @@ -194,3 +194,9 @@ func (p *pool) checkHasPeers() {
p.hasPeer = false
}
}

func (p *pool) size() int {
p.m.RLock()
defer p.m.RUnlock()
return p.activeCount
}
9 changes: 6 additions & 3 deletions share/p2p/shrexeds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func (c *Client) RequestEDS(
}
}
if err != p2p.ErrNotFound {
log.Warnw("client: eds request to peer failed", "peer", peer, "hash", dataHash.String())
log.Warnw("client: eds request to peer failed",
"peer", peer,
"hash", dataHash.String(),
"err", err)
}

return nil, err
Expand All @@ -79,7 +82,7 @@ func (c *Client) doRequest(

if dl, ok := ctx.Deadline(); ok {
if err = stream.SetDeadline(dl); err != nil {
log.Debugw("error setting deadline: %s", "err", err)
log.Debugw("client: error setting deadline: %s", "err", err)
}
}

Expand All @@ -94,7 +97,7 @@ func (c *Client) doRequest(
}
err = stream.CloseWrite()
if err != nil {
log.Debugw("error closing write", "err", err)
log.Debugw("client: error closing write", "err", err)
}

// read and parse status from peer
Expand Down
39 changes: 21 additions & 18 deletions share/p2p/shrexeds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"

"github.com/celestiaorg/go-libp2p-messenger/serde"

Expand Down Expand Up @@ -59,12 +60,13 @@ func (s *Server) Stop(context.Context) error {
}

func (s *Server) handleStream(stream network.Stream) {
log.Debug("server: handling eds request")
logger := log.With("peer", stream.Conn().RemotePeer())
logger.Debug("server: handling eds request")

// read request from stream to get the dataHash for store lookup
req, err := s.readRequest(stream)
req, err := s.readRequest(logger, stream)
if err != nil {
log.Warnw("server: reading request from stream", "err", err)
logger.Warnw("server: reading request from stream", "err", err)
stream.Reset() //nolint:errcheck
return
}
Expand All @@ -73,10 +75,11 @@ func (s *Server) handleStream(stream network.Stream) {
hash := share.DataHash(req.Hash)
err = hash.Validate()
if err != nil {
log.Debugw("server: invalid request", "err", err)
logger.Debugw("server: invalid request", "err", err)
stream.Reset() //nolint:errcheck
return
}
logger = logger.With("hash", hash)

ctx, cancel := context.WithTimeout(s.ctx, s.params.HandleRequestTimeout)
defer cancel()
Expand All @@ -90,44 +93,44 @@ func (s *Server) handleStream(stream network.Stream) {
case errors.Is(err, eds.ErrNotFound):
status = p2p_pb.Status_NOT_FOUND
case err != nil:
log.Errorw("server: get car", "err", err)
logger.Errorw("server: get CAR", "err", err)
status = p2p_pb.Status_INTERNAL
}

// inform the client of our status
err = s.writeStatus(status, stream)
err = s.writeStatus(logger, status, stream)
if err != nil {
log.Warnw("server: writing status to stream", "err", err)
logger.Warnw("server: writing status to stream", "err", err)
stream.Reset() //nolint:errcheck
return
}
// if we cannot serve the EDS, we are already done
if status != p2p_pb.Status_OK {
err = stream.Close()
if err != nil {
log.Debugw("server: closing stream", "err", err)
logger.Debugw("server: closing stream", "err", err)
}
return
}

// start streaming the ODS to the client
err = s.writeODS(edsReader, stream)
err = s.writeODS(logger, edsReader, stream)
if err != nil {
log.Warnw("server: writing ods to stream", "hash", hash.String(), "err", err)
logger.Warnw("server: writing ods to stream", "err", err)
stream.Reset() //nolint:errcheck
return
}

err = stream.Close()
if err != nil {
log.Debugw("server: closing stream", "err", err)
logger.Debugw("server: closing stream", "err", err)
}
}

func (s *Server) readRequest(stream network.Stream) (*p2p_pb.EDSRequest, error) {
func (s *Server) readRequest(logger *zap.SugaredLogger, stream network.Stream) (*p2p_pb.EDSRequest, error) {
err := stream.SetReadDeadline(time.Now().Add(s.params.ServerReadTimeout))
if err != nil {
log.Debugw("server: set read deadline", "err", err)
logger.Debugw("server: set read deadline", "err", err)
}

req := new(p2p_pb.EDSRequest)
Expand All @@ -137,27 +140,27 @@ func (s *Server) readRequest(stream network.Stream) (*p2p_pb.EDSRequest, error)
}
err = stream.CloseRead()
if err != nil {
log.Debugw("server: closing read", "err", err)
logger.Debugw("server: closing read", "err", err)
}

return req, nil
}

func (s *Server) writeStatus(status p2p_pb.Status, stream network.Stream) error {
func (s *Server) writeStatus(logger *zap.SugaredLogger, status p2p_pb.Status, stream network.Stream) error {
err := stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout))
if err != nil {
log.Debugw("server: set write deadline", "err", err)
logger.Debugw("server: set write deadline", "err", err)
}

resp := &p2p_pb.EDSResponse{Status: status}
_, err = serde.Write(stream, resp)
return err
}

func (s *Server) writeODS(edsReader io.Reader, stream network.Stream) error {
func (s *Server) writeODS(logger *zap.SugaredLogger, edsReader io.Reader, stream network.Stream) error {
err := stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout))
if err != nil {
log.Debugw("server: set read deadline", "err", err)
logger.Debugw("server: set read deadline", "err", err)
}

odsReader, err := eds.ODSReader(edsReader)
Expand Down
4 changes: 2 additions & 2 deletions share/p2p/shrexnd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (c *Client) RequestND(
return shares, err
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return nil, ctx.Err()
return nil, err
}
// some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not
// unwrap to a ctx err
Expand All @@ -69,7 +69,7 @@ func (c *Client) RequestND(
}
}
if err != p2p.ErrNotFound {
log.Warnw("client-nd: peer returned err", "peer", peer, "err", err)
log.Warnw("client-nd: peer returned err", "err", err)
}
return nil, err
}
Expand Down
Loading

0 comments on commit c4e994b

Please sign in to comment.