Skip to content

Commit a4f8e7e

Browse files
committed
block: rename BufHandle to TempBuffer, use single pool
Using two separate temp buffer pools for compressed and uncompressed data seems unnecessary and likely leads to more memory usage. Switch to a single temp buffer pool (which simplifies the code) and rename `BufHandle` to `TempBuffer`.
1 parent 842c8ef commit a4f8e7e

File tree

3 files changed

+42
-80
lines changed

3 files changed

+42
-80
lines changed

sstable/blob/blob.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ type FileWriter struct {
116116

117117
type compressedBlock struct {
118118
pb block.PhysicalBlock
119-
bh *block.BufHandle
119+
bh *block.TempBuffer
120120
off uint64
121121
}
122122

sstable/block/compression.go

Lines changed: 40 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package block
66

77
import (
8-
"math/rand/v2"
98
"slices"
109
"sync"
1110

@@ -255,11 +254,11 @@ func (b *PhysicalBlock) WriteTo(w objstorage.Writable) (n int, err error) {
255254

256255
// CompressAndChecksumBufHandle is like CompressAndChecksum, but it will
257256
// retrieve a buffer for the compressed block from a sync.Pool and return a
258-
// *BufHandle that the caller can use to release the buffer back to the pool.
257+
// *TempBuffer that the caller can use to release the buffer back to the pool.
259258
func CompressAndChecksumBufHandle(
260259
blockData []byte, compression Compression, checksummer *Checksummer,
261-
) (PhysicalBlock, *BufHandle) {
262-
compressedBuf := compressedBuffers.Get()
260+
) (PhysicalBlock, *TempBuffer) {
261+
compressedBuf := NewTempBuffer()
263262
pb := CompressAndChecksum(&compressedBuf.b, blockData, compression, checksummer)
264263
return pb, compressedBuf
265264
}
@@ -306,15 +305,15 @@ func CompressAndChecksumWithCompressor(
306305
// slices used in construction of the uncompressed block and the compressed
307306
// physical block.
308307
type Buffer struct {
309-
h *BufHandle
308+
h *TempBuffer
310309
compression Compression
311310
checksummer Checksummer
312311
}
313312

314313
// Init configures the BlockBuffer with the specified compression and checksum
315314
// type.
316315
func (b *Buffer) Init(compression Compression, checksumType ChecksumType) {
317-
b.h = uncompressedBuffers.Get()
316+
b.h = NewTempBuffer()
318317
b.h.b = b.h.b[:0]
319318
b.compression = compression
320319
b.checksummer.Type = checksumType
@@ -366,15 +365,15 @@ func (b *Buffer) Resize(length int) {
366365

367366
// CompressAndChecksum compresses and checksums the block data, returning a
368367
// PhysicalBlock that is owned by the caller. The returned PhysicalBlock's
369-
// memory is backed by the returned BufHandle. If non-nil, the returned
370-
// BufHandle may be Released once the caller is done with the physical block to
368+
// memory is backed by the returned TempBuffer. If non-nil, the returned
369+
// TempBuffer may be Released once the caller is done with the physical block to
371370
// recycle the block's underlying memory.
372371
//
373372
// When CompressAndChecksum returns, the callee has been reset and is ready to
374373
// be reused.
375-
func (b *Buffer) CompressAndChecksum() (PhysicalBlock, *BufHandle) {
374+
func (b *Buffer) CompressAndChecksum() (PhysicalBlock, *TempBuffer) {
376375
// Grab a buffer to use as the destination for compression.
377-
compressedBuf := compressedBuffers.Get()
376+
compressedBuf := NewTempBuffer()
378377
pb := CompressAndChecksum(&compressedBuf.b, b.h.b, b.compression, &b.checksummer)
379378
b.h.b = b.h.b[:0]
380379
return pb, compressedBuf
@@ -394,83 +393,46 @@ func (b *Buffer) Release() {
394393
}
395394
}
396395

397-
// BufHandle is a handle to a buffer that can be released to a pool for reuse.
398-
type BufHandle struct {
399-
pool *bufferSyncPool
400-
b []byte
396+
// TempBuffer is a buffer that is used temporarily and is released back to a
397+
// pool for reuse.
398+
type TempBuffer struct {
399+
b []byte
401400
}
402401

403-
// Release releases the buffer back to the pool for reuse.
404-
func (h *BufHandle) Release() {
405-
if invariants.Enabled && (h.pool == nil) != (h.b == nil) {
406-
panic(errors.AssertionFailedf("pool (%t) and buffer (%t) nilness disagree", h.pool == nil, h.b == nil))
407-
}
408-
if invariants.Enabled && (h.pool != nil && h.pool.Max == 0) {
409-
panic(errors.AssertionFailedf("pool has no maximum size"))
402+
// NewTempBuffer returns a TempBuffer from the pool. TempBuffer.b will have zero
403+
// length and arbitrary capacity.
404+
func NewTempBuffer() *TempBuffer {
405+
tb := tempBufferPool.Get().(*TempBuffer)
406+
if invariants.Enabled && len(tb.b) > 0 {
407+
panic("NewTempBuffer length not 0")
410408
}
409+
return tb
410+
}
411+
412+
// Release releases the buffer back to the pool for reuse.
413+
func (tb *TempBuffer) Release() {
411414
// Note we avoid releasing buffers that are larger than the configured
412415
// maximum to the pool. This avoids holding on to occasional large buffers
413-
// necessary for, for example, singlular large values.
414-
if h.b != nil && len(h.b) < h.pool.Max {
415-
if invariants.Sometimes(50) {
416-
// Set the bytes to a random value. Cap the number of bytes being
417-
// randomized to prevent test timeouts.
418-
l := min(cap(h.b), 1000)
419-
h.b = h.b[:l:l]
420-
for j := range h.b {
421-
h.b[j] = byte(rand.Uint32())
416+
// necessary for e.g. singular large values.
417+
if tb.b != nil && len(tb.b) < tempBufferMaxReusedSize {
418+
if invariants.Sometimes(20) {
419+
// Mangle the buffer data.
420+
for i := range tb.b {
421+
tb.b[i] = 0xCC
422422
}
423423
}
424-
h.pool.Put(h)
424+
tb.b = tb.b[:0]
425+
tempBufferPool.Put(tb)
425426
}
426427
}
427428

428-
var (
429-
// uncompressedBuffers is a pool of buffers that were used to store
430-
// uncompressed block data. These buffers should be sized right around the
431-
// configured block size. If multiple Pebble engines are running in the same
432-
// process, they all share a pool and the size of the buffers may vary.
433-
uncompressedBuffers = bufferSyncPool{Max: 256 << 10, Default: 4 << 10}
434-
// compressedBuffers is a pool of buffers that were used to store compressed
435-
// block data. These buffers will vary significantly in size depending on
436-
// the compressibility of the data.
437-
compressedBuffers = bufferSyncPool{Max: 128 << 10, Default: 4 << 10}
438-
)
439-
440-
// bufferSyncPool is a pool of block buffers for memory re-use.
441-
type bufferSyncPool struct {
442-
Max int
443-
Default int
444-
pool sync.Pool
429+
// tempBufferPool is a pool of buffers that are used to temporarily hold either
430+
// compressed or uncompressed block data.
431+
var tempBufferPool = sync.Pool{
432+
New: func() any {
433+
return &TempBuffer{b: make([]byte, 0, tempBufferInitialSize)}
434+
},
445435
}
446436

447-
// Put returns a buffer to the pool. While the buffer is in the pool, its pool
448-
// member is zeroed. This is used to validate invariants around double use of a
449-
// buffer.
450-
func (p *bufferSyncPool) Put(bh *BufHandle) {
451-
if bh.pool != p {
452-
panic(errors.AssertionFailedf("buffer has pool %v; trying to return it to pool %v", bh.pool, p))
453-
}
454-
bh.pool = nil
455-
p.pool.Put(bh)
456-
}
457-
458-
// Get retrieves a new buf from the pool, or allocates one of the configured
459-
// default size if the pool is empty.
460-
func (p *bufferSyncPool) Get() *BufHandle {
461-
v := p.pool.Get()
462-
if v != nil {
463-
bh := v.(*BufHandle)
464-
if bh.pool != nil {
465-
panic(errors.AssertionFailedf("buffer has a pool; was it inserted into a pool twice?"))
466-
}
467-
// Set the pool so we know where to return the buffer to.
468-
bh.pool = p
469-
return bh
470-
}
471-
if invariants.Enabled && p.Default == 0 {
472-
// Guard against accidentally forgetting to initialize a buffer sync pool.
473-
panic(errors.AssertionFailedf("buffer pool has no default size"))
474-
}
475-
return &BufHandle{b: make([]byte, 0, p.Default), pool: p}
476-
}
437+
const tempBufferInitialSize = 32 * 1024
438+
const tempBufferMaxReusedSize = 256 * 1024

sstable/valblk/writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Writer struct {
3030

3131
type bufferedValueBlock struct {
3232
block block.PhysicalBlock
33-
bufHandle *block.BufHandle
33+
bufHandle *block.TempBuffer
3434
handle block.Handle
3535
}
3636

0 commit comments

Comments
 (0)