Skip to content

Commit f4e9b0c

Browse files
committed
compressionanalyzer: support adaptive compressors
Switch experiments from using `compression.Setting` to `block.CompressionProfile` and add two adaptive profiles.
1 parent aecd724 commit f4e9b0c

File tree

8 files changed

+153
-96
lines changed

8 files changed

+153
-96
lines changed

sstable/block/compression.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (i CompressionIndicator) String() string {
217217
}
218218
}
219219

220-
func (i CompressionIndicator) algorithm() compression.Algorithm {
220+
func (i CompressionIndicator) Algorithm() compression.Algorithm {
221221
switch i {
222222
case NoCompressionIndicator:
223223
return compression.NoCompression

sstable/block/compressor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,5 +97,5 @@ var noopCompressor = MakeCompressor(NoCompression)
9797
type Decompressor = compression.Decompressor
9898

9999
func GetDecompressor(c CompressionIndicator) Decompressor {
100-
return compression.GetDecompressor(c.algorithm())
100+
return compression.GetDecompressor(c.Algorithm())
101101
}

sstable/compressionanalyzer/block_analyzer.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
// compression algorithms on sstable and blob file blocks.
1818
type BlockAnalyzer struct {
1919
b Buckets
20-
compressors [numSettings]compression.Compressor
20+
compressors [numProfiles]block.Compressor
2121
decompressors [compression.NumAlgorithms]compression.Decompressor
2222
minLZFastest compression.Compressor
2323
buf1 []byte
@@ -26,8 +26,8 @@ type BlockAnalyzer struct {
2626

2727
func NewBlockAnalyzer() *BlockAnalyzer {
2828
a := &BlockAnalyzer{}
29-
for i, s := range Settings {
30-
a.compressors[i] = compression.GetCompressor(s)
29+
for i, p := range Profiles {
30+
a.compressors[i] = block.MakeCompressor(p)
3131
}
3232
for i := range a.decompressors {
3333
a.decompressors[i] = compression.GetDecompressor(compression.Algorithm(i))
@@ -38,6 +38,15 @@ func NewBlockAnalyzer() *BlockAnalyzer {
3838
return a
3939
}
4040

41+
// ResetCompressors the compressors. This is useful for adaptive compressors
42+
// which keep some state; we want to clear that state for each sstable.
43+
func (a *BlockAnalyzer) ResetCompressors() {
44+
for i, p := range Profiles {
45+
a.compressors[i].Close()
46+
a.compressors[i] = block.MakeCompressor(p)
47+
}
48+
}
49+
4150
func (a *BlockAnalyzer) Close() {
4251
for _, c := range a.compressors {
4352
c.Close()
@@ -57,8 +66,8 @@ func (a *BlockAnalyzer) Block(kind block.Kind, block []byte) {
5766
compressibility := MakeCompressibility(len(block), len(compressed))
5867
bucket := &a.b[kind][size][compressibility]
5968
bucket.UncompressedSize.Add(float64(len(block)))
60-
for i := range Settings {
61-
a.runExperiment(&bucket.Experiments[i], block, a.compressors[i], a.decompressors)
69+
for i := range a.compressors {
70+
a.runExperiment(&bucket.Experiments[i], block, kind, &a.compressors[i], a.decompressors)
6271
}
6372
}
6473

@@ -67,9 +76,10 @@ func (a *BlockAnalyzer) Buckets() *Buckets {
6776
}
6877

6978
func (a *BlockAnalyzer) runExperiment(
70-
pa *PerSetting,
79+
pa *PerProfile,
7180
block []byte,
72-
compressor compression.Compressor,
81+
blockKind block.Kind,
82+
compressor *block.Compressor,
7383
decompressors [compression.NumAlgorithms]compression.Decompressor,
7484
) {
7585
// buf1 will hold the compressed data; it can get a bit larger in the worst
@@ -80,14 +90,14 @@ func (a *BlockAnalyzer) runExperiment(
8090
// Compress.
8191
runtime.Gosched()
8292
t1 := crtime.NowMono()
83-
compressed, setting := compressor.Compress(a.buf1[:0], block)
93+
ci, compressed := compressor.Compress(a.buf1[:0], block, blockKind)
8494
compressionTime := t1.Elapsed()
8595

8696
// Yield the processor, reducing the chance that we get preempted during
8797
// DecompressInto.
8898
runtime.Gosched()
8999
t2 := crtime.NowMono()
90-
if err := decompressors[setting.Algorithm].DecompressInto(a.buf2, compressed); err != nil {
100+
if err := decompressors[ci.Algorithm()].DecompressInto(a.buf2, compressed); err != nil {
91101
panic(err)
92102
}
93103
decompressionTime := t2.Elapsed()

sstable/compressionanalyzer/buckets.go

Lines changed: 72 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"time"
1313

1414
"github.com/cockroachdb/pebble/internal/compression"
15+
"github.com/cockroachdb/pebble/sstable/block"
1516
"github.com/cockroachdb/pebble/sstable/block/blockkind"
1617
)
1718

@@ -95,19 +96,69 @@ func (c Compressibility) String() string {
9596
}
9697
}
9798

98-
var Settings = [...]compression.Setting{
99-
compression.Snappy,
100-
compression.MinLZFastest,
101-
compression.MinLZBalanced,
102-
compression.ZstdLevel1,
103-
compression.ZstdLevel3,
99+
var Profiles = [...]*block.CompressionProfile{
100+
{
101+
Name: "Snappy",
102+
DataBlocks: compression.Snappy,
103+
ValueBlocks: compression.Snappy,
104+
OtherBlocks: compression.Snappy,
105+
MinReductionPercent: 0,
106+
},
107+
108+
{
109+
Name: "MinLZ1",
110+
DataBlocks: compression.MinLZFastest,
111+
ValueBlocks: compression.MinLZFastest,
112+
OtherBlocks: compression.MinLZFastest,
113+
MinReductionPercent: 0,
114+
},
115+
116+
{
117+
Name: "MinLZ2",
118+
DataBlocks: compression.MinLZBalanced,
119+
ValueBlocks: compression.MinLZBalanced,
120+
OtherBlocks: compression.MinLZBalanced,
121+
MinReductionPercent: 0,
122+
},
123+
124+
{
125+
Name: "Zstd1",
126+
DataBlocks: compression.ZstdLevel1,
127+
ValueBlocks: compression.ZstdLevel1,
128+
OtherBlocks: compression.ZstdLevel1,
129+
MinReductionPercent: 0,
130+
},
131+
132+
{
133+
Name: "Auto1",
134+
DataBlocks: compression.ZstdLevel1,
135+
ValueBlocks: compression.ZstdLevel1,
136+
OtherBlocks: compression.MinLZFastest,
137+
AdaptiveReductionCutoffPercent: 30,
138+
MinReductionPercent: 0,
139+
},
140+
141+
{
142+
Name: "Zstd3",
143+
DataBlocks: compression.ZstdLevel3,
144+
ValueBlocks: compression.ZstdLevel3,
145+
OtherBlocks: compression.ZstdLevel3,
146+
MinReductionPercent: 0,
147+
},
148+
149+
{
150+
Name: "Auto3",
151+
DataBlocks: compression.ZstdLevel3,
152+
ValueBlocks: compression.ZstdLevel3,
153+
OtherBlocks: compression.MinLZFastest,
154+
AdaptiveReductionCutoffPercent: 30,
155+
MinReductionPercent: 0,
156+
},
104157
// Zstd levels 5+ are too slow (on the order of 15-20MB/s to compress) and
105158
// don't usually offer a very large benefit in terms of size vs. level 3.
106-
// compression.ZstdLevel5,
107-
// compression.ZstdLevel7,
108159
}
109160

110-
const numSettings = len(Settings)
161+
const numProfiles = len(Profiles)
111162

112163
// Buckets holds the results of all experiments.
113164
type Buckets [blockkind.NumKinds][numBlockSizes][numCompressibility]Bucket
@@ -116,12 +167,12 @@ type Buckets [blockkind.NumKinds][numBlockSizes][numCompressibility]Bucket
116167
// compressibility.
117168
type Bucket struct {
118169
UncompressedSize Welford
119-
Experiments [numSettings]PerSetting
170+
Experiments [numProfiles]PerProfile
120171
}
121172

122-
// PerSetting holds statistics from experiments on blocks in a bucket with a
173+
// PerProfile holds statistics from experiments on blocks in a bucket with a
123174
// specific compression.Setting.
124-
type PerSetting struct {
175+
type PerProfile struct {
125176
CompressionRatio WeightedWelford
126177
// CPU times are in nanoseconds per uncompressed byte.
127178
CompressionTime WeightedWelford
@@ -133,8 +184,8 @@ func (b *Buckets) String(minSamples int) string {
133184
tw := tabwriter.NewWriter(&buf, 2, 1, 2, ' ', 0)
134185

135186
fmt.Fprintf(tw, "Kind\tSize Range\tTest CR\tSamples\tSize\t")
136-
for _, s := range Settings {
137-
fmt.Fprintf(tw, "\t%s", s.String())
187+
for _, p := range Profiles {
188+
fmt.Fprintf(tw, "\t%s", p.Name)
138189
}
139190
fmt.Fprintf(tw, "\n")
140191
for k := range blockkind.All() {
@@ -190,13 +241,13 @@ func stdDevStr(mean, stddev float64) string {
190241
func (b *Buckets) ToCSV(minSamples int) string {
191242
var buf strings.Builder
192243
fmt.Fprintf(&buf, "Kind,Size Range,Test CR,Samples,Size,Size±")
193-
for _, s := range Settings {
194-
fmt.Fprintf(&buf, ",%s CR", s.String())
195-
fmt.Fprintf(&buf, ",%s CR±", s.String())
196-
fmt.Fprintf(&buf, ",%s Comp ns/b", s.String())
197-
fmt.Fprintf(&buf, ",%s Comp±", s.String())
198-
fmt.Fprintf(&buf, ",%s Decomp ns/b", s.String())
199-
fmt.Fprintf(&buf, ",%s Decomp±", s.String())
244+
for _, p := range Profiles {
245+
fmt.Fprintf(&buf, ",%s CR", p.Name)
246+
fmt.Fprintf(&buf, ",%s CR±", p.Name)
247+
fmt.Fprintf(&buf, ",%s Comp ns/b", p.Name)
248+
fmt.Fprintf(&buf, ",%s Comp±", p.Name)
249+
fmt.Fprintf(&buf, ",%s Decomp ns/b", p.Name)
250+
fmt.Fprintf(&buf, ",%s Decomp±", p.Name)
200251
}
201252
fmt.Fprintf(&buf, "\n")
202253
for k := range blockkind.All() {

sstable/compressionanalyzer/file_analyzer.go

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import (
2020
//
2121
// TODO(radu): add support for blob files.
2222
type FileAnalyzer struct {
23-
*BlockAnalyzer
23+
blockAnalyzer *BlockAnalyzer
2424

2525
readLimiter *tokenbucket.TokenBucket
2626
sstReadOpts sstable.ReaderOptions
@@ -35,22 +35,27 @@ func NewFileAnalyzer(
3535
panic("sstReadOpts.CacheOpts.CacheHandle must be nil")
3636
}
3737
return &FileAnalyzer{
38-
BlockAnalyzer: NewBlockAnalyzer(),
38+
blockAnalyzer: NewBlockAnalyzer(),
3939
readLimiter: readLimiter,
4040
sstReadOpts: sstReadOpts,
4141
}
4242
}
4343

44+
func (fa *FileAnalyzer) Buckets() *Buckets {
45+
return fa.blockAnalyzer.Buckets()
46+
}
47+
4448
func (fa *FileAnalyzer) Close() {
45-
if fa.BlockAnalyzer != nil {
46-
fa.BlockAnalyzer.Close()
49+
if fa.blockAnalyzer != nil {
50+
fa.blockAnalyzer.Close()
4751
}
4852
*fa = FileAnalyzer{}
4953
}
5054

5155
// SSTable analyzes the blocks in an sstable file and closes the readable (even
5256
// in error cases).
5357
func (fa *FileAnalyzer) SSTable(ctx context.Context, readable objstorage.Readable) error {
58+
fa.blockAnalyzer.ResetCompressors()
5459
r, err := sstable.NewReader(ctx, readable, fa.sstReadOpts)
5560
if err != nil {
5661
_ = readable.Close()
@@ -124,7 +129,7 @@ func (fa *FileAnalyzer) sstBlock(
124129
if err != nil {
125130
return err
126131
}
127-
fa.BlockAnalyzer.Block(kind, h.BlockData())
132+
fa.blockAnalyzer.Block(kind, h.BlockData())
128133
h.Release()
129134
return nil
130135
}

sstable/compressionanalyzer/file_analyzer_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010

1111
"github.com/cockroachdb/crlib/crstrings"
1212
"github.com/cockroachdb/datadriven"
13-
"github.com/cockroachdb/pebble/internal/compression"
1413
"github.com/cockroachdb/pebble/sstable"
1514
"github.com/cockroachdb/pebble/vfs"
1615
)
@@ -43,7 +42,7 @@ func TestFileAnalyzer(t *testing.T) {
4342
for l := range bucket.Experiments {
4443
// Snappy always has the same output in all configurations and on
4544
// all platforms.
46-
if Settings[l].Algorithm != compression.SnappyAlgorithm {
45+
if Profiles[l].Name != "Snappy" {
4746
bucket.Experiments[l].CompressionRatio = WeightedWelford{}
4847
}
4948
bucket.Experiments[l].CompressionTime = WeightedWelford{}

0 commit comments

Comments
 (0)