Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* [ENHANCEMENT] Instrument Ingester CPU profile with source for read APIs. #7494
* [ENHANCEMENT] Ingester: Convert expanded postings cache from FIFO to LRU eviction to retain frequently-queried entries under memory pressure. #7510
* [ENHANCEMENT] Querier: Detach series label and chunk data from gRPC unmarshal buffers in store-gateway streaming path, allowing the Go GC to reclaim receive buffers. #7519
* [ENHANCEMENT] Distributor: Add `WrappedHistogram` with configurable size limit (`-validation.max-native-histogram-size-bytes`) to cap native histogram protobuf size before unmarshalling. #7570
* [BUGFIX] Querier: Fix queryWithRetry and labelsWithRetry returning (nil, nil) on cancelled context by propagating ctx.Err(). #7370
* [BUGFIX] Metrics Helper: Fix non-deterministic bucket order in merged histograms by sorting buckets after map iteration, matching Prometheus client library behavior. #7380
* [BUGFIX] Distributor: Return HTTP 401 Unauthorized when tenant ID resolution fails in the Prometheus Remote Write 2.0 path. #7389
Expand Down
4 changes: 2 additions & 2 deletions integration/remote_write_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ func TestIngest_CreatedTimestampFallback(t *testing.T) {
TimeSeriesV2: &cortexpb.TimeSeriesV2{
LabelsRefs: []uint32{1, 2},
CreatedTimestamp: e2e.TimeToMilliseconds(startTs),
Histograms: []cortexpb.Histogram{h},
Histograms: []cortexpb.WrappedHistogram{cortexpb.WrapHistogram(h)},
},
},
},
Expand Down Expand Up @@ -778,7 +778,7 @@ func TestIngest_StartAndCreatedTimestampIgnoredWhenDisabled(t *testing.T) {
TimeSeriesV2: &cortexpb.TimeSeriesV2{
LabelsRefs: []uint32{1, 2},
CreatedTimestamp: e2e.TimeToMilliseconds(startTs),
Histograms: []cortexpb.Histogram{h},
Histograms: []cortexpb.WrappedHistogram{cortexpb.WrapHistogram(h)},
},
}},
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cortexpb/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

// ToWriteRequest converts matched slices of Labels, Samples, Metadata and Histograms into a WriteRequest proto.
// It gets timeseries from the pool, so ReuseSlice() should be called when done.
func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMetadata, histograms []Histogram, source SourceEnum) *WriteRequest {
func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMetadata, histograms []WrappedHistogram, source SourceEnum) *WriteRequest {
req := &WriteRequest{
Timeseries: PreallocTimeseriesSliceFromPool(),
Metadata: metadata,
Expand All @@ -44,7 +44,7 @@ func ToWriteRequest(lbls []labels.Labels, samples []Sample, metadata []*MetricMe
return req
}

func (w *WriteRequest) AddHistogramTimeSeries(lbls []labels.Labels, histograms []Histogram) {
func (w *WriteRequest) AddHistogramTimeSeries(lbls []labels.Labels, histograms []WrappedHistogram) {
for i := range lbls {
ts := TimeseriesFromPool()
ts.Labels = append(ts.Labels, FromLabelsToLabelAdapters(lbls[i])...)
Expand Down
270 changes: 117 additions & 153 deletions pkg/cortexpb/cortex.pb.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/cortexpb/cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ message TimeSeriesV2 {
//
// Samples and histograms are sorted by timestamp (older first).
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Histogram histograms = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "WrappedHistogram"];

// exemplars represents an optional set of exemplars attached to this series' samples.
repeated ExemplarV2 exemplars = 4 [(gogoproto.nullable) = false];
Expand Down Expand Up @@ -140,7 +140,7 @@ message TimeSeries {
// Sorted by time, oldest sample first.
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false];
repeated Histogram histograms = 4 [(gogoproto.nullable) = false, (gogoproto.customtype) = "WrappedHistogram"];
}

message LabelPair {
Expand Down
50 changes: 50 additions & 0 deletions pkg/cortexpb/histograms.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package cortexpb

import (
"fmt"

"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/prompb"
)
Expand Down Expand Up @@ -176,3 +178,51 @@ func (h Histogram) BucketCount() int {
}
return count
}

// MaxWrappedHistogramSizeBytes is the maximum allowed size of a single native histogram in raw protobuf
// bytes before unmarshalling. A value of 0 disables the limit.
var maxWrappedHistogramSizeBytes = 16 * 1024

// WrappedHistogram is a Histogram with a size check on Unmarshal.
// Used only on the ingestion path, not on the query path.
type WrappedHistogram struct {
Histogram
}

// Unmarshal implements proto.Message.
func (p *WrappedHistogram) Unmarshal(dAtA []byte) error {
if maxWrappedHistogramSizeBytes > 0 && len(dAtA) > maxWrappedHistogramSizeBytes {
return fmt.Errorf("native histogram size %d bytes exceeds limit %d bytes", len(dAtA), maxWrappedHistogramSizeBytes)
}
return p.Histogram.Unmarshal(dAtA)
}

// Marshal implements proto.Marshaler.
func (p *WrappedHistogram) Marshal() (dAtA []byte, err error) {
return p.Histogram.Marshal()
}

// MarshalTo implements proto.Marshaler.
func (p *WrappedHistogram) MarshalTo(dAtA []byte) (int, error) {
return p.Histogram.MarshalTo(dAtA)
}

// MarshalToSizedBuffer implements proto.Marshaler.
func (p *WrappedHistogram) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return p.Histogram.MarshalToSizedBuffer(dAtA)
}

// Size implements proto.Sizer.
func (p *WrappedHistogram) Size() int {
return p.Histogram.Size()
}

// Equal implements proto.Equal.
func (p WrappedHistogram) Equal(other WrappedHistogram) bool {
return p.Histogram.Equal(&other.Histogram)
}

// WrapHistogram wraps a Histogram into a WrappedHistogram.
func WrapHistogram(h Histogram) WrappedHistogram {
return WrappedHistogram{Histogram: h}
}
3 changes: 2 additions & 1 deletion pkg/cortexpb/timeseries.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ var (
Labels: make([]LabelAdapter, 0, expectedLabels),
Samples: make([]Sample, 0, expectedSamplesPerSeries),
Exemplars: make([]Exemplar, 0, expectedExemplarsPerSeries),
Histograms: make([]Histogram, 0, expectedHistogramsPerSeries),
Histograms: make([]WrappedHistogram, 0, expectedHistogramsPerSeries),
}
},
}
Expand All @@ -59,6 +59,7 @@ func (PreallocConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&expectedTimeseries, "ingester-client.expected-timeseries", expectedTimeseries, "Expected number of timeseries per request, used for preallocations.")
f.IntVar(&expectedLabels, "ingester-client.expected-labels", expectedLabels, "Expected number of labels per timeseries, used for preallocations.")
f.IntVar(&expectedSamplesPerSeries, "ingester-client.expected-samples-per-series", expectedSamplesPerSeries, "Expected number of samples per timeseries, used for preallocations.")
f.IntVar(&maxWrappedHistogramSizeBytes, "validation.max-native-histogram-size-bytes", maxWrappedHistogramSizeBytes, "Maximum size in bytes of a single native histogram in raw protobuf before unmarshalling. 0 to disable.")
}

// PreallocWriteRequest is a WriteRequest which preallocs slices on Unmarshal.
Expand Down
60 changes: 60 additions & 0 deletions pkg/cortexpb/timeseries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,63 @@ func BenchmarkMarshallWriteRequest(b *testing.B) {
})
}
}

func TestWrappedHistogram_Unmarshal_SizeLimit(t *testing.T) {
t.Run("rejects oversized histogram", func(t *testing.T) {
original := maxWrappedHistogramSizeBytes
maxWrappedHistogramSizeBytes = 1024 // 1KB for test
defer func() { maxWrappedHistogramSizeBytes = original }()

// Create a histogram that exceeds 1KB when marshalled
h := Histogram{
Schema: 3,
NegativeDeltas: make([]int64, 600),
PositiveDeltas: make([]int64, 600),
}
data, err := h.Marshal()
require.NoError(t, err)
require.Greater(t, len(data), 1024) // confirm it's over limit

p := &WrappedHistogram{}
err = p.Unmarshal(data)
require.Error(t, err)
assert.Contains(t, err.Error(), "exceeds limit")
})

t.Run("allows normal histogram", func(t *testing.T) {
original := maxWrappedHistogramSizeBytes
maxWrappedHistogramSizeBytes = 16 * 1024
defer func() { maxWrappedHistogramSizeBytes = original }()

h := Histogram{
Schema: 3,
NegativeDeltas: make([]int64, 50),
PositiveDeltas: make([]int64, 50),
}
data, err := h.Marshal()
require.NoError(t, err)

p := &WrappedHistogram{}
err = p.Unmarshal(data)
require.NoError(t, err)
assert.Equal(t, int32(3), p.Schema)
assert.Len(t, p.NegativeDeltas, 50)
})

t.Run("allows when limit disabled", func(t *testing.T) {
original := maxWrappedHistogramSizeBytes
maxWrappedHistogramSizeBytes = 0 // disabled
defer func() { maxWrappedHistogramSizeBytes = original }()

h := Histogram{
NegativeDeltas: make([]int64, 5000),
PositiveDeltas: make([]int64, 5000),
}
data, err := h.Marshal()
require.NoError(t, err)

p := &WrappedHistogram{}
err = p.Unmarshal(data)
require.NoError(t, err)
})
}
2 changes: 1 addition & 1 deletion pkg/cortexpb/timeseriesv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var (
return &TimeSeriesV2{
LabelsRefs: make([]uint32, 0, expectedLabels),
Samples: make([]Sample, 0, expectedSamplesPerSeries),
Histograms: make([]Histogram, 0, expectedHistogramsPerSeries),
Histograms: make([]WrappedHistogram, 0, expectedHistogramsPerSeries),
Exemplars: make([]ExemplarV2, 0, expectedExemplarsPerSeries),
Metadata: MetadataV2{},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortexpb/timeseriesv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestTimeseriesV2FromPool(t *testing.T) {
ts.LabelsRefs = []uint32{1, 2}
ts.Samples = []Sample{{Value: 1, TimestampMs: 2}}
ts.Exemplars = []ExemplarV2{{LabelsRefs: []uint32{1, 2}, Value: 1, Timestamp: 2}}
ts.Histograms = []Histogram{{}}
ts.Histograms = []WrappedHistogram{{}}
ts.CreatedTimestamp = 12345
ts.Metadata = MetadataV2{Type: 1, HelpRef: 2, UnitRef: 3}
ReuseTimeseriesV2(ts)
Expand Down
10 changes: 5 additions & 5 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,19 +687,19 @@ func (d *Distributor) validateSeries(ts cortexpb.PreallocTimeseries, userID stri
}
}

var histograms []cortexpb.Histogram
var histograms []cortexpb.WrappedHistogram
if len(ts.Histograms) > 0 {
// Only alloc when data present
histograms = make([]cortexpb.Histogram, 0, len(ts.Histograms))
histograms = make([]cortexpb.WrappedHistogram, 0, len(ts.Histograms))
for i, h := range ts.Histograms {
if err := validation.ValidateSampleTimestamp(d.validateMetrics, limits, userID, ts.Labels, h.TimestampMs); err != nil {
return emptyPreallocSeries, err
}
convertedHistogram, err := validation.ValidateNativeHistogram(d.validateMetrics, limits, userID, ts.Labels, h)
convertedHistogram, err := validation.ValidateNativeHistogram(d.validateMetrics, limits, userID, ts.Labels, h.Histogram)
if err != nil {
return emptyPreallocSeries, err
}
ts.Histograms[i] = convertedHistogram
ts.Histograms[i].Histogram = convertedHistogram
}
histograms = append(histograms, ts.Histograms...)
}
Expand Down Expand Up @@ -1026,7 +1026,7 @@ type samplesLabelSetEntry struct {
}

// countNHCB returns the number of native histograms with custom buckets schema in the given slice.
func countNHCB(histograms []cortexpb.Histogram) int {
func countNHCB(histograms []cortexpb.WrappedHistogram) int {
n := 0
for _, h := range histograms {
if histogram.IsCustomBucketsSchema(h.GetSchema()) {
Expand Down
36 changes: 18 additions & 18 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3220,12 +3220,12 @@ func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {
func mockWriteRequest(lbls []labels.Labels, value int64, timestampMs int64, histogram bool) *cortexpb.WriteRequest {
var (
samples []cortexpb.Sample
histograms []cortexpb.Histogram
histograms []cortexpb.WrappedHistogram
)
if histogram {
histograms = make([]cortexpb.Histogram, len(lbls))
histograms = make([]cortexpb.WrappedHistogram, len(lbls))
for i := range lbls {
histograms[i] = cortexpb.HistogramToHistogramProto(timestampMs, tsdbutil.GenerateTestHistogram(value))
histograms[i] = cortexpb.WrapHistogram(cortexpb.HistogramToHistogramProto(timestampMs, tsdbutil.GenerateTestHistogram(value)))
}
} else {
samples = make([]cortexpb.Sample, len(lbls))
Expand Down Expand Up @@ -3496,7 +3496,7 @@ func makeWriteRequestTimeseries(labels []cortexpb.LabelAdapter, ts, value int64,
},
}
if histogram {
t.Histograms = append(t.Histograms, cortexpb.HistogramToHistogramProto(ts, tsdbutil.GenerateTestHistogram(value)))
t.Histograms = append(t.Histograms, cortexpb.WrapHistogram(cortexpb.HistogramToHistogramProto(ts, tsdbutil.GenerateTestHistogram(value))))
} else {
t.Samples = append(t.Samples, cortexpb.Sample{
TimestampMs: ts,
Expand All @@ -3510,7 +3510,7 @@ func makeWriteRequestTimeseriesNHCB(labels []cortexpb.LabelAdapter, ts, value in
return cortexpb.PreallocTimeseries{
TimeSeries: &cortexpb.TimeSeries{
Labels: labels,
Histograms: []cortexpb.Histogram{cortexpb.HistogramToHistogramProto(ts, tsdbutil.GenerateTestCustomBucketsHistogram(value))},
Histograms: []cortexpb.WrappedHistogram{cortexpb.WrapHistogram(cortexpb.HistogramToHistogramProto(ts, tsdbutil.GenerateTestCustomBucketsHistogram(value)))},
},
}
}
Expand All @@ -3530,8 +3530,8 @@ func makeWriteRequestHA(samples int, replica, cluster string, histogram bool) *c
},
}
if histogram {
ts.Histograms = []cortexpb.Histogram{
cortexpb.HistogramToHistogramProto(int64(i), tsdbutil.GenerateTestHistogram(int64(i))),
ts.Histograms = []cortexpb.WrappedHistogram{
cortexpb.WrapHistogram(cortexpb.HistogramToHistogramProto(int64(i), tsdbutil.GenerateTestHistogram(int64(i)))),
}
} else {
ts.Samples = []cortexpb.Sample{
Expand Down Expand Up @@ -3627,8 +3627,8 @@ func makeWriteRequestHAMixedSamples(samples int, histogram bool) *cortexpb.Write
}
}
if histogram {
ts.Histograms = []cortexpb.Histogram{
cortexpb.HistogramToHistogramProto(int64(samples), tsdbutil.GenerateTestHistogram(int64(samples))),
ts.Histograms = []cortexpb.WrappedHistogram{
cortexpb.WrapHistogram(cortexpb.HistogramToHistogramProto(int64(samples), tsdbutil.GenerateTestHistogram(int64(samples)))),
}
} else {
var s = make([]cortexpb.Sample, 0)
Expand Down Expand Up @@ -4098,7 +4098,7 @@ func TestDistributorValidation(t *testing.T) {
metadata []*cortexpb.MetricMetadata
labels []labels.Labels
samples []cortexpb.Sample
histograms []cortexpb.Histogram
histograms []cortexpb.WrappedHistogram
err error
}{
// Test validation passes.
Expand All @@ -4111,8 +4111,8 @@ func TestDistributorValidation(t *testing.T) {
TimestampMs: int64(now),
Value: 1,
}},
histograms: []cortexpb.Histogram{
cortexpb.HistogramToHistogramProto(int64(now), testHistogram),
histograms: []cortexpb.WrappedHistogram{
cortexpb.WrapHistogram(cortexpb.HistogramToHistogramProto(int64(now), testHistogram)),
},
},
// Test validation fails for very old samples.
Expand Down Expand Up @@ -4200,8 +4200,8 @@ func TestDistributorValidation(t *testing.T) {
labels: []labels.Labels{
labels.FromStrings(labels.MetricName, "testmetric", "foo", "bar", "foo2", "bar2"),
},
histograms: []cortexpb.Histogram{
cortexpb.HistogramToHistogramProto(int64(now), testHistogram),
histograms: []cortexpb.WrappedHistogram{
cortexpb.WrapHistogram(cortexpb.HistogramToHistogramProto(int64(now), testHistogram)),
},
err: httpgrpc.Errorf(http.StatusBadRequest, `series has too many labels (actual: 3, limit: 2) series: 'testmetric{foo2="bar2", foo="bar"}'`),
},
Expand All @@ -4210,8 +4210,8 @@ func TestDistributorValidation(t *testing.T) {
labels: []labels.Labels{
labels.FromStrings(labels.MetricName, "testmetric", "foo", "bar"),
},
histograms: []cortexpb.Histogram{
cortexpb.HistogramToHistogramProto(int64(past), testHistogram),
histograms: []cortexpb.WrappedHistogram{
cortexpb.WrapHistogram(cortexpb.HistogramToHistogramProto(int64(past), testHistogram)),
},
err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too old: %d metric: "testmetric"`, past),
},
Expand All @@ -4220,8 +4220,8 @@ func TestDistributorValidation(t *testing.T) {
labels: []labels.Labels{
labels.FromStrings(labels.MetricName, "testmetric", "foo", "bar"),
},
histograms: []cortexpb.Histogram{
cortexpb.FloatHistogramToHistogramProto(int64(future), testFloatHistogram),
histograms: []cortexpb.WrappedHistogram{
cortexpb.WrapHistogram(cortexpb.FloatHistogramToHistogramProto(int64(future), testFloatHistogram)),
},
err: httpgrpc.Errorf(http.StatusBadRequest, `timestamp too new: %d metric: "testmetric"`, future),
},
Expand Down
6 changes: 3 additions & 3 deletions pkg/ingester/client/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) {
for s.Next() {
series := s.At()
samples := []cortexpb.Sample{}
histograms := []cortexpb.Histogram{}
histograms := []cortexpb.WrappedHistogram{}
it = series.Iterator(it)
for valType := it.Next(); valType != chunkenc.ValNone; valType = it.Next() {
switch valType {
Expand All @@ -130,10 +130,10 @@ func SeriesSetToQueryResponse(s storage.SeriesSet) (*QueryResponse, error) {
})
case chunkenc.ValHistogram:
t, v := it.AtHistogram(nil)
histograms = append(histograms, cortexpb.HistogramToHistogramProto(t, v))
histograms = append(histograms, cortexpb.WrappedHistogram{Histogram: cortexpb.HistogramToHistogramProto(t, v)})
case chunkenc.ValFloatHistogram:
t, v := it.AtFloatHistogram(nil)
histograms = append(histograms, cortexpb.FloatHistogramToHistogramProto(t, v))
histograms = append(histograms, cortexpb.WrappedHistogram{Histogram: cortexpb.FloatHistogramToHistogramProto(t, v)})
default:
panic("unhandled default case")
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -1535,9 +1535,9 @@ func (i *Ingester) Push(ctx context.Context, req *cortexpb.WriteRequest) (*corte
)

if hp.GetCountFloat() > 0 {
fh = cortexpb.FloatHistogramProtoToFloatHistogram(hp)
fh = cortexpb.FloatHistogramProtoToFloatHistogram(hp.Histogram)
} else {
h = cortexpb.HistogramProtoToHistogram(hp)
h = cortexpb.HistogramProtoToHistogram(hp.Histogram)
}

if hp.StartTimestampMs != 0 && hp.TimestampMs != 0 {
Expand Down
Loading
Loading