Skip to content

Commit

Permalink
Reduce memory allocations for seriesChunksSet.series and its chunks s…
Browse files Browse the repository at this point in the history
…lices

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci committed Dec 16, 2022
1 parent 523e71d commit 59d459e
Show file tree
Hide file tree
Showing 8 changed files with 648 additions and 93 deletions.
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ func (s *BucketStore) streamingSeriesSetForBlocks(
mergedBatches := mergedSeriesChunkRefsSetIterators(s.maxSeriesPerBatch, batches...)
var set storepb.SeriesSet
if chunkReaders != nil {
set = newSeriesSetWithChunks(ctx, *chunkReaders, chunksPool, mergedBatches, stats, s.metrics.iteratorLoadDurations)
set = newSeriesSetWithChunks(ctx, *chunkReaders, chunksPool, mergedBatches, s.maxSeriesPerBatch, stats, s.metrics.iteratorLoadDurations)
} else {
set = newSeriesSetWithoutChunks(ctx, mergedBatches)
}
Expand Down
46 changes: 19 additions & 27 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,12 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
runTestWithStore := func(t test.TB, st *BucketStore) {
if !t.IsBenchmark() {
st.chunkPool = &trackedBytesPool{parent: st.chunkPool}

// Reset the memory pool tracker (only if streaming store-gateway is enabled).
if st.maxSeriesPerBatch > 0 {
seriesEntrySlicePool.(*pool.TrackedPool).Reset()
seriesChunksSlicePool.(*pool.TrackedPool).Reset()
}
}

assert.NoError(t, st.SyncBlocks(context.Background()))
Expand Down Expand Up @@ -1083,9 +1089,17 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries

if !t.IsBenchmark() {
if !skipChunk {
// TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate.
assert.Equal(t, 0, int(st.chunkPool.(*trackedBytesPool).balance.Load()))
assert.Zero(t, st.chunkPool.(*trackedBytesPool).balance.Load())
st.chunkPool.(*trackedBytesPool).gets.Store(0)

// Only if streaming store-gateway is enabled.
if st.maxSeriesPerBatch > 0 {
assert.Zero(t, seriesEntrySlicePool.(*pool.TrackedPool).Balance.Load())
assert.Zero(t, seriesChunksSlicePool.(*pool.TrackedPool).Balance.Load())

assert.Greater(t, int(seriesEntrySlicePool.(*pool.TrackedPool).Gets.Load()), 0)
assert.Greater(t, int(seriesChunksSlicePool.(*pool.TrackedPool).Gets.Load()), 0)
}
}

for _, b := range st.blocks {
Expand Down Expand Up @@ -1213,7 +1227,7 @@ func TestBucket_Series_Concurrency(t *testing.T) {
for _, batchSize := range []int{len(expectedSeries) / 100, len(expectedSeries) * 2} {
t.Run(fmt.Sprintf("batch size: %d", batchSize), func(t *testing.T) {
// Reset the memory pool tracker.
seriesChunkRefsSetPool.(*trackedPool).reset()
seriesChunkRefsSetPool.(*pool.TrackedPool).Reset()

metaFetcher, err := block.NewRawMetaFetcher(logger, instrumentedBucket)
assert.NoError(t, err)
Expand Down Expand Up @@ -1267,8 +1281,8 @@ func TestBucket_Series_Concurrency(t *testing.T) {

// Ensure the seriesChunkRefsSet memory pool has been used and all slices pulled from
// pool have put back.
assert.Greater(t, seriesChunkRefsSetPool.(*trackedPool).gets.Load(), int64(0))
assert.Equal(t, int64(0), seriesChunkRefsSetPool.(*trackedPool).balance.Load())
assert.Greater(t, seriesChunkRefsSetPool.(*pool.TrackedPool).Gets.Load(), int64(0))
assert.Equal(t, int64(0), seriesChunkRefsSetPool.(*pool.TrackedPool).Balance.Load())
})
}
}
Expand All @@ -1294,28 +1308,6 @@ func (m *trackedBytesPool) Put(b *[]byte) {
m.parent.Put(b)
}

type trackedPool struct {
parent pool.Interface
balance atomic.Int64
gets atomic.Int64
}

func (p *trackedPool) Get() any {
p.balance.Inc()
p.gets.Inc()
return p.parent.Get()
}

func (p *trackedPool) Put(x any) {
p.balance.Dec()
p.parent.Put(x)
}

func (p *trackedPool) reset() {
p.balance.Store(0)
p.gets.Store(0)
}

// Regression test against: https://github.com/thanos-io/thanos/issues/2147.
func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
tmpDir := t.TempDir()
Expand Down
178 changes: 150 additions & 28 deletions pkg/storegateway/series_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,148 @@ package storegateway

import (
"context"
"sync"
"time"

"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/mimir/pkg/storegateway/storepb"
util_math "github.com/grafana/mimir/pkg/util/math"
"github.com/grafana/mimir/pkg/util/pool"
)

// Mimir compacts blocks up to 24h. Assuming a 5s scrape interval as worst case scenario,
// and 120 samples per chunk, there could be 86400 * (1 / 5) * (1 / 120) = 144 chunks for
// a series in the biggest block. Using a slab size of 1000 looks a good trade-off to support
// high frequency scraping without wasting too much memory in case of queries hitting a low
// number of chunks (across series).
const seriesChunksSlabSize = 1000

var (
seriesEntrySlicePool = pool.Interface(&sync.Pool{
// Intentionally return nil if the pool is empty, so that the caller can preallocate
// the slice with the right size.
New: nil,
})

seriesChunksSlicePool = pool.Interface(&sync.Pool{
// Intentionally return nil if the pool is empty, so that the caller can preallocate
// the slice with the right size.
New: nil,
})
)

// seriesChunksSetIterator is the interface implemented by an iterator returning a sequence of seriesChunksSet.
type seriesChunksSetIterator interface {
Next() bool

// At returns the current seriesChunksSet. The caller should (but NOT must) invoke seriesChunksSet.release()
// on the returned set once it's guaranteed it will not be used anymore.
At() seriesChunksSet

Err() error
}

// seriesChunksSet holds a set of series, each with its own chunks.
type seriesChunksSet struct {
series []seriesEntry
series []seriesEntry
seriesReleasable bool

// It gets lazy initialized (only if required).
seriesChunksPool *pool.SlabPool[storepb.AggrChunk]

// chunksReleaser releases the memory used to allocate series chunks.
chunksReleaser chunksReleaser
}

// newSeriesChunksSet creates a new seriesChunksSet. The series slice is pre-allocated with
// the provided seriesCapacity at least. This means this function GUARANTEES the series slice
// will have a capacity of at least seriesCapacity.
//
// If seriesReleasable is true, then a subsequent call release() will put the internal
// series slices to a memory pool for reusing.
func newSeriesChunksSet(seriesCapacity int, seriesReleasable bool) seriesChunksSet {
var prealloc []seriesEntry

// If it's releasable then we try to reuse a slice from the pool.
if seriesReleasable {
if reused := seriesEntrySlicePool.Get(); reused != nil {
prealloc = *(reused.(*[]seriesEntry))

// The capacity MUST be guaranteed. If it's smaller, then we forget it and will be
// reallocated.
if cap(prealloc) < seriesCapacity {
prealloc = nil
}
}
}

if prealloc == nil {
prealloc = make([]seriesEntry, 0, seriesCapacity)
}

return seriesChunksSet{
series: prealloc,
seriesReleasable: seriesReleasable,
}
}

type chunksReleaser interface {
// Release the memory used to allocate series chunks.
Release()
}

// release the internal series and chunks slices to a memory pool, and call the chunksReleaser.Release().
// The series and chunks slices won't be released to a memory pool if seriesChunksSet was created to be not releasable.
//
// This function is not idempotent. Calling it twice would introduce subtle bugs.
func (b *seriesChunksSet) release() {
if b.chunksReleaser != nil {
b.chunksReleaser.Release()
b.chunksReleaser = nil
}

b.series = nil
if b.seriesReleasable {
// Reset series and chunk entries, before putting back to the pool.
for i := range b.series {
for c := range b.series[i].chks {
b.series[i].chks[c].MinTime = 0
b.series[i].chks[c].MaxTime = 0
b.series[i].chks[c].Raw = nil
}

b.series[i].lset = nil
b.series[i].refs = nil
b.series[i].chks = nil
}

if b.seriesChunksPool != nil {
b.seriesChunksPool.Release()
}

reuse := b.series[:0]
seriesEntrySlicePool.Put(&reuse)
}
}

// newSeriesAggrChunkSlice returns a []storepb.AggrChunk guaranteed to have length and capacity
// equal to the provided size. The returned slice may be picked from a memory pool and then released
// back once release() gets invoked.
func (b *seriesChunksSet) newSeriesAggrChunkSlice(size int) []storepb.AggrChunk {
if !b.seriesReleasable {
return make([]storepb.AggrChunk, size)
}

// Lazy initialise the pool.
if b.seriesChunksPool == nil {
b.seriesChunksPool = pool.NewSlabPool[storepb.AggrChunk](seriesChunksSlicePool, seriesChunksSlabSize)
}

return b.seriesChunksPool.Get(size)
}

func (b seriesChunksSet) len() int {
func (b *seriesChunksSet) len() int {
return len(b.series)
}

Expand All @@ -60,9 +162,9 @@ func newSeriesChunksSeriesSet(from seriesChunksSetIterator) storepb.SeriesSet {
}
}

func newSeriesSetWithChunks(ctx context.Context, chunkReaders bucketChunkReaders, chunksPool pool.Bytes, batches seriesChunkRefsSetIterator, stats *safeQueryStats, iteratorLoadDurations *prometheus.HistogramVec) storepb.SeriesSet {
func newSeriesSetWithChunks(ctx context.Context, chunkReaders bucketChunkReaders, chunksPool pool.Bytes, refsIterator seriesChunkRefsSetIterator, refsIteratorBatchSize int, stats *safeQueryStats, iteratorLoadDurations *prometheus.HistogramVec) storepb.SeriesSet {
var iterator seriesChunksSetIterator
iterator = newLoadingSeriesChunksSetIterator(chunkReaders, chunksPool, batches, stats)
iterator = newLoadingSeriesChunksSetIterator(chunkReaders, chunksPool, refsIterator, refsIteratorBatchSize, stats)
iterator = newDurationMeasuringIterator[seriesChunksSet](iterator, iteratorLoadDurations.WithLabelValues("chunks_load"))
iterator = newPreloadingSetIterator[seriesChunksSet](ctx, 1, iterator)
// We are measuring the time we wait for a preloaded batch. In an ideal world this is 0 because there's always a preloaded batch waiting.
Expand All @@ -78,11 +180,15 @@ func newSeriesSetWithChunks(ctx context.Context, chunkReaders bucketChunkReaders
func (b *seriesChunksSeriesSet) Next() bool {
b.currOffset++
if b.currOffset >= b.currSet.len() {
// The current set won't be accessed anymore because the iterator is moving to the next one,
// so we can release it.
b.currSet.release()

if !b.from.Next() {
b.currSet.release()
b.currSet = seriesChunksSet{}
return false
}
b.currSet.release()

b.currSet = b.from.At()
b.currOffset = 0
}
Expand Down Expand Up @@ -174,25 +280,27 @@ func (p *preloadingSetIterator[Set]) Err() error {
}

type loadingSeriesChunksSetIterator struct {
chunkReaders bucketChunkReaders
from seriesChunkRefsSetIterator
chunksPool pool.Bytes
stats *safeQueryStats
chunkReaders bucketChunkReaders
from seriesChunkRefsSetIterator
fromBatchSize int
chunksPool pool.Bytes
stats *safeQueryStats

current seriesChunksSet
err error
}

func newLoadingSeriesChunksSetIterator(chunkReaders bucketChunkReaders, chunksPool pool.Bytes, from seriesChunkRefsSetIterator, stats *safeQueryStats) *loadingSeriesChunksSetIterator {
func newLoadingSeriesChunksSetIterator(chunkReaders bucketChunkReaders, chunksPool pool.Bytes, from seriesChunkRefsSetIterator, fromBatchSize int, stats *safeQueryStats) *loadingSeriesChunksSetIterator {
return &loadingSeriesChunksSetIterator{
chunkReaders: chunkReaders,
from: from,
chunksPool: chunksPool,
stats: stats,
chunkReaders: chunkReaders,
from: from,
fromBatchSize: fromBatchSize,
chunksPool: chunksPool,
stats: stats,
}
}

func (c *loadingSeriesChunksSetIterator) Next() bool {
func (c *loadingSeriesChunksSetIterator) Next() (retHasNext bool) {
if c.err != nil {
return false
}
Expand All @@ -207,15 +315,30 @@ func (c *loadingSeriesChunksSetIterator) Next() bool {
// This data structure doesn't retain the seriesChunkRefsSet so it can be released once done.
defer nextUnloaded.release()

entries := make([]seriesEntry, nextUnloaded.len())
// Pre-allocate the series slice using the expected batchSize even if nextUnloaded has less elements,
// so that there's a higher chance the slice will be reused once released.
nextSet := newSeriesChunksSet(util_math.Max(c.fromBatchSize, nextUnloaded.len()), true)

// Release the set if an error occurred.
defer func() {
if !retHasNext && c.err != nil {
nextSet.release()
}
}()

// The series slice is guaranteed to have at least the requested capacity,
// so can safely expand it.
nextSet.series = nextSet.series[:nextUnloaded.len()]

c.chunkReaders.reset()

for i, s := range nextUnloaded.series {
entries[i].lset = s.lset
entries[i].chks = make([]storepb.AggrChunk, len(s.chunks))
nextSet.series[i].lset = s.lset
nextSet.series[i].chks = nextSet.newSeriesAggrChunkSlice(len(s.chunks))

for j, chunk := range s.chunks {
entries[i].chks[j].MinTime = chunk.minTime
entries[i].chks[j].MaxTime = chunk.maxTime
nextSet.series[i].chks[j].MinTime = chunk.minTime
nextSet.series[i].chks[j].MaxTime = chunk.maxTime

err := c.chunkReaders.addLoad(chunk.blockID, chunk.ref, i, j)
if err != nil {
Expand All @@ -228,15 +351,14 @@ func (c *loadingSeriesChunksSetIterator) Next() bool {
// Create a batched memory pool that can be released all at once.
chunksPool := &pool.BatchBytes{Delegate: c.chunksPool}

err := c.chunkReaders.load(entries, chunksPool, c.stats)
err := c.chunkReaders.load(nextSet.series, chunksPool, c.stats)
if err != nil {
c.err = errors.Wrap(err, "loading chunks")
return false
}
c.current = seriesChunksSet{
series: entries,
chunksReleaser: chunksPool,
}

nextSet.chunksReleaser = chunksPool
c.current = nextSet
return true
}

Expand Down
Loading

0 comments on commit 59d459e

Please sign in to comment.