Skip to content

Commit

Permalink
Reduce newSeriesChunkRefsSet() introducing a memory pool
Browse files Browse the repository at this point in the history
Fixed the optimization for the case series are not duplicated among sets
Added TestMergedSeriesChunkRefsSet_Concurrency

Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
dimitarvdimitrov authored and pracucci committed Dec 12, 2022
1 parent f028204 commit a5ec2c2
Show file tree
Hide file tree
Showing 3 changed files with 305 additions and 25 deletions.
4 changes: 4 additions & 0 deletions pkg/storegateway/series_chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func (c *loadingSeriesChunksSetIterator) Next() bool {
}

nextUnloaded := c.from.At()

// This data structure doesn't retain the seriesChunkRefsSet so it can be released once done.
defer nextUnloaded.release()

entries := make([]seriesEntry, nextUnloaded.len())
c.chunkReaders.reset()
for i, s := range nextUnloaded.series {
Expand Down
144 changes: 125 additions & 19 deletions pkg/storegateway/series_refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package storegateway

import (
"context"
"sync"

"github.com/oklog/ulid"
"github.com/pkg/errors"
Expand All @@ -17,10 +18,22 @@ import (
"github.com/grafana/mimir/pkg/storegateway/storepb"
)

var (
seriesChunkRefsSetPool = sync.Pool{
// Intentionally return nil if the pool is empty, so that the caller can preallocate
// the slice with the right size.
New: nil,
}
)

// seriesChunkRefsSetIterator is the interface implemented by an iterator returning a sequence of seriesChunkRefsSet.
type seriesChunkRefsSetIterator interface {
Next() bool

// At returns the current seriesChunkRefsSet. The caller should (but NOT must) invoke seriesChunkRefsSet.release()
// on the returned set once it's guaranteed it will not be used anymore.
At() seriesChunkRefsSet

Err() error
}

Expand All @@ -35,18 +48,52 @@ type seriesChunkRefsIterator interface {
type seriesChunkRefsSet struct {
// series sorted by labels.
series []seriesChunkRefs

// releasable holds whether the series slice (but not its content) can be released to a memory pool.
releasable bool
}

func newSeriesChunkRefsSet(capacity int) seriesChunkRefsSet {
// newSeriesChunkRefsSet creates a new seriesChunkRefsSet with the given capacity.
// If releasable is true, then a subsequent call release() will put the internal
// series slices to a memory pool for reusing.
func newSeriesChunkRefsSet(capacity int, releasable bool) seriesChunkRefsSet {
var prealloc []seriesChunkRefs

// If it's releasable then we try to reuse a slice from the pool.
if releasable {
if reused := seriesChunkRefsSetPool.Get(); reused != nil {
prealloc = *(reused.(*[]seriesChunkRefs))
}
}

if prealloc == nil {
prealloc = make([]seriesChunkRefs, 0, capacity)
}

return seriesChunkRefsSet{
series: make([]seriesChunkRefs, 0, capacity),
series: prealloc,
releasable: releasable,
}
}

func (b seriesChunkRefsSet) len() int {
return len(b.series)
}

// release the internal series slice to a memory pool. This function call has no effect
// if seriesChunkRefsSet was created to be not releasable.
func (b seriesChunkRefsSet) release() {
if b.series == nil || !b.releasable {
return
}

reuse := b.series[:0]
seriesChunkRefsSetPool.Put(&reuse)

// TODO this doesn't do what we want, because seriesChunkRefsSet is passed around by copy and not reference
b.series = nil
}

// seriesChunkRefs holds a series with a list of chunk references.
type seriesChunkRefs struct {
lset labels.Labels
Expand Down Expand Up @@ -95,11 +142,22 @@ func newSeriesChunkRefsIterator(set seriesChunkRefsSet) *seriesChunkRefsIterator

// reset replaces the current set with the provided one. After calling reset() you
// must call Next() to advance the iterator to the first element.
//
// This function just reset the internal state and it does NOT invoke release()
// on the previous seriesChunkRefsSet.
func (c *seriesChunkRefsIteratorImpl) reset(set seriesChunkRefsSet) {
c.set = set
c.currentOffset = -1
}

// resetIteratorAndReleasePreviousSet is like reset() but also release the previous seriesChunkRefsSet
// hold internally. Invoke this function if none else except this iterator is holding a
// reference to the previous seriesChunkRefsSet.
func (c *seriesChunkRefsIteratorImpl) resetIteratorAndReleasePreviousSet(set seriesChunkRefsSet) {
c.set.release()
c.reset(set)
}

func (c *seriesChunkRefsIteratorImpl) Next() bool {
c.currentOffset++
return !c.Done()
Expand Down Expand Up @@ -144,10 +202,15 @@ func (c flattenedSeriesChunkRefsIterator) Next() bool {
// The current iterator has no more elements. We check if there's another
// iterator to fetch and then iterate on.
if !c.from.Next() {
// We can safely release the previous set because none retained it except the
// iterator itself (which we're going to reset).
c.iterator.resetIteratorAndReleasePreviousSet(seriesChunkRefsSet{})
return false
}

c.iterator.reset(c.from.At())
// We can safely release the previous set because none retained it except the
// iterator itself (which we're going to reset).
c.iterator.resetIteratorAndReleasePreviousSet(c.from.At())

// We've replaced the current iterator, so can recursively call Next()
// to check if there's any item in the new iterator and further advance it if not.
Expand All @@ -169,19 +232,19 @@ func (emptySeriesChunkRefsSetIterator) Next() bool { return false }
func (emptySeriesChunkRefsSetIterator) At() seriesChunkRefsSet { return seriesChunkRefsSet{} }
func (emptySeriesChunkRefsSetIterator) Err() error { return nil }

func mergedSeriesChunkRefsSetIterators(mergedSize int, all ...seriesChunkRefsSetIterator) seriesChunkRefsSetIterator {
func mergedSeriesChunkRefsSetIterators(mergedBatchSize int, all ...seriesChunkRefsSetIterator) seriesChunkRefsSetIterator {
switch len(all) {
case 0:
return emptySeriesChunkRefsSetIterator{}
case 1:
return newDeduplicatingSeriesChunkRefsSetIterator(mergedSize, all[0])
return newDeduplicatingSeriesChunkRefsSetIterator(mergedBatchSize, all[0])
}
h := len(all) / 2

return newMergedSeriesChunkRefsSet(
mergedSize,
mergedSeriesChunkRefsSetIterators(mergedSize, all[:h]...),
mergedSeriesChunkRefsSetIterators(mergedSize, all[h:]...),
mergedBatchSize,
mergedSeriesChunkRefsSetIterators(mergedBatchSize, all[:h]...),
mergedSeriesChunkRefsSetIterators(mergedBatchSize, all[h:]...),
)
}

Expand All @@ -191,13 +254,15 @@ type mergedSeriesChunkRefsSet struct {
a, b seriesChunkRefsSetIterator
aAt, bAt *seriesChunkRefsIteratorImpl
current seriesChunkRefsSet
done bool
}

func newMergedSeriesChunkRefsSet(mergedBatchSize int, a, b seriesChunkRefsSetIterator) *mergedSeriesChunkRefsSet {
return &mergedSeriesChunkRefsSet{
batchSize: mergedBatchSize,
a: a,
b: b,
done: false,
// start iterator on an empty set. It will be reset with a non-empty set when Next() is called
aAt: newSeriesChunkRefsIterator(seriesChunkRefsSet{}),
bAt: newSeriesChunkRefsIterator(seriesChunkRefsSet{}),
Expand All @@ -213,17 +278,31 @@ func (s *mergedSeriesChunkRefsSet) Err() error {
return nil
}

func (s *mergedSeriesChunkRefsSet) At() seriesChunkRefsSet {
return s.current
}

func (s *mergedSeriesChunkRefsSet) Next() bool {
next := newSeriesChunkRefsSet(s.batchSize)
if s.done {
return false
}

// This can be released by the caller because mergedSeriesChunkRefsSet doesn't retain it
// after Next() will be called again.
next := newSeriesChunkRefsSet(s.batchSize, true)

for i := 0; i < s.batchSize; i++ {
if err := s.ensureItemAvailableToRead(s.aAt, s.a); err != nil {
// Stop iterating on first error encountered.
s.current = seriesChunkRefsSet{}
s.done = true
return false
}

if err := s.ensureItemAvailableToRead(s.bAt, s.b); err != nil {
// Stop iterating on first error encountered.
s.current = seriesChunkRefsSet{}
s.done = true
return false
}

Expand All @@ -234,8 +313,17 @@ func (s *mergedSeriesChunkRefsSet) Next() bool {
next.series = append(next.series, nextSeries)
}

// We have reached the end of the iterator and next set is empty, so we can
// directly release it.
if next.len() == 0 {
next.release()
s.current = seriesChunkRefsSet{}
s.done = true
return false
}

s.current = next
return s.current.len() > 0
return true
}

// ensureItemAvailableToRead ensures curr has an item available to read, unless we reached the
Expand All @@ -245,14 +333,18 @@ func (s *mergedSeriesChunkRefsSet) ensureItemAvailableToRead(curr *seriesChunkRe
// Ensure curr has an item available, otherwise fetch the next set.
for curr.Done() {
if set.Next() {
curr.reset(set.At())
// We can release the previous set because it hasn't been retained by anyone else.
curr.resetIteratorAndReleasePreviousSet(set.At())

// Advance the iterator to the first element. If the iterator is empty,
// it will be detected and handled by the outer for loop.
curr.Next()
continue
}

// Release the previous set because won't be accessed anymore.
curr.resetIteratorAndReleasePreviousSet(seriesChunkRefsSet{})

if set.Err() != nil {
// Stop iterating on first error encountered.
return set.Err()
Expand Down Expand Up @@ -333,10 +425,6 @@ Outer:
return toReturn, true
}

func (s *mergedSeriesChunkRefsSet) At() seriesChunkRefsSet {
return s.current
}

type seriesChunkRefsSeriesSet struct {
from seriesChunkRefsSetIterator

Expand All @@ -362,11 +450,18 @@ func (s *seriesChunkRefsSeriesSet) Next() bool {
// The current iterator has no more elements. We check if there's another
// iterator to fetch and then iterate on.
if !s.from.Next() {
// We can safely release the previous set because none retained it except the
// iterator itself (which we're going to reset).
s.currentIterator.resetIteratorAndReleasePreviousSet(seriesChunkRefsSet{})

return false
}

next := s.from.At()
s.currentIterator.reset(next)

// We can safely release the previous set because none retained it except the
// iterator itself (which we're going to reset).
s.currentIterator.resetIteratorAndReleasePreviousSet(next)

// We've replaced the current iterator, so can recursively call Next()
// to check if there's any item in the new iterator and further advance it if not.
Expand Down Expand Up @@ -417,7 +512,10 @@ func (s *deduplicatingSeriesChunkRefsSetIterator) Next() bool {
firstSeries = *s.peek
s.peek = nil
}
nextSet := newSeriesChunkRefsSet(s.batchSize)

// This can be released by the caller because deduplicatingSeriesChunkRefsSetIterator doesn't retain it
// after Next() will be called again.
nextSet := newSeriesChunkRefsSet(s.batchSize, true)
nextSet.series = append(nextSet.series, firstSeries)

var nextSeries seriesChunkRefs
Expand Down Expand Up @@ -615,7 +713,9 @@ func (s *loadingSeriesChunkRefsSetIterator) Next() bool {
loadStats := &queryStats{}
defer s.stats.merge(loadStats)

nextSet := newSeriesChunkRefsSet(len(nextPostings))
// This can be released by the caller because loadingSeriesChunkRefsSetIterator doesn't retain it
// after Next() will be called again.
nextSet := newSeriesChunkRefsSet(len(nextPostings), true)

for _, id := range nextPostings {
lset, chks, err := s.loadSeriesForTime(id, loadedSeries, loadStats)
Expand All @@ -637,9 +737,15 @@ func (s *loadingSeriesChunkRefsSetIterator) Next() bool {
chunks: chks,
})
}

if nextSet.len() == 0 {
return s.Next() // we didn't find any suitable series in this set, try with the next one
// The next set we attempted to build is empty, so we can directly release it.
nextSet.release()

// Try with the next set of postings.
return s.Next()
}

s.currentSet = nextSet
return true
}
Expand Down
Loading

0 comments on commit a5ec2c2

Please sign in to comment.