Skip to content

Commit

Permalink
Expose estimated chunk and series size as configurable options (thano…
Browse files Browse the repository at this point in the history
…s-io#6426)

* expose estimated chunk and series size as configurable options

Signed-off-by: Ben Ye <benye@amazon.com>

* fix lint

Signed-off-by: Ben Ye <benye@amazon.com>

* fix test

Signed-off-by: Ben Ye <benye@amazon.com>

* fix test

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored and pedro-stanaka committed Jun 27, 2023
1 parent 9ad9014 commit aed2baa
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 30 deletions.
14 changes: 14 additions & 0 deletions cmd/thanos/store.go
Expand Up @@ -63,6 +63,8 @@ type storeConfig struct {
httpConfig httpConfig
indexCacheSizeBytes units.Base2Bytes
chunkPoolSize units.Base2Bytes
estimatedMaxSeriesSize uint64
estimatedMaxChunkSize uint64
seriesBatchSize int
storeRateLimits store.SeriesSelectLimits
maxDownloadedBytes units.Base2Bytes
Expand Down Expand Up @@ -139,6 +141,12 @@ func (sc *storeConfig) registerFlag(cmd extkingpin.FlagClause) {
cmd.Flag("debug.series-batch-size", "The batch size when fetching series from TSDB blocks. Setting the number too high can lead to slower retrieval, while setting it too low can lead to throttling caused by too many calls made to object storage.").
Hidden().Default(strconv.Itoa(store.SeriesBatchSize)).IntVar(&sc.seriesBatchSize)

cmd.Flag("debug.estimated-max-series-size", "Estimated max series size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 64KB.").
Hidden().Default(strconv.Itoa(store.EstimatedMaxSeriesSize)).Uint64Var(&sc.estimatedMaxSeriesSize)

cmd.Flag("debug.estimated-max-chunk-size", "Estimated max chunk size. Setting a value might result in over fetching data while a small value might result in data refetch. Default value is 16KiB.").
Hidden().Default(strconv.Itoa(store.EstimatedMaxChunkSize)).Uint64Var(&sc.estimatedMaxChunkSize)

sc.filterConf = &store.FilterConfig{}

cmd.Flag("min-time", "Start of time range limit to serve. Thanos Store will serve only metrics, which happened later than this value. Option can be a constant time in RFC3339 format or time duration relative to current time, such as -1d or 2h45m. Valid duration units are ms, s, m, h, d, w, y.").
Expand Down Expand Up @@ -358,6 +366,12 @@ func runStore(
store.WithFilterConfig(conf.filterConf),
store.WithChunkHashCalculation(true),
store.WithSeriesBatchSize(conf.seriesBatchSize),
store.WithBlockEstimatedMaxSeriesFunc(func(_ metadata.Meta) uint64 {
return conf.estimatedMaxSeriesSize
}),
store.WithBlockEstimatedMaxChunkFunc(func(_ metadata.Meta) uint64 {
return conf.estimatedMaxChunkSize
}),
}

if conf.debugLogging {
Expand Down
54 changes: 44 additions & 10 deletions pkg/store/bucket.go
Expand Up @@ -74,8 +74,8 @@ const (
// Take a look at Figure 6 in this whitepaper http://www.vldb.org/pvldb/vol8/p1816-teller.pdf.
MaxSamplesPerChunk = 120
// EstimatedMaxChunkSize is average max of chunk size. This can be exceeded though in very rare (valid) cases.
EstimatedMaxChunkSize = 16000
maxSeriesSize = 64 * 1024
EstimatedMaxChunkSize = 16000
EstimatedMaxSeriesSize = 64 * 1024
// Relatively large in order to reduce memory waste, yet small enough to avoid excessive allocations.
chunkBytesPoolMinSize = 64 * 1024 // 64 KiB
chunkBytesPoolMaxSize = 64 * 1024 * 1024 // 64 MiB
Expand Down Expand Up @@ -255,7 +255,7 @@ func newBucketStoreMetrics(reg prometheus.Registerer) *bucketStoreMetrics {
}, []string{"reason"})
m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_bucket_store_series_refetches_total",
Help: fmt.Sprintf("Total number of cases where %v bytes was not enough was to fetch series from index, resulting in refetch.", maxSeriesSize),
Help: "Total number of cases where configured estimated series bytes was not enough was to fetch series from index, resulting in refetch.",
})

m.cachedPostingsCompressions = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Expand Down Expand Up @@ -325,6 +325,8 @@ type FilterConfig struct {
MinTime, MaxTime model.TimeOrDurationValue
}

type BlockEstimator func(meta metadata.Meta) uint64

// BucketStore implements the store API backed by a bucket. It loads all index
// files to local disk.
//
Expand Down Expand Up @@ -378,6 +380,9 @@ type BucketStore struct {
enableSeriesResponseHints bool

enableChunkHashCalculation bool

blockEstimatedMaxSeriesFunc BlockEstimator
blockEstimatedMaxChunkFunc BlockEstimator
}

func (s *BucketStore) validate() error {
Expand Down Expand Up @@ -463,6 +468,18 @@ func WithSeriesBatchSize(seriesBatchSize int) BucketStoreOption {
}
}

func WithBlockEstimatedMaxSeriesFunc(f BlockEstimator) BucketStoreOption {
return func(s *BucketStore) {
s.blockEstimatedMaxSeriesFunc = f
}
}

func WithBlockEstimatedMaxChunkFunc(f BlockEstimator) BucketStoreOption {
return func(s *BucketStore) {
s.blockEstimatedMaxChunkFunc = f
}
}

// NewBucketStore creates a new bucket backed store that implements the store API against
// an object store bucket. It is optimized to work against high latency backends.
func NewBucketStore(
Expand Down Expand Up @@ -710,6 +727,8 @@ func (s *BucketStore) addBlock(ctx context.Context, meta *metadata.Meta) (err er
s.chunkPool,
indexHeaderReader,
s.partitioner,
s.blockEstimatedMaxSeriesFunc,
s.blockEstimatedMaxChunkFunc,
)
if err != nil {
return errors.Wrap(err, "new bucket block")
Expand Down Expand Up @@ -2005,6 +2024,9 @@ type bucketBlock struct {
// Block's labels used by block-level matchers to filter blocks to query. These are used to select blocks using
// request hints' BlockMatchers.
relabelLabels labels.Labels

estimatedMaxChunkSize int
estimatedMaxSeriesSize int
}

func newBucketBlock(
Expand All @@ -2018,7 +2040,17 @@ func newBucketBlock(
chunkPool pool.Bytes,
indexHeadReader indexheader.Reader,
p Partitioner,
maxSeriesSizeFunc BlockEstimator,
maxChunkSizeFunc BlockEstimator,
) (b *bucketBlock, err error) {
maxSeriesSize := EstimatedMaxSeriesSize
if maxSeriesSizeFunc != nil {
maxSeriesSize = int(maxSeriesSizeFunc(*meta))
}
maxChunkSize := EstimatedMaxChunkSize
if maxChunkSizeFunc != nil {
maxChunkSize = int(maxChunkSizeFunc(*meta))
}
b = &bucketBlock{
logger: logger,
metrics: metrics,
Expand All @@ -2036,6 +2068,8 @@ func newBucketBlock(
Name: block.BlockIDLabel,
Value: meta.ULID.String(),
}),
estimatedMaxSeriesSize: maxSeriesSize,
estimatedMaxChunkSize: maxChunkSize,
}
sort.Sort(b.extLset)
sort.Sort(b.relabelLabels)
Expand Down Expand Up @@ -2654,7 +2688,7 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser
}

parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) {
return uint64(ids[i]), uint64(ids[i] + maxSeriesSize)
return uint64(ids[i]), uint64(ids[i]) + uint64(r.block.estimatedMaxSeriesSize)
})

g, ctx := errgroup.WithContext(ctx)
Expand Down Expand Up @@ -2705,7 +2739,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series

// Inefficient, but should be rare.
r.block.metrics.seriesRefetches.Inc()
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", maxSeriesSize)
level.Warn(r.block.logger).Log("msg", "series size exceeded expected size; refetching", "id", id, "series length", n+int(l), "maxSeriesSize", r.block.estimatedMaxSeriesSize)

// Fetch plus to get the size of next one if exists.
return r.loadSeries(ctx, ids[i:], true, uint64(id), uint64(id)+uint64(n+int(l)+1), bytesLimiter)
Expand Down Expand Up @@ -2942,7 +2976,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [
return pIdxs[i].offset < pIdxs[j].offset
})
parts := r.block.partitioner.Partition(len(pIdxs), func(i int) (start, end uint64) {
return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + EstimatedMaxChunkSize
return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + uint64(r.block.estimatedMaxChunkSize)
})

for _, p := range parts {
Expand Down Expand Up @@ -2978,7 +3012,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
return errors.Wrap(err, "get range reader")
}
defer runutil.CloseWithLogOnErr(r.block.logger, reader, "readChunkRange close range reader")
bufReader := bufio.NewReaderSize(reader, EstimatedMaxChunkSize)
bufReader := bufio.NewReaderSize(reader, r.block.estimatedMaxChunkSize)

locked := true
r.mtx.Lock()
Expand All @@ -3004,11 +3038,11 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
n int
)

bufPooled, err := r.block.chunkPool.Get(EstimatedMaxChunkSize)
bufPooled, err := r.block.chunkPool.Get(r.block.estimatedMaxChunkSize)
if err == nil {
buf = *bufPooled
} else {
buf = make([]byte, EstimatedMaxChunkSize)
buf = make([]byte, r.block.estimatedMaxChunkSize)
}
defer r.block.chunkPool.Put(&buf)

Expand All @@ -3024,7 +3058,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
// Presume chunk length to be reasonably large for common use cases.
// However, declaration for EstimatedMaxChunkSize warns us some chunks could be larger in some rare cases.
// This is handled further down below.
chunkLen = EstimatedMaxChunkSize
chunkLen = r.block.estimatedMaxChunkSize
if i+1 < len(pIdxs) {
if diff = pIdxs[i+1].offset - pIdx.offset; int(diff) < chunkLen {
chunkLen = int(diff)
Expand Down
44 changes: 24 additions & 20 deletions pkg/store/bucket_test.go
Expand Up @@ -212,7 +212,7 @@ func TestBucketFilterExtLabelsMatchers(t *testing.T) {
},
},
}
b, _ := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil)
b, _ := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil)
ms := []*labels.Matcher{
{Type: labels.MatchNotEqual, Name: "a", Value: "b"},
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestBucketBlock_matchLabels(t *testing.T) {
},
}

b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil)
b, err := newBucketBlock(context.Background(), log.NewNopLogger(), newBucketStoreMetrics(nil), meta, bkt, path.Join(dir, blockID.String()), nil, nil, nil, nil, nil, nil)
testutil.Ok(t, err)

cases := []struct {
Expand Down Expand Up @@ -1515,14 +1515,16 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()), metadata.NoneFunc))

b1 = &bucketBlock{
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
estimatedMaxSeriesSize: EstimatedMaxSeriesSize,
estimatedMaxChunkSize: EstimatedMaxChunkSize,
}
b1.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b1.meta.ULID, DefaultPostingOffsetInMemorySampling)
testutil.Ok(t, err)
Expand Down Expand Up @@ -1554,14 +1556,16 @@ func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
testutil.Ok(t, block.Upload(context.Background(), logger, bkt, filepath.Join(blockDir, id.String()), metadata.NoneFunc))

b2 = &bucketBlock{
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
indexCache: indexCache,
logger: logger,
metrics: newBucketStoreMetrics(nil),
bkt: bkt,
meta: meta,
partitioner: NewGapBasedPartitioner(PartitionerMaxGapSize),
chunkObjs: []string{filepath.Join(id.String(), "chunks", "000001")},
chunkPool: chunkPool,
estimatedMaxSeriesSize: EstimatedMaxSeriesSize,
estimatedMaxChunkSize: EstimatedMaxChunkSize,
}
b2.indexHeaderReader, err = indexheader.NewBinaryReader(context.Background(), log.NewNopLogger(), bkt, tmpDir, b2.meta.ULID, DefaultPostingOffsetInMemorySampling)
testutil.Ok(t, err)
Expand Down Expand Up @@ -2547,7 +2551,7 @@ func BenchmarkBucketBlock_readChunkRange(b *testing.B) {
testutil.Ok(b, err)

// Create a bucket block with only the dependencies we need for the benchmark.
blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, nil, chunkPool, nil, nil)
blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, nil, chunkPool, nil, nil, nil, nil)
testutil.Ok(b, err)

b.ResetTimer()
Expand Down Expand Up @@ -2637,7 +2641,7 @@ func prepareBucket(b testing.TB, resolutionLevel compact.ResolutionLevel, sample
testutil.Ok(b, err)

// Create a bucket block with only the dependencies we need for the benchmark.
blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner)
blk, err := newBucketBlock(context.Background(), logger, newBucketStoreMetrics(nil), blockMeta, bkt, tmpDir, indexCache, chunkPool, indexHeaderReader, partitioner, nil, nil)
testutil.Ok(b, err)
return blk, blockMeta
}
Expand Down

0 comments on commit aed2baa

Please sign in to comment.