Skip to content

Commit 4b9290a

Browse files
committed
block: rework physical block code
We change `PhysicalBlock` to always be backed by a `TempBuffer`; this simplifies the code related to creating the blocks. The trailer is now also stored directly in this buffer, and we no longer have to issue two separate `Write` calls. Note that we were already using a `TempBuffer` in the most important write paths (columnar sstable writer and blob value writer). We add a `PhysicalBlockMaker` type which encompasses the compressor and the checksummer and a single `Make()` method replaces all `CompressAndChecksum/CopyAndChecksum` variations.
1 parent f60436a commit 4b9290a

File tree

12 files changed

+322
-407
lines changed

12 files changed

+322
-407
lines changed

sstable/blob/blob.go

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,12 @@ type FileWriter struct {
102102
// index block encoding the offsets at which each block is written.
103103
// Additionally, when rewriting a blob file, the index block's virtualBlocks
104104
// column is also populated to remap blockIDs to the physical block indexes.
105-
indexEncoder indexBlockEncoder
106-
stats FileWriterStats
107-
flushGov block.FlushGovernor
108-
checksummer block.Checksummer
109-
compressor block.Compressor
110-
cpuMeasurer base.CPUMeasurer
111-
writeQueue struct {
105+
indexEncoder indexBlockEncoder
106+
stats FileWriterStats
107+
flushGov block.FlushGovernor
108+
physBlockMaker block.PhysicalBlockMaker
109+
cpuMeasurer base.CPUMeasurer
110+
writeQueue struct {
112111
wg sync.WaitGroup
113112
ch chan compressedBlock
114113
err error
@@ -117,7 +116,6 @@ type FileWriter struct {
117116

118117
type compressedBlock struct {
119118
pb block.PhysicalBlock
120-
bh *block.TempBuffer
121119
off uint64
122120
}
123121

@@ -130,8 +128,7 @@ func NewFileWriter(fn base.DiskFileNum, w objstorage.Writable, opts FileWriterOp
130128
fw.valuesEncoder.Init()
131129
fw.flushGov = opts.FlushGovernor
132130
fw.indexEncoder.Init()
133-
fw.checksummer = block.Checksummer{Type: opts.ChecksumType}
134-
fw.compressor = block.MakeCompressor(opts.Compression)
131+
fw.physBlockMaker.Init(opts.Compression, opts.ChecksumType)
135132
fw.cpuMeasurer = opts.CpuMeasurer
136133
fw.writeQueue.ch = make(chan compressedBlock)
137134
fw.writeQueue.wg.Add(1)
@@ -215,12 +212,12 @@ func (w *FileWriter) flush() {
215212
if w.valuesEncoder.Count() == 0 {
216213
panic(errors.AssertionFailedf("no values to flush"))
217214
}
218-
pb, bh := block.CompressAndChecksumToTempBuffer(w.valuesEncoder.Finish(), blockkind.BlobValue, &w.compressor, &w.checksummer)
215+
pb := w.physBlockMaker.Make(w.valuesEncoder.Finish(), blockkind.BlobValue, block.NoFlags)
219216
compressedLen := uint64(pb.LengthWithoutTrailer())
220217
w.stats.BlockCount++
221218
off := w.stats.FileLen
222219
w.stats.FileLen += compressedLen + block.TrailerLen
223-
w.writeQueue.ch <- compressedBlock{pb: pb, bh: bh, off: off}
220+
w.writeQueue.ch <- compressedBlock{pb: pb, off: off}
224221
w.valuesEncoder.Reset()
225222
}
226223

@@ -244,9 +241,8 @@ func (w *FileWriter) drainWriteQueue() {
244241
Offset: cb.off,
245242
Length: uint64(cb.pb.LengthWithoutTrailer()),
246243
})
247-
// We're done with the buffer associated with this physical block.
248-
// Release it back to its pool.
249-
cb.bh.Release()
244+
// We're done with this physical block.
245+
cb.pb.Release()
250246
}
251247
}
252248

@@ -284,14 +280,13 @@ func (w *FileWriter) Close() (FileWriterStats, error) {
284280
var indexBlockHandle block.Handle
285281
{
286282
indexBlock := w.indexEncoder.Finish()
287-
var compressedBuf []byte
288-
pb := block.CopyAndChecksum(&compressedBuf, indexBlock, blockkind.Metadata, &w.compressor, &w.checksummer)
283+
pb := w.physBlockMaker.Make(indexBlock, blockkind.Metadata, block.DontCompress)
284+
defer pb.Release()
289285
if _, w.err = pb.WriteTo(w.w); w.err != nil {
290-
err = w.err
291286
if w.w != nil {
292287
w.w.Abort()
293288
}
294-
return FileWriterStats{}, err
289+
return FileWriterStats{}, w.err
295290
}
296291
indexBlockHandle.Offset = stats.FileLen
297292
indexBlockHandle.Length = uint64(pb.LengthWithoutTrailer())
@@ -301,7 +296,7 @@ func (w *FileWriter) Close() (FileWriterStats, error) {
301296
// Write the footer.
302297
footer := fileFooter{
303298
format: FileFormatV1,
304-
checksum: w.checksummer.Type,
299+
checksum: w.physBlockMaker.Checksummer.Type,
305300
indexHandle: indexBlockHandle,
306301
originalFileNum: w.fileNum,
307302
}

sstable/block/block.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -101,20 +101,6 @@ func DecodeHandleWithProperties(src []byte) (HandleWithProperties, error) {
101101
}, nil
102102
}
103103

104-
// TrailerLen is the length of the trailer at the end of a block.
105-
const TrailerLen = 5
106-
107-
// Trailer is the trailer at the end of a block, encoding the block type
108-
// (compression) and a checksum.
109-
type Trailer = [TrailerLen]byte
110-
111-
// MakeTrailer constructs a trailer from a block type and a checksum.
112-
func MakeTrailer(blockType byte, checksum uint32) (t Trailer) {
113-
t[0] = blockType
114-
binary.LittleEndian.PutUint32(t[1:5], checksum)
115-
return t
116-
}
117-
118104
// ChecksumType specifies the checksum used for blocks.
119105
type ChecksumType byte
120106

sstable/block/compression.go

Lines changed: 0 additions & 205 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,11 @@ package block
66

77
import (
88
"runtime"
9-
"slices"
109
"strings"
11-
"sync"
1210

1311
"github.com/cockroachdb/errors"
1412
"github.com/cockroachdb/pebble/internal/base"
15-
"github.com/cockroachdb/pebble/internal/bytealloc"
1613
"github.com/cockroachdb/pebble/internal/compression"
17-
"github.com/cockroachdb/pebble/internal/invariants"
18-
"github.com/cockroachdb/pebble/objstorage"
1914
)
2015

2116
// CompressionProfile contains the parameters for compressing blocks in an
@@ -243,203 +238,3 @@ func DecompressInto(ci CompressionIndicator, compressed []byte, buf []byte) erro
243238
}
244239
return nil
245240
}
246-
247-
// PhysicalBlock represents a block (possibly compressed) as it is stored
248-
// physically on disk, including its trailer.
249-
type PhysicalBlock struct {
250-
// data contains the possibly compressed block data.
251-
data []byte
252-
trailer Trailer
253-
}
254-
255-
// NewPhysicalBlock returns a new PhysicalBlock with the provided block
256-
// data. The trailer is set from the last TrailerLen bytes of the
257-
// block. The data could be compressed.
258-
func NewPhysicalBlock(data []byte) PhysicalBlock {
259-
trailer := Trailer(data[len(data)-TrailerLen:])
260-
data = data[:len(data)-TrailerLen]
261-
return PhysicalBlock{data: data, trailer: trailer}
262-
}
263-
264-
// LengthWithTrailer returns the length of the data block, including the trailer.
265-
func (b *PhysicalBlock) LengthWithTrailer() int {
266-
return len(b.data) + TrailerLen
267-
}
268-
269-
// LengthWithoutTrailer returns the length of the data block, excluding the trailer.
270-
func (b *PhysicalBlock) LengthWithoutTrailer() int {
271-
return len(b.data)
272-
}
273-
274-
// CloneWithByteAlloc returns a deep copy of the block, using the provided
275-
// bytealloc.A to allocate memory for the new copy.
276-
func (b *PhysicalBlock) CloneWithByteAlloc(a *bytealloc.A) PhysicalBlock {
277-
var data []byte
278-
*a, data = (*a).Alloc(len(b.data))
279-
copy(data, b.data)
280-
return PhysicalBlock{
281-
data: data,
282-
trailer: b.trailer,
283-
}
284-
}
285-
286-
// Clone returns a deep copy of the block.
287-
func (b PhysicalBlock) Clone() PhysicalBlock {
288-
data := make([]byte, len(b.data))
289-
copy(data, b.data)
290-
return PhysicalBlock{data: data, trailer: b.trailer}
291-
}
292-
293-
// WriteTo writes the block (including its trailer) to the provided Writable. If
294-
// err == nil, n is the number of bytes successfully written to the Writable.
295-
//
296-
// WriteTo might mangle the block data.
297-
func (b *PhysicalBlock) WriteTo(w objstorage.Writable) (n int, err error) {
298-
if err := w.Write(b.data); err != nil {
299-
return 0, err
300-
}
301-
if err := w.Write(b.trailer[:]); err != nil {
302-
return 0, err
303-
}
304-
305-
// WriteTo is allowed to mangle the data. Mangle it ourselves some of the time
306-
// in invariant builds to catch callers that don't handle this.
307-
if invariants.Enabled && invariants.Sometimes(1) {
308-
invariants.Mangle(b.data)
309-
}
310-
return len(b.data) + len(b.trailer), nil
311-
}
312-
313-
// CompressAndChecksum compresses and checksums the provided block, returning
314-
// the compressed block and its trailer. The result is appended to the dst
315-
// argument.
316-
func CompressAndChecksum(
317-
dst *[]byte, blockData []byte, blockKind Kind, compressor *Compressor, checksummer *Checksummer,
318-
) PhysicalBlock {
319-
buf := (*dst)[:0]
320-
ci, buf := compressor.Compress(buf, blockData, blockKind)
321-
*dst = buf
322-
323-
// Calculate the checksum.
324-
pb := PhysicalBlock{data: buf}
325-
checksum := checksummer.Checksum(buf, byte(ci))
326-
pb.trailer = MakeTrailer(byte(ci), checksum)
327-
return pb
328-
}
329-
330-
// CopyAndChecksum copies the provided block (without compressing it) and
331-
// checksums it, returning the physical block. The result is appended to the dst
332-
// argument.
333-
//
334-
// Note that we still need to provide a Compressor so we can inform it of the
335-
// uncompressed block (for statistics).
336-
func CopyAndChecksum(
337-
dst *[]byte, blockData []byte, blockKind Kind, compressor *Compressor, checksummer *Checksummer,
338-
) PhysicalBlock {
339-
buf := *dst
340-
buf = append(buf[:0], blockData...)
341-
*dst = buf
342-
343-
// Calculate the checksum.
344-
pb := PhysicalBlock{data: buf}
345-
checksum := checksummer.Checksum(buf, byte(NoCompressionIndicator))
346-
pb.trailer = MakeTrailer(byte(NoCompressionIndicator), checksum)
347-
compressor.UncompressedBlock(len(blockData), blockKind)
348-
return pb
349-
}
350-
351-
// CompressAndChecksumToTempBuffer compresses and checksums the provided block
352-
// into a TempBuffer. The caller should Release() the TempBuffer once it is no
353-
// longer necessary.
354-
func CompressAndChecksumToTempBuffer(
355-
blockData []byte, blockKind Kind, compressor *Compressor, checksummer *Checksummer,
356-
) (PhysicalBlock, *TempBuffer) {
357-
// Grab a buffer to use as the destination for compression.
358-
compressedBuf := NewTempBuffer()
359-
pb := CompressAndChecksum(&compressedBuf.b, blockData, blockKind, compressor, checksummer)
360-
return pb, compressedBuf
361-
}
362-
363-
// CopyAndChecksumToTempBuffer copies (without compressing) and checksums
364-
// the provided block into a TempBuffer. The caller should Release() the
365-
// TempBuffer once it is no longer necessary.
366-
func CopyAndChecksumToTempBuffer(
367-
blockData []byte, blockKind Kind, compressor *Compressor, checksummer *Checksummer,
368-
) (PhysicalBlock, *TempBuffer) {
369-
// Grab a buffer to use as the destination for compression.
370-
compressedBuf := NewTempBuffer()
371-
pb := CopyAndChecksum(&compressedBuf.b, blockData, blockKind, compressor, checksummer)
372-
return pb, compressedBuf
373-
}
374-
375-
// TempBuffer is a buffer that is used temporarily and is released back to a
376-
// pool for reuse.
377-
type TempBuffer struct {
378-
b []byte
379-
}
380-
381-
// NewTempBuffer returns a TempBuffer from the pool. The buffer will have zero
382-
// size and length and arbitrary capacity.
383-
func NewTempBuffer() *TempBuffer {
384-
tb := tempBufferPool.Get().(*TempBuffer)
385-
if invariants.Enabled && len(tb.b) > 0 {
386-
panic("NewTempBuffer length not 0")
387-
}
388-
return tb
389-
}
390-
391-
// Data returns the byte slice currently backing the Buffer.
392-
func (tb *TempBuffer) Data() []byte {
393-
return tb.b
394-
}
395-
396-
// Size returns the current size of the buffer.
397-
func (tb *TempBuffer) Size() int {
398-
return len(tb.b)
399-
}
400-
401-
// Append appends the contents of v to the buffer, growing the buffer if
402-
// necessary. Returns the offset at which it was appended.
403-
func (tb *TempBuffer) Append(v []byte) (startOffset int) {
404-
startOffset = len(tb.b)
405-
tb.b = append(tb.b, v...)
406-
return startOffset
407-
}
408-
409-
// Resize resizes the buffer to the specified length, allocating if necessary.
410-
// If the length is longer than the current length, the values of the new bytes
411-
// are arbitrary.
412-
func (tb *TempBuffer) Resize(length int) {
413-
if length > cap(tb.b) {
414-
tb.b = slices.Grow(tb.b, length-len(tb.b))
415-
}
416-
tb.b = tb.b[:length]
417-
}
418-
419-
// Reset is equivalent to Resize(0).
420-
func (tb *TempBuffer) Reset() {
421-
tb.b = tb.b[:0]
422-
}
423-
424-
// Release releases the buffer back to the pool for reuse.
425-
func (tb *TempBuffer) Release() {
426-
// Note we avoid releasing buffers that are larger than the configured
427-
// maximum to the pool. This avoids holding on to occasional large buffers
428-
// necessary for e.g. singular large values.
429-
if tb.b != nil && len(tb.b) < tempBufferMaxReusedSize {
430-
invariants.MaybeMangle(tb.b)
431-
tb.b = tb.b[:0]
432-
tempBufferPool.Put(tb)
433-
}
434-
}
435-
436-
// tempBufferPool is a pool of buffers that are used to temporarily hold either
437-
// compressed or uncompressed block data.
438-
var tempBufferPool = sync.Pool{
439-
New: func() any {
440-
return &TempBuffer{b: make([]byte, 0, tempBufferInitialSize)}
441-
},
442-
}
443-
444-
const tempBufferInitialSize = 32 * 1024
445-
const tempBufferMaxReusedSize = 256 * 1024

sstable/block/compression_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@ func TestBufferRandomized(t *testing.T) {
1919
t.Logf("seed %d", seed)
2020
rng := rand.New(rand.NewPCG(0, seed))
2121

22-
compressor := MakeCompressor(SnappyCompression)
23-
defer compressor.Close()
24-
var checksummer Checksummer
25-
checksummer.Init(ChecksumTypeCRC32c)
22+
var physBlockMaker PhysicalBlockMaker
23+
physBlockMaker.Init(SnappyCompression, ChecksumTypeCRC32c)
24+
defer physBlockMaker.Close()
2625
b := NewTempBuffer()
2726
defer b.Release()
2827
vbuf := make([]byte, 0, 1<<10) // 1 KiB
@@ -53,7 +52,7 @@ func TestBufferRandomized(t *testing.T) {
5352
s := b.Data()
5453
require.Equal(t, vbuf, s[len(s)-len(vbuf):])
5554
}
56-
_, bh := CompressAndChecksumToTempBuffer(b.Data(), blockkind.SSTableData, &compressor, &checksummer)
55+
bh := physBlockMaker.Make(b.Data(), blockkind.SSTableData, NoFlags)
5756
b.Reset()
5857
bh.Release()
5958
})

0 commit comments

Comments
 (0)