Skip to content

Commit 056234f

Browse files
committed
pebble: replace Compression with CompressionProfile
1 parent 38d4022 commit 056234f

20 files changed

+107
-138
lines changed

checkpoint_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ func TestCheckpointManyFiles(t *testing.T) {
434434
// Disable compression to speed up the test.
435435
opts.EnsureDefaults()
436436
for i := range opts.Levels {
437-
opts.Levels[i].Compression = func() Compression { return NoCompression }
437+
opts.Levels[i].Compression = func() *CompressionProfile { return NoCompression }
438438
}
439439

440440
d, err := Open("", opts)

internal/manifest/table_metadata.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1168,8 +1168,9 @@ type TableStats struct {
11681168
RangeDeletionsBytesEstimate uint64
11691169
// Total size of value blocks and value index block.
11701170
ValueBlocksSize uint64
1171-
// CompressionType is the compression type of the table.
1172-
CompressionType block.Compression
1171+
// CompressionType is the compression profile used for the table (or nil if
1172+
// the profile name is not recognized).
1173+
CompressionType *block.CompressionProfile
11731174
// TombstoneDenseBlocksRatio is the ratio of data blocks in this table that
11741175
// fulfills at least one of the following:
11751176
// 1. The block contains at least options.Experimental.NumDeletionsThreshold

metamorphic/options.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -797,14 +797,15 @@ func RandomOptions(
797797
lopts.FilterPolicy = newTestingFilterPolicy(1 << rng.IntN(5))
798798
}
799799

800-
// We use either no compression, snappy compression or zstd compression.
801-
switch rng.IntN(3) {
800+
switch rng.IntN(4) {
802801
case 0:
803-
lopts.Compression = func() block.Compression { return pebble.NoCompression }
802+
lopts.Compression = func() *block.CompressionProfile { return pebble.NoCompression }
804803
case 1:
805-
lopts.Compression = func() block.Compression { return pebble.ZstdCompression }
804+
lopts.Compression = func() *block.CompressionProfile { return pebble.ZstdCompression }
805+
case 2:
806+
lopts.Compression = func() *block.CompressionProfile { return pebble.SnappyCompression }
806807
default:
807-
lopts.Compression = func() block.Compression { return pebble.SnappyCompression }
808+
lopts.Compression = func() *block.CompressionProfile { return pebble.MinLZCompression }
808809
}
809810
opts.Levels[0] = lopts
810811

options.go

Lines changed: 12 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,10 @@ const (
4141
defaultLevelMultiplier = 10
4242
)
4343

44-
// Compression exports the base.Compression type.
45-
type Compression = block.Compression
44+
type CompressionProfile = block.CompressionProfile
4645

4746
// Exported Compression constants.
48-
const (
47+
var (
4948
DefaultCompression = block.DefaultCompression
5049
NoCompression = block.NoCompression
5150
SnappyCompression = block.SnappyCompression
@@ -417,9 +416,9 @@ type LevelOptions struct {
417416

418417
// Compression defines the per-block compression to use.
419418
//
420-
// The default value is DefaultCompression (which uses Snappy) for L0, or the
421-
// function from the previous level for all other levels.
422-
Compression func() Compression
419+
// The default value is Snappy for L0, or the function from the previous level
420+
// for all other levels.
421+
Compression func() *CompressionProfile
423422

424423
// FilterPolicy defines a filter algorithm (such as a Bloom filter) that can
425424
// reduce disk reads for Get calls.
@@ -467,7 +466,7 @@ func (o *LevelOptions) EnsureL0Defaults() {
467466
o.BlockSizeThreshold = base.DefaultBlockSizeThreshold
468467
}
469468
if o.Compression == nil {
470-
o.Compression = func() Compression { return DefaultCompression }
469+
o.Compression = func() *CompressionProfile { return SnappyCompression }
471470
}
472471
if o.FilterPolicy == nil {
473472
o.FilterPolicy = NoFilterPolicy
@@ -1614,7 +1613,7 @@ func (o *Options) String() string {
16141613
fmt.Fprintf(&buf, " block_restart_interval=%d\n", l.BlockRestartInterval)
16151614
fmt.Fprintf(&buf, " block_size=%d\n", l.BlockSize)
16161615
fmt.Fprintf(&buf, " block_size_threshold=%d\n", l.BlockSizeThreshold)
1617-
fmt.Fprintf(&buf, " compression=%s\n", resolveDefaultCompression(l.Compression()))
1616+
fmt.Fprintf(&buf, " compression=%s\n", l.Compression().Name)
16181617
fmt.Fprintf(&buf, " filter_policy=%s\n", l.FilterPolicy.Name())
16191618
fmt.Fprintf(&buf, " filter_type=%s\n", l.FilterType)
16201619
fmt.Fprintf(&buf, " index_block_size=%d\n", l.IndexBlockSize)
@@ -2084,20 +2083,11 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
20842083
case "block_size_threshold":
20852084
l.BlockSizeThreshold, err = strconv.Atoi(value)
20862085
case "compression":
2087-
switch value {
2088-
case "Default":
2089-
l.Compression = func() Compression { return DefaultCompression }
2090-
case "NoCompression":
2091-
l.Compression = func() Compression { return NoCompression }
2092-
case "Snappy":
2093-
l.Compression = func() Compression { return SnappyCompression }
2094-
case "ZSTD":
2095-
l.Compression = func() Compression { return ZstdCompression }
2096-
case "MinLZ":
2097-
l.Compression = func() Compression { return MinLZCompression }
2098-
default:
2086+
profile := block.CompressionProfileByName(value)
2087+
if profile == nil {
20992088
return errors.Errorf("pebble: unknown compression: %q", errors.Safe(value))
21002089
}
2090+
l.Compression = func() *CompressionProfile { return profile }
21012091
case "filter_policy":
21022092
if hooks != nil && hooks.NewFilterPolicy != nil {
21032093
l.FilterPolicy, err = hooks.NewFilterPolicy(value)
@@ -2335,7 +2325,7 @@ func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstab
23352325
writerOpts.BlockRestartInterval = levelOpts.BlockRestartInterval
23362326
writerOpts.BlockSize = levelOpts.BlockSize
23372327
writerOpts.BlockSizeThreshold = levelOpts.BlockSizeThreshold
2338-
writerOpts.Compression = resolveDefaultCompression(levelOpts.Compression())
2328+
writerOpts.Compression = levelOpts.Compression()
23392329
writerOpts.FilterPolicy = levelOpts.FilterPolicy
23402330
writerOpts.FilterType = levelOpts.FilterType
23412331
writerOpts.IndexBlockSize = levelOpts.IndexBlockSize
@@ -2357,7 +2347,7 @@ func (o *Options) MakeWriterOptions(level int, format sstable.TableFormat) sstab
23572347
func (o *Options) MakeBlobWriterOptions(level int) blob.FileWriterOptions {
23582348
lo := o.Levels[level]
23592349
return blob.FileWriterOptions{
2360-
Compression: resolveDefaultCompression(lo.Compression()),
2350+
Compression: lo.Compression(),
23612351
ChecksumType: block.ChecksumTypeCRC32c,
23622352
FlushGovernor: block.MakeFlushGovernor(
23632353
lo.BlockSize,
@@ -2368,13 +2358,6 @@ func (o *Options) MakeBlobWriterOptions(level int) blob.FileWriterOptions {
23682358
}
23692359
}
23702360

2371-
func resolveDefaultCompression(c Compression) Compression {
2372-
if c <= DefaultCompression || c >= block.NCompression {
2373-
c = SnappyCompression
2374-
}
2375-
return c
2376-
}
2377-
23782361
func (o *Options) MakeObjStorageProviderSettings(dirname string) objstorageprovider.Settings {
23792362
s := objstorageprovider.Settings{
23802363
Logger: o.Logger,

sstable/blob/blob.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,15 @@ const (
4949

5050
// FileWriterOptions are used to configure the FileWriter.
5151
type FileWriterOptions struct {
52-
Compression block.Compression
52+
Compression *block.CompressionProfile
5353
ChecksumType block.ChecksumType
5454
FlushGovernor block.FlushGovernor
5555
// Only CPUMeasurer.MeasureCPUBlobFileSecondary is used.
5656
CpuMeasurer base.CPUMeasurer
5757
}
5858

5959
func (o *FileWriterOptions) ensureDefaults() {
60-
if o.Compression <= block.DefaultCompression || o.Compression >= block.NCompression {
60+
if o.Compression == nil {
6161
o.Compression = block.SnappyCompression
6262
}
6363
if o.ChecksumType == block.ChecksumTypeNone {
@@ -131,7 +131,7 @@ func NewFileWriter(fn base.DiskFileNum, w objstorage.Writable, opts FileWriterOp
131131
fw.flushGov = opts.FlushGovernor
132132
fw.indexEncoder.Init()
133133
fw.checksummer = block.Checksummer{Type: opts.ChecksumType}
134-
fw.compressor = block.MakeCompressor(opts.Compression.ToProfile())
134+
fw.compressor = block.MakeCompressor(opts.Compression)
135135
fw.cpuMeasurer = opts.CpuMeasurer
136136
fw.writeQueue.ch = make(chan compressedBlock)
137137
fw.writeQueue.wg.Add(1)

sstable/blob/blob_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,15 @@ func scanFileWriterOptions(t *testing.T, td *datadriven.TestData) FileWriterOpti
8585
var (
8686
targetBlockSize int = 128
8787
blockSizeThreshold int = 90
88-
compression = block.NoCompression
8988
)
9089
td.MaybeScanArgs(t, "target-block-size", &targetBlockSize)
9190
td.MaybeScanArgs(t, "block-size-threshold", &blockSizeThreshold)
91+
var compression *block.CompressionProfile
9292
if cmdArg, ok := td.Arg("compression"); ok {
93-
compression = block.CompressionFromString(cmdArg.SingleVal(t))
93+
compression = block.CompressionProfileByName(cmdArg.SingleVal(t))
94+
if compression == nil {
95+
t.Fatalf("unknown compression %q", cmdArg.SingleVal(t))
96+
}
9497
}
9598
return FileWriterOptions{
9699
Compression: compression,

sstable/block/compression.go

Lines changed: 36 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package block
66

77
import (
88
"slices"
9+
"strings"
910
"sync"
1011

1112
"github.com/cockroachdb/errors"
@@ -16,71 +17,6 @@ import (
1617
"github.com/cockroachdb/pebble/objstorage"
1718
)
1819

19-
// Compression is the per-block compression algorithm to use.
20-
type Compression int
21-
22-
// The available compression types.
23-
const (
24-
DefaultCompression Compression = iota
25-
NoCompression
26-
SnappyCompression
27-
ZstdCompression
28-
// MinLZCompression is only supported with table formats v6+. Older formats
29-
// fall back to snappy.
30-
MinLZCompression
31-
NCompression
32-
)
33-
34-
var profiles = [...]CompressionProfile{
35-
DefaultCompression: SimpleCompressionProfile(DefaultCompression.String(), compression.Snappy),
36-
NoCompression: SimpleCompressionProfile(NoCompression.String(), compression.None),
37-
SnappyCompression: SimpleCompressionProfile(SnappyCompression.String(), compression.Snappy),
38-
ZstdCompression: SimpleCompressionProfile(ZstdCompression.String(), compression.ZstdLevel3),
39-
MinLZCompression: SimpleCompressionProfile(MinLZCompression.String(), compression.MinLZFastest),
40-
}
41-
42-
func (c Compression) ToProfile() *CompressionProfile {
43-
return &profiles[c]
44-
}
45-
46-
// String implements fmt.Stringer, returning a human-readable name for the
47-
// compression algorithm.
48-
func (c Compression) String() string {
49-
switch c {
50-
case DefaultCompression:
51-
return "Default"
52-
case NoCompression:
53-
return "NoCompression"
54-
case SnappyCompression:
55-
return "Snappy"
56-
case ZstdCompression:
57-
return "ZSTD"
58-
case MinLZCompression:
59-
return "MinLZ"
60-
default:
61-
return "Unknown"
62-
}
63-
}
64-
65-
// CompressionFromString returns an sstable.Compression from its
66-
// string representation. Inverse of c.String() above.
67-
func CompressionFromString(s string) Compression {
68-
switch s {
69-
case "Default":
70-
return DefaultCompression
71-
case "NoCompression":
72-
return NoCompression
73-
case "Snappy":
74-
return SnappyCompression
75-
case "ZSTD":
76-
return ZstdCompression
77-
case "MinLZ":
78-
return MinLZCompression
79-
default:
80-
return DefaultCompression
81-
}
82-
}
83-
8420
// CompressionProfile contains the parameters for compressing blocks in an
8521
// sstable or blob file.
8622
//
@@ -107,17 +43,49 @@ type CompressionProfile struct {
10743
// TODO(radu): knobs for adaptive compression go here.
10844
}
10945

110-
// SimpleCompressionProfile returns a CompressionProfile that uses the same
46+
var (
47+
NoCompression = simpleCompressionProfile("NoCompression", compression.None)
48+
SnappyCompression = simpleCompressionProfile("Snappy", compression.Snappy)
49+
ZstdCompression = simpleCompressionProfile("ZSTD", compression.ZstdLevel3)
50+
MinLZCompression = simpleCompressionProfile("MinLZ", compression.MinLZFastest)
51+
52+
DefaultCompression = SnappyCompression
53+
)
54+
55+
// simpleCompressionProfile returns a CompressionProfile that uses the same
11156
// compression setting for all blocks and which uses the uncompressed block if
11257
// compression reduces it by less than 12%. This is similar to older Pebble
11358
// versions which used Compression.
114-
func SimpleCompressionProfile(name string, setting compression.Setting) CompressionProfile {
115-
return CompressionProfile{
59+
//
60+
// It should only be used during global initialization.
61+
func simpleCompressionProfile(name string, setting compression.Setting) *CompressionProfile {
62+
p := &CompressionProfile{
11663
Name: name,
11764
DataBlocks: setting,
11865
OtherBlocks: setting,
11966
MinReductionPercent: 12,
12067
}
68+
registerCompressionProfile(p)
69+
return p
70+
}
71+
72+
// CompressionProfileByName returns the built-in compression profile with the
73+
// given name, or nil if there is no such profile. It is case-insensitive.
74+
//
75+
// The caller must gracefully handle the nil return case as an unknown
76+
// (user-defined or deprecated) profile.
77+
func CompressionProfileByName(name string) *CompressionProfile {
78+
return compressionProfileMap[strings.ToLower(name)]
79+
}
80+
81+
var compressionProfileMap = make(map[string]*CompressionProfile)
82+
83+
func registerCompressionProfile(p *CompressionProfile) {
84+
key := strings.ToLower(p.Name)
85+
if _, ok := compressionProfileMap[key]; ok {
86+
panic(errors.AssertionFailedf("duplicate compression profile: %s", p.Name))
87+
}
88+
compressionProfileMap[key] = p
12189
}
12290

12391
// CompressionIndicator is the byte stored physically within the block.Trailer

sstable/block/compression_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestBufferRandomized(t *testing.T) {
1919
t.Logf("seed %d", seed)
2020
rng := rand.New(rand.NewPCG(0, seed))
2121

22-
compressor := MakeCompressor(SnappyCompression.ToProfile())
22+
compressor := MakeCompressor(SnappyCompression)
2323
defer compressor.Close()
2424
var checksummer Checksummer
2525
checksummer.Init(ChecksumTypeCRC32c)

sstable/block/compressor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (c *Compressor) Compress(dst, src []byte, kind Kind) (CompressionIndicator,
7373
// any state and can be used in parallel.
7474
var NoopCompressor = &noopCompressor
7575

76-
var noopCompressor = MakeCompressor(NoCompression.ToProfile())
76+
var noopCompressor = MakeCompressor(NoCompression)
7777

7878
type Decompressor = compression.Decompressor
7979

sstable/colblk_writer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ func newColumnarWriter(
186186
w.props.PropertyCollectorNames = buf.String()
187187

188188
w.props.ComparerName = o.Comparer.Name
189-
w.props.CompressionName = o.Compression.String()
189+
w.props.CompressionName = o.Compression.Name
190190
w.props.KeySchemaName = o.KeySchema.Name
191191
w.props.MergerName = o.MergerName
192192

@@ -195,7 +195,7 @@ func newColumnarWriter(
195195
w.cpuMeasurer = cpuMeasurer
196196
go w.drainWriteQueue()
197197

198-
w.compressor = block.MakeCompressor(w.opts.Compression.ToProfile())
198+
w.compressor = block.MakeCompressor(w.opts.Compression)
199199
return w
200200
}
201201

0 commit comments

Comments
 (0)