Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming store-gateway: add tests for index cache with query sharding #3728

Merged
merged 6 commits into from
Dec 15, 2022
Merged
24 changes: 6 additions & 18 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func blockSeries(
chunksPool *pool.BatchBytes, // Pool used to get memory buffers to store chunks. Required only if !skipChunks.
matchers []*labels.Matcher, // Series matchers.
shard *sharding.ShardSelector, // Shard selector.
seriesHashCache *hashcache.BlockSeriesHashCache, // Block-specific series hash cache (used only if shard selector is specified).
seriesHasher seriesHasher, // Block-specific series hash cache (used only if shard selector is specified).
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
skipChunks bool, // If true, chunks are not loaded and minTime/maxTime are ignored.
Expand Down Expand Up @@ -608,7 +608,7 @@ func blockSeries(
// not belonging to the shard.
var seriesCacheStats queryStats
if shard != nil {
ps, seriesCacheStats = filterPostingsByCachedShardHash(ps, shard, seriesHashCache)
ps = filterPostingsByCachedShardHash(ps, shard, seriesHasher, &seriesCacheStats)
}

if len(ps) == 0 {
Expand Down Expand Up @@ -662,20 +662,8 @@ func blockSeries(
}

// Skip the series if it doesn't belong to the shard.
if shard != nil {
hash, ok := seriesHashCache.Fetch(id)
seriesCacheStats.seriesHashCacheRequests++

if !ok {
hash = lset.Hash()
seriesHashCache.Store(id, hash)
} else {
seriesCacheStats.seriesHashCacheHits++
}

if hash%shard.ShardCount != shard.ShardIndex {
continue
}
if !shardOwned(shard, seriesHasher, id, lset, &seriesCacheStats) {
continue
}

// Check series limit after filtering out series not belonging to the requested shard (if any).
Expand Down Expand Up @@ -1043,7 +1031,7 @@ func (s *BucketStore) synchronousSeriesSet(
chunksPool,
matchers,
shardSelector,
blockSeriesHashCache,
cachedSeriesHasher{blockSeriesHashCache},
chunksLimiter,
seriesLimiter,
req.SkipChunks,
Expand Down Expand Up @@ -1129,7 +1117,7 @@ func (s *BucketStore) streamingSeriesSetForBlocks(
b.meta,
matchers,
shardSelector,
blockSeriesHashCache,
cachedSeriesHasher{blockSeriesHashCache},
chunksLimiter,
seriesLimiter,
req.SkipChunks,
Expand Down
13 changes: 3 additions & 10 deletions pkg/storegateway/bucket_index_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/tsdb/hashcache"
"github.com/prometheus/prometheus/tsdb/index"

"github.com/grafana/mimir/pkg/storage/sharding"
Expand Down Expand Up @@ -157,17 +156,11 @@ func (it *bigEndianPostings) length() int {
// filterPostingsByCachedShardHash filters the input postings by the provided shard. It filters only
// postings for which we have their series hash already in the cache; if a series is not in the cache,
// postings will be kept in the output.
func filterPostingsByCachedShardHash(ps []storage.SeriesRef, shard *sharding.ShardSelector, seriesHashCache *hashcache.BlockSeriesHashCache) (filteredPostings []storage.SeriesRef, stats queryStats) {
func filterPostingsByCachedShardHash(ps []storage.SeriesRef, shard *sharding.ShardSelector, seriesHashCache seriesHasher, stats *queryStats) []storage.SeriesRef {
writeIdx := 0
stats.seriesHashCacheRequests = len(ps)

for readIdx := 0; readIdx < len(ps); readIdx++ {
seriesID := ps[readIdx]
hash, ok := seriesHashCache.Fetch(seriesID)
if ok {
stats.seriesHashCacheHits++
}

hash, ok := seriesHashCache.CachedHash(seriesID, stats)
// Keep the posting if it's not in the cache, or it's in the cache and belongs to our shard.
if !ok || hash%uint64(shard.ShardCount) == uint64(shard.ShardIndex) {
ps[writeIdx] = seriesID
Expand All @@ -182,7 +175,7 @@ func filterPostingsByCachedShardHash(ps []storage.SeriesRef, shard *sharding.Sha
// Shrink the size.
ps = ps[:writeIdx]

return ps, stats
return ps
}

// paddedPostings adds the v2 index padding to postings without expanding them
Expand Down
19 changes: 10 additions & 9 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2212,7 +2212,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
chunkReader := blk.chunkReader(ctx)
chunksPool := &pool.BatchBytes{Delegate: pool.NoopBytes{}}

seriesSet, _, err := blockSeries(context.Background(), indexReader, chunkReader, chunksPool, matchers, shardSelector, seriesHashCache, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, log.NewNopLogger())
seriesSet, _, err := blockSeries(context.Background(), indexReader, chunkReader, chunksPool, matchers, shardSelector, cachedSeriesHasher{seriesHashCache}, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, log.NewNopLogger())
require.NoError(b, err)

// Ensure at least 1 series has been returned (as expected).
Expand Down Expand Up @@ -2312,7 +2312,7 @@ func TestBlockSeries_Cache(t *testing.T) {

indexr := b.indexReader()
for i, tc := range testCases {
ss, _, err := blockSeries(context.Background(), indexr, nil, nil, tc.matchers, tc.shard, shc, nil, sl, true, b.meta.MinTime, b.meta.MaxTime, log.NewNopLogger())
ss, _, err := blockSeries(context.Background(), indexr, nil, nil, tc.matchers, tc.shard, cachedSeriesHasher{shc}, nil, sl, true, b.meta.MinTime, b.meta.MaxTime, log.NewNopLogger())
require.NoError(t, err, "Unexpected error for test case %d", i)
lset := lsetFromSeriesSet(t, ss)
require.Equalf(t, tc.expectedLabelSet, lset, "Wrong label set for test case %d", i)
Expand All @@ -2322,7 +2322,7 @@ func TestBlockSeries_Cache(t *testing.T) {
// We break the index cache to not allow looking up series, so we know we don't look up series.
indexr.block.indexCache = forbiddenFetchMultiSeriesForRefsIndexCache{b.indexCache, t}
for i, tc := range testCases {
ss, _, err := blockSeries(context.Background(), indexr, nil, nil, tc.matchers, tc.shard, shc, nil, sl, true, b.meta.MinTime, b.meta.MaxTime, log.NewNopLogger())
ss, _, err := blockSeries(context.Background(), indexr, nil, nil, tc.matchers, tc.shard, cachedSeriesHasher{shc}, nil, sl, true, b.meta.MinTime, b.meta.MaxTime, log.NewNopLogger())
require.NoError(t, err, "Unexpected error for test case %d", i)
lset := lsetFromSeriesSet(t, ss)
require.Equalf(t, tc.expectedLabelSet, lset, "Wrong label set for test case %d", i)
Expand Down Expand Up @@ -2589,12 +2589,12 @@ func TestFilterPostingsByCachedShardHash(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
cache := hashcache.NewSeriesHashCache(1024 * 1024).GetBlockCache("test")
hasher := mockSeriesHasher{cached: make(map[storage.SeriesRef]uint64)}
for _, pair := range testData.cacheEntries {
cache.Store(storage.SeriesRef(pair[0]), pair[1])
hasher.cached[storage.SeriesRef(pair[0])] = pair[1]
}

actualPostings, _ := filterPostingsByCachedShardHash(testData.inputPostings, testData.shard, cache)
actualPostings := filterPostingsByCachedShardHash(testData.inputPostings, testData.shard, hasher, nil)
assert.Equal(t, testData.expectedPostings, actualPostings)
})
}
Expand All @@ -2609,9 +2609,10 @@ func TestFilterPostingsByCachedShardHash_NoAllocations(t *testing.T) {
for _, pair := range cacheEntries {
cache.Store(storage.SeriesRef(pair[0]), pair[1])
}
stats := &queryStats{}

assert.Equal(t, float64(0), testing.AllocsPerRun(1, func() {
filterPostingsByCachedShardHash(inputPostings, shard, cache)
filterPostingsByCachedShardHash(inputPostings, shard, cachedSeriesHasher{cache}, stats)
}))
}

Expand All @@ -2638,7 +2639,7 @@ func BenchmarkFilterPostingsByCachedShardHash_AllPostingsShifted(b *testing.B) {
inputPostings = inputPostings[0:numPostings]
copy(inputPostings, originalPostings)

filterPostingsByCachedShardHash(inputPostings, shard, cache)
filterPostingsByCachedShardHash(inputPostings, shard, cachedSeriesHasher{cache}, nil)
}
}

Expand All @@ -2657,6 +2658,6 @@ func BenchmarkFilterPostingsByCachedShardHash_NoPostingsShifted(b *testing.B) {
for n := 0; n < b.N; n++ {
// We reuse the same postings slice because we expect this test to not
// modify it (cache is empty).
filterPostingsByCachedShardHash(ps, shard, cache)
filterPostingsByCachedShardHash(ps, shard, cachedSeriesHasher{cache}, nil)
}
}
24 changes: 13 additions & 11 deletions pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,6 @@ type loadingSeriesChunkRefsSetIterator struct {
ctx context.Context
postingsSetIterator *postingsSetsIterator
indexr *bucketIndexReader
seriesHashCache *hashcache.BlockSeriesHashCache
indexCache indexcache.IndexCache
stats *safeQueryStats
blockID ulid.ULID
Expand Down Expand Up @@ -632,7 +631,7 @@ func openBlockSeriesChunkRefsSetsIterator(
blockMeta *metadata.Meta,
matchers []*labels.Matcher, // Series matchers.
shard *sharding.ShardSelector, // Shard selector.
seriesHashCache *hashcache.BlockSeriesHashCache, // Block-specific series hash cache (used only if shard selector is specified).
seriesHasher seriesHasher,
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
skipChunks bool, // If true chunks are not loaded and minTime/maxTime are ignored.
Expand All @@ -656,12 +655,11 @@ func openBlockSeriesChunkRefsSetsIterator(
newPostingsSetsIterator(ps, batchSize),
indexr,
indexCache,
seriesHashCache,
stats,
blockMeta,
matchers,
shard,
cachedSeriesHasher{cache: seriesHashCache},
seriesHasher,
skipChunks,
minTime,
maxTime,
Expand All @@ -678,7 +676,6 @@ func newLoadingSeriesChunkRefsSetIterator(
postingsSetIterator *postingsSetsIterator,
indexr *bucketIndexReader,
indexCache indexcache.IndexCache,
seriesHashCache *hashcache.BlockSeriesHashCache,
stats *safeQueryStats,
blockMeta *metadata.Meta,
matchers []*labels.Matcher,
Expand All @@ -704,7 +701,6 @@ func newLoadingSeriesChunkRefsSetIterator(
matchers: matchers,
shard: shard,
seriesHasher: seriesHasher,
seriesHashCache: seriesHashCache,
skipChunks: skipChunks,
minTime: minTime,
maxTime: maxTime,
Expand Down Expand Up @@ -744,7 +740,7 @@ func (s *loadingSeriesChunkRefsSetIterator) Next() bool {
// not belonging to the shard.
if s.shard != nil {
var unsafeStats queryStats
nextPostings, unsafeStats = filterPostingsByCachedShardHash(nextPostings, s.shard, s.seriesHashCache)
nextPostings = filterPostingsByCachedShardHash(nextPostings, s.shard, s.seriesHasher, &unsafeStats)
loadStats.merge(&unsafeStats)
}

Expand Down Expand Up @@ -902,21 +898,27 @@ func logSeriesForPostingsCacheEvent(ctx context.Context, logger log.Logger, user

type seriesHasher interface {
Hash(seriesID storage.SeriesRef, lset labels.Labels, stats *queryStats) uint64
CachedHash(seriesID storage.SeriesRef, stats *queryStats) (uint64, bool)
}

type cachedSeriesHasher struct {
cache *hashcache.BlockSeriesHashCache
}

func (b cachedSeriesHasher) Hash(id storage.SeriesRef, lset labels.Labels, stats *queryStats) uint64 {
func (b cachedSeriesHasher) CachedHash(seriesID storage.SeriesRef, stats *queryStats) (uint64, bool) {
stats.seriesHashCacheRequests++
hash, isCached := b.cache.Fetch(seriesID)
if isCached {
stats.seriesHashCacheHits++
}
return hash, isCached
}

hash, ok := b.cache.Fetch(id)
func (b cachedSeriesHasher) Hash(id storage.SeriesRef, lset labels.Labels, stats *queryStats) uint64 {
hash, ok := b.CachedHash(id, stats)
if !ok {
hash = lset.Hash()
b.cache.Store(id, hash)
} else {
stats.seriesHashCacheHits++
}
return hash
}
Expand Down
Loading