From 53345fc5c6421025cc9d3d77352fbbf7ac799c04 Mon Sep 17 00:00:00 2001 From: Daniel Deluiggi Date: Fri, 21 Nov 2025 09:57:15 -0800 Subject: [PATCH] ReuseSlice missing in distributor push Signed-off-by: Daniel Deluiggi --- pkg/distributor/distributor.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 2065d0eea7..5617fb7300 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -706,6 +706,14 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri // Push implements client.IngesterServer func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error) { + var validationError = true + defer func() { + if validationError { + cortexpb.ReuseSlice(req.Timeseries) + req.Free() + } + }() + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -760,9 +768,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co cluster, replica := findHALabels(limits.HAReplicaLabel, limits.HAClusterLabel, req.Timeseries[0].Labels) removeReplica, err = d.checkSample(ctx, userID, cluster, replica, limits) if err != nil { - // Ensure the request slice is reused if the series get deduped. - cortexpb.ReuseSlice(req.Timeseries) - if errors.Is(err, ha.ReplicasNotMatchError{}) { // These samples have been deduped. d.dedupedSamples.WithLabelValues(userID, cluster).Add(float64(numFloatSamples + numHistogramSamples)) @@ -773,7 +778,6 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co d.validateMetrics.DiscardedSamples.WithLabelValues(validation.TooManyHAClusters, userID).Add(float64(numFloatSamples + numHistogramSamples)) return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()) } - return nil, err } // If there wasn't an error but removeReplica is false that means we didn't find both HA labels. @@ -795,18 +799,12 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) if len(seriesKeys) == 0 && len(nhSeriesKeys) == 0 && len(metadataKeys) == 0 { - // Ensure the request slice is reused if there's no series or metadata passing the validation. - cortexpb.ReuseSlice(req.Timeseries) - return &cortexpb.WriteResponse{}, firstPartialErr } totalSamples := validatedFloatSamples + validatedHistogramSamples totalN := totalSamples + validatedExemplars + len(validatedMetadata) if !d.ingestionRateLimiter.AllowN(now, userID, totalN) { - // Ensure the request slice is reused if the request is rate limited. - cortexpb.ReuseSlice(req.Timeseries) - d.validateMetrics.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(totalSamples)) d.validateMetrics.DiscardedExemplars.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedExemplars)) d.validateMetrics.DiscardedMetadata.WithLabelValues(validation.RateLimited, userID).Add(float64(len(validatedMetadata))) @@ -844,6 +842,9 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co return nil, nativeHistogramErr } + //DoBatch will be responsible to call cleanup after all async ingester requests finish. + validationError = false + err = d.doBatch(ctx, req, subRing, keys, initialMetadataIndex, validatedMetadata, validatedTimeseries, userID) if err != nil { return nil, err