Skip to content

Commit 34c1017

Browse files
committed
compression: return Setting from Compress
1 parent 77b1bbf commit 34c1017

File tree

10 files changed

+42
-37
lines changed

10 files changed

+42
-37
lines changed

internal/compression/compression.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/minio/minlz"
1111
)
1212

13-
// Algorithm identifies a compression algorithm. Some compression algorithm
13+
// Algorithm identifies a compression algorithm. Some compression algorithms
1414
// support multiple compression levels.
1515
//
1616
// Decompressing data requires only an Algorithm.
@@ -21,7 +21,8 @@ const (
2121
SnappyAlgorithm
2222
Zstd
2323
MinLZ
24-
numAlgorithms
24+
25+
NumAlgorithms
2526
)
2627

2728
// String implements fmt.Stringer, returning a human-readable name for the
@@ -72,11 +73,9 @@ var (
7273
// Compressor is an interface for compressing data. An instance is associated
7374
// with a specific Setting.
7475
type Compressor interface {
75-
// Algorithm returns the algorithm used by this Compressor.
76-
Algorithm() Algorithm
77-
7876
// Compress a block, appending the compressed data to dst[:0].
79-
Compress(dst, src []byte) []byte
77+
// Returns setting used.
78+
Compress(dst, src []byte) ([]byte, Setting)
8079

8180
// Close must be called when the Compressor is no longer needed.
8281
// After Close is called, the Compressor must not be used again.

internal/compression/compression_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ func TestCompressionRoundtrip(t *testing.T) {
3232
compressedBuf := make([]byte, 1+rng.IntN(1<<10 /* 1 KiB */))
3333
compressor := GetCompressor(s)
3434
defer compressor.Close()
35-
compressed := compressor.Compress(compressedBuf, payload)
36-
got, err := decompress(s.Algorithm, compressed)
35+
compressed, st := compressor.Compress(compressedBuf, payload)
36+
got, err := decompress(st.Algorithm, compressed)
3737
require.NoError(t, err)
3838
require.Equal(t, payload, got)
3939
})

internal/compression/minlz.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ var _ Compressor = (*minlzCompressor)(nil)
1818

1919
func (c *minlzCompressor) Algorithm() Algorithm { return MinLZ }
2020

21-
func (c *minlzCompressor) Compress(dst, src []byte) []byte {
21+
func (c *minlzCompressor) Compress(dst, src []byte) ([]byte, Setting) {
2222
// MinLZ cannot encode blocks greater than 8MB. Fall back to Snappy in those
23-
// cases. Note that MinLZ can decode the Snappy compressed block.
23+
// cases.
2424
if len(src) > minlz.MaxBlockSize {
2525
return (snappyCompressor{}).Compress(dst, src)
2626
}
@@ -29,7 +29,7 @@ func (c *minlzCompressor) Compress(dst, src []byte) []byte {
2929
if err != nil {
3030
panic(errors.Wrap(err, "minlz compression"))
3131
}
32-
return compressed
32+
return compressed, Setting{Algorithm: MinLZ, Level: uint8(c.level)}
3333
}
3434

3535
func (c *minlzCompressor) Close() {}

internal/compression/minlz_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,20 @@ func TestMinLZLargeBlock(t *testing.T) {
2020
}
2121
c := GetCompressor(MinLZFastest)
2222
defer c.Close()
23-
compressed := c.Compress(nil, b)
24-
d := GetDecompressor(MinLZ)
23+
compressed, st := c.Compress(nil, b)
24+
25+
d := GetDecompressor(st.Algorithm)
2526
decompressed := make([]byte, len(b))
26-
defer d.Close()
27+
require.NoError(t, d.DecompressInto(decompressed, compressed))
28+
require.Equal(t, b, decompressed)
29+
d.Close()
2730

31+
// Verify that a MinLZ decompressor always works (even if Compress returned
32+
// Snappy).
33+
d = GetDecompressor(MinLZ)
34+
clear(decompressed)
2835
require.NoError(t, d.DecompressInto(decompressed, compressed))
2936
require.Equal(t, b, decompressed)
37+
d.Close()
3038
}
3139
}

internal/compression/noop.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@ type noopCompressor struct{}
88

99
var _ Compressor = noopCompressor{}
1010

11-
func (noopCompressor) Algorithm() Algorithm { return NoCompression }
12-
func (noopCompressor) Compress(dst, src []byte) []byte {
13-
return append(dst[:0], src...)
11+
func (noopCompressor) Compress(dst, src []byte) ([]byte, Setting) {
12+
return append(dst[:0], src...), None
1413
}
1514
func (noopCompressor) Close() {}
1615

internal/compression/snappy.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ var _ Compressor = snappyCompressor{}
1616

1717
func (snappyCompressor) Algorithm() Algorithm { return SnappyAlgorithm }
1818

19-
func (snappyCompressor) Compress(dst, src []byte) []byte {
19+
func (snappyCompressor) Compress(dst, src []byte) ([]byte, Setting) {
2020
dst = dst[:cap(dst):cap(dst)]
21-
return snappy.Encode(dst, src)
21+
return snappy.Encode(dst, src), Snappy
2222
}
2323

2424
func (snappyCompressor) Close() {}

internal/compression/zstd_cgo.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,7 @@ var zstdCompressorPool = sync.Pool{
3939
// relies on CGo.
4040
const UseStandardZstdLib = true
4141

42-
func (z *zstdCompressor) Algorithm() Algorithm { return Zstd }
43-
44-
func (z *zstdCompressor) Compress(compressedBuf []byte, b []byte) []byte {
42+
func (z *zstdCompressor) Compress(compressedBuf []byte, b []byte) ([]byte, Setting) {
4543
if len(compressedBuf) < binary.MaxVarintLen64 {
4644
compressedBuf = append(compressedBuf, make([]byte, binary.MaxVarintLen64-len(compressedBuf))...)
4745
}
@@ -63,7 +61,7 @@ func (z *zstdCompressor) Compress(compressedBuf []byte, b []byte) []byte {
6361
panic("Allocated a new buffer despite checking CompressBound.")
6462
}
6563

66-
return compressedBuf[:varIntLen+len(result)]
64+
return compressedBuf[:varIntLen+len(result)], Setting{Algorithm: Zstd, Level: uint8(z.level)}
6765
}
6866

6967
func (z *zstdCompressor) Close() {

internal/compression/zstd_nocgo.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,13 @@ func getZstdCompressor(level int) *zstdCompressor {
3434
// relies on CGo.
3535
const UseStandardZstdLib = false
3636

37-
func (z *zstdCompressor) Algorithm() Algorithm { return Zstd }
38-
3937
func (z *zstdCompressor) Compress(compressedBuf, b []byte) []byte {
4038
if len(compressedBuf) < binary.MaxVarintLen64 {
4139
compressedBuf = append(compressedBuf, make([]byte, binary.MaxVarintLen64-len(compressedBuf))...)
4240
}
4341
varIntLen := binary.PutUvarint(compressedBuf, uint64(len(b)))
44-
return (*zstd.Encoder)(z).EncodeAll(b, compressedBuf[:varIntLen])
42+
res := (*zstd.Encoder)(z).EncodeAll(b, compressedBuf[:varIntLen])
43+
return res, Setting{Algorithm: Zstd, Level: uint8(z.level)}
4544
}
4645

4746
func (z *zstdCompressor) Close() {

sstable/block/compressor.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,20 +70,19 @@ func (c *Compressor) Compress(dst, src []byte, kind Kind) (CompressionIndicator,
7070
compressor = c.otherBlocksCompressor
7171
}
7272

73-
out := compressor.Compress(dst, src)
73+
out, setting := compressor.Compress(dst, src)
7474

7575
// Return the original data uncompressed if the reduction is less than the
7676
// minimum, i.e.:
7777
//
7878
// after * 100
7979
// ----------- > 100 - MinReductionPercent
8080
// before
81-
algorithm := compressor.Algorithm()
82-
if algorithm != compression.NoCompression &&
81+
if setting.Algorithm != compression.NoCompression &&
8382
int64(len(out))*100 > int64(len(src))*int64(100-c.profile.MinReductionPercent) {
8483
return NoCompressionIndicator, append(out[:0], src...)
8584
}
86-
return compressionIndicatorFromAlgorithm(algorithm), out
85+
return compressionIndicatorFromAlgorithm(setting.Algorithm), out
8786
}
8887

8988
// NoopCompressor is a Compressor that does not compress data. It does not have

sstable/compressionanalyzer/block_analyzer.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
type BlockAnalyzer struct {
1919
b Buckets
2020
compressors [numSettings]compression.Compressor
21-
decompressors [numSettings]compression.Decompressor
21+
decompressors [compression.NumAlgorithms]compression.Decompressor
2222
minLZFastest compression.Compressor
2323
buf1 []byte
2424
buf2 []byte
@@ -28,7 +28,9 @@ func NewBlockAnalyzer() *BlockAnalyzer {
2828
a := &BlockAnalyzer{}
2929
for i, s := range Settings {
3030
a.compressors[i] = compression.GetCompressor(s)
31-
a.decompressors[i] = compression.GetDecompressor(s.Algorithm)
31+
}
32+
for i := range a.decompressors {
33+
a.decompressors[i] = compression.GetDecompressor(compression.Algorithm(i))
3234
}
3335
a.minLZFastest = compression.GetCompressor(compression.MinLZFastest)
3436
a.buf1 = make([]byte, 256*1024)
@@ -51,11 +53,12 @@ func (a *BlockAnalyzer) Close() {
5153
// of various compression algorithms on it.
5254
func (a *BlockAnalyzer) Block(kind block.Kind, block []byte) {
5355
size := MakeBlockSize(len(block))
54-
compressibility := MakeCompressibility(len(block), len(a.minLZFastest.Compress(a.buf1[:0], block)))
56+
compressed, _ := a.minLZFastest.Compress(a.buf1[:0], block)
57+
compressibility := MakeCompressibility(len(block), len(compressed))
5558
bucket := &a.b[kind][size][compressibility]
5659
bucket.UncompressedSize.Add(float64(len(block)))
5760
for i := range Settings {
58-
a.runExperiment(&bucket.Experiments[i], block, a.compressors[i], a.decompressors[i])
61+
a.runExperiment(&bucket.Experiments[i], block, a.compressors[i], a.decompressors)
5962
}
6063
}
6164

@@ -67,7 +70,7 @@ func (a *BlockAnalyzer) runExperiment(
6770
pa *PerSetting,
6871
block []byte,
6972
compressor compression.Compressor,
70-
decompressor compression.Decompressor,
73+
decompressors [compression.NumAlgorithms]compression.Decompressor,
7174
) {
7275
// buf1 will hold the compressed data; it can get a bit larger in the worst
7376
// case, add a bit of head
@@ -77,14 +80,14 @@ func (a *BlockAnalyzer) runExperiment(
7780
// Compress.
7881
runtime.Gosched()
7982
t1 := crtime.NowMono()
80-
compressed := compressor.Compress(a.buf1[:0], block)
83+
compressed, setting := compressor.Compress(a.buf1[:0], block)
8184
compressionTime := t1.Elapsed()
8285

8386
// Yield the processor, reducing the chance that we get preempted during
8487
// DecompressInto.
8588
runtime.Gosched()
8689
t2 := crtime.NowMono()
87-
if err := decompressor.DecompressInto(a.buf2, compressed); err != nil {
90+
if err := decompressors[setting.Algorithm].DecompressInto(a.buf2, compressed); err != nil {
8891
panic(err)
8992
}
9093
decompressionTime := t2.Elapsed()

0 commit comments

Comments
 (0)