Skip to content

Commit

Permalink
db: do not cache compaction block reads
Browse files Browse the repository at this point in the history
During compactions, avoid populating the block cache with input files' blocks.
These files will soon be removed from the LSM, making it less likely any
iterator will need to read these blocks. While Pebble uses a scan-resistant
block cache algorithm (ClockPRO), the act of inserting the blocks into the
cache increases contention on the block cache mutexes (#1997). This contention
has been observed to significantly contribute to tail latencies, both for reads
and for writes during memtable reservation. Additionally, although these blocks
may be soon replaced with more useful blocks due to ClockPRO's scan resistance,
they may be freed by a different thread inducing excessive TLB shootdowns
(#2693).

A compaction only requires a relatively small working set of buffers during its
scan across input sstables. In this commit, we introduce a per-compaction
BufferPool that is used to allocate buffers during cache misses. Buffers are
reused throughout the compaction and only freed to the memory allocator when
they're too small or the compaction is finished. This reduces pressure on the
memory allocator and the block cache.
  • Loading branch information
jbowens committed Jul 16, 2023
1 parent a89c926 commit 03c97cd
Show file tree
Hide file tree
Showing 17 changed files with 542 additions and 126 deletions.
37 changes: 35 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ type compaction struct {
// resulting version has been installed (if successful), but the compaction
// goroutine is still cleaning up (eg, deleting obsolete files).
versionEditApplied bool
bufferPool sstable.BufferPool

score float64

Expand Down Expand Up @@ -1343,7 +1344,10 @@ func (c *compaction) newInputIter(
f manifest.LevelFile, _ *IterOptions, l manifest.Level, bytesIterated *uint64,
) (keyspan.FragmentIterator, error) {
iter, rangeDelIter, err := newIters(context.Background(), f.FileMetadata,
&IterOptions{level: l}, internalIterOpts{bytesIterated: &c.bytesIterated})
&IterOptions{level: l}, internalIterOpts{
bytesIterated: &c.bytesIterated,
bufferPool: &c.bufferPool,
})
if err == nil {
// TODO(peter): It is mildly wasteful to open the point iterator only to
// immediately close it. One way to solve this would be to add new
Expand Down Expand Up @@ -1428,7 +1432,10 @@ func (c *compaction) newInputIter(
// to configure the levelIter at these levels to hide the obsolete points.
addItersForLevel := func(level *compactionLevel, l manifest.Level) error {
iters = append(iters, newLevelIter(iterOpts, c.cmp, nil /* split */, newIters,
level.files.Iter(), l, &c.bytesIterated))
level.files.Iter(), l, internalIterOpts{
bytesIterated: &c.bytesIterated,
bufferPool: &c.bufferPool,
}))
// TODO(jackson): Use keyspan.LevelIter to avoid loading all the range
// deletions into memory upfront. (See #2015, which reverted this.)
// There will be no user keys that are split between sstables
Expand Down Expand Up @@ -2746,6 +2753,32 @@ func (d *DB) runCompaction(
d.mu.Unlock()
defer d.mu.Lock()

// Compactions use a pool of buffers to read blocks, avoiding polluting the
// block cache with blocks that will not be read again. We initialize the
// buffer pool with a size 12. This initial size does not need to be
// accurate, because the pool will grow to accommodate the maximum number of
// blocks allocated at a given time over the course of the compaction. But
// choosing a size larger than that working set avoids any additional
// allocations to grow the size of the pool over the course of iteration.
//
// Justification for initial size 12: In a two-level compaction, at any
// given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
// Additionally, when decoding a compressed block, we'll temporarily
// allocate 1 additional block to hold the compressed buffer. In the worst
// case that all input sstables have two-level index blocks (+2), value
// blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
// additionally require 2n+4 blocks where n is the number of input sstables.
// Range deletion and range key blocks are relatively rare, and the cost of
// an additional allocation or two over the course of the compaction is
// considered to be okay. A larger initial size would cause the pool to hold
// on to more memory, even when it's not in-use because the pool will
// recycle buffers up to the current capacity of the pool. The memory use of
// a 12-buffer pool is expected to be within reason, even if all the buffers
// grow to the typical size of an index block (256 KiB) which would
// translate to 3 MiB per compaction.
c.bufferPool.Init(12)
defer c.bufferPool.Release()

iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots)
if err != nil {
return nil, pendingOutputs, stats, err
Expand Down
2 changes: 1 addition & 1 deletion flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
// aren't truly levels in the lsm. Right now, the encoding only supports
// L0 sublevels, and the rest of the levels in the lsm.
return newLevelIter(
opts, s.cmp, s.split, s.newIters, s.slice.Iter(), manifest.Level(0), nil,
opts, s.cmp, s.split, s.newIters, s.slice.Iter(), manifest.Level(0), internalIterOpts{},
)
}

Expand Down
4 changes: 2 additions & 2 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ func ingestTargetLevel(
// Check for overlap over the keys of L0 by iterating over the sublevels.
for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ {
iter := newLevelIter(iterOps, cmp, nil /* split */, newIters,
v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), nil)
v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), internalIterOpts{})

var rangeDelIter keyspan.FragmentIterator
// Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
Expand Down Expand Up @@ -760,7 +760,7 @@ func ingestTargetLevel(
level := baseLevel
for ; level < numLevels; level++ {
levelIter := newLevelIter(iterOps, cmp, nil /* split */, newIters,
v.Levels[level].Iter(), manifest.Level(level), nil)
v.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
var rangeDelIter keyspan.FragmentIterator
// Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
// sets it up for the target file.
Expand Down
5 changes: 3 additions & 2 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func tableNewRangeDelIter(ctx context.Context, newIters tableNewIters) keyspan.T

type internalIterOpts struct {
bytesIterated *uint64
bufferPool *sstable.BufferPool
stats *base.InternalIteratorStats
boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
}
Expand Down Expand Up @@ -246,11 +247,11 @@ func newLevelIter(
newIters tableNewIters,
files manifest.LevelIterator,
level manifest.Level,
bytesIterated *uint64,
internalOpts internalIterOpts,
) *levelIter {
l := &levelIter{}
l.init(context.Background(), opts, cmp, split, newIters, files, level,
internalIterOpts{bytesIterated: bytesIterated})
internalOpts)
return l
}

Expand Down
16 changes: 8 additions & 8 deletions level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestLevelIter(t *testing.T) {

iter := newLevelIter(opts, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters, files.Iter(), manifest.Level(level),
nil)
internalIterOpts{})
defer iter.Close()
// Fake up the range deletion initialization.
iter.initRangeDel(new(keyspan.FragmentIterator))
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestLevelIter(t *testing.T) {

iter := newLevelIter(opts, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters2, files.Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
iter.SeekGE([]byte(key), base.SeekGEFlagsNone)
lower, upper := tableOpts.GetLowerBound(), tableOpts.GetUpperBound()
return fmt.Sprintf("[%s,%s]\n", lower, upper)
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestLevelIterBoundaries(t *testing.T) {
slice := manifest.NewLevelSliceKeySorted(lt.cmp.Compare, lt.metas)
iter = newLevelIter(IterOptions{}, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, lt.newIters, slice.Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
// Fake up the range deletion initialization.
iter.initRangeDel(new(keyspan.FragmentIterator))
}
Expand Down Expand Up @@ -536,7 +536,7 @@ func BenchmarkLevelIterSeekGE(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))

b.ResetTimer()
Expand Down Expand Up @@ -578,7 +578,7 @@ func BenchmarkLevelIterSeqSeekGEWithBounds(b *testing.B) {
opts.LowerBound, opts.UpperBound)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})
// Fake up the range deletion initialization, to resemble the usage
// in a mergingIter.
l.initRangeDel(new(keyspan.FragmentIterator))
Expand Down Expand Up @@ -627,7 +627,7 @@ func BenchmarkLevelIterSeqSeekPrefixGE(b *testing.B) {
func(b *testing.B) {
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters, metas.Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
// Fake up the range deletion initialization, to resemble the usage
// in a mergingIter.
l.initRangeDel(new(keyspan.FragmentIterator))
Expand Down Expand Up @@ -672,7 +672,7 @@ func BenchmarkLevelIterNext(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -706,7 +706,7 @@ func BenchmarkLevelIterPrev(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
2 changes: 1 addition & 1 deletion merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters, levelSlices[i].Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
l.initRangeDel(&mils[level].rangeDelIter)
l.initBoundaryContext(&mils[level].levelIterBoundaryContext)
mils[level].iter = l
Expand Down
18 changes: 8 additions & 10 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manual"
Expand Down Expand Up @@ -400,10 +399,9 @@ type blockIter struct {
// For a block encoded with a restart interval of 1, cached and cachedBuf
// will not be used as there are no prefix compressed entries between the
// restart points.
cached []blockEntry
cachedBuf []byte
cacheHandle cache.Handle
// The first user key in the block. This is used by the caller to set bounds
cached []blockEntry
cachedBuf []byte
handle bufferHandle
// for block iteration for already loaded blocks.
firstUserKey []byte
lazyValueHandling struct {
Expand Down Expand Up @@ -458,10 +456,10 @@ func (i *blockIter) init(
// ingested.
// - Foreign sstable iteration: globalSeqNum is always set.
func (i *blockIter) initHandle(
cmp Compare, block cache.Handle, globalSeqNum uint64, hideObsoletePoints bool,
cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool,
) error {
i.cacheHandle.Release()
i.cacheHandle = block
i.handle.Release()
i.handle = block
return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints)
}

Expand Down Expand Up @@ -1515,8 +1513,8 @@ func (i *blockIter) Error() error {
// Close implements internalIterator.Close, as documented in the pebble
// package.
func (i *blockIter) Close() error {
i.cacheHandle.Release()
i.cacheHandle = cache.Handle{}
i.handle.Release()
i.handle = bufferHandle{}
i.val = nil
i.lazyValue = base.LazyValue{}
i.lazyValueHandling.vbr = nil
Expand Down
2 changes: 1 addition & 1 deletion sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
// block that bhp points to, along with its block properties.
if twoLevelIndex {
subiter := &blockIter{}
subIndex, err := r.readBlock(context.Background(), bhp.BlockHandle, nil, nil, nil)
subIndex, err := r.readBlock(context.Background(), bhp.BlockHandle, nil, nil, nil, nil)
if err != nil {
return err.Error()
}
Expand Down
139 changes: 139 additions & 0 deletions sstable/buffer_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use
// of this source code is governed by a BSD-style license that can be found in
// the LICENSE file.

package sstable

import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/cache"
)

// A bufferHandle is a handle to manually-managed memory. The handle may point
// to a block in the block cache (h.Get() != nil), or a buffer that exists
// outside the block cache allocated from a BufferPool (b.Valid()).
type bufferHandle struct {
h cache.Handle
b Buf
}

// Get retrieves the underlying buffer referenced by the handle.
func (bh bufferHandle) Get() []byte {
if v := bh.h.Get(); v != nil {
return v
} else if bh.b.p != nil {
return bh.b.p.pool[bh.b.i].b
}
return nil
}

// Release releases the buffer, either back to the block cache or BufferPool.
func (bh bufferHandle) Release() {
bh.h.Release()
bh.b.Release()
}

// A BufferPool holds a pool of buffers for holding sstable blocks. An initial
// size of the pool is provided on Init, but a BufferPool will grow to meet the
// largest working set size. It'll never shrink. When a buffer is released, the
// BufferPool recycles the buffer for future allocations.
//
// A BufferPool should only be used for short-lived allocations with
// well-understood working set sizes to avoid excessive memory consumption.
//
// BufferPool is not thread-safe.
type BufferPool struct {
// pool contains all the buffers held by the pool, including buffers that
// are in-use. For every i < len(pool): pool[i].v is non-nil.
pool []allocedBuffer
}

type allocedBuffer struct {
v *cache.Value
// b holds the current byte slice. It's backed by v, but may be a subslice
// of v's memory while the buffer is in-use [ len(b) ≤ len(v.Buf()) ].
//
// If the buffer is not currently in-use, b is nil. When being recycled, the
// BufferPool.Alloc will reset b to be a subslice of v.Buf().
b []byte
}

// Init initializes the pool with an initial working set buffer size of
// `initialSize`.
func (p *BufferPool) Init(initialSize int) {
*p = BufferPool{
pool: make([]allocedBuffer, 0, initialSize),
}
}

// Release releases all buffers held by the pool and resets the pool to an
// uninitialized state.
func (p *BufferPool) Release() {
for i := range p.pool {
if p.pool[i].b != nil {
panic(errors.AssertionFailedf("Release called on a BufferPool with in-use buffers"))
}
cache.Free(p.pool[i].v)
}
*p = BufferPool{}
}

// Alloc allocates a new buffer of size n. If the pool already holds a buffer at
// least as large as n, the pooled buffer is used instead.
//
// Alloc is O(MAX(N,M)) where N is the largest number of concurrently in-use
// buffers allocated and M is the initialSize passed to Init.
func (p *BufferPool) Alloc(n int) Buf {
unusableBufferIdx := -1
for i := 0; i < len(p.pool); i++ {
if p.pool[i].b == nil {
if len(p.pool[i].v.Buf()) >= n {
p.pool[i].b = p.pool[i].v.Buf()[:n]
return Buf{p: p, i: i}
}
unusableBufferIdx = i
}
}

// If we would need to grow the size of the pool to allocate another buffer,
// but there was a slot available occupied by a buffer that's just too
// small, replace the too-small buffer.
if len(p.pool) == cap(p.pool) && unusableBufferIdx >= 0 {
i := unusableBufferIdx
cache.Free(p.pool[i].v)
p.pool[i].v = cache.Alloc(n)
p.pool[i].b = p.pool[i].v.Buf()
return Buf{p: p, i: i}
}

// Allocate a new buffer.
v := cache.Alloc(n)
p.pool = append(p.pool, allocedBuffer{v: v, b: v.Buf()[:n]})
return Buf{p: p, i: len(p.pool) - 1}
}

// A Buf holds a reference to a manually-managed, pooled byte buffer.
type Buf struct {
p *BufferPool
// i holds the index into p.pool where the buffer may be found. This scheme
// avoids needing to allocate the handle to the buffer on the heap at the
// cost of copying two words instead of one.
i int
}

// Valid returns true if the buf holds a valid buffer.
func (b Buf) Valid() bool {
return b.p != nil
}

// Release releases the buffer back to the pool.
func (b *Buf) Release() {
if b.p == nil {
return
}
// Clear the allocedBuffer's byte slice. This signals the allocated buffer
// is no longer in use and a future call to BufferPool.Alloc may reuse this
// buffer.
b.p.pool[b.i].b = nil
b.p = nil
}

0 comments on commit 03c97cd

Please sign in to comment.