diff --git a/das/metrics.go b/das/metrics.go index 5a6262969b..fd72fe5e4e 100644 --- a/das/metrics.go +++ b/das/metrics.go @@ -149,6 +149,9 @@ func (m *metrics) observeSample( if m == nil { return } + if ctx.Err() != nil { + ctx = context.Background() + } m.sampleTime.Record(ctx, sampleTime.Seconds(), attribute.Bool(failedLabel, err != nil), attribute.Int(headerWidthLabel, len(h.DAH.RowsRoots)), @@ -169,6 +172,9 @@ func (m *metrics) observeGetHeader(ctx context.Context, d time.Duration) { if m == nil { return } + if ctx.Err() != nil { + ctx = context.Background() + } m.getHeaderTime.Record(ctx, d.Seconds()) } @@ -177,5 +183,8 @@ func (m *metrics) observeNewHead(ctx context.Context) { if m == nil { return } + if ctx.Err() != nil { + ctx = context.Background() + } m.newHead.Add(ctx, 1) } diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index 74ffd80188..f1db6e0797 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -248,11 +248,14 @@ func (d *Discovery) discover(ctx context.Context) bool { var wg errgroup.Group // limit to minimize chances of overreaching the limit wg.SetLimit(int(d.set.Limit())) - defer wg.Wait() //nolint:errcheck // stop discovery when we are done findCtx, findCancel := context.WithCancel(ctx) - defer findCancel() + defer func() { + // some workers could still be running, wait them to finish before canceling findCtx + wg.Wait() //nolint:errcheck + findCancel() + }() peers, err := d.disc.FindPeers(findCtx, rendezvousPoint) if err != nil { diff --git a/share/getters/shrex.go b/share/getters/shrex.go index 3fd7c4846c..99a94048f0 100644 --- a/share/getters/shrex.go +++ b/share/getters/shrex.go @@ -29,7 +29,6 @@ const ( // serve getEDS request for block size 256 defaultMinRequestTimeout = time.Minute // should be >= shrexeds server write timeout defaultMinAttemptsCount = 3 - metricObservationTimeout = 100 * time.Millisecond ) var meter = global.MeterProvider().Meter("shrex/getter") @@ -39,22 +38,23 @@ type metrics struct { ndAttempts syncint64.Histogram } -func (m *metrics) recordEDSAttempt(attemptCount int, success bool) { +func (m *metrics) recordEDSAttempt(ctx context.Context, attemptCount int, success bool) { if m == nil { return } - ctx, cancel := context.WithTimeout(context.Background(), metricObservationTimeout) - defer cancel() + if ctx.Err() != nil { + ctx = context.Background() + } m.edsAttempts.Record(ctx, int64(attemptCount), attribute.Bool("success", success)) } -func (m *metrics) recordNDAttempt(attemptCount int, success bool) { +func (m *metrics) recordNDAttempt(ctx context.Context, attemptCount int, success bool) { if m == nil { return } - - ctx, cancel := context.WithTimeout(context.Background(), metricObservationTimeout) - defer cancel() + if ctx.Err() != nil { + ctx = context.Background() + } m.ndAttempts.Record(ctx, int64(attemptCount), attribute.Bool("success", success)) } @@ -140,7 +140,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex "hash", root.String(), "err", getErr, "finished (s)", time.Since(start)) - sg.metrics.recordEDSAttempt(attempt, false) + sg.metrics.recordEDSAttempt(ctx, attempt, false) return nil, fmt.Errorf("getter/shrex: %w", err) } @@ -151,7 +151,7 @@ func (sg *ShrexGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.Ex switch { case getErr == nil: setStatus(peers.ResultSynced) - sg.metrics.recordEDSAttempt(attempt, true) + sg.metrics.recordEDSAttempt(ctx, attempt, true) return eds, nil case errors.Is(getErr, context.DeadlineExceeded), errors.Is(getErr, context.Canceled): @@ -198,7 +198,7 @@ func (sg *ShrexGetter) GetSharesByNamespace( "hash", root.String(), "err", getErr, "finished (s)", time.Since(start)) - sg.metrics.recordNDAttempt(attempt, false) + sg.metrics.recordNDAttempt(ctx, attempt, false) return nil, fmt.Errorf("getter/shrex: %w", err) } @@ -209,7 +209,7 @@ func (sg *ShrexGetter) GetSharesByNamespace( switch { case getErr == nil: setStatus(peers.ResultNoop) - sg.metrics.recordNDAttempt(attempt, true) + sg.metrics.recordNDAttempt(ctx, attempt, true) return nd, nil case errors.Is(getErr, context.DeadlineExceeded), errors.Is(getErr, context.Canceled): diff --git a/share/p2p/metrics.go b/share/p2p/metrics.go index 8f3e813c8f..87c1e2eeb0 100644 --- a/share/p2p/metrics.go +++ b/share/p2p/metrics.go @@ -3,7 +3,6 @@ package p2p import ( "context" "fmt" - "time" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/global" @@ -14,8 +13,6 @@ import ( var meter = global.MeterProvider().Meter("shrex/eds") -var observationTimeout = 100 * time.Millisecond - type status string const ( @@ -32,13 +29,13 @@ type Metrics struct { // ObserveRequests increments the total number of requests sent with the given status as an // attribute. -func (m *Metrics) ObserveRequests(count int64, status status) { +func (m *Metrics) ObserveRequests(ctx context.Context, count int64, status status) { if m == nil { return } - - ctx, cancel := context.WithTimeout(context.Background(), observationTimeout) - defer cancel() + if ctx.Err() != nil { + ctx = context.Background() + } m.totalRequestCounter.Add(ctx, count, attribute.String("status", string(status))) } diff --git a/share/p2p/peers/manager.go b/share/p2p/peers/manager.go index 7a2dcc6dc3..35aa67c6e6 100644 --- a/share/p2p/peers/manager.go +++ b/share/p2p/peers/manager.go @@ -189,29 +189,30 @@ func (m *Manager) Peer( p.remove(peerID) return m.Peer(ctx, datahash) } - return m.newPeer(datahash, peerID, sourceShrexSub, p.len(), 0) + return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), 0) } // if no peer for datahash is currently available, try to use full node // obtained from discovery peerID, ok = m.fullNodes.tryGet() if ok { - return m.newPeer(datahash, peerID, sourceFullNodes, m.fullNodes.len(), 0) + return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), 0) } // no peers are available right now, wait for the first one start := time.Now() select { case peerID = <-p.next(ctx): - return m.newPeer(datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) + return m.newPeer(ctx, datahash, peerID, sourceShrexSub, p.len(), time.Since(start)) case peerID = <-m.fullNodes.next(ctx): - return m.newPeer(datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start)) + return m.newPeer(ctx, datahash, peerID, sourceFullNodes, m.fullNodes.len(), time.Since(start)) case <-ctx.Done(): return "", nil, ctx.Err() } } func (m *Manager) newPeer( + ctx context.Context, datahash share.DataHash, peerID peer.ID, source peerSource, @@ -223,7 +224,7 @@ func (m *Manager) newPeer( "source", source, "pool_size", poolSize, "wait (s)", waitTime) - m.metrics.observeGetPeer(source, poolSize, waitTime) + m.metrics.observeGetPeer(ctx, source, poolSize, waitTime) return peerID, m.doneFunc(datahash, peerID, source), nil } diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index cdcffef4db..b35ef70e6b 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -223,7 +223,7 @@ func TestManager(t *testing.T) { stopManager(t, manager) }) - t.Run("get peer from discovery", func(t *testing.T) { + t.Run("mark pool synced", func(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) t.Cleanup(cancel) diff --git a/share/p2p/peers/metrics.go b/share/p2p/peers/metrics.go index 412cc01996..ab5bfc97b2 100644 --- a/share/p2p/peers/metrics.go +++ b/share/p2p/peers/metrics.go @@ -18,8 +18,6 @@ import ( ) const ( - observeTimeout = 100 * time.Millisecond - isInstantKey = "is_instant" doneResultKey = "done_result" @@ -171,12 +169,14 @@ func initMetrics(manager *Manager) (*metrics, error) { return metrics, nil } -func (m *metrics) observeGetPeer(source peerSource, poolSize int, waitTime time.Duration) { +func (m *metrics) observeGetPeer(ctx context.Context, + source peerSource, poolSize int, waitTime time.Duration) { if m == nil { return } - ctx, cancel := context.WithTimeout(context.Background(), observeTimeout) - defer cancel() + if ctx.Err() != nil { + ctx = context.Background() + } m.getPeer.Add(ctx, 1, attribute.String(sourceKey, string(source)), attribute.Bool(isInstantKey, waitTime == 0)) @@ -196,9 +196,8 @@ func (m *metrics) observeDoneResult(source peerSource, result result) { if m == nil { return } - ctx, cancel := context.WithTimeout(context.Background(), observeTimeout) - defer cancel() + ctx := context.Background() m.doneResult.Add(ctx, 1, attribute.String(sourceKey, string(source)), attribute.String(doneResultKey, string(result))) @@ -224,10 +223,11 @@ func (m *metrics) validationObserver(validator shrexsub.ValidatorFn) shrexsub.Va resStr = "unknown" } - observeCtx, cancel := context.WithTimeout(context.Background(), observeTimeout) - defer cancel() + if ctx.Err() != nil { + ctx = context.Background() + } - m.validationResult.Add(observeCtx, 1, + m.validationResult.Add(ctx, 1, attribute.String(validationResultKey, resStr)) return res } diff --git a/share/p2p/shrexeds/client.go b/share/p2p/shrexeds/client.go index 5f3470179e..daff83a07b 100644 --- a/share/p2p/shrexeds/client.go +++ b/share/p2p/shrexeds/client.go @@ -56,7 +56,7 @@ func (c *Client) RequestEDS( } log.Debugw("client: eds request to peer failed", "peer", peer, "hash", dataHash.String(), "error", err) if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - c.metrics.ObserveRequests(1, p2p.StatusTimeout) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout) return nil, err } // some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not @@ -64,7 +64,7 @@ func (c *Client) RequestEDS( var ne net.Error if errors.As(err, &ne) && ne.Timeout() { if deadline, _ := ctx.Deadline(); deadline.Before(time.Now()) { - c.metrics.ObserveRequests(1, p2p.StatusTimeout) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout) return nil, context.DeadlineExceeded } } @@ -110,7 +110,7 @@ func (c *Client) doRequest( if err != nil { // server is overloaded and closed the stream if errors.Is(err, io.EOF) { - c.metrics.ObserveRequests(1, p2p.StatusRateLimited) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited) return nil, p2p.ErrNotFound } stream.Reset() //nolint:errcheck @@ -124,10 +124,10 @@ func (c *Client) doRequest( if err != nil { return nil, fmt.Errorf("failed to read eds from ods bytes: %w", err) } - c.metrics.ObserveRequests(1, p2p.StatusSuccess) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess) return eds, nil case pb.Status_NOT_FOUND: - c.metrics.ObserveRequests(1, p2p.StatusNotFound) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound) return nil, p2p.ErrNotFound case pb.Status_INVALID: log.Debug("client: invalid request") @@ -135,7 +135,7 @@ func (c *Client) doRequest( case pb.Status_INTERNAL: fallthrough default: - c.metrics.ObserveRequests(1, p2p.StatusInternalErr) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusInternalErr) return nil, p2p.ErrInvalidResponse } } diff --git a/share/p2p/shrexeds/server.go b/share/p2p/shrexeds/server.go index ac246c00c1..cecf0c5cb8 100644 --- a/share/p2p/shrexeds/server.go +++ b/share/p2p/shrexeds/server.go @@ -65,7 +65,7 @@ func (s *Server) Stop(context.Context) error { func (s *Server) observeRateLimitedRequests() { numRateLimited := s.middleware.DrainCounter() if numRateLimited > 0 { - s.metrics.ObserveRequests(numRateLimited, p2p.StatusRateLimited) + s.metrics.ObserveRequests(context.Background(), numRateLimited, p2p.StatusRateLimited) } } @@ -103,7 +103,7 @@ func (s *Server) handleStream(stream network.Stream) { status := p2p_pb.Status_OK switch { case errors.Is(err, eds.ErrNotFound): - s.metrics.ObserveRequests(1, p2p.StatusNotFound) + s.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound) status = p2p_pb.Status_NOT_FOUND case err != nil: logger.Errorw("server: get CAR", "err", err) @@ -134,7 +134,7 @@ func (s *Server) handleStream(stream network.Stream) { return } - s.metrics.ObserveRequests(1, p2p.StatusSuccess) + s.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess) err = stream.Close() if err != nil { logger.Debugw("server: closing stream", "err", err) diff --git a/share/p2p/shrexnd/client.go b/share/p2p/shrexnd/client.go index 47eb742aa7..b5ae66f51f 100644 --- a/share/p2p/shrexnd/client.go +++ b/share/p2p/shrexnd/client.go @@ -59,7 +59,7 @@ func (c *Client) RequestND( return shares, err } if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - c.metrics.ObserveRequests(1, p2p.StatusTimeout) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout) return nil, err } // some net.Errors also mean the context deadline was exceeded, but yamux/mocknet do not @@ -67,7 +67,7 @@ func (c *Client) RequestND( var ne net.Error if errors.As(err, &ne) && ne.Timeout() { if deadline, _ := ctx.Deadline(); deadline.Before(time.Now()) { - c.metrics.ObserveRequests(1, p2p.StatusTimeout) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusTimeout) return nil, context.DeadlineExceeded } } @@ -112,14 +112,14 @@ func (c *Client) doRequest( if err != nil { // server is overloaded and closed the stream if errors.Is(err, io.EOF) { - c.metrics.ObserveRequests(1, p2p.StatusRateLimited) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusRateLimited) return nil, p2p.ErrNotFound } stream.Reset() //nolint:errcheck return nil, fmt.Errorf("client-nd: reading response: %w", err) } - if err = c.statusToErr(resp.Status); err != nil { + if err = c.statusToErr(ctx, resp.Status); err != nil { return nil, fmt.Errorf("client-nd: response code is not OK: %w", err) } @@ -187,13 +187,13 @@ func (c *Client) setStreamDeadlines(ctx context.Context, stream network.Stream) } } -func (c *Client) statusToErr(code pb.StatusCode) error { +func (c *Client) statusToErr(ctx context.Context, code pb.StatusCode) error { switch code { case pb.StatusCode_OK: - c.metrics.ObserveRequests(1, p2p.StatusSuccess) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess) return nil case pb.StatusCode_NOT_FOUND: - c.metrics.ObserveRequests(1, p2p.StatusNotFound) + c.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound) return p2p.ErrNotFound case pb.StatusCode_INVALID: log.Debug("client-nd: invalid request") diff --git a/share/p2p/shrexnd/server.go b/share/p2p/shrexnd/server.go index 83bbcd6b40..3355bdd759 100644 --- a/share/p2p/shrexnd/server.go +++ b/share/p2p/shrexnd/server.go @@ -77,7 +77,7 @@ func (srv *Server) Stop(context.Context) error { func (srv *Server) observeRateLimitedRequests() { numRateLimited := srv.middleware.DrainCounter() if numRateLimited > 0 { - srv.metrics.ObserveRequests(numRateLimited, p2p.StatusRateLimited) + srv.metrics.ObserveRequests(context.Background(), numRateLimited, p2p.StatusRateLimited) } } @@ -120,27 +120,27 @@ func (srv *Server) handleNamespacedData(ctx context.Context, stream network.Stre dah, err := srv.store.GetDAH(ctx, req.RootHash) if err != nil { if errors.Is(err, eds.ErrNotFound) { - srv.respondNotFoundError(logger, stream) + srv.respondNotFoundError(ctx, logger, stream) return } logger.Errorw("server: retrieving DAH", "err", err) - srv.respondInternalError(logger, stream) + srv.respondInternalError(ctx, logger, stream) return } shares, err := srv.getter.GetSharesByNamespace(ctx, dah, req.NamespaceId) if errors.Is(err, share.ErrNotFound) { - srv.respondNotFoundError(logger, stream) + srv.respondNotFoundError(ctx, logger, stream) return } if err != nil { logger.Errorw("server: retrieving shares", "err", err) - srv.respondInternalError(logger, stream) + srv.respondInternalError(ctx, logger, stream) return } resp := namespacedSharesToResponse(shares) - srv.respond(logger, stream, resp) + srv.respond(ctx, logger, stream, resp) } // validateRequest checks correctness of the request @@ -156,19 +156,21 @@ func validateRequest(req pb.GetSharesByNamespaceRequest) error { } // respondNotFoundError sends internal error response to client -func (srv *Server) respondNotFoundError(logger *zap.SugaredLogger, stream network.Stream) { +func (srv *Server) respondNotFoundError(ctx context.Context, + logger *zap.SugaredLogger, stream network.Stream) { resp := &pb.GetSharesByNamespaceResponse{ Status: pb.StatusCode_NOT_FOUND, } - srv.respond(logger, stream, resp) + srv.respond(ctx, logger, stream, resp) } // respondInternalError sends internal error response to client -func (srv *Server) respondInternalError(logger *zap.SugaredLogger, stream network.Stream) { +func (srv *Server) respondInternalError(ctx context.Context, + logger *zap.SugaredLogger, stream network.Stream) { resp := &pb.GetSharesByNamespaceResponse{ Status: pb.StatusCode_INTERNAL, } - srv.respond(logger, stream, resp) + srv.respond(ctx, logger, stream, resp) } // namespacedSharesToResponse encodes shares into proto and sends it to client with OK status code @@ -195,7 +197,8 @@ func namespacedSharesToResponse(shares share.NamespacedShares) *pb.GetSharesByNa } } -func (srv *Server) respond(logger *zap.SugaredLogger, stream network.Stream, resp *pb.GetSharesByNamespaceResponse) { +func (srv *Server) respond(ctx context.Context, + logger *zap.SugaredLogger, stream network.Stream, resp *pb.GetSharesByNamespaceResponse) { err := stream.SetWriteDeadline(time.Now().Add(srv.params.ServerWriteTimeout)) if err != nil { logger.Debugw("server: setting write deadline", "err", err) @@ -210,11 +213,11 @@ func (srv *Server) respond(logger *zap.SugaredLogger, stream network.Stream, res switch { case resp.Status == pb.StatusCode_OK: - srv.metrics.ObserveRequests(1, p2p.StatusSuccess) + srv.metrics.ObserveRequests(ctx, 1, p2p.StatusSuccess) case resp.Status == pb.StatusCode_NOT_FOUND: - srv.metrics.ObserveRequests(1, p2p.StatusNotFound) + srv.metrics.ObserveRequests(ctx, 1, p2p.StatusNotFound) case resp.Status == pb.StatusCode_INTERNAL: - srv.metrics.ObserveRequests(1, p2p.StatusInternalErr) + srv.metrics.ObserveRequests(ctx, 1, p2p.StatusInternalErr) } if err = stream.Close(); err != nil { logger.Debugw("server: closing stream", "err", err)