From 03c97cda4174bfa3e5a3abec0620a53e099ac4e2 Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Wed, 28 Jun 2023 21:05:12 -0400 Subject: [PATCH] db: do not cache compaction block reads During compactions, avoid populating the block cache with input files' blocks. These files will soon be removed from the LSM, making it less likely any iterator will need to read these blocks. While Pebble uses a scan-resistant block cache algorithm (ClockPRO), the act of inserting the blocks into the cache increases contention on the block cache mutexes (#1997). This contention has been observed to significantly contribute to tail latencies, both for reads and for writes during memtable reservation. Additionally, although these blocks may be soon replaced with more useful blocks due to ClockPRO's scan resistance, they may be freed by a different thread inducing excessive TLB shootdowns (#2693). A compaction only requires a relatively small working set of buffers during its scan across input sstables. In this commit, we introduce a per-compaction BufferPool that is used to allocate buffers during cache misses. Buffers are reused throughout the compaction and only freed to the memory allocator when they're too small or the compaction is finished. This reduces pressure on the memory allocator and the block cache. --- compaction.go | 37 +++++- flushable.go | 2 +- ingest.go | 4 +- level_iter.go | 5 +- level_iter_test.go | 16 +-- merging_iter_test.go | 2 +- sstable/block.go | 18 ++- sstable/block_property_test.go | 2 +- sstable/buffer_pool.go | 139 ++++++++++++++++++++ sstable/buffer_pool_test.go | 78 +++++++++++ sstable/reader.go | 230 +++++++++++++++++++++------------ sstable/reader_test.go | 22 +++- sstable/table_test.go | 2 +- sstable/testdata/buffer_pool | 84 ++++++++++++ sstable/value_block.go | 20 +-- table_cache.go | 3 +- testdata/metrics | 4 +- 17 files changed, 542 insertions(+), 126 deletions(-) create mode 100644 sstable/buffer_pool.go create mode 100644 sstable/buffer_pool_test.go create mode 100644 sstable/testdata/buffer_pool diff --git a/compaction.go b/compaction.go index ff4d27783f..54b1ce4d40 100644 --- a/compaction.go +++ b/compaction.go @@ -566,6 +566,7 @@ type compaction struct { // resulting version has been installed (if successful), but the compaction // goroutine is still cleaning up (eg, deleting obsolete files). versionEditApplied bool + bufferPool sstable.BufferPool score float64 @@ -1343,7 +1344,10 @@ func (c *compaction) newInputIter( f manifest.LevelFile, _ *IterOptions, l manifest.Level, bytesIterated *uint64, ) (keyspan.FragmentIterator, error) { iter, rangeDelIter, err := newIters(context.Background(), f.FileMetadata, - &IterOptions{level: l}, internalIterOpts{bytesIterated: &c.bytesIterated}) + &IterOptions{level: l}, internalIterOpts{ + bytesIterated: &c.bytesIterated, + bufferPool: &c.bufferPool, + }) if err == nil { // TODO(peter): It is mildly wasteful to open the point iterator only to // immediately close it. One way to solve this would be to add new @@ -1428,7 +1432,10 @@ func (c *compaction) newInputIter( // to configure the levelIter at these levels to hide the obsolete points. addItersForLevel := func(level *compactionLevel, l manifest.Level) error { iters = append(iters, newLevelIter(iterOpts, c.cmp, nil /* split */, newIters, - level.files.Iter(), l, &c.bytesIterated)) + level.files.Iter(), l, internalIterOpts{ + bytesIterated: &c.bytesIterated, + bufferPool: &c.bufferPool, + })) // TODO(jackson): Use keyspan.LevelIter to avoid loading all the range // deletions into memory upfront. (See #2015, which reverted this.) // There will be no user keys that are split between sstables @@ -2746,6 +2753,32 @@ func (d *DB) runCompaction( d.mu.Unlock() defer d.mu.Lock() + // Compactions use a pool of buffers to read blocks, avoiding polluting the + // block cache with blocks that will not be read again. We initialize the + // buffer pool with a size 12. This initial size does not need to be + // accurate, because the pool will grow to accommodate the maximum number of + // blocks allocated at a given time over the course of the compaction. But + // choosing a size larger than that working set avoids any additional + // allocations to grow the size of the pool over the course of iteration. + // + // Justification for initial size 12: In a two-level compaction, at any + // given moment we'll have 2 index blocks in-use and 2 data blocks in-use. + // Additionally, when decoding a compressed block, we'll temporarily + // allocate 1 additional block to hold the compressed buffer. In the worst + // case that all input sstables have two-level index blocks (+2), value + // blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll + // additionally require 2n+4 blocks where n is the number of input sstables. + // Range deletion and range key blocks are relatively rare, and the cost of + // an additional allocation or two over the course of the compaction is + // considered to be okay. A larger initial size would cause the pool to hold + // on to more memory, even when it's not in-use because the pool will + // recycle buffers up to the current capacity of the pool. The memory use of + // a 12-buffer pool is expected to be within reason, even if all the buffers + // grow to the typical size of an index block (256 KiB) which would + // translate to 3 MiB per compaction. + c.bufferPool.Init(12) + defer c.bufferPool.Release() + iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots) if err != nil { return nil, pendingOutputs, stats, err diff --git a/flushable.go b/flushable.go index cbd486eb79..474d410c52 100644 --- a/flushable.go +++ b/flushable.go @@ -172,7 +172,7 @@ func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator { // aren't truly levels in the lsm. Right now, the encoding only supports // L0 sublevels, and the rest of the levels in the lsm. return newLevelIter( - opts, s.cmp, s.split, s.newIters, s.slice.Iter(), manifest.Level(0), nil, + opts, s.cmp, s.split, s.newIters, s.slice.Iter(), manifest.Level(0), internalIterOpts{}, ) } diff --git a/ingest.go b/ingest.go index 183216e666..07e55eb3d7 100644 --- a/ingest.go +++ b/ingest.go @@ -729,7 +729,7 @@ func ingestTargetLevel( // Check for overlap over the keys of L0 by iterating over the sublevels. for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ { iter := newLevelIter(iterOps, cmp, nil /* split */, newIters, - v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), nil) + v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), internalIterOpts{}) var rangeDelIter keyspan.FragmentIterator // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE @@ -760,7 +760,7 @@ func ingestTargetLevel( level := baseLevel for ; level < numLevels; level++ { levelIter := newLevelIter(iterOps, cmp, nil /* split */, newIters, - v.Levels[level].Iter(), manifest.Level(level), nil) + v.Levels[level].Iter(), manifest.Level(level), internalIterOpts{}) var rangeDelIter keyspan.FragmentIterator // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE // sets it up for the target file. diff --git a/level_iter.go b/level_iter.go index 3cf90274a0..d10ec39488 100644 --- a/level_iter.go +++ b/level_iter.go @@ -51,6 +51,7 @@ func tableNewRangeDelIter(ctx context.Context, newIters tableNewIters) keyspan.T type internalIterOpts struct { bytesIterated *uint64 + bufferPool *sstable.BufferPool stats *base.InternalIteratorStats boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter } @@ -246,11 +247,11 @@ func newLevelIter( newIters tableNewIters, files manifest.LevelIterator, level manifest.Level, - bytesIterated *uint64, + internalOpts internalIterOpts, ) *levelIter { l := &levelIter{} l.init(context.Background(), opts, cmp, split, newIters, files, level, - internalIterOpts{bytesIterated: bytesIterated}) + internalOpts) return l } diff --git a/level_iter_test.go b/level_iter_test.go index d2ec77b80b..233249ba70 100644 --- a/level_iter_test.go +++ b/level_iter_test.go @@ -88,7 +88,7 @@ func TestLevelIter(t *testing.T) { iter := newLevelIter(opts, DefaultComparer.Compare, func(a []byte) int { return len(a) }, newIters, files.Iter(), manifest.Level(level), - nil) + internalIterOpts{}) defer iter.Close() // Fake up the range deletion initialization. iter.initRangeDel(new(keyspan.FragmentIterator)) @@ -131,7 +131,7 @@ func TestLevelIter(t *testing.T) { iter := newLevelIter(opts, DefaultComparer.Compare, func(a []byte) int { return len(a) }, newIters2, files.Iter(), - manifest.Level(level), nil) + manifest.Level(level), internalIterOpts{}) iter.SeekGE([]byte(key), base.SeekGEFlagsNone) lower, upper := tableOpts.GetLowerBound(), tableOpts.GetUpperBound() return fmt.Sprintf("[%s,%s]\n", lower, upper) @@ -326,7 +326,7 @@ func TestLevelIterBoundaries(t *testing.T) { slice := manifest.NewLevelSliceKeySorted(lt.cmp.Compare, lt.metas) iter = newLevelIter(IterOptions{}, DefaultComparer.Compare, func(a []byte) int { return len(a) }, lt.newIters, slice.Iter(), - manifest.Level(level), nil) + manifest.Level(level), internalIterOpts{}) // Fake up the range deletion initialization. iter.initRangeDel(new(keyspan.FragmentIterator)) } @@ -536,7 +536,7 @@ func BenchmarkLevelIterSeekGE(b *testing.B) { iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{}) rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) b.ResetTimer() @@ -578,7 +578,7 @@ func BenchmarkLevelIterSeqSeekGEWithBounds(b *testing.B) { opts.LowerBound, opts.UpperBound) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{}) // Fake up the range deletion initialization, to resemble the usage // in a mergingIter. l.initRangeDel(new(keyspan.FragmentIterator)) @@ -627,7 +627,7 @@ func BenchmarkLevelIterSeqSeekPrefixGE(b *testing.B) { func(b *testing.B) { l := newLevelIter(IterOptions{}, DefaultComparer.Compare, func(a []byte) int { return len(a) }, newIters, metas.Iter(), - manifest.Level(level), nil) + manifest.Level(level), internalIterOpts{}) // Fake up the range deletion initialization, to resemble the usage // in a mergingIter. l.initRangeDel(new(keyspan.FragmentIterator)) @@ -672,7 +672,7 @@ func BenchmarkLevelIterNext(b *testing.B) { iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{}) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -706,7 +706,7 @@ func BenchmarkLevelIterPrev(b *testing.B) { iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{}) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/merging_iter_test.go b/merging_iter_test.go index 8d5c8d6088..24c3b2209d 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -629,7 +629,7 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS } l := newLevelIter(IterOptions{}, DefaultComparer.Compare, func(a []byte) int { return len(a) }, newIters, levelSlices[i].Iter(), - manifest.Level(level), nil) + manifest.Level(level), internalIterOpts{}) l.initRangeDel(&mils[level].rangeDelIter) l.initBoundaryContext(&mils[level].levelIterBoundaryContext) mils[level].iter = l diff --git a/sstable/block.go b/sstable/block.go index 53fe7991fb..cbd2e5ce6f 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -10,7 +10,6 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" - "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manual" @@ -400,10 +399,9 @@ type blockIter struct { // For a block encoded with a restart interval of 1, cached and cachedBuf // will not be used as there are no prefix compressed entries between the // restart points. - cached []blockEntry - cachedBuf []byte - cacheHandle cache.Handle - // The first user key in the block. This is used by the caller to set bounds + cached []blockEntry + cachedBuf []byte + handle bufferHandle // for block iteration for already loaded blocks. firstUserKey []byte lazyValueHandling struct { @@ -458,10 +456,10 @@ func (i *blockIter) init( // ingested. // - Foreign sstable iteration: globalSeqNum is always set. func (i *blockIter) initHandle( - cmp Compare, block cache.Handle, globalSeqNum uint64, hideObsoletePoints bool, + cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, ) error { - i.cacheHandle.Release() - i.cacheHandle = block + i.handle.Release() + i.handle = block return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints) } @@ -1515,8 +1513,8 @@ func (i *blockIter) Error() error { // Close implements internalIterator.Close, as documented in the pebble // package. func (i *blockIter) Close() error { - i.cacheHandle.Release() - i.cacheHandle = cache.Handle{} + i.handle.Release() + i.handle = bufferHandle{} i.val = nil i.lazyValue = base.LazyValue{} i.lazyValueHandling.vbr = nil diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go index e7298eccb7..76ad99ea2e 100644 --- a/sstable/block_property_test.go +++ b/sstable/block_property_test.go @@ -1316,7 +1316,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { // block that bhp points to, along with its block properties. if twoLevelIndex { subiter := &blockIter{} - subIndex, err := r.readBlock(context.Background(), bhp.BlockHandle, nil, nil, nil) + subIndex, err := r.readBlock(context.Background(), bhp.BlockHandle, nil, nil, nil, nil) if err != nil { return err.Error() } diff --git a/sstable/buffer_pool.go b/sstable/buffer_pool.go new file mode 100644 index 0000000000..b12318f294 --- /dev/null +++ b/sstable/buffer_pool.go @@ -0,0 +1,139 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package sstable + +import ( + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/cache" +) + +// A bufferHandle is a handle to manually-managed memory. The handle may point +// to a block in the block cache (h.Get() != nil), or a buffer that exists +// outside the block cache allocated from a BufferPool (b.Valid()). +type bufferHandle struct { + h cache.Handle + b Buf +} + +// Get retrieves the underlying buffer referenced by the handle. +func (bh bufferHandle) Get() []byte { + if v := bh.h.Get(); v != nil { + return v + } else if bh.b.p != nil { + return bh.b.p.pool[bh.b.i].b + } + return nil +} + +// Release releases the buffer, either back to the block cache or BufferPool. +func (bh bufferHandle) Release() { + bh.h.Release() + bh.b.Release() +} + +// A BufferPool holds a pool of buffers for holding sstable blocks. An initial +// size of the pool is provided on Init, but a BufferPool will grow to meet the +// largest working set size. It'll never shrink. When a buffer is released, the +// BufferPool recycles the buffer for future allocations. +// +// A BufferPool should only be used for short-lived allocations with +// well-understood working set sizes to avoid excessive memory consumption. +// +// BufferPool is not thread-safe. +type BufferPool struct { + // pool contains all the buffers held by the pool, including buffers that + // are in-use. For every i < len(pool): pool[i].v is non-nil. + pool []allocedBuffer +} + +type allocedBuffer struct { + v *cache.Value + // b holds the current byte slice. It's backed by v, but may be a subslice + // of v's memory while the buffer is in-use [ len(b) ≤ len(v.Buf()) ]. + // + // If the buffer is not currently in-use, b is nil. When being recycled, the + // BufferPool.Alloc will reset b to be a subslice of v.Buf(). + b []byte +} + +// Init initializes the pool with an initial working set buffer size of +// `initialSize`. +func (p *BufferPool) Init(initialSize int) { + *p = BufferPool{ + pool: make([]allocedBuffer, 0, initialSize), + } +} + +// Release releases all buffers held by the pool and resets the pool to an +// uninitialized state. +func (p *BufferPool) Release() { + for i := range p.pool { + if p.pool[i].b != nil { + panic(errors.AssertionFailedf("Release called on a BufferPool with in-use buffers")) + } + cache.Free(p.pool[i].v) + } + *p = BufferPool{} +} + +// Alloc allocates a new buffer of size n. If the pool already holds a buffer at +// least as large as n, the pooled buffer is used instead. +// +// Alloc is O(MAX(N,M)) where N is the largest number of concurrently in-use +// buffers allocated and M is the initialSize passed to Init. +func (p *BufferPool) Alloc(n int) Buf { + unusableBufferIdx := -1 + for i := 0; i < len(p.pool); i++ { + if p.pool[i].b == nil { + if len(p.pool[i].v.Buf()) >= n { + p.pool[i].b = p.pool[i].v.Buf()[:n] + return Buf{p: p, i: i} + } + unusableBufferIdx = i + } + } + + // If we would need to grow the size of the pool to allocate another buffer, + // but there was a slot available occupied by a buffer that's just too + // small, replace the too-small buffer. + if len(p.pool) == cap(p.pool) && unusableBufferIdx >= 0 { + i := unusableBufferIdx + cache.Free(p.pool[i].v) + p.pool[i].v = cache.Alloc(n) + p.pool[i].b = p.pool[i].v.Buf() + return Buf{p: p, i: i} + } + + // Allocate a new buffer. + v := cache.Alloc(n) + p.pool = append(p.pool, allocedBuffer{v: v, b: v.Buf()[:n]}) + return Buf{p: p, i: len(p.pool) - 1} +} + +// A Buf holds a reference to a manually-managed, pooled byte buffer. +type Buf struct { + p *BufferPool + // i holds the index into p.pool where the buffer may be found. This scheme + // avoids needing to allocate the handle to the buffer on the heap at the + // cost of copying two words instead of one. + i int +} + +// Valid returns true if the buf holds a valid buffer. +func (b Buf) Valid() bool { + return b.p != nil +} + +// Release releases the buffer back to the pool. +func (b *Buf) Release() { + if b.p == nil { + return + } + // Clear the allocedBuffer's byte slice. This signals the allocated buffer + // is no longer in use and a future call to BufferPool.Alloc may reuse this + // buffer. + b.p.pool[b.i].b = nil + b.p = nil +} diff --git a/sstable/buffer_pool_test.go b/sstable/buffer_pool_test.go new file mode 100644 index 0000000000..66ae094772 --- /dev/null +++ b/sstable/buffer_pool_test.go @@ -0,0 +1,78 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package sstable + +import ( + "bytes" + "fmt" + "io" + "testing" + + "github.com/cockroachdb/datadriven" +) + +func writeBufferPool(w io.Writer, bp *BufferPool) { + for i := 0; i < cap(bp.pool); i++ { + if i > 0 { + fmt.Fprint(w, " ") + } + if i >= len(bp.pool) { + fmt.Fprint(w, "[ ]") + continue + } + sz := len(bp.pool[i].v.Buf()) + if bp.pool[i].b == nil { + fmt.Fprintf(w, "[%4d]", sz) + } else { + fmt.Fprintf(w, "<%4d>", sz) + } + } +} + +func TestBufferPool(t *testing.T) { + var bp BufferPool + var buf bytes.Buffer + handles := map[string]Buf{} + drainPool := func() { + for h, b := range handles { + b.Release() + delete(handles, h) + } + bp.Release() + } + defer drainPool() + datadriven.RunTest(t, "testdata/buffer_pool", func(t *testing.T, td *datadriven.TestData) string { + buf.Reset() + switch td.Cmd { + case "init": + if cap(bp.pool) > 0 { + drainPool() + } + var initialSize int + td.ScanArgs(t, "size", &initialSize) + bp.Init(initialSize) + writeBufferPool(&buf, &bp) + return buf.String() + case "alloc": + var n int + var handle string + td.ScanArgs(t, "n", &n) + td.ScanArgs(t, "handle", &handle) + handles[handle] = bp.Alloc(n) + writeBufferPool(&buf, &bp) + return buf.String() + case "release": + var handle string + td.ScanArgs(t, "handle", &handle) + b := handles[handle] + b.Release() + delete(handles, handle) + writeBufferPool(&buf, &bp) + return buf.String() + default: + return fmt.Sprintf("unrecognized command %q", td.Cmd) + } + }) +} diff --git a/sstable/reader.go b/sstable/reader.go index bd3138a932..220b8bbf19 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -224,6 +224,7 @@ type singleLevelIterator struct { err error closeHook func(i Iterator) error stats *base.InternalIteratorStats + bufferPool *BufferPool // boundsCmp and positionedUsingLatestBounds are for optimizing iteration // that uses multiple adjacent bounds. The seek after setting a new bound @@ -362,32 +363,32 @@ var rangeKeyFragmentBlockIterPool = sync.Pool{ func checkSingleLevelIterator(obj interface{}) { i := obj.(*singleLevelIterator) - if p := i.data.cacheHandle.Get(); p != nil { - fmt.Fprintf(os.Stderr, "singleLevelIterator.data.cacheHandle is not nil: %p\n", p) + if p := i.data.handle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.data.handle is not nil: %p\n", p) os.Exit(1) } - if p := i.index.cacheHandle.Get(); p != nil { - fmt.Fprintf(os.Stderr, "singleLevelIterator.index.cacheHandle is not nil: %p\n", p) + if p := i.index.handle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.index.handle is not nil: %p\n", p) os.Exit(1) } } func checkTwoLevelIterator(obj interface{}) { i := obj.(*twoLevelIterator) - if p := i.data.cacheHandle.Get(); p != nil { - fmt.Fprintf(os.Stderr, "singleLevelIterator.data.cacheHandle is not nil: %p\n", p) + if p := i.data.handle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.data.handle is not nil: %p\n", p) os.Exit(1) } - if p := i.index.cacheHandle.Get(); p != nil { - fmt.Fprintf(os.Stderr, "singleLevelIterator.index.cacheHandle is not nil: %p\n", p) + if p := i.index.handle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.index.handle is not nil: %p\n", p) os.Exit(1) } } func checkRangeKeyFragmentBlockIterator(obj interface{}) { i := obj.(*rangeKeyFragmentBlockIter) - if p := i.blockIter.cacheHandle.Get(); p != nil { - fmt.Fprintf(os.Stderr, "fragmentBlockIter.blockIter.cacheHandle is not nil: %p\n", p) + if p := i.blockIter.handle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "fragmentBlockIter.blockIter.handle is not nil: %p\n", p) os.Exit(1) } } @@ -408,6 +409,7 @@ func (i *singleLevelIterator) init( useFilter, hideObsoletePoints bool, stats *base.InternalIteratorStats, rp ReaderProvider, + bufferPool *BufferPool, ) error { if r.err != nil { return r.err @@ -430,6 +432,7 @@ func (i *singleLevelIterator) init( i.cmp = r.Compare i.stats = stats i.hideObsoletePoints = hideObsoletePoints + i.bufferPool = bufferPool err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum, false) if err != nil { // blockIter.Close releases indexH and always returns a nil error @@ -581,7 +584,7 @@ func (i *singleLevelIterator) loadBlock(dir int8) loadBlockResult { // blockIntersects } ctx := objiotracing.WithBlockType(i.ctx, objiotracing.DataBlock) - block, err := i.reader.readBlock(ctx, i.dataBH, nil /* transform */, i.dataRH, i.stats) + block, err := i.reader.readBlock(ctx, i.dataBH, nil /* transform */, i.dataRH, i.stats, i.bufferPool) if err != nil { i.err = err return loadBlockFailed @@ -600,9 +603,9 @@ func (i *singleLevelIterator) loadBlock(dir int8) loadBlockResult { // the valueBlockReader. func (i *singleLevelIterator) readBlockForVBR( ctx context.Context, h BlockHandle, stats *base.InternalIteratorStats, -) (cache.Handle, error) { +) (bufferHandle, error) { ctx = objiotracing.WithBlockType(ctx, objiotracing.ValueBlock) - return i.reader.readBlock(ctx, h, nil, i.vbRH, stats) + return i.reader.readBlock(ctx, h, nil, i.vbRH, stats, i.bufferPool) } // resolveMaybeExcluded is invoked when the block-property filterer has found @@ -967,7 +970,7 @@ func (i *singleLevelIterator) seekPrefixGE( } i.lastBloomFilterMatched = false // Check prefix bloom filter. - var dataH cache.Handle + var dataH bufferHandle dataH, i.err = i.reader.readFilter(i.ctx, i.stats) if i.err != nil { i.data.invalidate() @@ -1780,7 +1783,7 @@ func (i *twoLevelIterator) loadIndex(dir int8) loadBlockResult { // blockIntersects } ctx := objiotracing.WithBlockType(i.ctx, objiotracing.MetadataBlock) - indexBlock, err := i.reader.readBlock(ctx, bhp.BlockHandle, nil /* transform */, nil /* readHandle */, i.stats) + indexBlock, err := i.reader.readBlock(ctx, bhp.BlockHandle, nil /* transform */, nil /* readHandle */, i.stats, i.bufferPool) if err != nil { i.err = err return loadBlockFailed @@ -1866,6 +1869,7 @@ func (i *twoLevelIterator) init( useFilter, hideObsoletePoints bool, stats *base.InternalIteratorStats, rp ReaderProvider, + bufferPool *BufferPool, ) error { if r.err != nil { return r.err @@ -1889,6 +1893,7 @@ func (i *twoLevelIterator) init( i.cmp = r.Compare i.stats = stats i.hideObsoletePoints = hideObsoletePoints + i.bufferPool = bufferPool err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum, false) if err != nil { // blockIter.Close releases topLevelIndexH and always returns a nil error @@ -2130,7 +2135,7 @@ func (i *twoLevelIterator) SeekPrefixGE( flags = flags.DisableTrySeekUsingNext() } i.lastBloomFilterMatched = false - var dataH cache.Handle + var dataH bufferHandle dataH, i.err = i.reader.readFilter(i.ctx, i.stats) if i.err != nil { i.data.invalidate() @@ -2924,9 +2929,9 @@ func MakeVirtualReader(reader *Reader, meta manifest.VirtualFileMeta) VirtualRea // NewCompactionIter is the compaction iterator function for virtual readers. func (v *VirtualReader) NewCompactionIter( - bytesIterated *uint64, rp ReaderProvider, + bytesIterated *uint64, rp ReaderProvider, bufferPool *BufferPool, ) (Iterator, error) { - return v.reader.newCompactionIter(bytesIterated, rp, &v.vState) + return v.reader.newCompactionIter(bytesIterated, rp, &v.vState, bufferPool) } // NewIterWithBlockPropertyFiltersAndContextEtc wraps @@ -3152,7 +3157,7 @@ func (r *Reader) newIterWithBlockPropertyFiltersAndContext( // until the final iterator closes. if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) - err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp) + err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp, nil /* bufferPool */) if err != nil { return nil, err } @@ -3160,7 +3165,7 @@ func (r *Reader) newIterWithBlockPropertyFiltersAndContext( } i := singleLevelIterPool.Get().(*singleLevelIterator) - err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp) + err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp, nil /* bufferPool */) if err != nil { return nil, err } @@ -3180,12 +3185,14 @@ func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { // NewCompactionIter returns an iterator similar to NewIter but it also increments // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up // after itself and returns a nil iterator. -func (r *Reader) NewCompactionIter(bytesIterated *uint64, rp ReaderProvider) (Iterator, error) { - return r.newCompactionIter(bytesIterated, rp, nil) +func (r *Reader) NewCompactionIter( + bytesIterated *uint64, rp ReaderProvider, bufferPool *BufferPool, +) (Iterator, error) { + return r.newCompactionIter(bytesIterated, rp, nil, bufferPool) } func (r *Reader) newCompactionIter( - bytesIterated *uint64, rp ReaderProvider, v *virtualState, + bytesIterated *uint64, rp ReaderProvider, v *virtualState, bufferPool *BufferPool, ) (Iterator, error) { if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) @@ -3193,7 +3200,7 @@ func (r *Reader) newCompactionIter( context.Background(), r, v, nil /* lower */, nil /* upper */, nil, false /* useFilter */, false, /* hideObsoletePoints */ - nil /* stats */, rp, + nil /* stats */, rp, bufferPool, ) if err != nil { return nil, err @@ -3208,7 +3215,7 @@ func (r *Reader) newCompactionIter( err := i.init( context.Background(), r, v, nil /* lower */, nil, /* upper */ nil, false /* useFilter */, false, /* hideObsoletePoints */ - nil /* stats */, rp, + nil /* stats */, rp, bufferPool, ) if err != nil { return nil, err @@ -3275,26 +3282,26 @@ func (i *rangeKeyFragmentBlockIter) Close() error { func (r *Reader) readIndex( ctx context.Context, stats *base.InternalIteratorStats, -) (cache.Handle, error) { +) (bufferHandle, error) { ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock) - return r.readBlock(ctx, r.indexBH, nil, nil, stats) + return r.readBlock(ctx, r.indexBH, nil, nil, stats, nil /* buffer pool */) } func (r *Reader) readFilter( ctx context.Context, stats *base.InternalIteratorStats, -) (cache.Handle, error) { +) (bufferHandle, error) { ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock) - return r.readBlock(ctx, r.filterBH, nil /* transform */, nil /* readHandle */, stats) + return r.readBlock(ctx, r.filterBH, nil /* transform */, nil /* readHandle */, stats, nil /* buffer pool */) } -func (r *Reader) readRangeDel(stats *base.InternalIteratorStats) (cache.Handle, error) { +func (r *Reader) readRangeDel(stats *base.InternalIteratorStats) (bufferHandle, error) { ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock) - return r.readBlock(ctx, r.rangeDelBH, r.rangeDelTransform, nil /* readHandle */, stats) + return r.readBlock(ctx, r.rangeDelBH, r.rangeDelTransform, nil /* readHandle */, stats, nil /* buffer pool */) } -func (r *Reader) readRangeKey(stats *base.InternalIteratorStats) (cache.Handle, error) { +func (r *Reader) readRangeKey(stats *base.InternalIteratorStats) (bufferHandle, error) { ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock) - return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats) + return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, nil /* buffer pool */) } func checkChecksum( @@ -3319,15 +3326,46 @@ func checkChecksum( return nil } -// readBlock reads and decompresses a block from disk into memory. +type cacheValueOrBuf struct { + // buf.Valid() returns true if backed by a BufferPool. + buf Buf + // v is non-nil if backed by the block cache. + v *cache.Value +} + +func (b cacheValueOrBuf) get() []byte { + if b.buf.Valid() { + return b.buf.p.pool[b.buf.i].b + } + return b.v.Buf() +} + +func (b cacheValueOrBuf) release() { + if b.buf.Valid() { + b.buf.Release() + } else { + cache.Free(b.v) + } +} + +func (b cacheValueOrBuf) truncate(n int) { + if b.buf.Valid() { + b.buf.p.pool[b.buf.i].b = b.buf.p.pool[b.buf.i].b[:n] + } else { + b.v.Truncate(n) + } +} + func (r *Reader) readBlock( ctx context.Context, bh BlockHandle, transform blockTransform, readHandle objstorage.ReadHandle, stats *base.InternalIteratorStats, -) (handle cache.Handle, _ error) { + bufferPool *BufferPool, +) (handle bufferHandle, _ error) { if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil { + // Cache hit. if readHandle != nil { readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+blockTrailerLen)) } @@ -3335,17 +3373,29 @@ func (r *Reader) readBlock( stats.BlockBytes += bh.Length stats.BlockBytesInCache += bh.Length } - return h, nil + // This block is already in the cache; return a handle to existing vlaue + // in the cache. + return bufferHandle{h: h}, nil + } + + // Cache miss. + var compressed cacheValueOrBuf + if bufferPool != nil { + compressed = cacheValueOrBuf{ + buf: bufferPool.Alloc(int(bh.Length + blockTrailerLen)), + } + } else { + compressed = cacheValueOrBuf{ + v: cache.Alloc(int(bh.Length + blockTrailerLen)), + } } - v := cache.Alloc(int(bh.Length + blockTrailerLen)) - b := v.Buf() readStartTime := time.Now() var err error if readHandle != nil { - err = readHandle.ReadAt(ctx, b, int64(bh.Offset)) + err = readHandle.ReadAt(ctx, compressed.get(), int64(bh.Offset)) } else { - err = r.readable.ReadAt(ctx, b, int64(bh.Offset)) + err = r.readable.ReadAt(ctx, compressed.get(), int64(bh.Offset)) } readDuration := time.Since(readStartTime) // TODO(sumeer): should the threshold be configurable. @@ -3358,56 +3408,74 @@ func (r *Reader) readBlock( // interface{}, unless necessary. if readDuration >= slowReadTracingThreshold && r.opts.LoggerAndTracer.IsTracingEnabled(ctx) { r.opts.LoggerAndTracer.Eventf(ctx, "reading %d bytes took %s", - bh.Length+blockTrailerLen, readDuration.String()) + int(bh.Length+blockTrailerLen), readDuration.String()) } if stats != nil { stats.BlockReadDuration += readDuration } if err != nil { - cache.Free(v) - return cache.Handle{}, err + compressed.release() + return bufferHandle{}, err } - - if err := checkChecksum(r.checksumType, b, bh, r.fileNum.FileNum()); err != nil { - cache.Free(v) - return cache.Handle{}, err + if err := checkChecksum(r.checksumType, compressed.get(), bh, r.fileNum.FileNum()); err != nil { + compressed.release() + return bufferHandle{}, err } - typ := blockType(b[bh.Length]) - b = b[:bh.Length] - v.Truncate(len(b)) + typ := blockType(compressed.get()[bh.Length]) + compressed.truncate(int(bh.Length)) + + var decompressed cacheValueOrBuf + if typ == noCompressionBlockType { + decompressed = compressed + } else { + // Decode the length of the decompressed value. + decodedLen, prefixLen, err := decompressedLen(typ, compressed.get()) + if err != nil { + compressed.release() + return bufferHandle{}, err + } - decoded, err := decompressBlock(typ, b) - if decoded != nil { - cache.Free(v) - v = decoded - b = v.Buf() - } else if err != nil { - cache.Free(v) - return cache.Handle{}, err + if bufferPool != nil { + decompressed = cacheValueOrBuf{buf: bufferPool.Alloc(decodedLen)} + } else { + decompressed = cacheValueOrBuf{v: cache.Alloc(decodedLen)} + } + if _, err := decompressInto(typ, compressed.get()[prefixLen:], decompressed.get()); err != nil { + compressed.release() + return bufferHandle{}, err + } + compressed.release() } if transform != nil { - // Transforming blocks is rare, so the extra copy of the transformed data - // is not problematic. - var err error - b, err = transform(b) + // Transforming blocks is very rare, so the extra copy of the + // transformed data is not problematic. + tmpTransformed, err := transform(decompressed.get()) if err != nil { - cache.Free(v) - return cache.Handle{}, err + decompressed.release() + return bufferHandle{}, err + } + + var transformed cacheValueOrBuf + if bufferPool != nil { + transformed = cacheValueOrBuf{buf: bufferPool.Alloc(len(tmpTransformed))} + } else { + transformed = cacheValueOrBuf{v: cache.Alloc(len(tmpTransformed))} } - newV := cache.Alloc(len(b)) - copy(newV.Buf(), b) - cache.Free(v) - v = newV + copy(transformed.get(), tmpTransformed) + decompressed.release() + decompressed = transformed } if stats != nil { stats.BlockBytes += bh.Length } - - h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, v) - return h, nil + if decompressed.buf.Valid() { + return bufferHandle{b: decompressed.buf}, nil + } + h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, decompressed.v) + return bufferHandle{h: h}, nil } func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { @@ -3455,7 +3523,7 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { b, err := r.readBlock( - context.Background(), metaindexBH, nil /* transform */, nil /* readHandle */, nil /* stats */) + context.Background(), metaindexBH, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { return err } @@ -3498,7 +3566,7 @@ func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { if bh, ok := meta[metaPropertiesName]; ok { b, err = r.readBlock( - context.Background(), bh, nil /* transform */, nil /* readHandle */, nil /* stats */) + context.Background(), bh, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { return err } @@ -3603,8 +3671,8 @@ func (r *Reader) Layout() (*Layout, error) { } l.Index = append(l.Index, indexBH.BlockHandle) - subIndex, err := r.readBlock(context.Background(), - indexBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */) + subIndex, err := r.readBlock(context.Background(), indexBH.BlockHandle, + nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { return nil, err } @@ -3627,7 +3695,7 @@ func (r *Reader) Layout() (*Layout, error) { } } if r.valueBIH.h.Length != 0 { - vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil) + vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil /* buffer pool */) if err != nil { return nil, err } @@ -3698,7 +3766,7 @@ func (r *Reader) ValidateBlockChecksums() error { } // Read the block, which validates the checksum. - h, err := r.readBlock(context.Background(), bh, nil, rh, nil) + h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* buffer pool */) if err != nil { return err } @@ -3760,8 +3828,8 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if err != nil { return 0, errCorruptIndexEntry } - startIdxBlock, err := r.readBlock(context.Background(), - startIdxBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */) + startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.BlockHandle, + nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { return 0, err } @@ -3782,7 +3850,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, errCorruptIndexEntry } endIdxBlock, err := r.readBlock(context.Background(), - endIdxBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */) + endIdxBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { return 0, err } @@ -4059,7 +4127,7 @@ func (l *Layout) Describe( } h, err := r.readBlock( - context.Background(), b.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */) + context.Background(), b.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { fmt.Fprintf(w, " [err: %s]\n", err) continue diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 9509ef42ec..811f5245ae 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -183,6 +183,7 @@ func TestVirtualReader(t *testing.T) { // Set during the latest build command. var r *Reader var meta manifest.PhysicalFileMeta + var bp BufferPool // Set during the latest virtualize command. var vMeta1 manifest.VirtualFileMeta @@ -191,6 +192,7 @@ func TestVirtualReader(t *testing.T) { defer func() { if r != nil { require.NoError(t, r.Close()) + bp.Release() } }() @@ -251,6 +253,7 @@ func TestVirtualReader(t *testing.T) { switch td.Cmd { case "build": if r != nil { + bp.Release() _ = r.Close() r = nil meta.FileMetadata = nil @@ -275,6 +278,7 @@ func TestVirtualReader(t *testing.T) { if err != nil { return err.Error() } + bp.Init(5) // Create a fake filemetada using the writer meta. meta, err = createPhysicalMeta(wMeta, r) @@ -330,7 +334,7 @@ func TestVirtualReader(t *testing.T) { var rp ReaderProvider var bytesIterated uint64 - iter, err := v.NewCompactionIter(&bytesIterated, rp) + iter, err := v.NewCompactionIter(&bytesIterated, rp, &bp) if err != nil { return err.Error() } @@ -680,7 +684,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { fmt.Fprintf(&buf, " %s: size %d\n", string(key.UserKey), bh.Length) if twoLevelIndex { b, err := r.readBlock( - context.Background(), bh.BlockHandle, nil, nil, nil) + context.Background(), bh.BlockHandle, nil, nil, nil, nil) require.NoError(t, err) defer b.Release() iter2, err := newBlockIter(r.Compare, b.Get()) @@ -911,7 +915,9 @@ func testBytesIteratedWithCompression( for _, numEntries := range []uint64{0, 1, maxNumEntries[i]} { r := buildTestTable(t, numEntries, blockSize, indexBlockSize, compression) var bytesIterated, prevIterated uint64 - citer, err := r.NewCompactionIter(&bytesIterated, TrivialReaderProvider{Reader: r}) + var pool BufferPool + pool.Init(5) + citer, err := r.NewCompactionIter(&bytesIterated, TrivialReaderProvider{Reader: r}, &pool) require.NoError(t, err) for key, _ := citer.First(); key != nil; key, _ = citer.Next() { @@ -930,6 +936,7 @@ func testBytesIteratedWithCompression( require.NoError(t, citer.Close()) require.NoError(t, r.Close()) + pool.Release() } } } @@ -965,7 +972,9 @@ func TestCompactionIteratorSetupForCompaction(t *testing.T) { for _, numEntries := range []uint64{0, 1, 1e5} { r := buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, DefaultCompression) var bytesIterated uint64 - citer, err := r.NewCompactionIter(&bytesIterated, TrivialReaderProvider{Reader: r}) + var pool BufferPool + pool.Init(5) + citer, err := r.NewCompactionIter(&bytesIterated, TrivialReaderProvider{Reader: r}, &pool) require.NoError(t, err) switch i := citer.(type) { case *compactionIterator: @@ -983,6 +992,7 @@ func TestCompactionIteratorSetupForCompaction(t *testing.T) { } require.NoError(t, citer.Close()) require.NoError(t, r.Close()) + pool.Release() } } } @@ -1017,7 +1027,9 @@ func TestReadaheadSetupForV3TablesWithMultipleVersions(t *testing.T) { require.NoError(t, err) defer r.Close() { - citer, err := r.NewCompactionIter(nil, TrivialReaderProvider{Reader: r}) + var pool BufferPool + pool.Init(5) + citer, err := r.NewCompactionIter(nil, TrivialReaderProvider{Reader: r}, &pool) require.NoError(t, err) defer citer.Close() i := citer.(*compactionIterator) diff --git a/sstable/table_test.go b/sstable/table_test.go index b3c5341b6a..2bf6b3cd2e 100644 --- a/sstable/table_test.go +++ b/sstable/table_test.go @@ -694,7 +694,7 @@ func TestMetaIndexEntriesSorted(t *testing.T) { r, err := newReader(f, ReaderOptions{}) require.NoError(t, err) - b, err := r.readBlock(context.Background(), r.metaIndexBH, nil, nil, nil) + b, err := r.readBlock(context.Background(), r.metaIndexBH, nil, nil, nil, nil) require.NoError(t, err) defer b.Release() diff --git a/sstable/testdata/buffer_pool b/sstable/testdata/buffer_pool new file mode 100644 index 0000000000..70161624d4 --- /dev/null +++ b/sstable/testdata/buffer_pool @@ -0,0 +1,84 @@ +# Each command prints the current state of the buffer pool. +# +# [ ] - Indicates a cell within BufferPool.pool's underlying array that's +# unused and does not hold a buffer. +# [ n] - Indicates a cell within BufferPool.pool that is not currently in use, +# but does hold a buffer of size n. +# < n> - Indicates a cell within BufferPool.pool that holds a buffer of size +# n, and that buffer is presently in-use and ineligible for reuse. + +init size=5 +---- +[ ] [ ] [ ] [ ] [ ] + +alloc n=512 handle=foo +---- +< 512> [ ] [ ] [ ] [ ] + +release handle=foo +---- +[ 512] [ ] [ ] [ ] [ ] + +# Allocating again should use the existing buffer. + +alloc n=512 handle=bar +---- +< 512> [ ] [ ] [ ] [ ] + +# Allocating again should allocate a new buffer for the next slot. + +alloc n=512 handle=bax +---- +< 512> < 512> [ ] [ ] [ ] + +release handle=bar +---- +[ 512] < 512> [ ] [ ] [ ] + +release handle=bax +---- +[ 512] [ 512] [ ] [ ] [ ] + +# Fill up the entire preallocated pool slice. + +alloc n=128 handle=bar +---- +< 512> [ 512] [ ] [ ] [ ] + +alloc n=1 handle=bax +---- +< 512> < 512> [ ] [ ] [ ] + +alloc n=1 handle=bux +---- +< 512> < 512> < 1> [ ] [ ] + +alloc n=1024 handle=foo +---- +< 512> < 512> < 1> <1024> [ ] + +alloc n=1024 handle=fax +---- +< 512> < 512> < 1> <1024> <1024> + +# Allocating one more should grow the underlying slice, and allocate a +# new appropriately sized buffer. + +alloc n=2048 handle=zed +---- +< 512> < 512> < 1> <1024> <1024> <2048> [ ] [ ] [ ] [ ] + +release handle=bux +---- +< 512> < 512> [ 1] <1024> <1024> <2048> [ ] [ ] [ ] [ ] + +alloc n=2 handle=bux +---- +< 512> < 512> [ 1] <1024> <1024> <2048> < 2> [ ] [ ] [ ] + +init size=0 +---- + +alloc n=1 handle=foo +---- +< 1> diff --git a/sstable/value_block.go b/sstable/value_block.go index fc0e7004fc..447348bf5b 100644 --- a/sstable/value_block.go +++ b/sstable/value_block.go @@ -13,7 +13,6 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" - "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing" "golang.org/x/exp/rand" @@ -728,7 +727,7 @@ func (ukb *UserKeyPrefixBound) IsEmpty() bool { type blockProviderWhenOpen interface { readBlockForVBR( ctx context.Context, h BlockHandle, stats *base.InternalIteratorStats, - ) (cache.Handle, error) + ) (bufferHandle, error) } type blockProviderWhenClosed struct { @@ -749,9 +748,12 @@ func (bpwc *blockProviderWhenClosed) close() { func (bpwc blockProviderWhenClosed) readBlockForVBR( ctx context.Context, h BlockHandle, stats *base.InternalIteratorStats, -) (cache.Handle, error) { +) (bufferHandle, error) { ctx = objiotracing.WithBlockType(ctx, objiotracing.ValueBlock) - return bpwc.r.readBlock(ctx, h, nil, nil, stats) + // TODO(jackson,sumeer): Consider whether to use a buffer pool in this case. + // The bpwc is not allowed to outlive the iterator tree, so it cannot + // outlive the buffer pool. + return bpwc.r.readBlock(ctx, h, nil, nil, stats, nil /* buffer pool */) } // ReaderProvider supports the implementation of blockProviderWhenClosed. @@ -790,16 +792,16 @@ type valueBlockReader struct { // The value blocks index is lazily retrieved the first time the reader // needs to read a value that resides in a value block. vbiBlock []byte - vbiCache cache.Handle + vbiCache bufferHandle // When sequentially iterating through all key-value pairs, the cost of // repeatedly getting a block that is already in the cache and releasing the - // cache.Handle can be ~40% of the cpu overhead. So the reader remembers the + // bufferHandle can be ~40% of the cpu overhead. So the reader remembers the // last value block it retrieved, in case there is locality of access, and // this value block can be used for the next value retrieval. valueBlockNum uint32 valueBlock []byte valueBlockPtr unsafe.Pointer - valueCache cache.Handle + valueCache bufferHandle lazyFetcher base.LazyFetcher closed bool bufToMangle []byte @@ -833,12 +835,12 @@ func (r *valueBlockReader) close() { // we were to reopen this valueBlockReader and retrieve the same // Handle.value from the cache, we don't want to accidentally unref it when // attempting to unref the old handle. - r.vbiCache = cache.Handle{} + r.vbiCache = bufferHandle{} r.valueBlock = nil r.valueBlockPtr = nil r.valueCache.Release() // See comment above. - r.valueCache = cache.Handle{} + r.valueCache = bufferHandle{} r.closed = true // rp, vbih, stats remain valid, so that LazyFetcher.ValueFetcher can be // implemented. diff --git a/table_cache.go b/table_cache.go index 67f1833720..444b9250bc 100644 --- a/table_cache.go +++ b/table_cache.go @@ -440,6 +440,7 @@ func (c *tableCacheShard) newIters( NewCompactionIter( bytesIterated *uint64, rp sstable.ReaderProvider, + bufferPool *sstable.BufferPool, ) (sstable.Iterator, error) } @@ -521,7 +522,7 @@ func (c *tableCacheShard) newIters( } if internalOpts.bytesIterated != nil { - iter, err = ic.NewCompactionIter(internalOpts.bytesIterated, rp) + iter, err = ic.NewCompactionIter(internalOpts.bytesIterated, rp, internalOpts.bufferPool) } else { iter, err = ic.NewIterWithBlockPropertyFiltersAndContextEtc( ctx, opts.GetLowerBound(), opts.GetUpperBound(), filterer, hideObsoletePoints, useFilter, diff --git a/testdata/metrics b/testdata/metrics index 4a362f27d3..a48140a1eb 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -82,7 +82,7 @@ compact 1 0 B 0 B 0 (size == esti memtbl 1 256 K zmemtbl 2 512 K ztbl 2 1.2 K - bcache 8 1.1 K 42.9% (score == hit-rate) + bcache 7 1.1 K 42.9% (score == hit-rate) tcache 2 1.3 K 66.7% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 2 @@ -116,7 +116,7 @@ compact 1 0 B 0 B 0 (size == esti memtbl 1 256 K zmemtbl 1 256 K ztbl 2 1.2 K - bcache 8 1.1 K 42.9% (score == hit-rate) + bcache 7 1.1 K 42.9% (score == hit-rate) tcache 2 1.3 K 66.7% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 2