From c4e994b03fe2a451df2f0df099c24d09ce76aec6 Mon Sep 17 00:00:00 2001 From: Vlad <13818348+walldiss@users.noreply.github.com> Date: Thu, 20 Apr 2023 21:03:49 +0800 Subject: [PATCH] refactor(share/shrex): improve shrex logging (#2101) ## Overview - Previously shrex-nd and shrex-eds error logs were indistinguishable, that lead to confusion of errors source. - add peerID to server logs - Improve some other shrex logging. Resolves https://github.com/celestiaorg/celestia-node/issues/2035 --- go.mod | 2 +- share/getters/shrex.go | 16 ++++++------- share/p2p/middleware.go | 2 +- share/p2p/peers/manager.go | 44 +++++++++++++++++++++-------------- share/p2p/peers/pool.go | 8 ++++++- share/p2p/shrexeds/client.go | 9 +++++--- share/p2p/shrexeds/server.go | 39 ++++++++++++++++--------------- share/p2p/shrexnd/client.go | 4 ++-- share/p2p/shrexnd/server.go | 45 +++++++++++++++++++----------------- 9 files changed, 97 insertions(+), 72 deletions(-) diff --git a/go.mod b/go.mod index 8a05b6dd72..15557baa63 100644 --- a/go.mod +++ b/go.mod @@ -71,6 +71,7 @@ require ( go.opentelemetry.io/otel/trace v1.13.0 go.opentelemetry.io/proto/otlp v0.19.0 go.uber.org/fx v1.19.2 + go.uber.org/zap v1.24.0 golang.org/x/crypto v0.7.0 golang.org/x/sync v0.1.0 golang.org/x/text v0.8.0 @@ -306,7 +307,6 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/dig v1.16.1 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.24.0 // indirect golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.8.0 // indirect diff --git a/share/getters/shrex.go b/share/getters/shrex.go index 95f06d821e..72626f406a 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -72,8 +72,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex peer, setStatus, getErr := sg.peerManager.Peer(ctx, root.Hash()) if getErr != nil { err = errors.Join(err, getErr) - log.Debugw("couldn't find peer", - "datahash", root.String(), + log.Debugw("eds: couldn't find peer", + "hash", root.String(), "err", getErr, "finished (s)", time.Since(start)) return nil, fmt.Errorf("getter/shrex: %w", err) @@ -101,8 +101,8 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex if !ErrorContains(err, getErr) { err = errors.Join(err, getErr) } - log.Debugw("request failed", - "datahash", root.String(), + log.Debugw("eds: request failed", + "hash", root.String(), "peer", peer.String(), "attempt", attempt, "err", getErr, @@ -125,8 +125,8 @@ func (sg *ShrexGetter) GetSharesByNamespace( peer, setStatus, getErr := sg.peerManager.Peer(ctx, root.Hash()) if getErr != nil { err = errors.Join(err, getErr) - log.Debugw("couldn't find peer", - "datahash", root.String(), + log.Debugw("nd: couldn't find peer", + "hash", root.String(), "err", getErr, "finished (s)", time.Since(start)) return nil, fmt.Errorf("getter/shrex: %w", err) @@ -154,8 +154,8 @@ func (sg *ShrexGetter) GetSharesByNamespace( if !ErrorContains(err, getErr) { err = errors.Join(err, getErr) } - log.Debugw("request failed", - "datahash", root.String(), + log.Debugw("nd: request failed", + "hash", root.String(), "peer", peer.String(), "attempt", attempt, "err", getErr, diff --git a/share/p2p/middleware.go b/share/p2p/middleware.go index 355ea7c468..25d733f43b 100644 --- a/share/p2p/middleware.go +++ b/share/p2p/middleware.go @@ -20,7 +20,7 @@ func RateLimitMiddleware(inner network.StreamHandler, concurrencyLimit int) netw log.Debug("concurrency limit reached") err := stream.Close() if err != nil { - log.Errorw("server: closing stream", "err", err) + log.Debugw("server: closing stream", "err", err) } return } diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index c18e91bdb1..71774a9211 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -174,6 +174,7 @@ func (m *Manager) Stop(ctx context.Context) error { func (m *Manager) Peer( ctx context.Context, datahash share.DataHash, ) (peer.ID, DoneFunc, error) { + logger := log.With("hash", datahash.String()) p := m.validatedPool(datahash.String()) // first, check if a peer is available for the given datahash @@ -181,13 +182,14 @@ func (m *Manager) Peer( if ok { // some pools could still have blacklisted peers in storage if m.isBlacklistedPeer(peerID) { - log.Debugw("removing blacklisted peer from pool", "hash", datahash.String(), + logger.Debugw("removing blacklisted peer from pool", "peer", peerID.String()) p.remove(peerID) return m.Peer(ctx, datahash) } - log.Debugw("returning shrex-sub peer", "hash", datahash.String(), - "peer", peerID.String()) + logger.Debugw("get peer from shrexsub pool", + "peer", peerID.String(), + "pool_size", p.size()) return peerID, m.doneFunc(datahash, peerID, false), nil } @@ -195,17 +197,26 @@ func (m *Manager) Peer( // obtained from discovery peerID, ok = m.fullNodes.tryGet() if ok { - log.Debugw("got peer from full nodes discovery pool", "peer", peerID, "datahash", datahash.String()) + logger.Debugw("got peer from full nodes pool", + "peer", peerID.String(), + "pool_size", m.fullNodes.size()) return peerID, m.doneFunc(datahash, peerID, true), nil } // no peers are available right now, wait for the first one + start := time.Now() select { case peerID = <-p.next(ctx): - log.Debugw("got peer from shrexSub pool after wait", "peer", peerID, "datahash", datahash.String()) + logger.Debugw("got peer from shrexSub pool after wait", + "peer", peerID.String(), + "pool_size", p.size(), + "after (s)", time.Since(start)) return peerID, m.doneFunc(datahash, peerID, false), nil case peerID = <-m.fullNodes.next(ctx): - log.Debugw("got peer from discovery pool after wait", "peer", peerID, "datahash", datahash.String()) + logger.Debugw("got peer from full nodes pool after wait", + "peer", peerID.String(), + "pool_size", m.fullNodes.size(), + "after (s)", time.Since(start)) return peerID, m.doneFunc(datahash, peerID, true), nil case <-ctx.Done(): return "", nil, ctx.Err() @@ -216,7 +227,7 @@ func (m *Manager) doneFunc(datahash share.DataHash, peerID peer.ID, fromFull boo return func(result result) { log.Debugw("set peer status", "peer", peerID, - "datahash", datahash.String(), + "hash", datahash.String(), "result", result) switch result { case ResultSuccess: @@ -258,28 +269,27 @@ func (m *Manager) subscribeHeader(ctx context.Context, headerSub libhead.Subscri // Validate will collect peer.ID into corresponding peer pool func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notification) pubsub.ValidationResult { + logger := log.With("peer", peerID, "hash", msg.DataHash.String()) // messages broadcast from self should bypass the validation with Accept if peerID == m.host.ID() { - log.Debugw("received datahash from self", "datahash", msg.DataHash.String()) + logger.Debug("received datahash from self") return pubsub.ValidationAccept } // punish peer for sending invalid hash if it has misbehaved in the past if m.isBlacklistedHash(msg.DataHash) { - log.Debugw("received blacklisted hash, reject validation", "peer", peerID, "datahash", msg.DataHash.String()) + logger.Debug("received blacklisted hash, reject validation") return pubsub.ValidationReject } if m.isBlacklistedPeer(peerID) { - log.Debugw("received message from blacklisted peer, reject validation", - "peer", peerID, - "datahash", msg.DataHash.String()) + logger.Debug("received message from blacklisted peer, reject validation") return pubsub.ValidationReject } if msg.Height == 0 { - log.Debugw("received message with 0 height", "peer", peerID) + logger.Debug("received message with 0 height") return pubsub.ValidationReject } @@ -288,14 +298,14 @@ func (m *Manager) Validate(_ context.Context, peerID peer.ID, msg shrexsub.Notif // if we allow pool creation for those headers, there is chance the pool will not be validated in // time and will be false-positively trigger blacklisting of hash and all peers that sent msgs for // that hash - log.Debugw("received message for past header", "peer", peerID, "datahash", msg.DataHash.String()) + logger.Debug("received message for past header") return pubsub.ValidationIgnore } p := m.getOrCreatePool(msg.DataHash.String()) p.headerHeight.Store(msg.Height) p.add(peerID) - log.Debugw("got hash from shrex-sub", "peer", peerID, "datahash", msg.DataHash.String()) + logger.Debug("got hash from shrex-sub") return pubsub.ValidationIgnore } @@ -349,7 +359,7 @@ func (m *Manager) isBlacklistedHash(hash share.DataHash) bool { func (m *Manager) validatedPool(hashStr string) *syncPool { p := m.getOrCreatePool(hashStr) if p.isValidatedDataHash.CompareAndSwap(false, true) { - log.Debugw("pool marked validated", "datahash", hashStr) + log.Debugw("pool marked validated", "hash", hashStr) } return p } @@ -391,7 +401,7 @@ func (m *Manager) cleanUp() []peer.ID { continue } log.Debug("blacklisting datahash with all corresponding peers", - "datahash", h, + "hash", h, "peer_list", p.peersList) // blacklist hash m.blacklistedHashes[h] = true diff --git a/share/p2p/peers/pool.go b/share/p2p/peers/pool.go index a31c4345b7..bcf425a475 100644 --- a/share/p2p/peers/pool.go +++ b/share/p2p/peers/pool.go @@ -12,7 +12,7 @@ const defaultCleanupThreshold = 2 // pool stores peers and provides methods for simple round-robin access. type pool struct { - m sync.Mutex + m sync.RWMutex peersList []peer.ID statuses map[peer.ID]status cooldown *timedQueue @@ -194,3 +194,9 @@ func (p *pool) checkHasPeers() { p.hasPeer = false } } + +func (p *pool) size() int { + p.m.RLock() + defer p.m.RUnlock() + return p.activeCount +} diff --git a/share/p2p/shrexeds/client.go b/share/p2p/shrexeds/client.go index 4a68d689dd..330fc2c026 100644 --- a/share/p2p/shrexeds/client.go +++ b/share/p2p/shrexeds/client.go @@ -61,7 +61,10 @@ func (c *Client) RequestEDS( } } if err != p2p.ErrNotFound { - log.Warnw("client: eds request to peer failed", "peer", peer, "hash", dataHash.String()) + log.Warnw("client: eds request to peer failed", + "peer", peer, + "hash", dataHash.String(), + "err", err) } return nil, err @@ -79,7 +82,7 @@ func (c *Client) doRequest( if dl, ok := ctx.Deadline(); ok { if err = stream.SetDeadline(dl); err != nil { - log.Debugw("error setting deadline: %s", "err", err) + log.Debugw("client: error setting deadline: %s", "err", err) } } @@ -94,7 +97,7 @@ func (c *Client) doRequest( } err = stream.CloseWrite() if err != nil { - log.Debugw("error closing write", "err", err) + log.Debugw("client: error closing write", "err", err) } // read and parse status from peer diff --git a/share/p2p/shrexeds/server.go b/share/p2p/shrexeds/server.go index f7e68bb271..8b0b674b88 100644 --- a/share/p2p/shrexeds/server.go +++ b/share/p2p/shrexeds/server.go @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" + "go.uber.org/zap" "github.com/celestiaorg/go-libp2p-messenger/serde" @@ -59,12 +60,13 @@ func (s *Server) Stop(context.Context) error { } func (s *Server) handleStream(stream network.Stream) { - log.Debug("server: handling eds request") + logger := log.With("peer", stream.Conn().RemotePeer()) + logger.Debug("server: handling eds request") // read request from stream to get the dataHash for store lookup - req, err := s.readRequest(stream) + req, err := s.readRequest(logger, stream) if err != nil { - log.Warnw("server: reading request from stream", "err", err) + logger.Warnw("server: reading request from stream", "err", err) stream.Reset() //nolint:errcheck return } @@ -73,10 +75,11 @@ func (s *Server) handleStream(stream network.Stream) { hash := share.DataHash(req.Hash) err = hash.Validate() if err != nil { - log.Debugw("server: invalid request", "err", err) + logger.Debugw("server: invalid request", "err", err) stream.Reset() //nolint:errcheck return } + logger = logger.With("hash", hash) ctx, cancel := context.WithTimeout(s.ctx, s.params.HandleRequestTimeout) defer cancel() @@ -90,14 +93,14 @@ func (s *Server) handleStream(stream network.Stream) { case errors.Is(err, eds.ErrNotFound): status = p2p_pb.Status_NOT_FOUND case err != nil: - log.Errorw("server: get car", "err", err) + logger.Errorw("server: get CAR", "err", err) status = p2p_pb.Status_INTERNAL } // inform the client of our status - err = s.writeStatus(status, stream) + err = s.writeStatus(logger, status, stream) if err != nil { - log.Warnw("server: writing status to stream", "err", err) + logger.Warnw("server: writing status to stream", "err", err) stream.Reset() //nolint:errcheck return } @@ -105,29 +108,29 @@ func (s *Server) handleStream(stream network.Stream) { if status != p2p_pb.Status_OK { err = stream.Close() if err != nil { - log.Debugw("server: closing stream", "err", err) + logger.Debugw("server: closing stream", "err", err) } return } // start streaming the ODS to the client - err = s.writeODS(edsReader, stream) + err = s.writeODS(logger, edsReader, stream) if err != nil { - log.Warnw("server: writing ods to stream", "hash", hash.String(), "err", err) + logger.Warnw("server: writing ods to stream", "err", err) stream.Reset() //nolint:errcheck return } err = stream.Close() if err != nil { - log.Debugw("server: closing stream", "err", err) + logger.Debugw("server: closing stream", "err", err) } } -func (s *Server) readRequest(stream network.Stream) (*p2p_pb.EDSRequest, error) { +func (s *Server) readRequest(logger *zap.SugaredLogger, stream network.Stream) (*p2p_pb.EDSRequest, error) { err := stream.SetReadDeadline(time.Now().Add(s.params.ServerReadTimeout)) if err != nil { - log.Debugw("server: set read deadline", "err", err) + logger.Debugw("server: set read deadline", "err", err) } req := new(p2p_pb.EDSRequest) @@ -137,16 +140,16 @@ func (s *Server) readRequest(stream network.Stream) (*p2p_pb.EDSRequest, error) } err = stream.CloseRead() if err != nil { - log.Debugw("server: closing read", "err", err) + logger.Debugw("server: closing read", "err", err) } return req, nil } -func (s *Server) writeStatus(status p2p_pb.Status, stream network.Stream) error { +func (s *Server) writeStatus(logger *zap.SugaredLogger, status p2p_pb.Status, stream network.Stream) error { err := stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout)) if err != nil { - log.Debugw("server: set write deadline", "err", err) + logger.Debugw("server: set write deadline", "err", err) } resp := &p2p_pb.EDSResponse{Status: status} @@ -154,10 +157,10 @@ func (s *Server) writeStatus(status p2p_pb.Status, stream network.Stream) error return err } -func (s *Server) writeODS(edsReader io.Reader, stream network.Stream) error { +func (s *Server) writeODS(logger *zap.SugaredLogger, edsReader io.Reader, stream network.Stream) error { err := stream.SetWriteDeadline(time.Now().Add(s.params.ServerWriteTimeout)) if err != nil { - log.Debugw("server: set read deadline", "err", err) + logger.Debugw("server: set read deadline", "err", err) } odsReader, err := eds.ODSReader(edsReader) diff --git a/share/p2p/shrexnd/client.go b/share/p2p/shrexnd/client.go index 553c0ffdd9..78af04e61b 100644 --- a/share/p2p/shrexnd/client.go +++ b/share/p2p/shrexnd/client.go @@ -58,7 +58,7 @@ func (c *Client) RequestND( return shares, err } if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - return nil, ctx.Err() + return nil, err } // some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not // unwrap to a ctx err @@ -69,7 +69,7 @@ func (c *Client) RequestND( } } if err != p2p.ErrNotFound { - log.Warnw("client-nd: peer returned err", "peer", peer, "err", err) + log.Warnw("client-nd: peer returned err", "err", err) } return nil, err } diff --git a/share/p2p/shrexnd/server.go b/share/p2p/shrexnd/server.go index 5e37dec3e0..cb71ad3053 100644 --- a/share/p2p/shrexnd/server.go +++ b/share/p2p/shrexnd/server.go @@ -10,6 +10,7 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" "github.com/minio/sha256-simd" + "go.uber.org/zap" "github.com/celestiaorg/go-libp2p-messenger/serde" @@ -70,30 +71,32 @@ func (srv *Server) Stop(context.Context) error { } func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stream) { - log.Debug("server: handling nd request") + logger := log.With("peer", stream.Conn().RemotePeer()) + logger.Debug("server: handling nd request") err := stream.SetReadDeadline(time.Now().Add(srv.params.ServerReadTimeout)) if err != nil { - log.Debugw("server: setting read deadline", "err", err) + logger.Debugw("server: setting read deadline", "err", err) } var req pb.GetSharesByNamespaceRequest _, err = serde.Read(stream, &req) if err != nil { - log.Warnw("server: reading request", "err", err) + logger.Warnw("server: reading request", "err", err) stream.Reset() //nolint:errcheck return } - log.Debugw("server: new request", "namespaceId", string(req.NamespaceId), "roothash", string(req.RootHash)) + logger = logger.With("namespaceId", string(req.NamespaceId), "hash", string(req.RootHash)) + logger.Debugw("server: new request") err = stream.CloseRead() if err != nil { - log.Debugw("server: closing read side of the stream", "err", err) + logger.Debugw("server: closing read side of the stream", "err", err) } err = validateRequest(req) if err != nil { - log.Debugw("server: invalid request", "err", err) + logger.Debugw("server: invalid request", "err", err) stream.Reset() //nolint:errcheck return } @@ -104,27 +107,27 @@ func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stre dah, err := srv.store.GetDAH(ctx, req.RootHash) if err != nil { if errors.Is(err, eds.ErrNotFound) { - srv.respondNotFoundError(stream) + srv.respondNotFoundError(logger, stream) return } - log.Errorw("server: retrieving DAH for datahash", "err", err) - srv.respondInternalError(stream) + logger.Errorw("server: retrieving DAH", "err", err) + srv.respondInternalError(logger, stream) return } shares, err := srv.getter.GetSharesByNamespace(ctx, dah, req.NamespaceId) if errors.Is(err, share.ErrNotFound) { - srv.respondNotFoundError(stream) + srv.respondNotFoundError(logger, stream) return } if err != nil { - log.Errorw("server: retrieving shares", "err", err) - srv.respondInternalError(stream) + logger.Errorw("server: retrieving shares", "err", err) + srv.respondInternalError(logger, stream) return } resp := namespacedSharesToResponse(shares) - srv.respond(stream, resp) + srv.respond(logger, stream, resp) } // validateRequest checks correctness of the request @@ -140,19 +143,19 @@ func validateRequest(req pb.GetSharesByNamespaceRequest) error { } // respondNotFoundError sends internal error response to client -func (srv *Server) respondNotFoundError(stream network.Stream) { +func (srv *Server) respondNotFoundError(logger *zap.SugaredLogger, stream network.Stream) { resp := &pb.GetSharesByNamespaceResponse{ Status: pb.StatusCode_NOT_FOUND, } - srv.respond(stream, resp) + srv.respond(logger, stream, resp) } // respondInternalError sends internal error response to client -func (srv *Server) respondInternalError(stream network.Stream) { +func (srv *Server) respondInternalError(logger *zap.SugaredLogger, stream network.Stream) { resp := &pb.GetSharesByNamespaceResponse{ Status: pb.StatusCode_INTERNAL, } - srv.respond(stream, resp) + srv.respond(logger, stream, resp) } // namespacedSharesToResponse encodes shares into proto and sends it to client with OK status code @@ -179,20 +182,20 @@ func namespacedSharesToResponse(shares share.NamespacedShares) *pb.GetSharesByNa } } -func (srv *Server) respond(stream network.Stream, resp *pb.GetSharesByNamespaceResponse) { +func (srv *Server) respond(logger *zap.SugaredLogger, stream network.Stream, resp *pb.GetSharesByNamespaceResponse) { err := stream.SetWriteDeadline(time.Now().Add(srv.params.ServerWriteTimeout)) if err != nil { - log.Debugw("server: seting write deadline", "err", err) + logger.Debugw("server: setting write deadline", "err", err) } _, err = serde.Write(stream, resp) if err != nil { - log.Warnw("server: writing response", "err", err) + logger.Warnw("server: writing response", "err", err) stream.Reset() //nolint:errcheck return } if err = stream.Close(); err != nil { - log.Debugw("server: closing stream", "err", err) + logger.Debugw("server: closing stream", "err", err) } }