Skip to content

Commit 4d38942

Browse files
committed
sstable: add Minlz Compression
1 parent 2913606 commit 4d38942

File tree

10 files changed

+93
-7
lines changed

10 files changed

+93
-7
lines changed

db.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2071,6 +2071,7 @@ func (d *DB) Metrics() *Metrics {
20712071
metrics.Table.CompressedCountUnknown += int64(compressionTypes.unknown)
20722072
metrics.Table.CompressedCountSnappy += int64(compressionTypes.snappy)
20732073
metrics.Table.CompressedCountZstd += int64(compressionTypes.zstd)
2074+
metrics.Table.CompressedCountMinlz += int64(compressionTypes.minlz)
20742075
metrics.Table.CompressedCountNone += int64(compressionTypes.none)
20752076
}
20762077

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,9 @@ require (
1414
github.com/ghemawat/stream v0.0.0-20171120220530-696b145b53b9
1515
github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e
1616
github.com/guptarohit/asciigraph v0.5.5
17-
github.com/klauspost/compress v1.16.7
17+
github.com/klauspost/compress v1.17.11
1818
github.com/kr/pretty v0.3.1
19+
github.com/minio/minlz v1.0.0
1920
github.com/olekukonko/tablewriter v0.0.5
2021
github.com/pkg/errors v0.9.1
2122
github.com/pmezard/go-difflib v1.0.0

go.sum

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E
127127
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
128128
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
129129
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
130-
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
131-
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
130+
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
131+
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
132132
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
133133
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
134134
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
@@ -145,6 +145,8 @@ github.com/mattn/go-sqlite3 v1.14.5/go.mod h1:WVKg1VTActs4Qso6iwGbiFih2UIHo0ENGw
145145
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
146146
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
147147
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
148+
github.com/minio/minlz v1.0.0 h1:Kj7aJZ1//LlTP1DM8Jm7lNKvvJS2m74gyyXXn3+uJWQ=
149+
github.com/minio/minlz v1.0.0/go.mod h1:qT0aEB35q79LLornSzeDH75LBf3aH1MV+jB5w9Wasec=
148150
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
149151
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
150152
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=

metrics.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,6 +290,8 @@ type Metrics struct {
290290
CompressedCountSnappy int64
291291
// The number of sstables that are compressed with zstd.
292292
CompressedCountZstd int64
293+
// The number of sstables that are compressed with minlz.
294+
CompressedCountMinlz int64
293295
// The number of sstables that are uncompressed.
294296
CompressedCountNone int64
295297

@@ -635,6 +637,9 @@ func (m *Metrics) SafeFormat(w redact.SafePrinter, _ rune) {
635637
if count := m.Table.CompressedCountZstd; count > 0 {
636638
w.Printf(" zstd: %d", redact.Safe(count))
637639
}
640+
if count := m.Table.CompressedCountMinlz; count > 0 {
641+
w.Printf(" minlz: %d", redact.Safe(count))
642+
}
638643
if count := m.Table.CompressedCountNone; count > 0 {
639644
w.Printf(" none: %d", redact.Safe(count))
640645
}

options.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const (
4848
NoCompression = block.NoCompression
4949
SnappyCompression = block.SnappyCompression
5050
ZstdCompression = block.ZstdCompression
51+
MinlzCompression = block.MinlzCompression
5152
)
5253

5354
// FilterType exports the base.FilterType type.
@@ -1945,6 +1946,8 @@ func (o *Options) Parse(s string, hooks *ParseHooks) error {
19451946
l.Compression = func() Compression { return SnappyCompression }
19461947
case "ZSTD":
19471948
l.Compression = func() Compression { return ZstdCompression }
1949+
case "Minlz":
1950+
l.Compression = func() Compression { return MinlzCompression }
19481951
default:
19491952
return errors.Errorf("pebble: unknown compression: %q", errors.Safe(value))
19501953
}

sstable/block/compression.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ const (
2525
NoCompression
2626
SnappyCompression
2727
ZstdCompression
28+
MinlzCompression
2829
NCompression
2930
)
3031

@@ -40,6 +41,8 @@ func (c Compression) String() string {
4041
return "Snappy"
4142
case ZstdCompression:
4243
return "ZSTD"
44+
case MinlzCompression:
45+
return "Minlz"
4346
default:
4447
return "Unknown"
4548
}
@@ -57,6 +60,8 @@ func CompressionFromString(s string) Compression {
5760
return SnappyCompression
5861
case "ZSTD":
5962
return ZstdCompression
63+
case "Minlz":
64+
return MinlzCompression
6065
default:
6166
return DefaultCompression
6267
}
@@ -84,6 +89,7 @@ const (
8489
Lz4hcCompressionIndicator CompressionIndicator = 5
8590
XpressCompressionIndicator CompressionIndicator = 6
8691
ZstdCompressionIndicator CompressionIndicator = 7
92+
MinlzCompressionIndicator CompressionIndicator = 8
8793
)
8894

8995
// String implements fmt.Stringer.
@@ -105,6 +111,8 @@ func (i CompressionIndicator) String() string {
105111
return "xpress"
106112
case 7:
107113
return "zstd"
114+
case 8:
115+
return "minlz"
108116
default:
109117
panic(errors.Newf("sstable: unknown block type: %d", i))
110118
}

sstable/block/compression_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,17 @@
55
package block
66

77
import (
8+
"bytes"
89
"encoding/binary"
910
"fmt"
1011
"math/rand/v2"
1112
"testing"
1213
"time"
1314

1415
"github.com/cockroachdb/crlib/testutils/leaktest"
16+
"github.com/cockroachdb/errors"
1517
"github.com/cockroachdb/pebble/internal/cache"
18+
"github.com/minio/minlz"
1619
"github.com/stretchr/testify/require"
1720
)
1821

@@ -136,3 +139,24 @@ func TestBufferRandomized(t *testing.T) {
136139
})
137140
}
138141
}
142+
143+
func TestMinlzEncodingLimit(t *testing.T) {
144+
// Tests that Minlz compression has a strict limit of minlz.MaxBlockSize: 8<<20 (8MiB)
145+
_, err := minlz.Encode([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize-1), minlz.LevelFastest)
146+
require.NoError(t, err)
147+
_, err = minlz.Encode([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize), minlz.LevelFastest)
148+
require.NoError(t, err)
149+
_, err = minlz.Encode([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize+1), minlz.LevelFastest)
150+
if !errors.Is(err, minlz.ErrTooLarge) {
151+
require.Fail(t, "Expected minlz.ErrTooLarge Error")
152+
}
153+
154+
c := GetCompressor(MinlzCompression)
155+
defer c.Close()
156+
algo, _ := c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize-1))
157+
require.Equal(t, algo, MinlzCompressionIndicator)
158+
algo, _ = c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize))
159+
require.Equal(t, algo, MinlzCompressionIndicator)
160+
algo, _ = c.Compress([]byte{}, bytes.Repeat([]byte{0}, minlz.MaxBlockSize+1))
161+
require.Equal(t, algo, SnappyCompressionIndicator)
162+
}

sstable/block/compressor.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"github.com/cockroachdb/errors"
77
"github.com/cockroachdb/pebble/internal/base"
88
"github.com/golang/snappy"
9+
"github.com/minio/minlz"
910
)
1011

1112
type Compressor interface {
@@ -18,9 +19,11 @@ type Compressor interface {
1819

1920
type noopCompressor struct{}
2021
type snappyCompressor struct{}
22+
type minlzCompressor struct{}
2123

2224
var _ Compressor = noopCompressor{}
2325
var _ Compressor = snappyCompressor{}
26+
var _ Compressor = minlzCompressor{}
2427

2528
func (noopCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
2629
panic("NoCompressionCompressor.Compress() should not be called.")
@@ -34,6 +37,21 @@ func (snappyCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte)
3437

3538
func (snappyCompressor) Close() {}
3639

40+
func (minlzCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
41+
// Minlz cannot encode blocks greater than 8MB. Fall back to Snappy in those cases.
42+
if len(src) > minlz.MaxBlockSize {
43+
return (snappyCompressor{}).Compress(dst, src)
44+
}
45+
46+
compressed, err := minlz.Encode(dst, src, minlz.LevelFastest)
47+
if err != nil {
48+
panic(errors.Wrap(err, "minlz compression"))
49+
}
50+
return MinlzCompressionIndicator, compressed
51+
}
52+
53+
func (minlzCompressor) Close() {}
54+
3755
func GetCompressor(c Compression) Compressor {
3856
switch c {
3957
case NoCompression:
@@ -42,6 +60,8 @@ func GetCompressor(c Compression) Compressor {
4260
return snappyCompressor{}
4361
case ZstdCompression:
4462
return getZstdCompressor()
63+
case MinlzCompression:
64+
return minlzCompressor{}
4565
default:
4666
panic("Invalid compression type.")
4767
}
@@ -65,9 +85,11 @@ type Decompressor interface {
6585

6686
type noopDecompressor struct{}
6787
type snappyDecompressor struct{}
88+
type minlzDecompressor struct{}
6889

6990
var _ Decompressor = noopDecompressor{}
7091
var _ Decompressor = snappyDecompressor{}
92+
var _ Decompressor = minlzDecompressor{}
7193

7294
func (noopDecompressor) DecompressInto(dst, src []byte) error {
7395
dst = dst[:len(src)]
@@ -94,8 +116,7 @@ func (snappyDecompressor) DecompressInto(buf, compressed []byte) error {
94116
}
95117

96118
func (snappyDecompressor) DecompressedLen(b []byte) (decompressedLen int, err error) {
97-
l, err := snappy.DecodedLen(b)
98-
return l, err
119+
return snappy.DecodedLen(b)
99120
}
100121

101122
func (snappyDecompressor) Close() {}
@@ -110,6 +131,22 @@ func (zstdDecompressor) DecompressedLen(b []byte) (decompressedLen int, err erro
110131
return int(decodedLenU64), nil
111132
}
112133

134+
func (minlzDecompressor) DecompressInto(buf, compressed []byte) error {
135+
result, err := minlz.Decode(buf, compressed)
136+
if len(result) != len(buf) || (len(result) > 0 && &result[0] != &buf[0]) {
137+
return base.CorruptionErrorf("pebble/table: decompressed into unexpected buffer: %p != %p",
138+
errors.Safe(result), errors.Safe(buf))
139+
}
140+
return err
141+
}
142+
143+
func (minlzDecompressor) DecompressedLen(b []byte) (decompressedLen int, err error) {
144+
l, err := minlz.DecodedLen(b)
145+
return l, err
146+
}
147+
148+
func (minlzDecompressor) Close() {}
149+
113150
func GetDecompressor(c CompressionIndicator) Decompressor {
114151
switch c {
115152
case NoCompressionIndicator:
@@ -118,6 +155,8 @@ func GetDecompressor(c CompressionIndicator) Decompressor {
118155
return snappyDecompressor{}
119156
case ZstdCompressionIndicator:
120157
return getZstdDecompressor()
158+
case MinlzCompressionIndicator:
159+
return minlzDecompressor{}
121160
default:
122161
panic("Invalid compression type.")
123162
}

sstable/writer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1240,7 +1240,7 @@ func runWriterBench(b *testing.B, keys [][]byte, comparer *base.Comparer, format
12401240
b.Run(fmt.Sprintf("block=%s", humanize.Bytes.Int64(int64(bs))), func(b *testing.B) {
12411241
for _, filter := range []bool{true, false} {
12421242
b.Run(fmt.Sprintf("filter=%t", filter), func(b *testing.B) {
1243-
for _, comp := range []block.Compression{block.NoCompression, block.SnappyCompression, block.ZstdCompression} {
1243+
for _, comp := range []block.Compression{block.NoCompression, block.SnappyCompression, block.ZstdCompression, block.MinlzCompression} {
12441244
b.Run(fmt.Sprintf("compression=%s", comp), func(b *testing.B) {
12451245
opts := WriterOptions{
12461246
BlockRestartInterval: 16,

table_stats.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1034,7 +1034,7 @@ var compressionTypeAnnotator = manifest.Annotator[compressionTypes]{
10341034
type compressionTypeAggregator struct{}
10351035

10361036
type compressionTypes struct {
1037-
snappy, zstd, none, unknown uint64
1037+
snappy, zstd, minlz, none, unknown uint64
10381038
}
10391039

10401040
func (a compressionTypeAggregator) Zero(dst *compressionTypes) *compressionTypes {
@@ -1053,6 +1053,8 @@ func (a compressionTypeAggregator) Accumulate(
10531053
dst.snappy++
10541054
case ZstdCompression:
10551055
dst.zstd++
1056+
case MinlzCompression:
1057+
dst.minlz++
10561058
case NoCompression:
10571059
dst.none++
10581060
default:
@@ -1066,6 +1068,7 @@ func (a compressionTypeAggregator) Merge(
10661068
) *compressionTypes {
10671069
dst.snappy += src.snappy
10681070
dst.zstd += src.zstd
1071+
dst.minlz += src.minlz
10691072
dst.none += src.none
10701073
dst.unknown += src.unknown
10711074
return dst

0 commit comments

Comments
 (0)