From aed2baaabae22dd31fef58a8a60a693ee50580d8 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 12 Jun 2023 04:22:01 -0700 Subject: [PATCH] Expose estimated chunk and series size as configurable options (#6426) * expose estimated chunk and series size as configurable options Signed-off-by: Ben Ye * fix lint Signed-off-by: Ben Ye * fix test Signed-off-by: Ben Ye * fix test Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- cmd/thanos/store.go | 14 +++++++++++ pkg/store/bucket.go | 54 ++++++++++++++++++++++++++++++++-------- pkg/store/bucket_test.go | 44 +++++++++++++++++--------------- 3 files changed, 82 insertions(+), 30 deletions(-) diff --git a/cmd/thanos/store.go b/cmd/thanos/store.go index b5b86bcab18..187395a3d7c 100644 --- a/cmd/thanos/store.go +++ b/cmd/thanos/store.go @@ -63,6 +63,8 @@ type storeConfig struct { httpConfig httpConfig indexCacheSizeBytes units.Base2Bytes chunkPoolSize units.Base2Bytes + estimatedMaxSeriesSize uint64 + estimatedMaxChunkSize uint64 seriesBatchSize int storeRateLimits store.SeriesSelectLimits maxDownloadedBytes units.Base2Bytes @@ -139,6 +141,12 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("debug.series-batch-size", "The batch size when fetching series from TSDB blocks. Setting the number too high can lead to slower retrieval, while setting it too low can lead to throttling caused by too many calls made to object storage."). Hidden().Default(strconv.Itoa(store.SeriesBatchSize)).IntVar(&sc.seriesBatchSize) + cmd.Flag("debug.estimated-max-series-size", "Estimated max series size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 64KB."). + Hidden().Default(strconv.Itoa(store.EstimatedMaxSeriesSize)).Uint64Var(&sc.estimatedMaxSeriesSize) + + cmd.Flag("debug.estimated-max-chunk-size", "Estimated max chunk size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB."). + Hidden().Default(strconv.Itoa(store.EstimatedMaxChunkSize)).Uint64Var(&sc.estimatedMaxChunkSize) + sc.filterConf = &store.FilterConfig{} cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y."). @@ -358,6 +366,12 @@ func runStore( store.WithFilterConfig(conf.filterConf), store.WithChunkHashCalculation(true), store.WithSeriesBatchSize(conf.seriesBatchSize), + store.WithBlockEstimatedMaxSeriesFunc(func(_ metadata.Meta) uint64 { + return conf.estimatedMaxSeriesSize + }), + store.WithBlockEstimatedMaxChunkFunc(func(_ metadata.Meta) uint64 { + return conf.estimatedMaxChunkSize + }), } if conf.debugLogging { diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 09be03cbc02..6b9ec676eeb 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -74,8 +74,8 @@ const ( // Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf. MaxSamplesPerChunk = 120 // EstimatedMaxChunkSize is average max of chunk size. This can be exceeded though in very rare (valid) cases. - EstimatedMaxChunkSize = 16000 - maxSeriesSize = 64 * 1024 + EstimatedMaxChunkSize = 16000 + EstimatedMaxSeriesSize = 64 * 1024 // Relatively large in order to reduce memory waste, yet small enough to avoid excessive allocations. chunkBytesPoolMinSize = 64 * 1024 // 64 KiB chunkBytesPoolMaxSize = 64 * 1024 * 1024 // 64 MiB @@ -255,7 +255,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics { }, []string{"reason"}) m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{ Name: "thanos_bucket_store_series_refetches_total", - Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize), + Help: "Total number of cases where configured estimated series bytes was not enough was to fetch series from index, resulting in refetch.", }) m.cachedPostingsCompressions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ @@ -325,6 +325,8 @@ type FilterConfig struct { MinTime, MaxTime model.TimeOrDurationValue } +type BlockEstimator func(meta metadata.Meta) uint64 + // BucketStore implements the store API backed by a bucket. It loads all index // files to local disk. // @@ -378,6 +380,9 @@ type BucketStore struct { enableSeriesResponseHints bool enableChunkHashCalculation bool + + blockEstimatedMaxSeriesFunc BlockEstimator + blockEstimatedMaxChunkFunc BlockEstimator } func (s *BucketStore) validate() error { @@ -463,6 +468,18 @@ func WithSeriesBatchSize(seriesBatchSize int) BucketStoreOption { } } +func WithBlockEstimatedMaxSeriesFunc(f BlockEstimator) BucketStoreOption { + return func(s *BucketStore) { + s.blockEstimatedMaxSeriesFunc = f + } +} + +func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption { + return func(s *BucketStore) { + s.blockEstimatedMaxChunkFunc = f + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -710,6 +727,8 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er s.chunkPool, indexHeaderReader, s.partitioner, + s.blockEstimatedMaxSeriesFunc, + s.blockEstimatedMaxChunkFunc, ) if err != nil { return errors.Wrap(err, "new bucket block") @@ -2005,6 +2024,9 @@ type bucketBlock struct { // Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using // request hints' BlockMatchers. relabelLabels labels.Labels + + estimatedMaxChunkSize int + estimatedMaxSeriesSize int } func newBucketBlock( @@ -2018,7 +2040,17 @@ func newBucketBlock( chunkPool pool.Bytes, indexHeadReader indexheader.Reader, p Partitioner, + maxSeriesSizeFunc BlockEstimator, + maxChunkSizeFunc BlockEstimator, ) (b *bucketBlock, err error) { + maxSeriesSize := EstimatedMaxSeriesSize + if maxSeriesSizeFunc != nil { + maxSeriesSize = int(maxSeriesSizeFunc(*meta)) + } + maxChunkSize := EstimatedMaxChunkSize + if maxChunkSizeFunc != nil { + maxChunkSize = int(maxChunkSizeFunc(*meta)) + } b = &bucketBlock{ logger: logger, metrics: metrics, @@ -2036,6 +2068,8 @@ func newBucketBlock( Name: block.BlockIDLabel, Value: meta.ULID.String(), }), + estimatedMaxSeriesSize: maxSeriesSize, + estimatedMaxChunkSize: maxChunkSize, } sort.Sort(b.extLset) sort.Sort(b.relabelLabels) @@ -2654,7 +2688,7 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser } parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) { - return uint64(ids[i]), uint64(ids[i] + maxSeriesSize) + return uint64(ids[i]), uint64(ids[i]) + uint64(r.block.estimatedMaxSeriesSize) }) g, ctx := errgroup.WithContext(ctx) @@ -2705,7 +2739,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series // Inefficient, but should be rare. r.block.metrics.seriesRefetches.Inc() - level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize) + level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize) // Fetch plus to get the size of next one if exists. return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter) @@ -2942,7 +2976,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ return pIdxs[i].offset < pIdxs[j].offset }) parts := r.block.partitioner.Partition(len(pIdxs), func(i int) (start, end uint64) { - return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + EstimatedMaxChunkSize + return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + uint64(r.block.estimatedMaxChunkSize) }) for _, p := range parts { @@ -2978,7 +3012,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a return errors.Wrap(err, "get range reader") } defer runutil.CloseWithLogOnErr(r.block.logger, reader, "readChunkRange close range reader") - bufReader := bufio.NewReaderSize(reader, EstimatedMaxChunkSize) + bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize) locked := true r.mtx.Lock() @@ -3004,11 +3038,11 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a n int ) - bufPooled, err := r.block.chunkPool.Get(EstimatedMaxChunkSize) + bufPooled, err := r.block.chunkPool.Get(r.block.estimatedMaxChunkSize) if err == nil { buf = *bufPooled } else { - buf = make([]byte, EstimatedMaxChunkSize) + buf = make([]byte, r.block.estimatedMaxChunkSize) } defer r.block.chunkPool.Put(&buf) @@ -3024,7 +3058,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a // Presume chunk length to be reasonably large for common use cases. // However, declaration for EstimatedMaxChunkSize warns us some chunks could be larger in some rare cases. // This is handled further down below. - chunkLen = EstimatedMaxChunkSize + chunkLen = r.block.estimatedMaxChunkSize if i+1 < len(pIdxs) { if diff = pIdxs[i+1].offset - pIdx.offset; int(diff) < chunkLen { chunkLen = int(diff) diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index f5d24c9d37c..0a12a4b329b 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -212,7 +212,7 @@ func TestBucketFilterExtLabelsMatchers(t *testing.T) { }, }, } - b, _ := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil) + b, _ := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) ms := []*labels.Matcher{ {Type: labels.MatchNotEqual, Name: "a", Value: "b"}, } @@ -260,7 +260,7 @@ func TestBucketBlock_matchLabels(t *testing.T) { }, } - b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil) + b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil) testutil.Ok(t, err) cases := []struct { @@ -1515,14 +1515,16 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()), metadata.NoneFunc)) b1 = &bucketBlock{ - indexCache: indexCache, - logger: logger, - metrics: newBucketStoreMetrics(nil), - bkt: bkt, - meta: meta, - partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), - chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, - chunkPool: chunkPool, + indexCache: indexCache, + logger: logger, + metrics: newBucketStoreMetrics(nil), + bkt: bkt, + meta: meta, + partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), + chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, + chunkPool: chunkPool, + estimatedMaxSeriesSize: EstimatedMaxSeriesSize, + estimatedMaxChunkSize: EstimatedMaxChunkSize, } b1.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b1.meta.ULID, DefaultPostingOffsetInMemorySampling) testutil.Ok(t, err) @@ -1554,14 +1556,16 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) { testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()), metadata.NoneFunc)) b2 = &bucketBlock{ - indexCache: indexCache, - logger: logger, - metrics: newBucketStoreMetrics(nil), - bkt: bkt, - meta: meta, - partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), - chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, - chunkPool: chunkPool, + indexCache: indexCache, + logger: logger, + metrics: newBucketStoreMetrics(nil), + bkt: bkt, + meta: meta, + partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize), + chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")}, + chunkPool: chunkPool, + estimatedMaxSeriesSize: EstimatedMaxSeriesSize, + estimatedMaxChunkSize: EstimatedMaxChunkSize, } b2.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b2.meta.ULID, DefaultPostingOffsetInMemorySampling) testutil.Ok(t, err) @@ -2547,7 +2551,7 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) { testutil.Ok(b, err) // Create a bucket block with only the dependencies we need for the benchmark. - blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, nil, chunkPool, nil, nil) + blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, nil, chunkPool, nil, nil, nil, nil) testutil.Ok(b, err) b.ResetTimer() @@ -2637,7 +2641,7 @@ func prepareBucket(b testing.TB, resolutionLevel compact.ResolutionLevel, sample testutil.Ok(b, err) // Create a bucket block with only the dependencies we need for the benchmark. - blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner) + blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner, nil, nil) testutil.Ok(b, err) return blk, blockMeta }