Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce newSeriesChunkRefsSet() introducing a memory pool #3666

Merged
merged 7 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 184 additions & 12 deletions pkg/storegateway/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"runtime"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -942,33 +943,35 @@ func benchmarkExpandedPostings(
}
}

func TestBucketSeries(t *testing.T) {
func TestBucket_Series(t *testing.T) {
tb := test.NewTB(t)
runSeriesInterestingCases(tb, 10000, 10000, func(t test.TB, samplesPerSeries, series int) {
benchBucketSeries(t, false, samplesPerSeries, series, 1)
})
}

func TestBucketSkipChunksSeries(t *testing.T) {
func TestBucket_Series_WithSkipChunks(t *testing.T) {
tb := test.NewTB(t)
runSeriesInterestingCases(tb, 10000, 10000, func(t test.TB, samplesPerSeries, series int) {
benchBucketSeries(t, true, samplesPerSeries, series, 1)
})
}

func BenchmarkBucketSeries(b *testing.B) {
func BenchmarkBucket_Series(b *testing.B) {
tb := test.NewTB(b)
// 10e6 samples = ~1736 days with 15s scrape
runSeriesInterestingCases(tb, 10e6, 10e5, func(t test.TB, samplesPerSeries, series int) {
benchBucketSeries(t, false, samplesPerSeries, series, 1/100e6, 1/10e4, 1)
})
}

func BenchmarkBucketSkipChunksSeries(b *testing.B) {
func BenchmarkBucket_Series_WithSkipChunks(b *testing.B) {
tb := test.NewTB(b)
// 10e6 samples = ~1736 days with 15s scrape
runSeriesInterestingCases(tb, 10e6, 10e5, func(t test.TB, samplesPerSeries, series int) {
benchBucketSeries(t, true, samplesPerSeries, series, 1/100e6, 1/10e4, 1)
// Send only requests with 100% ratio because in Mimir we lookup series at block boundaries
// when skip chunks = true.
benchBucketSeries(t, true, samplesPerSeries, series, 1)
})
}

Expand Down Expand Up @@ -1040,7 +1043,7 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries

runTestWithStore := func(t test.TB, st *BucketStore) {
if !t.IsBenchmark() {
st.chunkPool = &mockedPool{parent: st.chunkPool}
st.chunkPool = &trackedBytesPool{parent: st.chunkPool}
}

assert.NoError(t, st.SyncBlocks(context.Background()))
Expand Down Expand Up @@ -1080,8 +1083,8 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
if !t.IsBenchmark() {
if !skipChunk {
// TODO(bwplotka): This is wrong negative for large number of samples (1mln). Investigate.
assert.Equal(t, 0, int(st.chunkPool.(*mockedPool).balance.Load()))
st.chunkPool.(*mockedPool).gets.Store(0)
assert.Equal(t, 0, int(st.chunkPool.(*trackedBytesPool).balance.Load()))
st.chunkPool.(*trackedBytesPool).gets.Store(0)
}

for _, b := range st.blocks {
Expand Down Expand Up @@ -1119,16 +1122,163 @@ func benchBucketSeries(t test.TB, skipChunk bool, samplesPerSeries, totalSeries
runTestWithStore(t, st)
})
}
}

func TestBucket_Series_Concurrency(t *testing.T) {
const (
numWorkers = 10
numRequestsPerWorker = 100
numBlocks = 4
numSeriesPerBlock = 100
numSamplesPerSeries = 200
)

var (
ctx = context.Background()
logger = log.NewNopLogger()
expectedSeries []*storepb.Series
expectedBlockIDs []string
random = rand.New(rand.NewSource(120))
tmpDir = t.TempDir()
)

test.VerifyNoLeak(t)

// Create a filesystem-based bucket.
bucket, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt"))
assert.NoError(t, err)
defer func() { assert.NoError(t, bucket.Close()) }()
instrumentedBucket := objstore.WithNoopInstr(bucket)

// Generate some blocks.
t.Log("generating test blocks")
blockDir := filepath.Join(tmpDir, "tmp")
for b := 0; b < numBlocks; b++ {
head, blockSeries := createHeadWithSeries(t, b, headGenOptions{
TSDBDir: filepath.Join(tmpDir, fmt.Sprintf("%d", b)),
SamplesPerSeries: numSamplesPerSeries,
Series: numSeriesPerBlock,
PrependLabels: labels.FromStrings(labels.MetricName, "test_metric", "zzz_block_id", strconv.Itoa(b)),
Random: random,
})

blockID := createBlockFromHead(t, blockDir, head)
assert.NoError(t, head.Close())

expectedSeries = append(expectedSeries, blockSeries...)
expectedBlockIDs = append(expectedBlockIDs, blockID.String())

require.NoError(t, mimir_tsdb.UploadBlock(ctx, logger, bucket, filepath.Join(blockDir, blockID.String()), nil))
}
t.Log("generated test blocks")

// Prepare a request to query all series.
hints := &hintspb.SeriesRequestHints{
BlockMatchers: []storepb.LabelMatcher{
{
Type: storepb.LabelMatcher_RE,
Name: block.BlockIDLabel,
Value: strings.Join(expectedBlockIDs, "|"),
},
},
}

marshalledHints, err := types.MarshalAny(hints)
require.NoError(t, err)

req := &storepb.SeriesRequest{
MinTime: math.MinInt64,
MaxTime: math.MaxInt64,
Matchers: []storepb.LabelMatcher{
{Type: storepb.LabelMatcher_EQ, Name: labels.MetricName, Value: "test_metric"},
},
Hints: marshalledHints,
}

runRequest := func(t *testing.T, store *BucketStore) {
srv := newBucketStoreSeriesServer(ctx)
require.NoError(t, store.Series(req, srv))
require.Equal(t, 0, len(srv.Warnings), "%v", srv.Warnings)
require.Equal(t, len(expectedSeries), len(srv.SeriesSet))

// Huge responses can produce unreadable diffs - make it more human readable.
for j := range expectedSeries {
require.Equal(t, expectedSeries[j].Labels, srv.SeriesSet[j].Labels, "series labels mismatch at position %d", j)
require.Equal(t, expectedSeries[j].Chunks, srv.SeriesSet[j].Chunks, "series chunks mismatch at position %d", j)
}
}

// Run the test with different batch sizes.
for _, batchSize := range []int{len(expectedSeries) / 100, len(expectedSeries) * 2} {
t.Run(fmt.Sprintf("batch size: %d", batchSize), func(t *testing.T) {
// Reset the memory pool tracker.
seriesChunkRefsSetPool.(*trackedPool).reset()

metaFetcher, err := block.NewRawMetaFetcher(logger, instrumentedBucket)
assert.NoError(t, err)

chunkPool, err := pool.NewBucketedBytes(chunkBytesPoolMinSize, chunkBytesPoolMaxSize, 2, 1e9) // 1GB.
assert.NoError(t, err)
trackedChunkPool := &trackedBytesPool{parent: chunkPool}

// Create the bucket store.
store, err := NewBucketStore(
"test-user",
instrumentedBucket,
metaFetcher,
tmpDir,
NewChunksLimiterFactory(0),
NewSeriesLimiterFactory(0),
newGapBasedPartitioner(mimir_tsdb.DefaultPartitionerMaxGapSize, nil),
1,
mimir_tsdb.DefaultPostingOffsetInMemorySampling,
indexheader.Config{},
false, // Lazy index-header loading disabled.
0,
hashcache.NewSeriesHashCache(1024*1024),
NewBucketStoreMetrics(nil),
WithLogger(logger),
WithChunkPool(trackedChunkPool),
WithStreamingSeriesPerBatch(batchSize),
)
require.NoError(t, err)
require.NoError(t, store.SyncBlocks(ctx))

// Run workers.
wg := sync.WaitGroup{}
wg.Add(numWorkers)

for c := 0; c < numWorkers; c++ {
go func() {
defer wg.Done()

for r := 0; r < numRequestsPerWorker; r++ {
runRequest(t, store)
}
}()
}

// Wait until all workers have done.
wg.Wait()

// Ensure all chunks have been released to the pool.
require.Equal(t, 0, int(trackedChunkPool.balance.Load()))

// Ensure the seriesChunkRefsSet memory pool has been used and all slices pulled from
// pool have put back.
assert.Greater(t, seriesChunkRefsSetPool.(*trackedPool).gets.Load(), int64(0))
assert.Equal(t, int64(0), seriesChunkRefsSetPool.(*trackedPool).balance.Load())
})
}
}

type mockedPool struct {
type trackedBytesPool struct {
parent pool.Bytes
balance atomic.Uint64
gets atomic.Uint64
}

func (m *mockedPool) Get(sz int) (*[]byte, error) {
func (m *trackedBytesPool) Get(sz int) (*[]byte, error) {
b, err := m.parent.Get(sz)
if err != nil {
return nil, err
Expand All @@ -1138,11 +1288,33 @@ func (m *mockedPool) Get(sz int) (*[]byte, error) {
return b, nil
}

func (m *mockedPool) Put(b *[]byte) {
func (m *trackedBytesPool) Put(b *[]byte) {
m.balance.Sub(uint64(cap(*b)))
m.parent.Put(b)
}

type trackedPool struct {
parent pool.Interface
balance atomic.Int64
gets atomic.Int64
}

func (p *trackedPool) Get() any {
p.balance.Inc()
p.gets.Inc()
return p.parent.Get()
}

func (p *trackedPool) Put(x any) {
p.balance.Dec()
p.parent.Put(x)
}

func (p *trackedPool) reset() {
p.balance.Store(0)
p.gets.Store(0)
}

// Regression test against: https://github.com/thanos-io/thanos/issues/2147.
func TestBucketSeries_OneBlock_InMemIndexCacheSegfault(t *testing.T) {
tmpDir := t.TempDir()
Expand Down Expand Up @@ -2190,7 +2362,7 @@ type headGenOptions struct {
}

// createHeadWithSeries returns head filled with given samples and same series returned in separate list for assertion purposes.
// Returned series list has "ext1"="1" prepended. Each series looks as follows:
// Each series looks as follows:
// {foo=bar,i=000001aaaaaaaaaabbbbbbbbbbccccccccccdddddddddd} <random value> where number indicate sample number from 0.
// Returned series are framed in the same way as remote read would frame them.
func createHeadWithSeries(t testing.TB, j int, opts headGenOptions) (*tsdb.Head, []*storepb.Series) {
Expand Down
4 changes: 4 additions & 0 deletions pkg/storegateway/series_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func (c *loadingSeriesChunksSetIterator) Next() bool {
}

nextUnloaded := c.from.At()

// This data structure doesn't retain the seriesChunkRefsSet so it can be released once done.
defer nextUnloaded.release()

entries := make([]seriesEntry, nextUnloaded.len())
c.chunkReaders.reset()
for i, s := range nextUnloaded.series {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/series_chunks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ func TestLoadingSeriesChunksSetIterator(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()
// Setup
bytesPool := &mockedPool{parent: pool.NoopBytes{}}
bytesPool := &trackedBytesPool{parent: pool.NoopBytes{}}
readersMap := make(map[ulid.ULID]chunkReader, len(testCase.existingBlocks))
for _, block := range testCase.existingBlocks {
readersMap[block.ulid] = newChunkReaderMockWithSeries(block.series, testCase.addLoadErr, testCase.loadErr)
Expand Down
Loading