Skip to content

Commit

Permalink
Pass a single struct to calculate and cache hashes of series
Browse files Browse the repository at this point in the history
Signed-off-by: Dimitar Dimitrov <dimitar.dimitrov@grafana.com>
  • Loading branch information
dimitarvdimitrov committed Dec 15, 2022
1 parent 4497d3e commit 7453753
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 74 deletions.
23 changes: 5 additions & 18 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ func blockSeries(
chunksPool *pool.BatchBytes, // Pool used to get memory buffers to store chunks. Required only if !skipChunks.
matchers []*labels.Matcher, // Series matchers.
shard *sharding.ShardSelector, // Shard selector.
seriesHashCache *hashcache.BlockSeriesHashCache, // Block-specific series hash cache (used only if shard selector is specified).
seriesHasher seriesHasher, // Block-specific series hash cache (used only if shard selector is specified).
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
skipChunks bool, // If true, chunks are not loaded and minTime/maxTime are ignored.
Expand Down Expand Up @@ -608,7 +608,7 @@ func blockSeries(
// not belonging to the shard.
var seriesCacheStats queryStats
if shard != nil {
ps, seriesCacheStats = filterPostingsByCachedShardHash(ps, shard, seriesHashCache)
ps = filterPostingsByCachedShardHash(ps, shard, seriesHasher, &seriesCacheStats)
}

if len(ps) == 0 {
Expand Down Expand Up @@ -662,20 +662,8 @@ func blockSeries(
}

// Skip the series if it doesn't belong to the shard.
if shard != nil {
hash, ok := seriesHashCache.Fetch(id)
seriesCacheStats.seriesHashCacheRequests++

if !ok {
hash = lset.Hash()
seriesHashCache.Store(id, hash)
} else {
seriesCacheStats.seriesHashCacheHits++
}

if hash%shard.ShardCount != shard.ShardIndex {
continue
}
if !shardOwned(shard, seriesHasher, id, lset, &seriesCacheStats) {
continue
}

// Check series limit after filtering out series not belonging to the requested shard (if any).
Expand Down Expand Up @@ -1043,7 +1031,7 @@ func (s *BucketStore) synchronousSeriesSet(
chunksPool,
matchers,
shardSelector,
blockSeriesHashCache,
cachedSeriesHasher{blockSeriesHashCache},
chunksLimiter,
seriesLimiter,
req.SkipChunks,
Expand Down Expand Up @@ -1129,7 +1117,6 @@ func (s *BucketStore) streamingSeriesSetForBlocks(
b.meta,
matchers,
shardSelector,
blockSeriesHashCache,
cachedSeriesHasher{blockSeriesHashCache},
chunksLimiter,
seriesLimiter,
Expand Down
12 changes: 3 additions & 9 deletions pkg/storegateway/bucket_index_postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,11 @@ func (it *bigEndianPostings) length() int {
// filterPostingsByCachedShardHash filters the input postings by the provided shard. It filters only
// postings for which we have their series hash already in the cache; if a series is not in the cache,
// postings will be kept in the output.
func filterPostingsByCachedShardHash(ps []storage.SeriesRef, shard *sharding.ShardSelector, seriesHashCache seriesHashCache) (filteredPostings []storage.SeriesRef, stats queryStats) {
func filterPostingsByCachedShardHash(ps []storage.SeriesRef, shard *sharding.ShardSelector, seriesHashCache seriesHasher, stats *queryStats) []storage.SeriesRef {
writeIdx := 0
stats.seriesHashCacheRequests = len(ps)

for readIdx := 0; readIdx < len(ps); readIdx++ {
seriesID := ps[readIdx]
hash, ok := seriesHashCache.Fetch(seriesID)
if ok {
stats.seriesHashCacheHits++
}

hash, ok := seriesHashCache.CachedHash(seriesID, stats)
// Keep the posting if it's not in the cache, or it's in the cache and belongs to our shard.
if !ok || hash%uint64(shard.ShardCount) == uint64(shard.ShardIndex) {
ps[writeIdx] = seriesID
Expand All @@ -181,7 +175,7 @@ func filterPostingsByCachedShardHash(ps []storage.SeriesRef, shard *sharding.Sha
// Shrink the size.
ps = ps[:writeIdx]

return ps, stats
return ps
}

// paddedPostings adds the v2 index padding to postings without expanding them
Expand Down
19 changes: 10 additions & 9 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2212,7 +2212,7 @@ func benchmarkBlockSeriesWithConcurrency(b *testing.B, concurrency int, blockMet
chunkReader := blk.chunkReader(ctx)
chunksPool := &pool.BatchBytes{Delegate: pool.NoopBytes{}}

seriesSet, _, err := blockSeries(context.Background(), indexReader, chunkReader, chunksPool, matchers, shardSelector, seriesHashCache, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, log.NewNopLogger())
seriesSet, _, err := blockSeries(context.Background(), indexReader, chunkReader, chunksPool, matchers, shardSelector, cachedSeriesHasher{seriesHashCache}, chunksLimiter, seriesLimiter, req.SkipChunks, req.MinTime, req.MaxTime, log.NewNopLogger())
require.NoError(b, err)

// Ensure at least 1 series has been returned (as expected).
Expand Down Expand Up @@ -2312,7 +2312,7 @@ func TestBlockSeries_Cache(t *testing.T) {

indexr := b.indexReader()
for i, tc := range testCases {
ss, _, err := blockSeries(context.Background(), indexr, nil, nil, tc.matchers, tc.shard, shc, nil, sl, true, b.meta.MinTime, b.meta.MaxTime, log.NewNopLogger())
ss, _, err := blockSeries(context.Background(), indexr, nil, nil, tc.matchers, tc.shard, cachedSeriesHasher{shc}, nil, sl, true, b.meta.MinTime, b.meta.MaxTime, log.NewNopLogger())
require.NoError(t, err, "Unexpected error for test case %d", i)
lset := lsetFromSeriesSet(t, ss)
require.Equalf(t, tc.expectedLabelSet, lset, "Wrong label set for test case %d", i)
Expand All @@ -2322,7 +2322,7 @@ func TestBlockSeries_Cache(t *testing.T) {
// We break the index cache to not allow looking up series, so we know we don't look up series.
indexr.block.indexCache = forbiddenFetchMultiSeriesForRefsIndexCache{b.indexCache, t}
for i, tc := range testCases {
ss, _, err := blockSeries(context.Background(), indexr, nil, nil, tc.matchers, tc.shard, shc, nil, sl, true, b.meta.MinTime, b.meta.MaxTime, log.NewNopLogger())
ss, _, err := blockSeries(context.Background(), indexr, nil, nil, tc.matchers, tc.shard, cachedSeriesHasher{shc}, nil, sl, true, b.meta.MinTime, b.meta.MaxTime, log.NewNopLogger())
require.NoError(t, err, "Unexpected error for test case %d", i)
lset := lsetFromSeriesSet(t, ss)
require.Equalf(t, tc.expectedLabelSet, lset, "Wrong label set for test case %d", i)
Expand Down Expand Up @@ -2589,12 +2589,12 @@ func TestFilterPostingsByCachedShardHash(t *testing.T) {

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
cache := hashcache.NewSeriesHashCache(1024 * 1024).GetBlockCache("test")
hasher := mockSeriesHasher{cached: make(map[storage.SeriesRef]uint64)}
for _, pair := range testData.cacheEntries {
cache.Store(storage.SeriesRef(pair[0]), pair[1])
hasher.cached[storage.SeriesRef(pair[0])] = pair[1]
}

actualPostings, _ := filterPostingsByCachedShardHash(testData.inputPostings, testData.shard, cache)
actualPostings := filterPostingsByCachedShardHash(testData.inputPostings, testData.shard, hasher, nil)
assert.Equal(t, testData.expectedPostings, actualPostings)
})
}
Expand All @@ -2609,9 +2609,10 @@ func TestFilterPostingsByCachedShardHash_NoAllocations(t *testing.T) {
for _, pair := range cacheEntries {
cache.Store(storage.SeriesRef(pair[0]), pair[1])
}
stats := &queryStats{}

assert.Equal(t, float64(0), testing.AllocsPerRun(1, func() {
filterPostingsByCachedShardHash(inputPostings, shard, cache)
filterPostingsByCachedShardHash(inputPostings, shard, cachedSeriesHasher{cache}, stats)
}))
}

Expand All @@ -2638,7 +2639,7 @@ func BenchmarkFilterPostingsByCachedShardHash_AllPostingsShifted(b *testing.B) {
inputPostings = inputPostings[0:numPostings]
copy(inputPostings, originalPostings)

filterPostingsByCachedShardHash(inputPostings, shard, cache)
filterPostingsByCachedShardHash(inputPostings, shard, cachedSeriesHasher{cache}, nil)
}
}

Expand All @@ -2657,6 +2658,6 @@ func BenchmarkFilterPostingsByCachedShardHash_NoPostingsShifted(b *testing.B) {
for n := 0; n < b.N; n++ {
// We reuse the same postings slice because we expect this test to not
// modify it (cache is empty).
filterPostingsByCachedShardHash(ps, shard, cache)
filterPostingsByCachedShardHash(ps, shard, cachedSeriesHasher{cache}, nil)
}
}
27 changes: 10 additions & 17 deletions pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,6 @@ type loadingSeriesChunkRefsSetIterator struct {
ctx context.Context
postingsSetIterator *postingsSetsIterator
indexr *bucketIndexReader
seriesHashCache seriesHashCache
indexCache indexcache.IndexCache
stats *safeQueryStats
blockID ulid.ULID
Expand Down Expand Up @@ -632,7 +631,6 @@ func openBlockSeriesChunkRefsSetsIterator(
blockMeta *metadata.Meta,
matchers []*labels.Matcher, // Series matchers.
shard *sharding.ShardSelector, // Shard selector.
seriesHashCache *hashcache.BlockSeriesHashCache, // Block-specific series hash cache (used only if shard selector is specified).
seriesHasher seriesHasher,
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
Expand All @@ -657,7 +655,6 @@ func openBlockSeriesChunkRefsSetsIterator(
newPostingsSetsIterator(ps, batchSize),
indexr,
indexCache,
seriesHashCache,
stats,
blockMeta,
matchers,
Expand All @@ -679,7 +676,6 @@ func newLoadingSeriesChunkRefsSetIterator(
postingsSetIterator *postingsSetsIterator,
indexr *bucketIndexReader,
indexCache indexcache.IndexCache,
seriesHashCache *hashcache.BlockSeriesHashCache,
stats *safeQueryStats,
blockMeta *metadata.Meta,
matchers []*labels.Matcher,
Expand All @@ -705,7 +701,6 @@ func newLoadingSeriesChunkRefsSetIterator(
matchers: matchers,
shard: shard,
seriesHasher: seriesHasher,
seriesHashCache: seriesHashCache,
skipChunks: skipChunks,
minTime: minTime,
maxTime: maxTime,
Expand Down Expand Up @@ -745,7 +740,7 @@ func (s *loadingSeriesChunkRefsSetIterator) Next() bool {
// not belonging to the shard.
if s.shard != nil {
var unsafeStats queryStats
nextPostings, unsafeStats = filterPostingsByCachedShardHash(nextPostings, s.shard, s.seriesHashCache)
nextPostings = filterPostingsByCachedShardHash(nextPostings, s.shard, s.seriesHasher, &unsafeStats)
loadStats.merge(&unsafeStats)
}

Expand Down Expand Up @@ -903,29 +898,27 @@ func logSeriesForPostingsCacheEvent(ctx context.Context, logger log.Logger, user

type seriesHasher interface {
Hash(seriesID storage.SeriesRef, lset labels.Labels, stats *queryStats) uint64
}

type seriesHashCache interface {
Fetch(seriesID storage.SeriesRef) (uint64, bool)
CachedHash(seriesID storage.SeriesRef, stats *queryStats) (uint64, bool)
}

type cachedSeriesHasher struct {
cache *hashcache.BlockSeriesHashCache
}

func (b cachedSeriesHasher) Fetch(seriesID storage.SeriesRef) (uint64, bool) {
return b.cache.Fetch(seriesID)
func (b cachedSeriesHasher) CachedHash(seriesID storage.SeriesRef, stats *queryStats) (uint64, bool) {
stats.seriesHashCacheRequests++
hash, isCached := b.cache.Fetch(seriesID)
if isCached {
stats.seriesHashCacheHits++
}
return hash, isCached
}

func (b cachedSeriesHasher) Hash(id storage.SeriesRef, lset labels.Labels, stats *queryStats) uint64 {
stats.seriesHashCacheRequests++

hash, ok := b.cache.Fetch(id)
hash, ok := b.CachedHash(id, stats)
if !ok {
hash = lset.Hash()
b.cache.Store(id, hash)
} else {
stats.seriesHashCacheHits++
}
return hash
}
Expand Down
55 changes: 34 additions & 21 deletions pkg/storegateway/series_refs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ func TestLoadingSeriesChunkRefsSetIterator(t *testing.T) {
testCases := map[string]struct {
shard *sharding.ShardSelector
matchers []*labels.Matcher
seriesHasher mockSeriesHasher
seriesHasher seriesHasher
skipChunks bool
minT, maxT int64
batchSize int
Expand Down Expand Up @@ -1134,7 +1134,15 @@ func TestLoadingSeriesChunkRefsSetIterator(t *testing.T) {
},
},
"returns no batches when no series are owned by shard": {
shard: &sharding.ShardSelector{ShardIndex: 1, ShardCount: 2},
shard: &sharding.ShardSelector{ShardIndex: 1, ShardCount: 2},
seriesHasher: mockSeriesHasher{
hashes: map[string]uint64{
`{l1="v1"}`: 0,
`{l1="v2"}`: 0,
`{l1="v3"}`: 0,
`{l1="v4"}`: 0,
},
},
minT: 0,
maxT: 40,
batchSize: 2,
Expand Down Expand Up @@ -1187,17 +1195,20 @@ func TestLoadingSeriesChunkRefsSetIterator(t *testing.T) {
postings,
testCase.batchSize,
)
hasher := testCase.seriesHasher
if hasher == nil {
hasher = cachedSeriesHasher{hashcache.NewSeriesHashCache(100).GetBlockCache("")}
}
loadingIterator := newLoadingSeriesChunkRefsSetIterator(
context.Background(),
postingsIterator,
indexr,
noopCache{},
hashcache.NewSeriesHashCache(1).GetBlockCache(""),
newSafeQueryStats(),
block.meta,
testCase.matchers,
testCase.shard,
testCase.seriesHasher,
hasher,
testCase.skipChunks,
testCase.minT,
testCase.maxT,
Expand All @@ -1208,7 +1219,7 @@ func TestLoadingSeriesChunkRefsSetIterator(t *testing.T) {
// Tests
sets := readAllSeriesChunkRefsSet(loadingIterator)
assert.NoError(t, loadingIterator.Err())
if !assert.Len(t, sets, len(testCase.expectedSets)) {
if !assert.Len(t, sets, len(testCase.expectedSets), testName) {
return
}

Expand Down Expand Up @@ -1408,7 +1419,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator(t *testing.T) {
block.meta,
[]*labels.Matcher{testCase.matcher},
nil,
hashCache,
cachedSeriesHasher{hashCache},
&limiter{limit: testCase.chunksLimit},
&limiter{limit: testCase.seriesLimit},
Expand Down Expand Up @@ -1634,8 +1644,10 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) {
b.indexCache = newInMemoryIndexCache(t)

// Run 1 with a cold cache
seriesHashCache := newSeriesHashCacheWithHashes(b.meta.ULID, testCase.cachedSeriesHashesWithColdCache)
seriesHasher := mockSeriesHasher{hashes: mockedSeriesHashes}
seriesHasher := mockSeriesHasher{
hashes: mockedSeriesHashes,
cached: testCase.cachedSeriesHashesWithColdCache,
}

statsColdCache := newSafeQueryStats()
indexReader := b.indexReader()
Expand All @@ -1648,12 +1660,12 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) {
b.meta,
testCase.matchers,
testCase.shard,
seriesHashCache,
seriesHasher,
&limiter{limit: 1000},
&limiter{limit: 1000},
true,
b.meta.MinTime, b.meta.MaxTime,
b.meta.MinTime,
b.meta.MaxTime,
statsColdCache,
NewBucketStoreMetrics(nil),
log.NewNopLogger(),
Expand All @@ -1666,7 +1678,10 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) {
assert.Equal(t, testCase.expectedSeriesReadFromBlockWithColdCache, statsColdCache.export().seriesFetched)

// Run 2 with a warm cache
seriesHashCache = newSeriesHashCacheWithHashes(b.meta.ULID, testCase.cachedSeriesHashesWithWarmCache)
seriesHasher = mockSeriesHasher{
hashes: mockedSeriesHashes,
cached: testCase.cachedSeriesHashesWithWarmCache,
}

statsWarnCache := newSafeQueryStats()
ss, err = openBlockSeriesChunkRefsSetsIterator(
Expand All @@ -1678,12 +1693,12 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) {
b.meta,
testCase.matchers,
testCase.shard,
seriesHashCache,
seriesHasher,
&limiter{limit: 1000},
&limiter{limit: 1000},
true,
b.meta.MinTime, b.meta.MaxTime,
b.meta.MinTime,
b.meta.MaxTime,
statsWarnCache,
NewBucketStoreMetrics(nil),
log.NewNopLogger(),
Expand All @@ -1699,14 +1714,6 @@ func TestOpenBlockSeriesChunkRefsSetsIterator_SeriesCaching(t *testing.T) {
}
}

func newSeriesHashCacheWithHashes(blockID ulid.ULID, postings map[storage.SeriesRef]uint64) *hashcache.BlockSeriesHashCache {
cache := hashcache.NewSeriesHashCache(10000).GetBlockCache(blockID.String())
for p, h := range postings {
cache.Store(p, h)
}
return cache
}

type forbiddenFetchMultiSeriesForRefsIndexCache struct {
indexcache.IndexCache

Expand Down Expand Up @@ -1776,9 +1783,15 @@ func TestPostingsSetsIterator(t *testing.T) {
}

type mockSeriesHasher struct {
cached map[storage.SeriesRef]uint64
hashes map[string]uint64
}

func (a mockSeriesHasher) CachedHash(seriesID storage.SeriesRef, stats *queryStats) (uint64, bool) {
hash, isCached := a.cached[seriesID]
return hash, isCached
}

func (a mockSeriesHasher) Hash(seriesID storage.SeriesRef, lset labels.Labels, stats *queryStats) uint64 {
return a.hashes[lset.String()]
}
Expand Down

0 comments on commit 7453753

Please sign in to comment.