Skip to content

Commit

Permalink
store-gateway: fix stats recording in filtering...Iterator (#4830)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov committed Apr 25, 2023
1 parent 658d73d commit 29a3fcc
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 12 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* [CHANGE] Store-gateway: change expanded postings and postings index cache key format. These caches will be invalidated when rolling out the new Mimir version. #4770
* [ENHANCEMENT] Add per-tenant limit `-validation.max-native-histogram-buckets` to be able to ignore native histogram samples that have too many buckets. #4765
* [ENHANCEMENT] Store-gateway: reduce memory usage in some LabelValues calls. #4789
* [ENHANCEMENT] Store-gateway: add a `stage` label to the metric `cortex_bucket_store_series_data_touched`. This label now applies to `data_type="chunks"` and `data_type="series"`. The `stage` label has 2 values: `processed` - the number of series that parsed - and `returned` - the number of series selected from the processed bytes to satisfy the query. #4797
* [ENHANCEMENT] Store-gateway: add a `stage` label to the metric `cortex_bucket_store_series_data_touched`. This label now applies to `data_type="chunks"` and `data_type="series"`. The `stage` label has 2 values: `processed` - the number of series that parsed - and `returned` - the number of series selected from the processed bytes to satisfy the query. #4797 #4830
* [ENHANCEMENT] Distributor: make `__meta_tenant_id` label available in relabeling rules configured via `metric_relabel_configs`. #4725
* [BUGFIX] Metadata API: Mimir will now return an empty object when no metadata is available, matching Prometheus. #4782
* [BUGFIX] Store-gateway: add collision detection on expanded postings and individual postings cache keys. #4770
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,7 +1304,7 @@ func labelValuesFromSeries(ctx context.Context, labelName string, seriesPerBatch
b.logger,
)
if len(pendingMatchers) > 0 {
iterator = &filteringSeriesChunkRefsSetIterator{from: iterator, matchers: pendingMatchers}
iterator = newFilteringSeriesChunkRefsSetIterator(pendingMatchers, iterator, stats)
}
iterator = seriesStreamingFetchRefsDurationIterator(iterator, stats)
seriesSet := newSeriesSetWithoutChunks(ctx, iterator, stats)
Expand Down
41 changes: 32 additions & 9 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ func (c cacheNotExpectingToStoreLabelNames) StoreLabelNames(userID string, block
}

func TestBlockLabelValues(t *testing.T) {
const series = 500
const series = 100_000

newTestBucketBlock := prepareTestBlockWithBinaryReader(test.NewTB(t), appendTestSeries(series))

Expand Down Expand Up @@ -454,14 +454,8 @@ func TestBlockLabelValues(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string{"bar"}, values)

// we remove the indexHeaderReader to ensure that results come from a cache
// if this panics, then we know that it's trying to read actual values
b.indexHeaderReader = &interceptedIndexReader{
Reader: b.indexHeaderReader,
onLabelNamesCalled: func() error { return context.DeadlineExceeded },
onLabelValuesCalled: func(string) error { return context.DeadlineExceeded },
onLabelValuesOffsetsCalled: func(string) error { return context.DeadlineExceeded },
}
// we break the indexHeaderReader to ensure that results come from a cache
b.indexHeaderReader = deadlineExceededIndexHeader()

values, err = blockLabelValues(context.Background(), b, selectAllStrategy{}, 5000, "j", pFooMatchers, log.NewNopLogger(), newSafeQueryStats())
require.NoError(t, err)
Expand All @@ -470,6 +464,27 @@ func TestBlockLabelValues(t *testing.T) {
require.NoError(t, err)
require.Equal(t, []string{"bar"}, values)
})

t.Run("happy case cached with weak matchers", func(t *testing.T) {
b := newTestBucketBlock()
b.indexCache = newInMemoryIndexCache(t)

matchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "p", "foo"),
labels.MustNewMatcher(labels.MatchRegexp, "i", "1234.+"),
labels.MustNewMatcher(labels.MatchRegexp, "j", ".+"), // this is too weak and doesn't bring much value, it should be shortcut
}
values, err := blockLabelValues(context.Background(), b, worstCaseFetchedDataStrategy{1.0}, 5000, "j", matchers, log.NewNopLogger(), newSafeQueryStats())
require.NoError(t, err)
require.Equal(t, []string{"foo"}, values)

// we break the indexHeaderReader to ensure that results come from a cache
b.indexHeaderReader = deadlineExceededIndexHeader()

values, err = blockLabelValues(context.Background(), b, worstCaseFetchedDataStrategy{1.0}, 5000, "j", matchers, log.NewNopLogger(), newSafeQueryStats())
require.NoError(t, err)
require.Equal(t, []string{"foo"}, values)
})
}

type cacheNotExpectingToStoreLabelValues struct {
Expand Down Expand Up @@ -886,6 +901,14 @@ func (iir *interceptedIndexReader) LabelValuesOffsets(name string, prefix string
return iir.Reader.LabelValuesOffsets(name, prefix, filter)
}

func deadlineExceededIndexHeader() *interceptedIndexReader {
return &interceptedIndexReader{
onLabelNamesCalled: func() error { return context.DeadlineExceeded },
onLabelValuesCalled: func(string) error { return context.DeadlineExceeded },
onLabelValuesOffsetsCalled: func(string) error { return context.DeadlineExceeded },
}
}

type contextNotifyingOnDoneWaiting struct {
context.Context
once sync.Once
Expand Down
10 changes: 9 additions & 1 deletion pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ func openBlockSeriesChunkRefsSetsIterator(
logger,
)
if len(pendingMatchers) > 0 {
iterator = &filteringSeriesChunkRefsSetIterator{stats: stats, from: iterator, matchers: pendingMatchers}
iterator = newFilteringSeriesChunkRefsSetIterator(pendingMatchers, iterator, stats)
}

return seriesStreamingFetchRefsDurationIterator(iterator, stats), nil
Expand Down Expand Up @@ -1046,6 +1046,14 @@ type filteringSeriesChunkRefsSetIterator struct {
current seriesChunkRefsSet
}

func newFilteringSeriesChunkRefsSetIterator(matchers []*labels.Matcher, from seriesChunkRefsSetIterator, stats *safeQueryStats) *filteringSeriesChunkRefsSetIterator {
return &filteringSeriesChunkRefsSetIterator{
stats: stats,
from: from,
matchers: matchers,
}
}

func (m *filteringSeriesChunkRefsSetIterator) Next() bool {
if !m.from.Next() {
return false
Expand Down

0 comments on commit 29a3fcc

Please sign in to comment.