Skip to content

Commit aecd724

Browse files
committed
compression: add an adaptive compressor
Add AdaptiveCompressor, which estimates the size reduction of using a slower algorithm vs a faster one and chooses automatically (on a per-block basis). We will separately update the compression analyzer to run experiments with the adaptive compressors.
1 parent 0c357d6 commit aecd724

File tree

4 files changed

+224
-28
lines changed

4 files changed

+224
-28
lines changed

internal/compression/adaptive.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package compression
6+
7+
import (
8+
"math"
9+
"math/rand/v2"
10+
"sync"
11+
12+
"github.com/cockroachdb/pebble/internal/ewma"
13+
)
14+
15+
// AdaptiveCompressor is a Compressor that automatically chooses between two
16+
// algorithms: it uses a slower but better algorithm as long as it reduces the
17+
// compressed size (compared to the faster algorithm) by a certain relative
18+
// amount. The decision is probabilistic and based on sampling a subset of
19+
// blocks.
20+
type AdaptiveCompressor struct {
21+
fast Compressor
22+
slow Compressor
23+
24+
reductionCutoff float64
25+
sampleEvery int
26+
27+
// estimator for the relative size reduction when choosing the slow algorithm.
28+
estimator ewma.Bytes
29+
rng rand.PCG
30+
31+
buf []byte
32+
}
33+
34+
// AdaptiveCompressorParams contains the parameters for an adaptive compressor.
35+
type AdaptiveCompressorParams struct {
36+
// Fast and Slow are the two compression settings the adaptive compressor
37+
// chooses between.
38+
Fast Setting
39+
Slow Setting
40+
// ReductionCutoff is the relative size reduction (when using the slow
41+
// algorithm vs the fast algorithm) below which we use the fast algorithm. For
42+
// example, if ReductionCutoff is 0.3 then we only use the slow algorithm if
43+
// it reduces the compressed size (compared to the fast algorithm) by at least
44+
// 30%.
45+
ReductionCutoff float64
46+
// SampleEvery defines the sampling frequency: the probability we sample a
47+
// block is 1.0/SampleEvery. Sampling means trying both algorithms and
48+
// recording the compression ratio.
49+
SampleEvery int
50+
// SampleHalfLife defines the half-life of the exponentially weighted moving
51+
// average. It should be a factor larger than the expected average block size.
52+
SampleHalfLife int64
53+
SamplingSeed uint64
54+
}
55+
56+
func NewAdaptiveCompressor(p AdaptiveCompressorParams) *AdaptiveCompressor {
57+
ac := adaptiveCompressorPool.Get().(*AdaptiveCompressor)
58+
ac.fast = GetCompressor(p.Fast)
59+
ac.slow = GetCompressor(p.Slow)
60+
ac.sampleEvery = p.SampleEvery
61+
ac.reductionCutoff = p.ReductionCutoff
62+
ac.estimator.Init(p.SampleHalfLife)
63+
ac.rng.Seed(p.SamplingSeed, p.SamplingSeed)
64+
return ac
65+
}
66+
67+
var _ Compressor = (*AdaptiveCompressor)(nil)
68+
69+
var adaptiveCompressorPool = sync.Pool{
70+
New: func() any { return &AdaptiveCompressor{} },
71+
}
72+
73+
func (ac *AdaptiveCompressor) Compress(dst, src []byte) ([]byte, Setting) {
74+
estimate := ac.estimator.Estimate()
75+
// TODO(radu): consider decreasing the sampling frequency if the estimate is
76+
// far from the cutoff.
77+
sampleThisBlock := math.IsNaN(estimate) || ac.rng.Uint64()%uint64(ac.sampleEvery) == 0
78+
if !sampleThisBlock {
79+
ac.estimator.NoSample(int64(len(src)))
80+
if estimate < ac.reductionCutoff {
81+
return ac.fast.Compress(dst, src)
82+
} else {
83+
return ac.slow.Compress(dst, src)
84+
}
85+
}
86+
bufFast, fastSetting := ac.fast.Compress(ac.buf[:0], src)
87+
ac.buf = bufFast[:0]
88+
dst, slowSetting := ac.slow.Compress(dst, src)
89+
reduction := 1 - float64(len(dst))/float64(len(bufFast))
90+
ac.estimator.SampledBlock(int64(len(src)), reduction)
91+
if reduction < ac.reductionCutoff {
92+
return append(dst[:0], bufFast...), fastSetting
93+
}
94+
return dst, slowSetting
95+
}
96+
97+
func (ac *AdaptiveCompressor) Close() {
98+
ac.fast.Close()
99+
ac.slow.Close()
100+
if cap(ac.buf) > 256*1024 {
101+
ac.buf = nil // Release large buffers.
102+
}
103+
adaptiveCompressorPool.Put(ac)
104+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
// Copyright 2025 The LevelDB-Go and Pebble Authors. All rights reserved. Use
2+
// of this source code is governed by a BSD-style license that can be found in
3+
// the LICENSE file.
4+
5+
package compression
6+
7+
import (
8+
"math/rand/v2"
9+
"testing"
10+
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestAdaptiveCompressorRand(t *testing.T) {
15+
// Test the adaptive compressor with random data and make sure it uses the
16+
// fast algorithm.
17+
ac := NewAdaptiveCompressor(AdaptiveCompressorParams{
18+
Fast: MinLZFastest,
19+
Slow: ZstdLevel1,
20+
ReductionCutoff: 0.2,
21+
SampleEvery: 10,
22+
SampleHalfLife: 128 * 1024,
23+
SamplingSeed: 1,
24+
})
25+
defer ac.Close()
26+
for i := 0; i < 100; i++ {
27+
data := make([]byte, 10+rand.IntN(64*1024))
28+
for j := range data {
29+
data[j] = byte(rand.Uint32())
30+
}
31+
_, setting := ac.Compress(nil, data)
32+
require.Equal(t, MinLZFastest, setting)
33+
}
34+
}
35+
36+
func TestAdaptiveCompressorCompressible(t *testing.T) {
37+
// Test the adaptive compressor with compressible data and make sure it uses
38+
// the slow algorithm.
39+
ac := NewAdaptiveCompressor(AdaptiveCompressorParams{
40+
Fast: None,
41+
Slow: ZstdLevel1,
42+
ReductionCutoff: 0.6,
43+
SampleEvery: 10,
44+
SampleHalfLife: 128 * 1024,
45+
SamplingSeed: 1,
46+
})
47+
defer ac.Close()
48+
for i := 0; i < 100; i++ {
49+
data := make([]byte, 10+rand.IntN(64*1024))
50+
for j := range data {
51+
data[j] = byte(j / 100)
52+
}
53+
_, setting := ac.Compress(nil, data)
54+
require.Equal(t, ZstdLevel1, setting)
55+
}
56+
}

sstable/block/compression.go

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,12 @@ type CompressionProfile struct {
4343
// uncompressed.
4444
MinReductionPercent uint8
4545

46-
// TODO(radu): knobs for adaptive compression go here.
46+
// AdaptiveReductionCutoffPercent (when set to a non-zero value) enables
47+
// adaptive compressors for data and value blocks which fall back to the
48+
// OtherBlocks setting. The OtherBlocks setting is used when the
49+
// DataBlocks/ValueBlocks setting cannot achieve a further data reduction of
50+
// at least AdaptiveReductionCutoffPercent%.
51+
AdaptiveReductionCutoffPercent uint8
4752
}
4853

4954
// UsesMinLZ returns true if the profile uses the MinLZ compression algorithm
@@ -72,15 +77,43 @@ var (
7277
})
7378

7479
BalancedCompression = registerCompressionProfile(CompressionProfile{
75-
Name: "Balanced",
80+
Name: "Balanced",
81+
DataBlocks: compression.ZstdLevel1,
82+
ValueBlocks: compression.ZstdLevel3,
83+
OtherBlocks: fastestCompression,
84+
AdaptiveReductionCutoffPercent: 30,
85+
MinReductionPercent: 5,
86+
})
87+
88+
GoodCompression = registerCompressionProfile(CompressionProfile{
89+
Name: "Good",
90+
DataBlocks: compression.ZstdLevel3,
91+
ValueBlocks: compression.ZstdLevel3,
92+
OtherBlocks: fastestCompression,
93+
MinReductionPercent: 5,
94+
})
95+
96+
// Adaptive compression profiles are experimental.
97+
98+
FastAdaptiveCompression = registerCompressionProfile(CompressionProfile{
99+
Name: "Fast adaptive",
100+
DataBlocks: fastestCompression,
101+
ValueBlocks: compression.ZstdLevel1,
102+
OtherBlocks: fastestCompression,
103+
AdaptiveReductionCutoffPercent: 30,
104+
MinReductionPercent: 10,
105+
})
106+
107+
BalancedAdaptiveCompression = registerCompressionProfile(CompressionProfile{
108+
Name: "Balanced adaptive",
76109
DataBlocks: compression.ZstdLevel1,
77110
ValueBlocks: compression.ZstdLevel3,
78111
OtherBlocks: fastestCompression,
79112
MinReductionPercent: 5,
80113
})
81114

82-
GoodCompression = registerCompressionProfile(CompressionProfile{
83-
Name: "Good",
115+
GoodAdaptiveCompression = registerCompressionProfile(CompressionProfile{
116+
Name: "Good adaptive",
84117
DataBlocks: compression.ZstdLevel3,
85118
ValueBlocks: compression.ZstdLevel3,
86119
OtherBlocks: fastestCompression,

sstable/block/compressor.go

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package block
22

33
import (
4+
"math/rand"
5+
46
"github.com/cockroachdb/pebble/internal/compression"
57
"github.com/cockroachdb/pebble/sstable/block/blockkind"
68
)
@@ -12,14 +14,9 @@ import (
1214
// .. = c.Compress(..)
1315
// c.Close()
1416
type Compressor struct {
15-
profile CompressionProfile
16-
dataBlocksCompressor compression.Compressor
17-
// valueBlocksCompressor is used for value blocks; It can be the same object as
18-
// dataBlocksCompressor.
17+
profile CompressionProfile
18+
dataBlocksCompressor compression.Compressor
1919
valueBlocksCompressor compression.Compressor
20-
// otherBlocksCompressor is used for blocks that are not data blocks, such as
21-
// index blocks or metadata blocks. It can be the same object as
22-
// dataBlocksCompressor.
2320
otherBlocksCompressor compression.Compressor
2421
}
2522

@@ -29,30 +26,36 @@ func MakeCompressor(profile *CompressionProfile) Compressor {
2926
c := Compressor{
3027
profile: *profile,
3128
}
32-
c.dataBlocksCompressor = compression.GetCompressor(profile.DataBlocks)
33-
if profile.ValueBlocks == profile.DataBlocks {
34-
c.valueBlocksCompressor = c.dataBlocksCompressor
35-
} else {
36-
c.valueBlocksCompressor = compression.GetCompressor(profile.ValueBlocks)
37-
}
38-
if profile.OtherBlocks == profile.DataBlocks {
39-
c.otherBlocksCompressor = c.dataBlocksCompressor
40-
} else {
41-
c.otherBlocksCompressor = compression.GetCompressor(profile.OtherBlocks)
42-
}
29+
30+
c.dataBlocksCompressor = maybeAdaptiveCompressor(profile, profile.DataBlocks)
31+
c.valueBlocksCompressor = maybeAdaptiveCompressor(profile, profile.ValueBlocks)
32+
c.otherBlocksCompressor = compression.GetCompressor(profile.OtherBlocks)
4333
return c
4434
}
4535

36+
func maybeAdaptiveCompressor(
37+
profile *CompressionProfile, setting compression.Setting,
38+
) compression.Compressor {
39+
if profile.AdaptiveReductionCutoffPercent != 0 && setting != profile.OtherBlocks {
40+
params := compression.AdaptiveCompressorParams{
41+
Slow: setting,
42+
Fast: profile.OtherBlocks,
43+
ReductionCutoff: float64(profile.AdaptiveReductionCutoffPercent) * 0.01,
44+
SampleEvery: 10,
45+
SampleHalfLife: 256 * 1024, // 256 KB
46+
SamplingSeed: rand.Uint64(),
47+
}
48+
return compression.NewAdaptiveCompressor(params)
49+
}
50+
return compression.GetCompressor(setting)
51+
}
52+
4653
// Close must be called when the Compressor is no longer needed.
4754
// After Close is called, the Compressor must not be used again.
4855
func (c *Compressor) Close() {
49-
if c.otherBlocksCompressor != c.dataBlocksCompressor {
50-
c.otherBlocksCompressor.Close()
51-
}
52-
if c.valueBlocksCompressor != c.dataBlocksCompressor {
53-
c.valueBlocksCompressor.Close()
54-
}
5556
c.dataBlocksCompressor.Close()
57+
c.valueBlocksCompressor.Close()
58+
c.otherBlocksCompressor.Close()
5659
*c = Compressor{}
5760
}
5861

0 commit comments

Comments
 (0)