Skip to content

Commit b99e8be

Browse files
committed
block: introduce CompressionProfile
CompressionProfile is intended as a replacement for Compression; it allows using different compressors for data vs other blocks, and it allows tuning of the minimum reduction percentage (previously hardcoded to 12.5%).
1 parent a2a8803 commit b99e8be

File tree

12 files changed

+210
-148
lines changed

12 files changed

+210
-148
lines changed

sstable/blob/blob.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func NewFileWriter(fn base.DiskFileNum, w objstorage.Writable, opts FileWriterOp
131131
fw.flushGov = opts.FlushGovernor
132132
fw.indexEncoder.Init()
133133
fw.checksummer = block.Checksummer{Type: opts.ChecksumType}
134-
fw.compressor = block.MakeCompressor(opts.Compression)
134+
fw.compressor = block.MakeCompressor(opts.Compression.ToProfile())
135135
fw.cpuMeasurer = opts.CpuMeasurer
136136
fw.writeQueue.ch = make(chan compressedBlock)
137137
fw.writeQueue.wg.Add(1)
@@ -213,7 +213,7 @@ func (w *FileWriter) flush() {
213213
if w.valuesEncoder.Count() == 0 {
214214
panic(errors.AssertionFailedf("no values to flush"))
215215
}
216-
pb, bh := block.CompressAndChecksumToTempBuffer(w.valuesEncoder.Finish(), &w.compressor, &w.checksummer)
216+
pb, bh := block.CompressAndChecksumToTempBuffer(w.valuesEncoder.Finish(), blockkind.BlobValue, &w.compressor, &w.checksummer)
217217
compressedLen := uint64(pb.LengthWithoutTrailer())
218218
w.stats.BlockCount++
219219
off := w.stats.FileLen
@@ -283,7 +283,7 @@ func (w *FileWriter) Close() (FileWriterStats, error) {
283283
{
284284
indexBlock := w.indexEncoder.Finish()
285285
var compressedBuf []byte
286-
pb := block.CompressAndChecksum(&compressedBuf, indexBlock, block.NoopCompressor, &w.checksummer)
286+
pb := block.CompressAndChecksum(&compressedBuf, indexBlock, blockkind.Metadata, block.NoopCompressor, &w.checksummer)
287287
if _, w.err = pb.WriteTo(w.w); w.err != nil {
288288
err = w.err
289289
if w.w != nil {

sstable/block/compression.go

Lines changed: 51 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,16 @@ const (
3131
NCompression
3232
)
3333

34-
var setting = [...]compression.Setting{
35-
DefaultCompression: compression.Snappy,
36-
NoCompression: compression.None,
37-
SnappyCompression: compression.Snappy,
38-
ZstdCompression: compression.ZstdLevel3,
39-
MinLZCompression: compression.MinLZFastest,
34+
var profiles = [...]CompressionProfile{
35+
DefaultCompression: SimpleCompressionProfile(DefaultCompression.String(), compression.Snappy),
36+
NoCompression: SimpleCompressionProfile(NoCompression.String(), compression.None),
37+
SnappyCompression: SimpleCompressionProfile(SnappyCompression.String(), compression.Snappy),
38+
ZstdCompression: SimpleCompressionProfile(ZstdCompression.String(), compression.ZstdLevel3),
39+
MinLZCompression: SimpleCompressionProfile(MinLZCompression.String(), compression.MinLZFastest),
4040
}
4141

42-
func (c Compression) setting() compression.Setting {
43-
return setting[c]
42+
func (c Compression) ToProfile() *CompressionProfile {
43+
return &profiles[c]
4444
}
4545

4646
// String implements fmt.Stringer, returning a human-readable name for the
@@ -81,6 +81,45 @@ func CompressionFromString(s string) Compression {
8181
}
8282
}
8383

84+
// CompressionProfile contains the parameters for compressing blocks in an
85+
// sstable or blob file.
86+
//
87+
// CompressionProfile is a more advanced successor to Compression.
88+
type CompressionProfile struct {
89+
Name string
90+
91+
// DataBlocks applies to sstable data and value blocks, as well as blob file
92+
// value blocks. OtherBlocks applies to all other blocks (such as index,
93+
// filter, metadata blocks).
94+
//
95+
// Some blocks (like rangedel) never use compression; this is at the
96+
// discretion of the sstable or blob file writer.
97+
//
98+
// Note that MinLZ is only supported with table formats v6+. Older formats
99+
// fall back to Snappy.
100+
DataBlocks compression.Setting
101+
OtherBlocks compression.Setting
102+
103+
// Blocks that are reduced by less than this percentage are stored
104+
// uncompressed.
105+
MinReductionPercent uint8
106+
107+
// TODO(radu): knobs for adaptive compression go here.
108+
}
109+
110+
// SimpleCompressionProfile returns a CompressionProfile that uses the same
111+
// compression setting for all blocks and which uses the uncompressed block if
112+
// compression reduces it by less than 12%. This is similar to older Pebble
113+
// versions which used Compression.
114+
func SimpleCompressionProfile(name string, setting compression.Setting) CompressionProfile {
115+
return CompressionProfile{
116+
Name: name,
117+
DataBlocks: setting,
118+
OtherBlocks: setting,
119+
MinReductionPercent: 12,
120+
}
121+
}
122+
84123
// CompressionIndicator is the byte stored physically within the block.Trailer
85124
// to indicate the compression type.
86125
//
@@ -260,17 +299,10 @@ func (b *PhysicalBlock) WriteTo(w objstorage.Writable) (n int, err error) {
260299
// the compressed payload is discarded and the original, uncompressed block data
261300
// is used to avoid unnecessary decompression overhead at read time.
262301
func CompressAndChecksum(
263-
dst *[]byte, blockData []byte, compressor *Compressor, checksummer *Checksummer,
302+
dst *[]byte, blockData []byte, blockKind Kind, compressor *Compressor, checksummer *Checksummer,
264303
) PhysicalBlock {
265304
buf := (*dst)[:0]
266-
// Compress the buffer, discarding the result if the improvement isn't at
267-
// least 12.5%.
268-
ci, buf := compressor.Compress(buf, blockData)
269-
if len(buf) >= len(blockData)-len(blockData)/8 && ci != NoCompressionIndicator {
270-
ci = NoCompressionIndicator
271-
buf = append(buf[:0], blockData...)
272-
}
273-
305+
ci, buf := compressor.Compress(buf, blockData, blockKind)
274306
*dst = buf
275307

276308
// Calculate the checksum.
@@ -284,11 +316,11 @@ func CompressAndChecksum(
284316
// into a TempBuffer. The caller should Release() the TempBuffer once it is no
285317
// longer necessary.
286318
func CompressAndChecksumToTempBuffer(
287-
blockData []byte, compressor *Compressor, checksummer *Checksummer,
319+
blockData []byte, blockKind Kind, compressor *Compressor, checksummer *Checksummer,
288320
) (PhysicalBlock, *TempBuffer) {
289321
// Grab a buffer to use as the destination for compression.
290322
compressedBuf := NewTempBuffer()
291-
pb := CompressAndChecksum(&compressedBuf.b, blockData, compressor, checksummer)
323+
pb := CompressAndChecksum(&compressedBuf.b, blockData, blockKind, compressor, checksummer)
292324
return pb, compressedBuf
293325
}
294326

sstable/block/compression_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"testing"
1111
"time"
1212

13+
"github.com/cockroachdb/pebble/sstable/block/blockkind"
1314
"github.com/stretchr/testify/require"
1415
)
1516

@@ -18,7 +19,7 @@ func TestBufferRandomized(t *testing.T) {
1819
t.Logf("seed %d", seed)
1920
rng := rand.New(rand.NewPCG(0, seed))
2021

21-
compressor := MakeCompressor(SnappyCompression)
22+
compressor := MakeCompressor(SnappyCompression.ToProfile())
2223
defer compressor.Close()
2324
var checksummer Checksummer
2425
checksummer.Init(ChecksumTypeCRC32c)
@@ -52,7 +53,7 @@ func TestBufferRandomized(t *testing.T) {
5253
s := b.Data()
5354
require.Equal(t, vbuf, s[len(s)-len(vbuf):])
5455
}
55-
_, bh := CompressAndChecksumToTempBuffer(b.Data(), &compressor, &checksummer)
56+
_, bh := CompressAndChecksumToTempBuffer(b.Data(), blockkind.SSTableData, &compressor, &checksummer)
5657
b.Reset()
5758
bh.Release()
5859
})

sstable/block/compressor.go

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,49 +1,79 @@
11
package block
22

3-
import "github.com/cockroachdb/pebble/internal/compression"
3+
import (
4+
"github.com/cockroachdb/pebble/internal/compression"
5+
"github.com/cockroachdb/pebble/sstable/block/blockkind"
6+
)
47

58
// Compressor is used to compress blocks. Typical usage:
69
//
7-
// c := GetCompressor(compression)
10+
// c := MakeCompressor(profile)
811
// .. = c.Compress(..)
912
// .. = c.Compress(..)
1013
// c.Close()
1114
type Compressor struct {
12-
algorithm compression.Algorithm
13-
compressor compression.Compressor
15+
profile CompressionProfile
16+
dataBlocksCompressor compression.Compressor
17+
// otherBlocksCompressor is used for blocks that are not data blocks, such as
18+
// index blocks or metadata blocks. It can be the same object as
19+
// dataBlocksCompressor.
20+
otherBlocksCompressor compression.Compressor
1421
}
1522

16-
// MakeCompressor returns a Compressor that applies the given compression. Close
17-
// must be called when it is no longer needed.
18-
func MakeCompressor(c Compression) Compressor {
19-
s := c.setting()
20-
return Compressor{
21-
algorithm: s.Algorithm,
22-
compressor: compression.GetCompressor(s),
23+
// MakeCompressor returns a Compressor that applies the given compression
24+
// profile. Close must be called when the compressor is no longer needed.
25+
func MakeCompressor(profile *CompressionProfile) Compressor {
26+
c := Compressor{
27+
profile: *profile,
2328
}
24-
}
25-
26-
// Compress a block, appending the compressed data to dst[:0].
27-
//
28-
// In addition to the buffer, returns the algorithm that was used.
29-
func (c *Compressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
30-
ci := compressionIndicatorFromAlgorithm(c.algorithm)
31-
return ci, c.compressor.Compress(dst, src)
29+
c.dataBlocksCompressor = compression.GetCompressor(profile.DataBlocks)
30+
c.otherBlocksCompressor = c.dataBlocksCompressor
31+
if profile.OtherBlocks != profile.DataBlocks {
32+
c.otherBlocksCompressor = compression.GetCompressor(profile.OtherBlocks)
33+
}
34+
return c
3235
}
3336

3437
// Close must be called when the Compressor is no longer needed.
3538
// After Close is called, the Compressor must not be used again.
3639
func (c *Compressor) Close() {
37-
c.compressor.Close()
40+
if c.otherBlocksCompressor != c.dataBlocksCompressor {
41+
c.otherBlocksCompressor.Close()
42+
}
43+
c.dataBlocksCompressor.Close()
3844
*c = Compressor{}
3945
}
4046

47+
// Compress a block, appending the compressed data to dst[:0].
48+
//
49+
// In addition to the buffer, returns the algorithm that was used.
50+
func (c *Compressor) Compress(dst, src []byte, kind Kind) (CompressionIndicator, []byte) {
51+
setting := c.profile.DataBlocks
52+
compressor := c.dataBlocksCompressor
53+
if kind != blockkind.SSTableData && kind != blockkind.SSTableValue && kind != blockkind.BlobValue {
54+
setting = c.profile.OtherBlocks
55+
compressor = c.otherBlocksCompressor
56+
}
57+
out := compressor.Compress(dst, src)
58+
59+
// Return the original data uncompressed if the reduction is less than the
60+
// minimum, i.e.:
61+
//
62+
// after * 100
63+
// ----------- > 100 - MinReductionPercent
64+
// before
65+
if setting.Algorithm != compression.NoCompression &&
66+
int64(len(out))*100 > int64(len(src))*int64(100-c.profile.MinReductionPercent) {
67+
return NoCompressionIndicator, append(out[:0], src...)
68+
}
69+
return compressionIndicatorFromAlgorithm(setting.Algorithm), out
70+
}
71+
4172
// NoopCompressor is a Compressor that does not compress data. It does not have
4273
// any state and can be used in parallel.
43-
var NoopCompressor = &Compressor{
44-
algorithm: compression.NoCompression,
45-
compressor: compression.GetCompressor(compression.None),
46-
}
74+
var NoopCompressor = &noopCompressor
75+
76+
var noopCompressor = MakeCompressor(NoCompression.ToProfile())
4777

4878
type Decompressor = compression.Decompressor
4979

sstable/colblk_writer.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cockroachdb/pebble/objstorage"
2222
"github.com/cockroachdb/pebble/sstable/blob"
2323
"github.com/cockroachdb/pebble/sstable/block"
24+
"github.com/cockroachdb/pebble/sstable/block/blockkind"
2425
"github.com/cockroachdb/pebble/sstable/colblk"
2526
"github.com/cockroachdb/pebble/sstable/rowblk"
2627
"github.com/cockroachdb/pebble/sstable/valblk"
@@ -148,7 +149,7 @@ func newColumnarWriter(
148149
if !o.DisableValueBlocks {
149150
w.valueBlock = valblk.NewWriter(
150151
block.MakeFlushGovernor(o.BlockSize, o.BlockSizeThreshold, o.SizeClassAwareThreshold, o.AllocatorSizeClasses),
151-
w.opts.Compression, w.opts.Checksum, func(compressedSize int) {})
152+
&w.compressor, w.opts.Checksum, func(compressedSize int) {})
152153
}
153154
if o.FilterPolicy != base.NoFilterPolicy {
154155
switch o.FilterType {
@@ -194,7 +195,7 @@ func newColumnarWriter(
194195
w.cpuMeasurer = cpuMeasurer
195196
go w.drainWriteQueue()
196197

197-
w.compressor = block.MakeCompressor(w.opts.Compression)
198+
w.compressor = block.MakeCompressor(w.opts.Compression.ToProfile())
198199
return w
199200
}
200201

@@ -723,6 +724,7 @@ func (w *RawColumnWriter) enqueueDataBlock(
723724
cb.physical = block.CompressAndChecksum(
724725
&cb.blockBuf.dataBuf,
725726
serializedBlock,
727+
blockkind.SSTableData,
726728
&w.compressor,
727729
&cb.blockBuf.checksummer,
728730
)
@@ -1250,6 +1252,7 @@ func (w *RawColumnWriter) addDataBlock(b, sep []byte, bhp block.HandleWithProper
12501252
cb.physical = block.CompressAndChecksum(
12511253
&cb.blockBuf.dataBuf,
12521254
b,
1255+
blockkind.SSTableData,
12531256
&w.compressor,
12541257
&cb.blockBuf.checksummer,
12551258
)

sstable/layout.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ func makeLayoutWriter(w objstorage.Writable, opts WriterOptions) layoutWriter {
796796
writable: w,
797797
cacheOpts: opts.internal.CacheOpts,
798798
tableFormat: opts.TableFormat,
799-
compressor: block.MakeCompressor(opts.Compression),
799+
compressor: block.MakeCompressor(opts.Compression.ToProfile()),
800800
checksumType: opts.Checksum,
801801
buf: blockBuf{
802802
checksummer: block.Checksummer{Type: opts.Checksum},
@@ -822,7 +822,7 @@ func (w *layoutWriter) Abort() {
822822
// WriteDataBlock constructs a trailer for the provided data block and writes
823823
// the block and trailer to the writer. It returns the block's handle.
824824
func (w *layoutWriter) WriteDataBlock(b []byte, buf *blockBuf) (block.Handle, error) {
825-
return w.writeBlock(b, &w.compressor, buf)
825+
return w.writeBlock(b, blockkind.SSTableData, &w.compressor, buf)
826826
}
827827

828828
// WritePrecompressedDataBlock writes a pre-compressed data block and its
@@ -837,7 +837,7 @@ func (w *layoutWriter) WritePrecompressedDataBlock(blk block.PhysicalBlock) (blo
837837
// the last-written index block's handle and adds it to the file's meta index
838838
// when the writer is finished.
839839
func (w *layoutWriter) WriteIndexBlock(b []byte) (block.Handle, error) {
840-
h, err := w.writeBlock(b, &w.compressor, &w.buf)
840+
h, err := w.writeBlock(b, blockkind.SSTableIndex, &w.compressor, &w.buf)
841841
if err == nil {
842842
w.lastIndexBlockHandle = h
843843
}
@@ -852,7 +852,7 @@ func (w *layoutWriter) WriteFilterBlock(f filterWriter) (bh block.Handle, err er
852852
if err != nil {
853853
return block.Handle{}, err
854854
}
855-
return w.writeNamedBlock(b, block.NoopCompressor, f.metaName())
855+
return w.writeNamedBlock(b, blockkind.Filter, block.NoopCompressor, f.metaName())
856856
}
857857

858858
// WritePropertiesBlock constructs a trailer for the provided properties block
@@ -867,28 +867,28 @@ func (w *layoutWriter) WritePropertiesBlock(b []byte) (block.Handle, error) {
867867
if w.tableFormat < TableFormatPebblev7 {
868868
compressor = block.NoopCompressor
869869
}
870-
return w.writeNamedBlock(b, compressor, metaPropertiesName)
870+
return w.writeNamedBlock(b, blockkind.Metadata, compressor, metaPropertiesName)
871871
}
872872

873873
// WriteRangeKeyBlock constructs a trailer for the provided range key block and
874874
// writes the block and trailer to the writer. It automatically adds the range
875875
// key block to the file's meta index when the writer is finished.
876876
func (w *layoutWriter) WriteRangeKeyBlock(b []byte) (block.Handle, error) {
877-
return w.writeNamedBlock(b, block.NoopCompressor, metaRangeKeyName)
877+
return w.writeNamedBlock(b, blockkind.RangeKey, block.NoopCompressor, metaRangeKeyName)
878878
}
879879

880880
// WriteRangeDeletionBlock constructs a trailer for the provided range deletion
881881
// block and writes the block and trailer to the writer. It automatically adds
882882
// the range deletion block to the file's meta index when the writer is
883883
// finished.
884884
func (w *layoutWriter) WriteRangeDeletionBlock(b []byte) (block.Handle, error) {
885-
return w.writeNamedBlock(b, block.NoopCompressor, metaRangeDelV2Name)
885+
return w.writeNamedBlock(b, blockkind.RangeDel, block.NoopCompressor, metaRangeDelV2Name)
886886
}
887887

888888
func (w *layoutWriter) writeNamedBlock(
889-
b []byte, compressor *block.Compressor, name string,
889+
b []byte, kind block.Kind, compressor *block.Compressor, name string,
890890
) (bh block.Handle, err error) {
891-
bh, err = w.writeBlock(b, compressor, &w.buf)
891+
bh, err = w.writeBlock(b, kind, compressor, &w.buf)
892892
if err == nil {
893893
w.recordToMetaindex(name, bh)
894894
}
@@ -917,9 +917,9 @@ func (w *layoutWriter) WriteValueIndexBlock(
917917

918918
// writeBlock checksums, compresses, and writes out a block.
919919
func (w *layoutWriter) writeBlock(
920-
b []byte, compressor *block.Compressor, buf *blockBuf,
920+
b []byte, kind block.Kind, compressor *block.Compressor, buf *blockBuf,
921921
) (block.Handle, error) {
922-
pb := block.CompressAndChecksum(&buf.dataBuf, b, compressor, &buf.checksummer)
922+
pb := block.CompressAndChecksum(&buf.dataBuf, b, kind, compressor, &buf.checksummer)
923923
h, err := w.writePrecompressedBlock(pb)
924924
return h, err
925925
}
@@ -1003,7 +1003,7 @@ func (w *layoutWriter) Finish() (size uint64, err error) {
10031003
}
10041004
b = bw.Finish()
10051005
}
1006-
metaIndexHandle, err := w.writeBlock(b, block.NoopCompressor, &w.buf)
1006+
metaIndexHandle, err := w.writeBlock(b, blockkind.Metadata, block.NoopCompressor, &w.buf)
10071007
if err != nil {
10081008
return 0, err
10091009
}

0 commit comments

Comments
 (0)