Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(share/shrex): improve shrex logging #2101

Merged
merged 4 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ require (
go.opentelemetry.io/otel/trace v1.13.0
go.opentelemetry.io/proto/otlp v0.19.0
go.uber.org/fx v1.19.2
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.7.0
golang.org/x/sync v0.1.0
golang.org/x/text v0.8.0
Expand Down Expand Up @@ -306,7 +307,6 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/dig v1.16.1 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.8.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions share/getters/shrex.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
peer, setStatus, getErr := sg.peerManager.Peer(ctx, root.Hash())
if getErr != nil {
err = errors.Join(err, getErr)
log.Debugw("couldn't find peer",
"datahash", root.String(),
log.Debugw("eds: couldn't find peer",
"hash", root.String(),
"err", getErr,
"finished (s)", time.Since(start))
return nil, fmt.Errorf("getter/shrex: %w", err)
Expand Down Expand Up @@ -101,8 +101,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex
if !ErrorContains(err, getErr) {
err = errors.Join(err, getErr)
}
log.Debugw("request failed",
"datahash", root.String(),
log.Debugw("eds: request failed",
"hash", root.String(),
"peer", peer.String(),
"attempt", attempt,
"err", getErr,
Expand All @@ -125,8 +125,8 @@ func (sg *ShrexGetter) GetSharesByNamespace(
peer, setStatus, getErr := sg.peerManager.Peer(ctx, root.Hash())
if getErr != nil {
err = errors.Join(err, getErr)
log.Debugw("couldn't find peer",
"datahash", root.String(),
log.Debugw("nd: couldn't find peer",
"hash", root.String(),
"err", getErr,
"finished (s)", time.Since(start))
return nil, fmt.Errorf("getter/shrex: %w", err)
Expand Down Expand Up @@ -154,8 +154,8 @@ func (sg *ShrexGetter) GetSharesByNamespace(
if !ErrorContains(err, getErr) {
err = errors.Join(err, getErr)
}
log.Debugw("request failed",
"datahash", root.String(),
log.Debugw("nd: request failed",
"hash", root.String(),
"peer", peer.String(),
"attempt", attempt,
"err", getErr,
Expand Down
2 changes: 1 addition & 1 deletion share/p2p/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func RateLimitMiddleware(inner network.StreamHandler, concurrencyLimit int) netw
log.Debug("concurrency limit reached")
err := stream.Close()
if err != nil {
log.Errorw("server: closing stream", "err", err)
log.Debugw("server: closing stream", "err", err)
}
return
}
Expand Down
44 changes: 27 additions & 17 deletions share/p2p/peers/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,38 +174,49 @@ func (m *Manager) Stop(ctx context.Context) error {
func (m *Manager) Peer(
ctx context.Context, datahash share.DataHash,
) (peer.ID, DoneFunc, error) {
logger := log.With("hash", datahash.String())
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
p := m.validatedPool(datahash.String())

// first, check if a peer is available for the given datahash
peerID, ok := p.tryGet()
if ok {
// some pools could still have blacklisted peers in storage
if m.isBlacklistedPeer(peerID) {
log.Debugw("removing blacklisted peer from pool", "hash", datahash.String(),
logger.Debugw("removing blacklisted peer from pool",
"peer", peerID.String())
p.remove(peerID)
return m.Peer(ctx, datahash)
}
log.Debugw("returning shrex-sub peer", "hash", datahash.String(),
"peer", peerID.String())
logger.Debugw("get peer from shrexsub pool",
"peer", peerID.String(),
"pool_size", p.size())
return peerID, m.doneFunc(datahash, peerID, false), nil
}

// if no peer for datahash is currently available, try to use full node
// obtained from discovery
peerID, ok = m.fullNodes.tryGet()
if ok {
log.Debugw("got peer from full nodes discovery pool", "peer", peerID, "datahash", datahash.String())
logger.Debugw("got peer from full nodes pool",
"peer", peerID.String(),
"pool_size", m.fullNodes.size())
return peerID, m.doneFunc(datahash, peerID, true), nil
}

// no peers are available right now, wait for the first one
start := time.Now()
select {
case peerID = <-p.next(ctx):
log.Debugw("got peer from shrexSub pool after wait", "peer", peerID, "datahash", datahash.String())
logger.Debugw("got peer from shrexSub pool after wait",
"peer", peerID.String(),
"pool_size", p.size(),
"after (s)", time.Since(start))
return peerID, m.doneFunc(datahash, peerID, false), nil
case peerID = <-m.fullNodes.next(ctx):
log.Debugw("got peer from discovery pool after wait", "peer", peerID, "datahash", datahash.String())
logger.Debugw("got peer from full nodes pool after wait",
"peer", peerID.String(),
"pool_size", m.fullNodes.size(),
"after (s)", time.Since(start))
return peerID, m.doneFunc(datahash, peerID, true), nil
case <-ctx.Done():
return "", nil, ctx.Err()
Expand All @@ -216,7 +227,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, fromFull boo
return func(result result) {
log.Debugw("set peer status",
"peer", peerID,
"datahash", datahash.String(),
"hash", datahash.String(),
"result", result)
switch result {
case ResultSuccess:
Expand Down Expand Up @@ -258,28 +269,27 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri

// Validate will collect peer.ID into corresponding peer pool
func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult {
logger := log.With("peer", peerID, "hash", msg.DataHash.String())

// messages broadcast from self should bypass the validation with Accept
if peerID == m.host.ID() {
log.Debugw("received datahash from self", "datahash", msg.DataHash.String())
logger.Debug("received datahash from self")
return pubsub.ValidationAccept
}

// punish peer for sending invalid hash if it has misbehaved in the past
if m.isBlacklistedHash(msg.DataHash) {
log.Debugw("received blacklisted hash, reject validation", "peer", peerID, "datahash", msg.DataHash.String())
logger.Debug("received blacklisted hash, reject validation")
return pubsub.ValidationReject
}

if m.isBlacklistedPeer(peerID) {
log.Debugw("received message from blacklisted peer, reject validation",
"peer", peerID,
"datahash", msg.DataHash.String())
logger.Debug("received message from blacklisted peer, reject validation")
return pubsub.ValidationReject
}

if msg.Height == 0 {
log.Debugw("received message with 0 height", "peer", peerID)
logger.Debug("received message with 0 height")
return pubsub.ValidationReject
}

Expand All @@ -288,14 +298,14 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif
// if we allow pool creation for those headers, there is chance the pool will not be validated in
// time and will be false-positively trigger blacklisting of hash and all peers that sent msgs for
// that hash
log.Debugw("received message for past header", "peer", peerID, "datahash", msg.DataHash.String())
logger.Debug("received message for past header")
return pubsub.ValidationIgnore
}

p := m.getOrCreatePool(msg.DataHash.String())
p.headerHeight.Store(msg.Height)
p.add(peerID)
log.Debugw("got hash from shrex-sub", "peer", peerID, "datahash", msg.DataHash.String())
logger.Debug("got hash from shrex-sub")
return pubsub.ValidationIgnore
}

Expand Down Expand Up @@ -349,7 +359,7 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool {
func (m *Manager) validatedPool(hashStr string) *syncPool {
p := m.getOrCreatePool(hashStr)
if p.isValidatedDataHash.CompareAndSwap(false, true) {
log.Debugw("pool marked validated", "datahash", hashStr)
log.Debugw("pool marked validated", "hash", hashStr)
}
return p
}
Expand Down Expand Up @@ -391,7 +401,7 @@ func (m *Manager) cleanUp() []peer.ID {
continue
}
log.Debug("blacklisting datahash with all corresponding peers",
"datahash", h,
"hash", h,
"peer_list", p.peersList)
// blacklist hash
m.blacklistedHashes[h] = true
Expand Down
8 changes: 7 additions & 1 deletion share/p2p/peers/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const defaultCleanupThreshold = 2

// pool stores peers and provides methods for simple round-robin access.
type pool struct {
m sync.Mutex
m sync.RWMutex
peersList []peer.ID
statuses map[peer.ID]status
cooldown *timedQueue
Expand Down Expand Up @@ -194,3 +194,9 @@ func (p *pool) checkHasPeers() {
p.hasPeer = false
}
}

func (p *pool) size() int {
p.m.RLock()
defer p.m.RUnlock()
return p.activeCount
}
9 changes: 6 additions & 3 deletions share/p2p/shrexeds/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,10 @@ func (c *Client) RequestEDS(
}
}
if err != p2p.ErrNotFound {
log.Warnw("client: eds request to peer failed", "peer", peer, "hash", dataHash.String())
log.Warnw("client: eds request to peer failed",
"peer", peer,
"hash", dataHash.String(),
"err", err)
}

return nil, err
Expand All @@ -79,7 +82,7 @@ func (c *Client) doRequest(

if dl, ok := ctx.Deadline(); ok {
if err = stream.SetDeadline(dl); err != nil {
log.Debugw("error setting deadline: %s", "err", err)
log.Debugw("client: error setting deadline: %s", "err", err)
}
}

Expand All @@ -94,7 +97,7 @@ func (c *Client) doRequest(
}
err = stream.CloseWrite()
if err != nil {
log.Debugw("error closing write", "err", err)
log.Debugw("client: error closing write", "err", err)
}

// read and parse status from peer
Expand Down
39 changes: 21 additions & 18 deletions share/p2p/shrexeds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"go.uber.org/zap"

"github.com/celestiaorg/go-libp2p-messenger/serde"

Expand Down Expand Up @@ -59,12 +60,13 @@ func (s *Server) Stop(context.Context) error {
}

func (s *Server) handleStream(stream network.Stream) {
log.Debug("server: handling eds request")
logger := log.With("peer", stream.Conn().RemotePeer())
logger.Debug("server: handling eds request")

// read request from stream to get the dataHash for store lookup
req, err := s.readRequest(stream)
req, err := s.readRequest(logger, stream)
if err != nil {
log.Warnw("server: reading request from stream", "err", err)
logger.Warnw("server: reading request from stream", "err", err)
stream.Reset() //nolint:errcheck
return
}
Expand All @@ -73,10 +75,11 @@ func (s *Server) handleStream(stream network.Stream) {
hash := share.DataHash(req.Hash)
err = hash.Validate()
if err != nil {
log.Debugw("server: invalid request", "err", err)
logger.Debugw("server: invalid request", "err", err)
stream.Reset() //nolint:errcheck
return
}
logger = logger.With("hash", hash)

ctx, cancel := context.WithTimeout(s.ctx, s.params.HandleRequestTimeout)
defer cancel()
Expand All @@ -90,44 +93,44 @@ func (s *Server) handleStream(stream network.Stream) {
case errors.Is(err, eds.ErrNotFound):
status = p2p_pb.Status_NOT_FOUND
case err != nil:
log.Errorw("server: get car", "err", err)
logger.Errorw("server: get CAR", "err", err)
status = p2p_pb.Status_INTERNAL
}

// inform the client of our status
err = s.writeStatus(status, stream)
err = s.writeStatus(logger, status, stream)
if err != nil {
log.Warnw("server: writing status to stream", "err", err)
logger.Warnw("server: writing status to stream", "err", err)
stream.Reset() //nolint:errcheck
return
}
// if we cannot serve the EDS, we are already done
if status != p2p_pb.Status_OK {
err = stream.Close()
if err != nil {
log.Debugw("server: closing stream", "err", err)
logger.Debugw("server: closing stream", "err", err)
}
return
}

// start streaming the ODS to the client
err = s.writeODS(edsReader, stream)
err = s.writeODS(logger, edsReader, stream)
if err != nil {
log.Warnw("server: writing ods to stream", "hash", hash.String(), "err", err)
logger.Warnw("server: writing ods to stream", "err", err)
stream.Reset() //nolint:errcheck
return
}

err = stream.Close()
if err != nil {
log.Debugw("server: closing stream", "err", err)
logger.Debugw("server: closing stream", "err", err)
}
}

func (s *Server) readRequest(stream network.Stream) (*p2p_pb.EDSRequest, error) {
func (s *Server) readRequest(logger *zap.SugaredLogger, stream network.Stream) (*p2p_pb.EDSRequest, error) {
walldiss marked this conversation as resolved.
Show resolved Hide resolved
err := stream.SetReadDeadline(time.Now().Add(s.params.ServerReadTimeout))
if err != nil {
log.Debugw("server: set read deadline", "err", err)
logger.Debugw("server: set read deadline", "err", err)
}

req := new(p2p_pb.EDSRequest)
Expand All @@ -137,27 +140,27 @@ func (s *Server) readRequest(stream network.Stream) (*p2p_pb.EDSRequest, error)
}
err = stream.CloseRead()
if err != nil {
log.Debugw("server: closing read", "err", err)
logger.Debugw("server: closing read", "err", err)
}

return req, nil
}

func (s *Server) writeStatus(status p2p_pb.Status, stream network.Stream) error {
func (s *Server) writeStatus(logger *zap.SugaredLogger, status p2p_pb.Status, stream network.Stream) error {
err := stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout))
if err != nil {
log.Debugw("server: set write deadline", "err", err)
logger.Debugw("server: set write deadline", "err", err)
}

resp := &p2p_pb.EDSResponse{Status: status}
_, err = serde.Write(stream, resp)
return err
}

func (s *Server) writeODS(edsReader io.Reader, stream network.Stream) error {
func (s *Server) writeODS(logger *zap.SugaredLogger, edsReader io.Reader, stream network.Stream) error {
err := stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout))
if err != nil {
log.Debugw("server: set read deadline", "err", err)
logger.Debugw("server: set read deadline", "err", err)
}

odsReader, err := eds.ODSReader(edsReader)
Expand Down
4 changes: 2 additions & 2 deletions share/p2p/shrexnd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (c *Client) RequestND(
return shares, err
}
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
return nil, ctx.Err()
return nil, err
}
// some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not
// unwrap to a ctx err
Expand All @@ -69,7 +69,7 @@ func (c *Client) RequestND(
}
}
if err != p2p.ErrNotFound {
log.Warnw("client-nd: peer returned err", "peer", peer, "err", err)
log.Warnw("client-nd: peer returned err", "err", err)
}
return nil, err
}
Expand Down
Loading