Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't load series multiple times when streaming chunks from store-gateways and only one batch is needed #8039

Merged
merged 45 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
897d3b6
Fix outdated comment
charleskorn Mar 19, 2024
254b1db
Add initial support for resetting `loadingSeriesChunkRefsSetIterator`
charleskorn Mar 19, 2024
4f67084
Add important TODO
charleskorn Mar 19, 2024
3fa6e3f
add reset funcs in preparation but don't use them yet
zenador Mar 19, 2024
05e6274
reuse the iterator, this builds but fails tests with EOF
zenador Mar 19, 2024
a15a25e
try changing strategy up front
zenador Mar 19, 2024
17c8360
Correctly reset merging and deduplicating iterators
charleskorn Mar 27, 2024
2473ca1
Avoid counting series and chunks twice in limiting iterator
charleskorn Mar 27, 2024
4f4d20b
Don't allow releasing series chunk refs set that will be reused
charleskorn Apr 3, 2024
1542ebe
Restore previous test setup
charleskorn May 3, 2024
09ddd2e
Fix typo in variable name
charleskorn May 3, 2024
fb1322b
Restore more previous test setup
charleskorn May 3, 2024
e1eadbd
Restore use of noChunkRefs where it should be used, and remove outdat…
charleskorn May 3, 2024
676349d
Fix linting warning.
charleskorn May 3, 2024
c3fb275
Introduce seriesIteratorStrategy flag for chunks streaming
charleskorn May 3, 2024
8b81435
Refactor test to exercise case where chunks streaming is both enabled…
charleskorn May 3, 2024
4471f33
Return reused series set to pool
charleskorn May 13, 2024
afebea2
Fix typo
charleskorn May 13, 2024
5915c19
Only load chunk refs when they're needed.
charleskorn May 13, 2024
f2dccc7
Rename variable to make intent clearer
charleskorn May 13, 2024
103615e
Add changelog entry
charleskorn May 13, 2024
73def47
Merge branch 'main' into charleskorn/store-gateway-double-work-fix
charleskorn May 13, 2024
9f40abc
Rename `withoutNoChunkRefs` to `withChunkRefs`
charleskorn May 14, 2024
3c8e50f
Remove redundant `if`
charleskorn May 14, 2024
4f07071
Add iterator type that wraps an iterator factory to manage the transi…
charleskorn May 15, 2024
cc40e1c
Remove `Reset` methods
charleskorn May 15, 2024
61290ed
Remove `makeReleasable`
charleskorn May 15, 2024
5f0342c
Unpick more unnecessary changes
charleskorn May 15, 2024
b24982d
Unpick whitespace change
charleskorn May 15, 2024
814d627
Extract method
charleskorn May 15, 2024
77ac30b
Refactor existing code to use `chunksStreamingCachingSeriesChunkRefsS…
charleskorn May 15, 2024
43115a9
Fix issue where streaming chunks does not work if multiple batches of…
charleskorn May 15, 2024
3c5eefe
Rename methods to better reflect their purpose
charleskorn May 15, 2024
93b6d9f
Move `streamingSeriesIterators` to `series_refs_streaming.go`
charleskorn May 20, 2024
c92b9a3
Remove unnecessary comments
charleskorn May 20, 2024
6ca5bc9
Remove unnecessary test helper function
charleskorn May 20, 2024
17bb03c
Introduce `seriesChunkRefsIteratorWrapper` interface instead of passi…
charleskorn May 20, 2024
81b1975
Rename type
charleskorn May 20, 2024
60f3084
Make a copy of the postings for each phase.
charleskorn May 20, 2024
c0fb6e9
Clarify comment
charleskorn May 20, 2024
e3d16b1
Use concrete type
charleskorn May 20, 2024
57c4b6b
Merge branch 'main' into charleskorn/store-gateway-double-work-fix
charleskorn May 20, 2024
e711d49
Don't make a copy of the postings unnecessarily.
charleskorn May 20, 2024
b2f7f15
Fix linting issues
charleskorn May 20, 2024
1af591f
Remove unused method
charleskorn May 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.index-header.lazy-loading-concurrency-queue-timeout`. When set, loads of index-headers at the store-gateway's index-header lazy load gate will not wait longer than that to execute. If a load reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #8138
* [ENHANCEMENT] Ingester: Optimize querying with regexp matchers. #8106
* [ENHANCEMENT] Distributor: Introduce `-distributor.max-request-pool-buffer-size` to allow configuring the maximum size of the request pool buffers. #8082
* [ENHANCEMENT] Store-gateway: improve performance when streaming chunks to queriers is enabled (`-querier.prefer-streaming-chunks-from-store-gateways=true`) and the query selects fewer than `-blocks-storage.bucket-store.batch-series-size` series (defaults to 5000 series). #8039
* [ENHANCEMENT] Ingester: active series are now updated along with owned series. They decrease when series change ownership between ingesters. This helps provide a more accurate total of active series when ingesters are added. This is only enabled when `-ingester.track-ingester-owned-series` or `-ingester.use-ingester-owned-series-for-limits` are enabled. #8084
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
Expand Down
102 changes: 42 additions & 60 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,10 +606,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
defer done()

var (
// If we are streaming the series labels and chunks separately, we don't need to fetch the postings
// twice. So we use these slices to re-use them. Each reuse[i] corresponds to a single block.
reuse []*reusedPostingsAndMatchers
resHints = &hintspb.SeriesResponseHints{}
streamingIterators *streamingSeriesIterators
resHints = &hintspb.SeriesResponseHints{}
)
for _, b := range blocks {
resHints.AddQueriedBlock(b.meta.ULID)
Expand All @@ -633,7 +631,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
)

seriesSet, reuse, err = s.streamingSeriesForBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
seriesSet, streamingIterators, err = s.createIteratorForChunksStreamingLabelsPhase(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
if err != nil {
return err
}
Expand Down Expand Up @@ -661,15 +659,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor

start := time.Now()
if req.StreamingChunksBatchSize > 0 {
var seriesChunkIt iterator[seriesChunksSet]
seriesChunkIt, err = s.streamingChunksSetForBlocks(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse)
if err != nil {
return err
}
seriesChunkIt := s.createIteratorForChunksStreamingChunksPhase(ctx, readers, stats, chunksLimiter, seriesLimiter, streamingIterators)
err = s.sendStreamingChunks(req, srv, seriesChunkIt, stats, streamingSeriesCount)
} else {
var seriesSet storepb.SeriesSet
seriesSet, err = s.nonStreamingSeriesSetForBlocks(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
seriesSet, err = s.createIteratorForNonChunksStreamingRequest(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
if err != nil {
return err
}
Expand Down Expand Up @@ -1027,24 +1021,24 @@ func chunksSize(chks []storepb.AggrChunk) (size int) {
return size
}

// nonStreamingSeriesSetForBlocks is used when the streaming feature is not enabled.
func (s *BucketStore) nonStreamingSeriesSetForBlocks(
// createIteratorForNonChunksStreamingRequest is used when the streaming feature is not enabled.
func (s *BucketStore) createIteratorForNonChunksStreamingRequest(
ctx context.Context,
req *storepb.SeriesRequest,
blocks []*bucketBlock,
indexReaders map[ulid.ULID]*bucketIndexReader,
chunkReaders *bucketChunkReaders,
shardSelector *sharding.ShardSelector,
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
stats *safeQueryStats,
) (storepb.SeriesSet, error) {
strategy := defaultStrategy
if req.SkipChunks {
strategy = noChunkRefs
}
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, nil, strategy)
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, strategy, nil)
if err != nil {
return nil, err
}
Expand All @@ -1059,56 +1053,45 @@ func (s *BucketStore) nonStreamingSeriesSetForBlocks(
return set, nil
}

// streamingSeriesForBlocks is used when streaming feature is enabled.
// createIteratorForChunksStreamingLabelsPhase is used when streaming feature is enabled.
// It returns a series set that only contains the series labels without any chunks information.
// The returned postings (series ref) and matches should be re-used when getting chunks to save on computation.
func (s *BucketStore) streamingSeriesForBlocks(
// The streamingSeriesIterators should be re-used when getting chunks to save on computation.
func (s *BucketStore) createIteratorForChunksStreamingLabelsPhase(
ctx context.Context,
req *storepb.SeriesRequest,
blocks []*bucketBlock,
indexReaders map[ulid.ULID]*bucketIndexReader,
shardSelector *sharding.ShardSelector,
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
stats *safeQueryStats,
) (storepb.SeriesSet, []*reusedPostingsAndMatchers, error) {
var (
reuse = make([]*reusedPostingsAndMatchers, len(blocks))
strategy = noChunkRefs | overlapMintMaxt
)
for i := range reuse {
reuse[i] = &reusedPostingsAndMatchers{}
}
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse, strategy)
) (storepb.SeriesSet, *streamingSeriesIterators, error) {
streamingIterators := newStreamingSeriesIterators()
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, overlapMintMaxt, streamingIterators)
if err != nil {
return nil, nil, err
}
return newSeriesSetWithoutChunks(ctx, it, stats), reuse, nil

return newSeriesSetWithoutChunks(ctx, it, stats), streamingIterators, nil
}

// streamingChunksSetForBlocks is used when streaming feature is enabled.
// It returns an iterator to go over the chunks for the series returned in the streamingSeriesForBlocks call.
// It is recommended to pass the reusePostings and reusePendingMatches returned by the streamingSeriesForBlocks call.
func (s *BucketStore) streamingChunksSetForBlocks(
// createIteratorForChunksStreamingChunksPhase is used when streaming feature is enabled.
// It returns an iterator to go over the chunks for the series returned in the createIteratorForChunksStreamingLabelsPhase call.
// It is required to pass the iterators returned by the createIteratorForChunksStreamingLabelsPhase call for reuse.
func (s *BucketStore) createIteratorForChunksStreamingChunksPhase(
ctx context.Context,
req *storepb.SeriesRequest,
blocks []*bucketBlock,
indexReaders map[ulid.ULID]*bucketIndexReader,
chunkReaders *bucketChunkReaders,
shardSelector *sharding.ShardSelector,
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
stats *safeQueryStats,
reuse []*reusedPostingsAndMatchers, // Should come from streamingSeriesForBlocks.
) (iterator[seriesChunksSet], error) {
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse, defaultStrategy)
if err != nil {
return nil, err
}
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
iterators *streamingSeriesIterators,
) iterator[seriesChunksSet] {
preparedIterators := iterators.prepareForChunksStreamingPhase()
it := s.getSeriesIteratorFromPerBlockIterators(preparedIterators, chunksLimiter, seriesLimiter)
scsi := newChunksPreloadingIterator(ctx, s.logger, s.userID, *chunkReaders, it, s.maxSeriesPerBatch, stats)
return scsi, nil

return scsi
}

func (s *BucketStore) getSeriesIteratorFromBlocks(
Expand All @@ -1121,8 +1104,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
stats *safeQueryStats,
reuse []*reusedPostingsAndMatchers, // Used if not empty. If not empty, len(reuse) must be len(blocks).
strategy seriesIteratorStrategy,
streamingIterators *streamingSeriesIterators,
) (iterator[seriesChunkRefsSet], error) {
var (
mtx = sync.Mutex{}
Expand All @@ -1131,9 +1114,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
begin = time.Now()
blocksQueriedByBlockMeta = make(map[blockQueriedMeta]int)
)
for i, b := range blocks {
for _, b := range blocks {
b := b
i := i

// Keep track of queried blocks.
indexr := indexReaders[b.meta.ULID]
Expand All @@ -1144,10 +1126,6 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
if shardSelector != nil {
blockSeriesHashCache = s.seriesHashCache.GetBlockCache(b.meta.ULID.String())
}
var r *reusedPostingsAndMatchers
if len(reuse) > 0 {
r = reuse[i]
}
g.Go(func() error {
part, err := openBlockSeriesChunkRefsSetsIterator(
ctx,
Expand All @@ -1162,8 +1140,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
strategy,
req.MinTime, req.MaxTime,
stats,
r,
s.logger,
streamingIterators,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1192,13 +1170,17 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
stats.streamingSeriesExpandPostingsDuration += time.Since(begin)
})

mergedIterator := mergedSeriesChunkRefsSetIterators(s.maxSeriesPerBatch, batches...)
return s.getSeriesIteratorFromPerBlockIterators(batches, chunksLimiter, seriesLimiter), nil
}

func (s *BucketStore) getSeriesIteratorFromPerBlockIterators(perBlockIterators []iterator[seriesChunkRefsSet], chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter) iterator[seriesChunkRefsSet] {
mergedIterator := mergedSeriesChunkRefsSetIterators(s.maxSeriesPerBatch, perBlockIterators...)

// Apply limits after the merging, so that if the same series is part of multiple blocks it just gets
// counted once towards the limit.
mergedIterator = newLimitingSeriesChunkRefsSetIterator(mergedIterator, chunksLimiter, seriesLimiter)

return mergedIterator, nil
return mergedIterator
}

func (s *BucketStore) recordSeriesCallResult(safeStats *safeQueryStats) {
Expand Down Expand Up @@ -1467,8 +1449,8 @@ func blockLabelNames(ctx context.Context, indexr *bucketIndexReader, matchers []
noChunkRefs,
minTime, maxTime,
stats,
nil,
logger,
nil,
)
if err != nil {
return nil, errors.Wrap(err, "fetch series")
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_chunk_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func TestBucketChunkReader_refetchChunks(t *testing.T) {
block.meta.MinTime,
block.meta.MaxTime,
newSafeQueryStats(),
nil,
log.NewNopLogger(),
nil,
)
require.NoError(t, err)

Expand Down
102 changes: 52 additions & 50 deletions pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ func (b seriesChunkRefsSet) release() {
seriesChunkRefsSetPool.Put(&reuse)
}

// makeUnreleasable returns a new seriesChunkRefsSet that cannot be released on a subsequent call to release.
//
// This is useful for scenarios where a set will be used multiple times and so it is not safe for consumers to release it.
func (b seriesChunkRefsSet) makeUnreleasable() seriesChunkRefsSet {
return seriesChunkRefsSet{b.series, false}
}

// seriesChunkRefs holds a series with a list of chunk references.
type seriesChunkRefs struct {
lset labels.Labels
Expand Down Expand Up @@ -690,47 +697,58 @@ func openBlockSeriesChunkRefsSetsIterator(
ctx context.Context,
batchSize int,
tenantID string,
indexr *bucketIndexReader, // Index reader for block.
indexr *bucketIndexReader,
indexCache indexcache.IndexCache,
blockMeta *block.Meta,
matchers []*labels.Matcher, // Series matchers.
shard *sharding.ShardSelector, // Shard selector.
matchers []*labels.Matcher,
shard *sharding.ShardSelector,
seriesHasher seriesHasher,
strategy seriesIteratorStrategy,
minTime, maxTime int64, // Series must have data in this time range to be returned (ignored if skipChunks=true).
minTime, maxTime int64,
stats *safeQueryStats,
reuse *reusedPostingsAndMatchers, // If this is not nil, these posting and matchers are used as it is without fetching new ones.
logger log.Logger,
streamingIterators *streamingSeriesIterators,
) (iterator[seriesChunkRefsSet], error) {
if batchSize <= 0 {
return nil, errors.New("set size must be a positive number")
}

var (
ps []storage.SeriesRef
pendingMatchers []*labels.Matcher
fetchPostings = true
)
if reuse != nil {
fetchPostings = !reuse.isSet()
ps = reuse.ps
pendingMatchers = reuse.matchers
ps, pendingMatchers, err := indexr.ExpandedPostings(ctx, matchers, stats)
if err != nil {
return nil, errors.Wrap(err, "expanded matching postings")
}
if fetchPostings {
var err error
ps, pendingMatchers, err = indexr.ExpandedPostings(ctx, matchers, stats)
if err != nil {
return nil, errors.Wrap(err, "expanded matching postings")
}
if reuse != nil {
reuse.set(ps, pendingMatchers)
}

iteratorFactory := func(strategy seriesIteratorStrategy, psi *postingsSetsIterator) iterator[seriesChunkRefsSet] {
return openBlockSeriesChunkRefsSetsIteratorFromPostings(ctx, tenantID, indexr, indexCache, blockMeta, shard, seriesHasher, strategy, minTime, maxTime, stats, psi, pendingMatchers, logger)
}

if streamingIterators == nil {
psi := newPostingsSetsIterator(ps, batchSize)
return iteratorFactory(strategy, psi), nil
}

return streamingIterators.wrapIterator(strategy, ps, batchSize, iteratorFactory), nil
}

func openBlockSeriesChunkRefsSetsIteratorFromPostings(
ctx context.Context,
tenantID string,
indexr *bucketIndexReader,
indexCache indexcache.IndexCache,
blockMeta *block.Meta,
shard *sharding.ShardSelector,
seriesHasher seriesHasher,
strategy seriesIteratorStrategy,
minTime, maxTime int64,
stats *safeQueryStats,
postingsSetsIterator *postingsSetsIterator,
pendingMatchers []*labels.Matcher,
logger log.Logger,
) iterator[seriesChunkRefsSet] {
var it iterator[seriesChunkRefsSet]
it = newLoadingSeriesChunkRefsSetIterator(
ctx,
newPostingsSetsIterator(ps, batchSize),
postingsSetsIterator,
indexr,
indexCache,
stats,
Expand All @@ -743,36 +761,12 @@ func openBlockSeriesChunkRefsSetsIterator(
tenantID,
logger,
)

if len(pendingMatchers) > 0 {
it = newFilteringSeriesChunkRefsSetIterator(pendingMatchers, it, stats)
}

return it, nil
}

// reusedPostings is used to share the postings and matches across function calls for re-use
// in case of streaming series. We have it as a separate struct so that we can give a safe way
// to use it by making a copy where required. You can use it to put items only once.
type reusedPostingsAndMatchers struct {
ps []storage.SeriesRef
matchers []*labels.Matcher
filled bool
}

func (p *reusedPostingsAndMatchers) set(ps []storage.SeriesRef, matchers []*labels.Matcher) {
if p.filled {
// We already have something here.
return
}
// Postings list can be modified later, so we make a copy here.
p.ps = make([]storage.SeriesRef, len(ps))
copy(p.ps, ps)
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
p.matchers = matchers
p.filled = true
}

func (p *reusedPostingsAndMatchers) isSet() bool {
return p.filled
return it
}

// seriesIteratorStrategy defines the strategy to use when loading the series and their chunk refs.
Expand Down Expand Up @@ -812,6 +806,14 @@ func (s seriesIteratorStrategy) isNoChunkRefsAndOverlapMintMaxt() bool {
return s.isNoChunkRefs() && s.isOverlapMintMaxt()
}

func (s seriesIteratorStrategy) withNoChunkRefs() seriesIteratorStrategy {
return s | noChunkRefs
}

func (s seriesIteratorStrategy) withChunkRefs() seriesIteratorStrategy {
return s & ^noChunkRefs
}

func newLoadingSeriesChunkRefsSetIterator(
ctx context.Context,
postingsSetIterator *postingsSetsIterator,
Expand Down
Loading
Loading