diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index 0ef19d8fba..67a30793f8 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -72,19 +72,30 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti fx.Invoke(modheader.WithMetrics), ) + samplingMetrics := fx.Options( + fx.Invoke(das.WithMetrics), + fx.Invoke(share.WithPeerManagerMetrics), + fx.Invoke(share.WithShrexClientMetrics), + fx.Invoke(share.WithShrexGetterMetrics), + ) + var opts fx.Option switch nodeType { - case node.Full, node.Light: + case node.Full: + opts = fx.Options( + baseComponents, + fx.Invoke(share.WithShrexServerMetrics), + samplingMetrics, + ) + case node.Light: opts = fx.Options( baseComponents, - fx.Invoke(das.WithMetrics), - fx.Invoke(share.WithPeerManagerMetrics), - // add more monitoring here + samplingMetrics, ) case node.Bridge: opts = fx.Options( baseComponents, - // add more monitoring here + fx.Invoke(share.WithShrexServerMetrics), ) default: panic("invalid node type") diff --git a/nodebuilder/share/config.go b/nodebuilder/share/config.go index 179035e21d..d843e78dd2 100644 --- a/nodebuilder/share/config.go +++ b/nodebuilder/share/config.go @@ -9,7 +9,6 @@ import ( type Config struct { // UseShareExchange is a flag toggling the usage of shrex protocols for blocksync. - // NOTE: This config variable only has an effect on full and bridge nodes. UseShareExchange bool // ShrExEDSParams sets shrexeds client and server configuration parameters ShrExEDSParams *shrexeds.Parameters diff --git a/nodebuilder/share/opts.go b/nodebuilder/share/opts.go index 93a9f4c4d5..dc63a4ca1e 100644 --- a/nodebuilder/share/opts.go +++ b/nodebuilder/share/opts.go @@ -1,7 +1,10 @@ package share import ( + "github.com/celestiaorg/celestia-node/share/getters" "github.com/celestiaorg/celestia-node/share/p2p/peers" + "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" + "github.com/celestiaorg/celestia-node/share/p2p/shrexnd" ) // WithPeerManagerMetrics is a utility function that is expected to be @@ -9,3 +12,25 @@ import ( func WithPeerManagerMetrics(m *peers.Manager) error { return m.WithMetrics() } + +func WithShrexClientMetrics(edsClient *shrexeds.Client, ndClient *shrexnd.Client) error { + err := edsClient.WithMetrics() + if err != nil { + return err + } + + return ndClient.WithMetrics() +} + +func WithShrexServerMetrics(edsServer *shrexeds.Server, ndServer *shrexnd.Server) error { + err := edsServer.WithMetrics() + if err != nil { + return err + } + + return ndServer.WithMetrics() +} + +func WithShrexGetterMetrics(sg *getters.ShrexGetter) error { + return sg.WithMetrics() +} diff --git a/share/getters/shrex.go b/share/getters/shrex.go index c93a138ef7..3fd7c4846c 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -6,6 +6,12 @@ import ( "fmt" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/unit" + "github.com/celestiaorg/nmt/namespace" "github.com/celestiaorg/rsmt2d" @@ -23,8 +29,61 @@ const ( // serve getEDS request for block size 256 defaultMinRequestTimeout = time.Minute // should be >= shrexeds server write timeout defaultMinAttemptsCount = 3 + metricObservationTimeout = 100 * time.Millisecond ) +var meter = global.MeterProvider().Meter("shrex/getter") + +type metrics struct { + edsAttempts syncint64.Histogram + ndAttempts syncint64.Histogram +} + +func (m *metrics) recordEDSAttempt(attemptCount int, success bool) { + if m == nil { + return + } + ctx, cancel := context.WithTimeout(context.Background(), metricObservationTimeout) + defer cancel() + m.edsAttempts.Record(ctx, int64(attemptCount), attribute.Bool("success", success)) +} + +func (m *metrics) recordNDAttempt(attemptCount int, success bool) { + if m == nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), metricObservationTimeout) + defer cancel() + m.ndAttempts.Record(ctx, int64(attemptCount), attribute.Bool("success", success)) +} + +func (sg *ShrexGetter) WithMetrics() error { + edsAttemptHistogram, err := meter.SyncInt64().Histogram( + "getters_shrex_eds_attempts_per_request", + instrument.WithUnit(unit.Dimensionless), + instrument.WithDescription("Number of attempts per shrex/eds request"), + ) + if err != nil { + return err + } + + ndAttemptHistogram, err := meter.SyncInt64().Histogram( + "getters_shrex_nd_attempts_per_request", + instrument.WithUnit(unit.Dimensionless), + instrument.WithDescription("Number of attempts per shrex/nd request"), + ) + if err != nil { + return err + } + + sg.metrics = &metrics{ + edsAttempts: edsAttemptHistogram, + ndAttempts: ndAttemptHistogram, + } + return nil +} + // ShrexGetter is a share.Getter that uses the shrex/eds and shrex/nd protocol to retrieve shares. type ShrexGetter struct { edsClient *shrexeds.Client @@ -37,6 +96,8 @@ type ShrexGetter struct { // minAttemptsCount will be used to split request timeout into multiple attempts. It will allow to // attempt multiple peers in scope of one request before context timeout is reached minAttemptsCount int + + metrics *metrics } func NewShrexGetter(edsClient *shrexeds.Client, ndClient *shrexnd.Client, peerManager *peers.Manager) *ShrexGetter { @@ -79,6 +140,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex "hash", root.String(), "err", getErr, "finished (s)", time.Since(start)) + sg.metrics.recordEDSAttempt(attempt, false) return nil, fmt.Errorf("getter/shrex: %w", err) } @@ -89,6 +151,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex switch { case getErr == nil: setStatus(peers.ResultSynced) + sg.metrics.recordEDSAttempt(attempt, true) return eds, nil case errors.Is(getErr, context.DeadlineExceeded), errors.Is(getErr, context.Canceled): @@ -135,6 +198,7 @@ func (sg *ShrexGetter) GetSharesByNamespace( "hash", root.String(), "err", getErr, "finished (s)", time.Since(start)) + sg.metrics.recordNDAttempt(attempt, false) return nil, fmt.Errorf("getter/shrex: %w", err) } @@ -145,6 +209,7 @@ func (sg *ShrexGetter) GetSharesByNamespace( switch { case getErr == nil: setStatus(peers.ResultNoop) + sg.metrics.recordNDAttempt(attempt, true) return nd, nil case errors.Is(getErr, context.DeadlineExceeded), errors.Is(getErr, context.Canceled): diff --git a/share/p2p/metrics.go b/share/p2p/metrics.go new file mode 100644 index 0000000000..4b94d8c8d5 --- /dev/null +++ b/share/p2p/metrics.go @@ -0,0 +1,72 @@ +package p2p + +import ( + "context" + "fmt" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/unit" +) + +var meter = global.MeterProvider().Meter("shrex/eds") + +var observationTimeout = 100 * time.Millisecond + +type status string + +const ( + StatusInternalErr status = "internal_err" + StatusNotFound status = "not_found" + StatusTimeout status = "timeout" + StatusSuccess status = "success" + StatusRateLimited status = "rate_limited" +) + +type Metrics struct { + totalRequestCounter syncint64.Counter +} + +// ObserveRequests increments the total number of requests sent with the given status as an attribute. +func (m *Metrics) ObserveRequests(count int64, status status) { + if m == nil { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), observationTimeout) + defer cancel() + m.totalRequestCounter.Add(ctx, count, attribute.String("status", string(status))) +} + +func InitClientMetrics(protocol string) (*Metrics, error) { + totalRequestCounter, err := meter.SyncInt64().Counter( + fmt.Sprintf("shrex_%s_client_total_requests", protocol), + instrument.WithUnit(unit.Dimensionless), + instrument.WithDescription(fmt.Sprintf("Total count of sent shrex/%s requests", protocol)), + ) + if err != nil { + return nil, err + } + + return &Metrics{ + totalRequestCounter: totalRequestCounter, + }, nil +} + +func InitServerMetrics(protocol string) (*Metrics, error) { + totalRequestCounter, err := meter.SyncInt64().Counter( + fmt.Sprintf("shrex_%s_server_total_responses", protocol), + instrument.WithUnit(unit.Dimensionless), + instrument.WithDescription(fmt.Sprintf("Total count of sent shrex/%s responses", protocol)), + ) + if err != nil { + return nil, err + } + + return &Metrics{ + totalRequestCounter: totalRequestCounter, + }, nil +} diff --git a/share/p2p/middleware.go b/share/p2p/middleware.go index 25d733f43b..df0a690af7 100644 --- a/share/p2p/middleware.go +++ b/share/p2p/middleware.go @@ -9,14 +9,33 @@ import ( var log = logging.Logger("shrex/middleware") -func RateLimitMiddleware(inner network.StreamHandler, concurrencyLimit int) network.StreamHandler { - var parallelRequests int64 - limit := int64(concurrencyLimit) +type Middleware struct { + // concurrencyLimit is the maximum number of requests that can be processed at once. + concurrencyLimit int64 + // parallelRequests is the number of requests currently being processed. + parallelRequests atomic.Int64 + // numRateLimited is the number of requests that were rate limited. + numRateLimited atomic.Int64 +} + +func NewMiddleware(concurrencyLimit int) *Middleware { + return &Middleware{ + concurrencyLimit: int64(concurrencyLimit), + } +} + +// DrainCounter returns the current value of the rate limit counter and resets it to 0. +func (m *Middleware) DrainCounter() int64 { + return m.numRateLimited.Swap(0) +} + +func (m *Middleware) RateLimitHandler(handler network.StreamHandler) network.StreamHandler { return func(stream network.Stream) { - current := atomic.AddInt64(¶llelRequests, 1) - defer atomic.AddInt64(¶llelRequests, -1) + current := m.parallelRequests.Add(1) + defer m.parallelRequests.Add(-1) - if current > limit { + if current > m.concurrencyLimit { + m.numRateLimited.Add(1) log.Debug("concurrency limit reached") err := stream.Close() if err != nil { @@ -24,6 +43,6 @@ func RateLimitMiddleware(inner network.StreamHandler, concurrencyLimit int) netw } return } - inner(stream) + handler(stream) } } diff --git a/share/p2p/shrexeds/client.go b/share/p2p/shrexeds/client.go index f6ad9d2992..dccb5ce06a 100644 --- a/share/p2p/shrexeds/client.go +++ b/share/p2p/shrexeds/client.go @@ -26,8 +26,9 @@ import ( type Client struct { params *Parameters protocolID protocol.ID + host host.Host - host host.Host + metrics *p2p.Metrics } // NewClient creates a new ShrEx/EDS client. @@ -53,7 +54,9 @@ func (c *Client) RequestEDS( if err == nil { return eds, nil } + log.Debugw("client: eds request to peer failed", "peer", peer, "hash", dataHash.String(), "error", err) if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + c.metrics.ObserveRequests(1, p2p.StatusTimeout) return nil, ctx.Err() } // some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not @@ -61,6 +64,7 @@ func (c *Client) RequestEDS( var ne net.Error if errors.As(err, &ne) && ne.Timeout() { if deadline, _ := ctx.Deadline(); deadline.Before(time.Now()) { + c.metrics.ObserveRequests(1, p2p.StatusTimeout) return nil, context.DeadlineExceeded } } @@ -106,6 +110,7 @@ func (c *Client) doRequest( if err != nil { // server is overloaded and closed the stream if errors.Is(err, io.EOF) { + c.metrics.ObserveRequests(1, p2p.StatusRateLimited) return nil, p2p.ErrNotFound } stream.Reset() //nolint:errcheck @@ -119,8 +124,10 @@ func (c *Client) doRequest( if err != nil { return nil, fmt.Errorf("failed to read eds from ods bytes: %w", err) } + c.metrics.ObserveRequests(1, p2p.StatusSuccess) return eds, nil case pb.Status_NOT_FOUND: + c.metrics.ObserveRequests(1, p2p.StatusNotFound) return nil, p2p.ErrNotFound case pb.Status_INVALID: log.Debug("client: invalid request") @@ -128,6 +135,7 @@ func (c *Client) doRequest( case pb.Status_INTERNAL: fallthrough default: + c.metrics.ObserveRequests(1, p2p.StatusInternalErr) return nil, p2p.ErrInvalidResponse } } diff --git a/share/p2p/shrexeds/exchange_test.go b/share/p2p/shrexeds/exchange_test.go index 6bd971c535..b0e11e3587 100644 --- a/share/p2p/shrexeds/exchange_test.go +++ b/share/p2p/shrexeds/exchange_test.go @@ -107,8 +107,9 @@ func TestExchange_RequestEDS(t *testing.T) { t.Fatal("timeout") } } + middleware := p2p.NewMiddleware(rateLimit) server.host.SetStreamHandler(server.protocolID, - p2p.RateLimitMiddleware(mockHandler, rateLimit)) + middleware.RateLimitHandler(mockHandler)) // take server concurrency slots with blocked requests for i := 0; i < rateLimit; i++ { diff --git a/share/p2p/shrexeds/params.go b/share/p2p/shrexeds/params.go index a6ed42bd83..795cb313ed 100644 --- a/share/p2p/shrexeds/params.go +++ b/share/p2p/shrexeds/params.go @@ -34,3 +34,21 @@ func (p *Parameters) Validate() error { return p.Parameters.Validate() } + +func (c *Client) WithMetrics() error { + metrics, err := p2p.InitClientMetrics("eds") + if err != nil { + return fmt.Errorf("shrex/eds: init Metrics: %w", err) + } + c.metrics = metrics + return nil +} + +func (s *Server) WithMetrics() error { + metrics, err := p2p.InitServerMetrics("eds") + if err != nil { + return fmt.Errorf("shrex/eds: init Metrics: %w", err) + } + s.metrics = metrics + return nil +} diff --git a/share/p2p/shrexeds/server.go b/share/p2p/shrexeds/server.go index 8b0b674b88..ac246c00c1 100644 --- a/share/p2p/shrexeds/server.go +++ b/share/p2p/shrexeds/server.go @@ -30,7 +30,9 @@ type Server struct { store *eds.Store - params *Parameters + params *Parameters + middleware *p2p.Middleware + metrics *p2p.Metrics } // NewServer creates a new ShrEx/EDS server. @@ -44,12 +46,13 @@ func NewServer(params *Parameters, host host.Host, store *eds.Store) (*Server, e store: store, protocolID: p2p.ProtocolID(params.NetworkID(), protocolString), params: params, + middleware: p2p.NewMiddleware(params.ConcurrencyLimit), }, nil } func (s *Server) Start(context.Context) error { s.ctx, s.cancel = context.WithCancel(context.Background()) - s.host.SetStreamHandler(s.protocolID, p2p.RateLimitMiddleware(s.handleStream, s.params.ConcurrencyLimit)) + s.host.SetStreamHandler(s.protocolID, s.middleware.RateLimitHandler(s.handleStream)) return nil } @@ -59,10 +62,19 @@ func (s *Server) Stop(context.Context) error { return nil } +func (s *Server) observeRateLimitedRequests() { + numRateLimited := s.middleware.DrainCounter() + if numRateLimited > 0 { + s.metrics.ObserveRequests(numRateLimited, p2p.StatusRateLimited) + } +} + func (s *Server) handleStream(stream network.Stream) { logger := log.With("peer", stream.Conn().RemotePeer()) logger.Debug("server: handling eds request") + s.observeRateLimitedRequests() + // read request from stream to get the dataHash for store lookup req, err := s.readRequest(logger, stream) if err != nil { @@ -91,6 +103,7 @@ func (s *Server) handleStream(stream network.Stream) { status := p2p_pb.Status_OK switch { case errors.Is(err, eds.ErrNotFound): + s.metrics.ObserveRequests(1, p2p.StatusNotFound) status = p2p_pb.Status_NOT_FOUND case err != nil: logger.Errorw("server: get CAR", "err", err) @@ -121,6 +134,7 @@ func (s *Server) handleStream(stream network.Stream) { return } + s.metrics.ObserveRequests(1, p2p.StatusSuccess) err = stream.Close() if err != nil { logger.Debugw("server: closing stream", "err", err) diff --git a/share/p2p/shrexnd/client.go b/share/p2p/shrexnd/client.go index 89e9e76e7d..47eb742aa7 100644 --- a/share/p2p/shrexnd/client.go +++ b/share/p2p/shrexnd/client.go @@ -29,7 +29,8 @@ type Client struct { params *Parameters protocolID protocol.ID - host host.Host + host host.Host + metrics *p2p.Metrics } // NewClient creates a new shrEx/nd client @@ -58,6 +59,7 @@ func (c *Client) RequestND( return shares, err } if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { + c.metrics.ObserveRequests(1, p2p.StatusTimeout) return nil, err } // some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not @@ -65,6 +67,7 @@ func (c *Client) RequestND( var ne net.Error if errors.As(err, &ne) && ne.Timeout() { if deadline, _ := ctx.Deadline(); deadline.Before(time.Now()) { + c.metrics.ObserveRequests(1, p2p.StatusTimeout) return nil, context.DeadlineExceeded } } @@ -109,13 +112,14 @@ func (c *Client) doRequest( if err != nil { // server is overloaded and closed the stream if errors.Is(err, io.EOF) { + c.metrics.ObserveRequests(1, p2p.StatusRateLimited) return nil, p2p.ErrNotFound } stream.Reset() //nolint:errcheck return nil, fmt.Errorf("client-nd: reading response: %w", err) } - if err = statusToErr(resp.Status); err != nil { + if err = c.statusToErr(resp.Status); err != nil { return nil, fmt.Errorf("client-nd: response code is not OK: %w", err) } @@ -183,11 +187,13 @@ func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream) } } -func statusToErr(code pb.StatusCode) error { +func (c *Client) statusToErr(code pb.StatusCode) error { switch code { case pb.StatusCode_OK: + c.metrics.ObserveRequests(1, p2p.StatusSuccess) return nil case pb.StatusCode_NOT_FOUND: + c.metrics.ObserveRequests(1, p2p.StatusNotFound) return p2p.ErrNotFound case pb.StatusCode_INVALID: log.Debug("client-nd: invalid request") diff --git a/share/p2p/shrexnd/exchange_test.go b/share/p2p/shrexnd/exchange_test.go index 8542992e0e..05bca67237 100644 --- a/share/p2p/shrexnd/exchange_test.go +++ b/share/p2p/shrexnd/exchange_test.go @@ -83,8 +83,9 @@ func TestExchange_RequestND(t *testing.T) { t.Fatal("timeout") } } + middleware := p2p.NewMiddleware(rateLimit) server.host.SetStreamHandler(server.protocolID, - p2p.RateLimitMiddleware(mockHandler, rateLimit)) + middleware.RateLimitHandler(mockHandler)) // take server concurrency slots with blocked requests for i := 0; i < rateLimit; i++ { @@ -102,17 +103,20 @@ func TestExchange_RequestND(t *testing.T) { type notFoundGetter struct{} -func (m notFoundGetter) GetShare(_ context.Context, _ *share.Root, _, _ int, +func (m notFoundGetter) GetShare( + _ context.Context, _ *share.Root, _, _ int, ) (share.Share, error) { return nil, share.ErrNotFound } -func (m notFoundGetter) GetEDS(_ context.Context, _ *share.Root, +func (m notFoundGetter) GetEDS( + _ context.Context, _ *share.Root, ) (*rsmt2d.ExtendedDataSquare, error) { return nil, share.ErrNotFound } -func (m notFoundGetter) GetSharesByNamespace(_ context.Context, _ *share.Root, _ namespace.ID, +func (m notFoundGetter) GetSharesByNamespace( + _ context.Context, _ *share.Root, _ namespace.ID, ) (share.NamespacedShares, error) { return nil, share.ErrNotFound } diff --git a/share/p2p/shrexnd/params.go b/share/p2p/shrexnd/params.go index 8f2c999dfe..1acf65ba96 100644 --- a/share/p2p/shrexnd/params.go +++ b/share/p2p/shrexnd/params.go @@ -1,6 +1,8 @@ package shrexnd import ( + "fmt" + logging "github.com/ipfs/go-log/v2" "github.com/celestiaorg/celestia-node/share/p2p" @@ -16,3 +18,21 @@ type Parameters = p2p.Parameters func DefaultParameters() *Parameters { return p2p.DefaultParameters() } + +func (c *Client) WithMetrics() error { + metrics, err := p2p.InitClientMetrics("nd") + if err != nil { + return fmt.Errorf("shrex/nd: init Metrics: %w", err) + } + c.metrics = metrics + return nil +} + +func (srv *Server) WithMetrics() error { + metrics, err := p2p.InitServerMetrics("nd") + if err != nil { + return fmt.Errorf("shrex/nd: init Metrics: %w", err) + } + srv.metrics = metrics + return nil +} diff --git a/share/p2p/shrexnd/server.go b/share/p2p/shrexnd/server.go index cb71ad3053..83bbcd6b40 100644 --- a/share/p2p/shrexnd/server.go +++ b/share/p2p/shrexnd/server.go @@ -24,14 +24,17 @@ import ( // Server implements server side of shrex/nd protocol to serve namespaced share to remote // peers. type Server struct { - params *Parameters + cancel context.CancelFunc + + host host.Host protocolID protocol.ID getter share.Getter store *eds.Store - host host.Host - cancel context.CancelFunc + params *Parameters + middleware *p2p.Middleware + metrics *p2p.Metrics } // NewServer creates new Server @@ -46,6 +49,7 @@ func NewServer(params *Parameters, host host.Host, store *eds.Store, getter shar host: host, params: params, protocolID: p2p.ProtocolID(params.NetworkID(), protocolString), + middleware: p2p.NewMiddleware(params.ConcurrencyLimit), } return srv, nil @@ -59,7 +63,7 @@ func (srv *Server) Start(context.Context) error { handler := func(s network.Stream) { srv.handleNamespacedData(ctx, s) } - srv.host.SetStreamHandler(srv.protocolID, p2p.RateLimitMiddleware(handler, srv.params.ConcurrencyLimit)) + srv.host.SetStreamHandler(srv.protocolID, srv.middleware.RateLimitHandler(handler)) return nil } @@ -70,10 +74,19 @@ func (srv *Server) Stop(context.Context) error { return nil } +func (srv *Server) observeRateLimitedRequests() { + numRateLimited := srv.middleware.DrainCounter() + if numRateLimited > 0 { + srv.metrics.ObserveRequests(numRateLimited, p2p.StatusRateLimited) + } +} + func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stream) { logger := log.With("peer", stream.Conn().RemotePeer()) logger.Debug("server: handling nd request") + srv.observeRateLimitedRequests() + err := stream.SetReadDeadline(time.Now().Add(srv.params.ServerReadTimeout)) if err != nil { logger.Debugw("server: setting read deadline", "err", err) @@ -195,6 +208,14 @@ func (srv *Server) respond(logger *zap.SugaredLogger, stream network.Stream, res return } + switch { + case resp.Status == pb.StatusCode_OK: + srv.metrics.ObserveRequests(1, p2p.StatusSuccess) + case resp.Status == pb.StatusCode_NOT_FOUND: + srv.metrics.ObserveRequests(1, p2p.StatusNotFound) + case resp.Status == pb.StatusCode_INTERNAL: + srv.metrics.ObserveRequests(1, p2p.StatusInternalErr) + } if err = stream.Close(); err != nil { logger.Debugw("server: closing stream", "err", err) }