Skip to content

Commit 207e6cd

Browse files
committed
block: remove Buffer
The `block.Buffer` type does very little and is barely used. Use `TempBuffer` directly instead.
1 parent a4f8e7e commit 207e6cd

File tree

7 files changed

+80
-120
lines changed

7 files changed

+80
-120
lines changed

sstable/blob/blob.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ type FileWriter struct {
105105
stats FileWriterStats
106106
flushGov block.FlushGovernor
107107
checksummer block.Checksummer
108-
compression block.Compression
108+
compressor block.Compressor
109109
cpuMeasurer base.CPUMeasurer
110110
writeQueue struct {
111111
wg sync.WaitGroup
@@ -130,7 +130,7 @@ func NewFileWriter(fn base.DiskFileNum, w objstorage.Writable, opts FileWriterOp
130130
fw.flushGov = opts.FlushGovernor
131131
fw.indexEncoder.Init()
132132
fw.checksummer = block.Checksummer{Type: opts.ChecksumType}
133-
fw.compression = opts.Compression
133+
fw.compressor = block.GetCompressor(opts.Compression)
134134
fw.cpuMeasurer = opts.CpuMeasurer
135135
fw.writeQueue.ch = make(chan compressedBlock)
136136
fw.writeQueue.wg.Add(1)
@@ -212,7 +212,7 @@ func (w *FileWriter) flush() {
212212
if w.valuesEncoder.Count() == 0 {
213213
panic(errors.AssertionFailedf("no values to flush"))
214214
}
215-
pb, bh := block.CompressAndChecksumBufHandle(w.valuesEncoder.Finish(), w.compression, &w.checksummer)
215+
pb, bh := block.CompressAndChecksumToTempBuffer(w.valuesEncoder.Finish(), w.compressor, &w.checksummer)
216216
compressedLen := uint64(pb.LengthWithoutTrailer())
217217
w.stats.BlockCount++
218218
off := w.stats.FileLen

sstable/block/block.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,10 @@ type Checksummer struct {
144144
blockTypeBuf [1]byte
145145
}
146146

147+
func (c *Checksummer) Init(typ ChecksumType) {
148+
c.Type = typ
149+
}
150+
147151
// Checksum computes a checksum over the provided block and block type.
148152
func (c *Checksummer) Checksum(block []byte, blockType byte) (checksum uint32) {
149153
// Calculate the checksum.

sstable/block/compression.go

Lines changed: 42 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -252,17 +252,6 @@ func (b *PhysicalBlock) WriteTo(w objstorage.Writable) (n int, err error) {
252252
return len(b.data) + len(b.trailer), nil
253253
}
254254

255-
// CompressAndChecksumBufHandle is like CompressAndChecksum, but it will
256-
// retrieve a buffer for the compressed block from a sync.Pool and return a
257-
// *TempBuffer that the caller can use to release the buffer back to the pool.
258-
func CompressAndChecksumBufHandle(
259-
blockData []byte, compression Compression, checksummer *Checksummer,
260-
) (PhysicalBlock, *TempBuffer) {
261-
compressedBuf := NewTempBuffer()
262-
pb := CompressAndChecksum(&compressedBuf.b, blockData, compression, checksummer)
263-
return pb, compressedBuf
264-
}
265-
266255
// CompressAndChecksum compresses and checksums the provided block, returning
267256
// the compressed block and its trailer. The result is appended to the dst
268257
// argument.
@@ -299,108 +288,26 @@ func CompressAndChecksumWithCompressor(
299288
return pb
300289
}
301290

302-
// A Buffer is a buffer for encoding a block. The caller mutates the buffer to
303-
// construct the uncompressed block, and calls CompressAndChecksum to produce
304-
// the physical, possibly-compressed PhysicalBlock. A Buffer recycles byte
305-
// slices used in construction of the uncompressed block and the compressed
306-
// physical block.
307-
type Buffer struct {
308-
h *TempBuffer
309-
compression Compression
310-
checksummer Checksummer
311-
}
312-
313-
// Init configures the BlockBuffer with the specified compression and checksum
314-
// type.
315-
func (b *Buffer) Init(compression Compression, checksumType ChecksumType) {
316-
b.h = NewTempBuffer()
317-
b.h.b = b.h.b[:0]
318-
b.compression = compression
319-
b.checksummer.Type = checksumType
320-
}
321-
322-
// Checksummer returns the Checksummer for the Buffer.
323-
func (b *Buffer) Checksummer() *Checksummer {
324-
return &b.checksummer
325-
}
326-
327-
// Get returns the byte slice currently backing the Buffer.
328-
func (b *Buffer) Get() []byte {
329-
return b.h.b
330-
}
331-
332-
// Size returns the current size of the buffer.
333-
func (b *Buffer) Size() int {
334-
return len(b.h.b)
335-
}
336-
337-
// Append appends the contents of v to the buffer, returning the offset at which
338-
// it was appended, growing the buffer if necessary.
339-
func (b *Buffer) Append(v []byte) int {
340-
// We may need to grow b.Buffer to accommodate the new value. If necessary,
341-
// double the size of the buffer until it's sufficiently large.
342-
off := len(b.h.b)
343-
newLen := off + len(v)
344-
if cap(b.h.b) < newLen {
345-
size := max(2*cap(b.h.b), 1024)
346-
for size < newLen {
347-
size *= 2
348-
}
349-
b.h.b = slices.Grow(b.h.b, size-len(b.h.b))
350-
}
351-
b.h.b = b.h.b[:newLen]
352-
if n := copy(b.h.b[off:], v); n != len(v) {
353-
panic("incorrect length computation")
354-
}
355-
return off
356-
}
357-
358-
// Resize resizes the buffer to the specified length, allocating if necessary.
359-
func (b *Buffer) Resize(length int) {
360-
if length > cap(b.h.b) {
361-
b.h.b = slices.Grow(b.h.b, length-len(b.h.b))
362-
}
363-
b.h.b = b.h.b[:length]
364-
}
365-
366-
// CompressAndChecksum compresses and checksums the block data, returning a
367-
// PhysicalBlock that is owned by the caller. The returned PhysicalBlock's
368-
// memory is backed by the returned TempBuffer. If non-nil, the returned
369-
// TempBuffer may be Released once the caller is done with the physical block to
370-
// recycle the block's underlying memory.
371-
//
372-
// When CompressAndChecksum returns, the callee has been reset and is ready to
373-
// be reused.
374-
func (b *Buffer) CompressAndChecksum() (PhysicalBlock, *TempBuffer) {
291+
// CompressAndChecksumToTempBuffer compresses and checksums the provided block
292+
// into a TempBuffer. The caller should Release() the TempBuffer once it is no
293+
// longer necessary.
294+
func CompressAndChecksumToTempBuffer(
295+
blockData []byte, compressor Compressor, checksummer *Checksummer,
296+
) (PhysicalBlock, *TempBuffer) {
375297
// Grab a buffer to use as the destination for compression.
376298
compressedBuf := NewTempBuffer()
377-
pb := CompressAndChecksum(&compressedBuf.b, b.h.b, b.compression, &b.checksummer)
378-
b.h.b = b.h.b[:0]
299+
pb := CompressAndChecksumWithCompressor(&compressedBuf.b, blockData, compressor, checksummer)
379300
return pb, compressedBuf
380301
}
381302

382-
// SetCompression changes the compression algorithm used by CompressAndChecksum.
383-
func (b *Buffer) SetCompression(compression Compression) {
384-
b.compression = compression
385-
}
386-
387-
// Release may be called when a buffer will no longer be used. It releases to
388-
// pools any memory held by the Buffer so that it may be reused.
389-
func (b *Buffer) Release() {
390-
if b.h != nil {
391-
b.h.Release()
392-
b.h = nil
393-
}
394-
}
395-
396303
// TempBuffer is a buffer that is used temporarily and is released back to a
397304
// pool for reuse.
398305
type TempBuffer struct {
399306
b []byte
400307
}
401308

402-
// NewTempBuffer returns a TempBuffer from the pool. TempBuffer.b will have zero
403-
// length and arbitrary capacity.
309+
// NewTempBuffer returns a TempBuffer from the pool. The buffer will have zero
310+
// size and length and arbitrary capacity.
404311
func NewTempBuffer() *TempBuffer {
405312
tb := tempBufferPool.Get().(*TempBuffer)
406313
if invariants.Enabled && len(tb.b) > 0 {
@@ -409,6 +316,39 @@ func NewTempBuffer() *TempBuffer {
409316
return tb
410317
}
411318

319+
// Data returns the byte slice currently backing the Buffer.
320+
func (tb *TempBuffer) Data() []byte {
321+
return tb.b
322+
}
323+
324+
// Size returns the current size of the buffer.
325+
func (tb *TempBuffer) Size() int {
326+
return len(tb.b)
327+
}
328+
329+
// Append appends the contents of v to the buffer, growing the buffer if
330+
// necessary. Returns the offset at which it was appended.
331+
func (tb *TempBuffer) Append(v []byte) (startOffset int) {
332+
startOffset = len(tb.b)
333+
tb.b = append(tb.b, v...)
334+
return startOffset
335+
}
336+
337+
// Resize resizes the buffer to the specified length, allocating if necessary.
338+
// If the length is longer than the current length, the values of the new bytes
339+
// are arbitrary.
340+
func (tb *TempBuffer) Resize(length int) {
341+
if length > cap(tb.b) {
342+
tb.b = slices.Grow(tb.b, length-len(tb.b))
343+
}
344+
tb.b = tb.b[:length]
345+
}
346+
347+
// Reset is equivalent to Resize(0).
348+
func (tb *TempBuffer) Reset() {
349+
tb.b = tb.b[:0]
350+
}
351+
412352
// Release releases the buffer back to the pool for reuse.
413353
func (tb *TempBuffer) Release() {
414354
// Note we avoid releasing buffers that are larger than the configured

sstable/block/compression_test.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ func TestBufferRandomized(t *testing.T) {
1818
t.Logf("seed %d", seed)
1919
rng := rand.New(rand.NewPCG(0, seed))
2020

21-
var b Buffer
22-
b.Init(SnappyCompression, ChecksumTypeCRC32c)
21+
compressor := GetCompressor(SnappyCompression)
22+
defer compressor.Close()
23+
var checksummer Checksummer
24+
checksummer.Init(ChecksumTypeCRC32c)
25+
b := NewTempBuffer()
2326
defer b.Release()
2427
vbuf := make([]byte, 0, 1<<10) // 1 KiB
2528

@@ -28,7 +31,7 @@ func TestBufferRandomized(t *testing.T) {
2831
// Randomly release and reinitialize the buffer.
2932
if rng.IntN(5) == 1 {
3033
b.Release()
31-
b.Init(SnappyCompression, ChecksumTypeCRC32c)
34+
b = NewTempBuffer()
3235
}
3336

3437
aggregateSizeOfKVs := rng.IntN(4<<20-(1<<10)) + 1<<10 // [1 KiB, 4 MiB)
@@ -46,10 +49,11 @@ func TestBufferRandomized(t *testing.T) {
4649
b.Append(vbuf)
4750
size += vlen
4851
require.Equal(t, size, b.Size())
49-
s := b.Get()
52+
s := b.Data()
5053
require.Equal(t, vbuf, s[len(s)-len(vbuf):])
5154
}
52-
_, bh := b.CompressAndChecksum()
55+
_, bh := CompressAndChecksumToTempBuffer(b.Data(), compressor, &checksummer)
56+
b.Reset()
5357
bh.Release()
5458
})
5559
}

sstable/block/compressor.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ func (c *Compressor) Close() {
3838
*c = Compressor{}
3939
}
4040

41+
// NoopCompressor is a Compressor that does not compress data. It does not have
42+
// any state and can be used in parallel.
43+
var NoopCompressor = Compressor{
44+
algorithm: compression.NoCompression,
45+
compressor: compression.GetCompressor(compression.None),
46+
}
47+
4148
type Decompressor = compression.Decompressor
4249

4350
func GetDecompressor(c CompressionIndicator) Decompressor {

sstable/layout.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@ func decodeColumnarMetaIndex(
758758
}
759759

760760
// layoutWriter writes the structure of an sstable to durable storage. It
761-
// accepts serialized blocks, writes them to storage and returns a block handle
761+
// accepts serialized blocks, writes them to storage, and returns a block handle
762762
// describing the offset and length of the block.
763763
type layoutWriter struct {
764764
writable objstorage.Writable
@@ -842,7 +842,7 @@ func (w *layoutWriter) WriteIndexBlock(b []byte) (block.Handle, error) {
842842
return h, err
843843
}
844844

845-
// WriteFilterBlock finishes the provided filter, constructs a trailer and
845+
// WriteFilterBlock finishes the provided filter, constructs a trailer, and
846846
// writes the block and trailer to the writer. It automatically adds the filter
847847
// block to the file's meta index when the writer is finished.
848848
func (w *layoutWriter) WriteFilterBlock(f filterWriter) (bh block.Handle, err error) {

sstable/valblk/writer.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ type Writer struct {
1919
// Block finished callback.
2020
blockFinishedFunc func(compressedSize int)
2121

22+
compressor block.Compressor
23+
checksummer block.Checksummer
2224
// buf is the current block being written to (uncompressed).
23-
buf block.Buffer
25+
buf *block.TempBuffer
2426
// Sequence of blocks that are finished.
2527
blocks []bufferedValueBlock
2628
// Cumulative value block bytes written so far.
@@ -54,7 +56,9 @@ func NewWriter(
5456
blockFinishedFunc: blockFinishedFunc,
5557
blocks: w.blocks[:0],
5658
}
57-
w.buf.Init(compression, checksumType)
59+
w.compressor = block.GetCompressor(compression)
60+
w.checksummer.Init(checksumType)
61+
w.buf = block.NewTempBuffer()
5862
return w
5963
}
6064

@@ -87,7 +91,8 @@ func (w *Writer) Size() uint64 {
8791
}
8892

8993
func (w *Writer) compressAndFlush() {
90-
physicalBlock, bufHandle := w.buf.CompressAndChecksum()
94+
physicalBlock, bufHandle := block.CompressAndChecksumToTempBuffer(w.buf.Data(), w.compressor, &w.checksummer)
95+
w.buf.Reset()
9196
bh := block.Handle{Offset: w.totalBlockBytes, Length: uint64(physicalBlock.LengthWithoutTrailer())}
9297
w.totalBlockBytes += uint64(physicalBlock.LengthWithTrailer())
9398
// blockFinishedFunc length excludes the block trailer.
@@ -147,11 +152,10 @@ func (w *Writer) Finish(layout LayoutWriter, fileOffset uint64) (IndexHandle, Wr
147152
}
148153

149154
func (w *Writer) writeValueBlocksIndex(layout LayoutWriter, h IndexHandle) (IndexHandle, error) {
150-
w.buf.SetCompression(block.NoCompression)
151155
blockLen := h.RowWidth() * len(w.blocks)
152156
h.Handle.Length = uint64(blockLen)
153157
w.buf.Resize(blockLen)
154-
b := w.buf.Get()
158+
b := w.buf.Data()
155159
for i := range w.blocks {
156160
littleEndianPut(uint64(i), b, int(h.BlockNumByteLength))
157161
b = b[int(h.BlockNumByteLength):]
@@ -163,7 +167,7 @@ func (w *Writer) writeValueBlocksIndex(layout LayoutWriter, h IndexHandle) (Inde
163167
if len(b) != 0 {
164168
panic("incorrect length calculation")
165169
}
166-
pb, bufHandle := w.buf.CompressAndChecksum()
170+
pb, bufHandle := block.CompressAndChecksumToTempBuffer(w.buf.Data(), block.NoopCompressor, &w.checksummer)
167171
if _, err := layout.WriteValueIndexBlock(pb, h); err != nil {
168172
return IndexHandle{}, err
169173
}
@@ -179,7 +183,8 @@ func (w *Writer) Release() {
179183
w.blocks[i] = bufferedValueBlock{}
180184
}
181185
w.buf.Release()
182-
w.buf = block.Buffer{}
186+
w.buf = nil
187+
w.compressor.Close()
183188
*w = Writer{blocks: w.blocks[:0]}
184189
valueBlockWriterPool.Put(w)
185190
}
@@ -202,7 +207,7 @@ type LayoutWriter interface {
202207
// WriteValueBlock writes a pre-finished value block (with the trailer) to
203208
// the writer.
204209
WriteValueBlock(blk block.PhysicalBlock) (block.Handle, error)
205-
// WriteValueBlockIndex writes a pre-finished value block index to the
210+
// WriteValueIndexBlock writes a pre-finished value block index to the
206211
// writer.
207212
WriteValueIndexBlock(blk block.PhysicalBlock, vbih IndexHandle) (block.Handle, error)
208213
}

0 commit comments

Comments
 (0)