Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't load series multiple times when streaming chunks from store-gateways and only one batch is needed #8039

Merged
merged 45 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
897d3b6
Fix outdated comment
charleskorn Mar 19, 2024
254b1db
Add initial support for resetting `loadingSeriesChunkRefsSetIterator`
charleskorn Mar 19, 2024
4f67084
Add important TODO
charleskorn Mar 19, 2024
3fa6e3f
add reset funcs in preparation but don't use them yet
zenador Mar 19, 2024
05e6274
reuse the iterator, this builds but fails tests with EOF
zenador Mar 19, 2024
a15a25e
try changing strategy up front
zenador Mar 19, 2024
17c8360
Correctly reset merging and deduplicating iterators
charleskorn Mar 27, 2024
2473ca1
Avoid counting series and chunks twice in limiting iterator
charleskorn Mar 27, 2024
4f4d20b
Don't allow releasing series chunk refs set that will be reused
charleskorn Apr 3, 2024
1542ebe
Restore previous test setup
charleskorn May 3, 2024
09ddd2e
Fix typo in variable name
charleskorn May 3, 2024
fb1322b
Restore more previous test setup
charleskorn May 3, 2024
e1eadbd
Restore use of noChunkRefs where it should be used, and remove outdat…
charleskorn May 3, 2024
676349d
Fix linting warning.
charleskorn May 3, 2024
c3fb275
Introduce seriesIteratorStrategy flag for chunks streaming
charleskorn May 3, 2024
8b81435
Refactor test to exercise case where chunks streaming is both enabled…
charleskorn May 3, 2024
4471f33
Return reused series set to pool
charleskorn May 13, 2024
afebea2
Fix typo
charleskorn May 13, 2024
5915c19
Only load chunk refs when they're needed.
charleskorn May 13, 2024
f2dccc7
Rename variable to make intent clearer
charleskorn May 13, 2024
103615e
Add changelog entry
charleskorn May 13, 2024
73def47
Merge branch 'main' into charleskorn/store-gateway-double-work-fix
charleskorn May 13, 2024
9f40abc
Rename `withoutNoChunkRefs` to `withChunkRefs`
charleskorn May 14, 2024
3c8e50f
Remove redundant `if`
charleskorn May 14, 2024
4f07071
Add iterator type that wraps an iterator factory to manage the transi…
charleskorn May 15, 2024
cc40e1c
Remove `Reset` methods
charleskorn May 15, 2024
61290ed
Remove `makeReleasable`
charleskorn May 15, 2024
5f0342c
Unpick more unnecessary changes
charleskorn May 15, 2024
b24982d
Unpick whitespace change
charleskorn May 15, 2024
814d627
Extract method
charleskorn May 15, 2024
77ac30b
Refactor existing code to use `chunksStreamingCachingSeriesChunkRefsS…
charleskorn May 15, 2024
43115a9
Fix issue where streaming chunks does not work if multiple batches of…
charleskorn May 15, 2024
3c5eefe
Rename methods to better reflect their purpose
charleskorn May 15, 2024
93b6d9f
Move `streamingSeriesIterators` to `series_refs_streaming.go`
charleskorn May 20, 2024
c92b9a3
Remove unnecessary comments
charleskorn May 20, 2024
6ca5bc9
Remove unnecessary test helper function
charleskorn May 20, 2024
17bb03c
Introduce `seriesChunkRefsIteratorWrapper` interface instead of passi…
charleskorn May 20, 2024
81b1975
Rename type
charleskorn May 20, 2024
60f3084
Make a copy of the postings for each phase.
charleskorn May 20, 2024
c0fb6e9
Clarify comment
charleskorn May 20, 2024
e3d16b1
Use concrete type
charleskorn May 20, 2024
57c4b6b
Merge branch 'main' into charleskorn/store-gateway-double-work-fix
charleskorn May 20, 2024
e711d49
Don't make a copy of the postings unnecessarily.
charleskorn May 20, 2024
b2f7f15
Fix linting issues
charleskorn May 20, 2024
1af591f
Remove unused method
charleskorn May 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.max-concurrent-queue-timeout`. When set, queries at the store-gateway's query gate will not wait longer than that to execute. If a query reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #7777
* [ENHANCEMENT] Ingester: Optimize querying with regexp matchers. #8106
* [ENHANCEMENT] Distributor: Introduce `-distributor.max-request-pool-buffer-size` to allow configuring the maximum size of the request pool buffers. #8082
* [ENHANCEMENT] Store-gateway: improve performance when streaming chunks to queriers is enabled (`-querier.prefer-streaming-chunks-from-store-gateways=true`) and the query selects fewer than `-blocks-storage.bucket-store.batch-series-size` series (defaults to 5000 series). #8039
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
126 changes: 70 additions & 56 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,10 +606,8 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
defer done()

var (
// If we are streaming the series labels and chunks separately, we don't need to fetch the postings
// twice. So we use these slices to re-use them. Each reuse[i] corresponds to a single block.
reuse []*reusedPostingsAndMatchers
resHints = &hintspb.SeriesResponseHints{}
streamingIterators *streamingSeriesIterators
resHints = &hintspb.SeriesResponseHints{}
)
for _, b := range blocks {
resHints.AddQueriedBlock(b.meta.ULID)
Expand All @@ -633,7 +631,7 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor
seriesLimiter = s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))
)

seriesSet, reuse, err = s.streamingSeriesForBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
seriesSet, streamingIterators, err = s.createIteratorForChunksStreamingLabelsPhase(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
if err != nil {
return err
}
Expand Down Expand Up @@ -661,15 +659,11 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, srv storegatewaypb.Stor

start := time.Now()
if req.StreamingChunksBatchSize > 0 {
var seriesChunkIt iterator[seriesChunksSet]
seriesChunkIt, err = s.streamingChunksSetForBlocks(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse)
if err != nil {
return err
}
seriesChunkIt := s.createIteratorForChunksStreamingChunksPhase(ctx, readers, stats, chunksLimiter, seriesLimiter, streamingIterators)
err = s.sendStreamingChunks(req, srv, seriesChunkIt, stats, streamingSeriesCount)
} else {
var seriesSet storepb.SeriesSet
seriesSet, err = s.nonStreamingSeriesSetForBlocks(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
seriesSet, err = s.createIteratorForNonChunksStreamingRequest(ctx, req, blocks, indexReaders, readers, shardSelector, matchers, chunksLimiter, seriesLimiter, stats)
if err != nil {
return err
}
Expand Down Expand Up @@ -1027,8 +1021,8 @@ func chunksSize(chks []storepb.AggrChunk) (size int) {
return size
}

// nonStreamingSeriesSetForBlocks is used when the streaming feature is not enabled.
func (s *BucketStore) nonStreamingSeriesSetForBlocks(
// createIteratorForNonChunksStreamingRequest is used when the streaming feature is not enabled.
func (s *BucketStore) createIteratorForNonChunksStreamingRequest(
ctx context.Context,
req *storepb.SeriesRequest,
blocks []*bucketBlock,
Expand All @@ -1044,7 +1038,7 @@ func (s *BucketStore) nonStreamingSeriesSetForBlocks(
if req.SkipChunks {
strategy = noChunkRefs
}
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, nil, strategy)
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, strategy, nil)
if err != nil {
return nil, err
}
Expand All @@ -1059,10 +1053,10 @@ func (s *BucketStore) nonStreamingSeriesSetForBlocks(
return set, nil
}

// streamingSeriesForBlocks is used when streaming feature is enabled.
// createIteratorForChunksStreamingLabelsPhase is used when streaming feature is enabled.
// It returns a series set that only contains the series labels without any chunks information.
// The returned postings (series ref) and matches should be re-used when getting chunks to save on computation.
func (s *BucketStore) streamingSeriesForBlocks(
// The streamingSeriesIterators should be re-used when getting chunks to save on computation.
func (s *BucketStore) createIteratorForChunksStreamingLabelsPhase(
ctx context.Context,
req *storepb.SeriesRequest,
blocks []*bucketBlock,
Expand All @@ -1072,43 +1066,64 @@ func (s *BucketStore) streamingSeriesForBlocks(
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
stats *safeQueryStats,
) (storepb.SeriesSet, []*reusedPostingsAndMatchers, error) {
var (
reuse = make([]*reusedPostingsAndMatchers, len(blocks))
strategy = noChunkRefs | overlapMintMaxt
)
for i := range reuse {
reuse[i] = &reusedPostingsAndMatchers{}
}
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse, strategy)
) (storepb.SeriesSet, *streamingSeriesIterators, error) {
streamingIterators := newStreamingSeriesIterators()
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, overlapMintMaxt, streamingIterators.iteratorWrapper)
if err != nil {
return nil, nil, err
}
return newSeriesSetWithoutChunks(ctx, it, stats), reuse, nil

return newSeriesSetWithoutChunks(ctx, it, stats), streamingIterators, nil
}

type streamingSeriesIterators struct {
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
iterators []*chunksStreamingCachingSeriesChunkRefsSetIterator
mtx *sync.RWMutex
}

func newStreamingSeriesIterators() *streamingSeriesIterators {
return &streamingSeriesIterators{
mtx: &sync.RWMutex{},
}
}

func (i *streamingSeriesIterators) iteratorWrapper(strategy seriesIteratorStrategy, postingsSetsIterator *postingsSetsIterator, factory iteratorFactory) iterator[seriesChunkRefsSet] {
it := newChunksStreamingCachingSeriesChunkRefsSetIterator(strategy, postingsSetsIterator, factory)

i.mtx.Lock()
i.iterators = append(i.iterators, it)
i.mtx.Unlock()

return it
}

func (i *streamingSeriesIterators) prepareForChunksStreamingPhase() []iterator[seriesChunkRefsSet] {
prepared := make([]iterator[seriesChunkRefsSet], 0, len(i.iterators))

for _, it := range i.iterators {
it.PrepareForChunksStreamingPhase()
prepared = append(prepared, it)
}

return prepared
}

// streamingChunksSetForBlocks is used when streaming feature is enabled.
// It returns an iterator to go over the chunks for the series returned in the streamingSeriesForBlocks call.
// It is recommended to pass the reusePostings and reusePendingMatches returned by the streamingSeriesForBlocks call.
func (s *BucketStore) streamingChunksSetForBlocks(
// createIteratorForChunksStreamingChunksPhase is used when streaming feature is enabled.
// It returns an iterator to go over the chunks for the series returned in the createIteratorForChunksStreamingLabelsPhase call.
// It is required to pass the iterators returned by the createIteratorForChunksStreamingLabelsPhase call for reuse.
func (s *BucketStore) createIteratorForChunksStreamingChunksPhase(
ctx context.Context,
req *storepb.SeriesRequest,
blocks []*bucketBlock,
indexReaders map[ulid.ULID]*bucketIndexReader,
chunkReaders *bucketChunkReaders,
shardSelector *sharding.ShardSelector,
matchers []*labels.Matcher,
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
stats *safeQueryStats,
reuse []*reusedPostingsAndMatchers, // Should come from streamingSeriesForBlocks.
) (iterator[seriesChunksSet], error) {
it, err := s.getSeriesIteratorFromBlocks(ctx, req, blocks, indexReaders, shardSelector, matchers, chunksLimiter, seriesLimiter, stats, reuse, defaultStrategy)
if err != nil {
return nil, err
}
chunksLimiter ChunksLimiter,
seriesLimiter SeriesLimiter,
iterators *streamingSeriesIterators,
) iterator[seriesChunksSet] {
preparedIterators := iterators.prepareForChunksStreamingPhase()
it := s.getSeriesIteratorFromPerBlockIterators(preparedIterators, chunksLimiter, seriesLimiter)
scsi := newChunksPreloadingIterator(ctx, s.logger, s.userID, *chunkReaders, it, s.maxSeriesPerBatch, stats)
return scsi, nil

return scsi
}

func (s *BucketStore) getSeriesIteratorFromBlocks(
Expand All @@ -1121,8 +1136,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
chunksLimiter ChunksLimiter, // Rate limiter for loading chunks.
seriesLimiter SeriesLimiter, // Rate limiter for loading series.
stats *safeQueryStats,
reuse []*reusedPostingsAndMatchers, // Used if not empty. If not empty, len(reuse) must be len(blocks).
strategy seriesIteratorStrategy,
wrapper func(strategy seriesIteratorStrategy, postingsSetsIterator *postingsSetsIterator, factory iteratorFactory) iterator[seriesChunkRefsSet],
charleskorn marked this conversation as resolved.
Show resolved Hide resolved
) (iterator[seriesChunkRefsSet], error) {
var (
mtx = sync.Mutex{}
Expand All @@ -1131,9 +1146,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
begin = time.Now()
blocksQueriedByBlockMeta = make(map[blockQueriedMeta]int)
)
for i, b := range blocks {
for _, b := range blocks {
b := b
i := i

// Keep track of queried blocks.
indexr := indexReaders[b.meta.ULID]
Expand All @@ -1144,10 +1158,6 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
if shardSelector != nil {
blockSeriesHashCache = s.seriesHashCache.GetBlockCache(b.meta.ULID.String())
}
var r *reusedPostingsAndMatchers
if len(reuse) > 0 {
r = reuse[i]
}
g.Go(func() error {
part, err := openBlockSeriesChunkRefsSetsIterator(
ctx,
Expand All @@ -1162,8 +1172,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
strategy,
req.MinTime, req.MaxTime,
stats,
r,
s.logger,
wrapper,
)
if err != nil {
return errors.Wrapf(err, "fetch series for block %s", b.meta.ULID)
Expand Down Expand Up @@ -1192,13 +1202,17 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
stats.streamingSeriesExpandPostingsDuration += time.Since(begin)
})

mergedIterator := mergedSeriesChunkRefsSetIterators(s.maxSeriesPerBatch, batches...)
return s.getSeriesIteratorFromPerBlockIterators(batches, chunksLimiter, seriesLimiter), nil
}

func (s *BucketStore) getSeriesIteratorFromPerBlockIterators(perBlockIterators []iterator[seriesChunkRefsSet], chunksLimiter ChunksLimiter, seriesLimiter SeriesLimiter) iterator[seriesChunkRefsSet] {
mergedIterator := mergedSeriesChunkRefsSetIterators(s.maxSeriesPerBatch, perBlockIterators...)

// Apply limits after the merging, so that if the same series is part of multiple blocks it just gets
// counted once towards the limit.
mergedIterator = newLimitingSeriesChunkRefsSetIterator(mergedIterator, chunksLimiter, seriesLimiter)

return mergedIterator, nil
return mergedIterator
}

func (s *BucketStore) recordSeriesCallResult(safeStats *safeQueryStats) {
Expand Down Expand Up @@ -1467,8 +1481,8 @@ func blockLabelNames(ctx context.Context, indexr *bucketIndexReader, matchers []
noChunkRefs,
minTime, maxTime,
stats,
nil,
logger,
nil,
)
if err != nil {
return nil, errors.Wrap(err, "fetch series")
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_chunk_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func TestBucketChunkReader_refetchChunks(t *testing.T) {
block.meta.MinTime,
block.meta.MaxTime,
newSafeQueryStats(),
nil,
log.NewNopLogger(),
nil,
)
require.NoError(t, err)

Expand Down
Loading
Loading