Skip to content

Commit 269db0a

Browse files
committed
sstable: add Compressor and Decompressor interfaces
add Compressor and Decompressor interfaces to refactor existing compressor and decompressor schemes.
1 parent eb88357 commit 269db0a

File tree

10 files changed

+251
-171
lines changed

10 files changed

+251
-171
lines changed

sstable/block/block.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -537,13 +537,13 @@ func (r *Reader) doRead(
537537
decompressed = compressed
538538
} else {
539539
// Decode the length of the decompressed value.
540-
decodedLen, prefixLen, err := DecompressedLen(typ, compressed.BlockData())
540+
decodedLen, err := DecompressedLen(typ, compressed.BlockData())
541541
if err != nil {
542542
compressed.Release()
543543
return Value{}, err
544544
}
545545
decompressed = Alloc(decodedLen, env.BufferPool)
546-
err = DecompressInto(typ, compressed.BlockData()[prefixLen:], decompressed.BlockData())
546+
err = DecompressInto(typ, compressed.BlockData(), decompressed.BlockData())
547547
compressed.Release()
548548
if err != nil {
549549
decompressed.Release()

sstable/block/compression.go

Lines changed: 8 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
package block
66

77
import (
8-
"encoding/binary"
98
"math/rand/v2"
109
"slices"
1110
"sync"
@@ -15,7 +14,6 @@ import (
1514
"github.com/cockroachdb/pebble/internal/bytealloc"
1615
"github.com/cockroachdb/pebble/internal/invariants"
1716
"github.com/cockroachdb/pebble/objstorage"
18-
"github.com/golang/snappy"
1917
)
2018

2119
// Compression is the per-block compression algorithm to use.
@@ -114,55 +112,21 @@ func (i CompressionIndicator) String() string {
114112

115113
// DecompressedLen returns the length of the provided block once decompressed,
116114
// allowing the caller to allocate a buffer exactly sized to the decompressed
117-
// payload. For some compression algorithms, the payload is prefixed with a
118-
// varint encoding the length of the decompressed block. In such cases, a
119-
// non-zero prefixLength is returned indicating the length of this prefix.
120-
func DecompressedLen(
121-
algo CompressionIndicator, b []byte,
122-
) (decompressedLen int, prefixLength int, err error) {
123-
switch algo {
124-
case NoCompressionIndicator:
125-
return len(b), 0, nil
126-
case SnappyCompressionIndicator:
127-
l, err := snappy.DecodedLen(b)
128-
return l, 0, err
129-
case ZstdCompressionIndicator:
130-
// This will also be used by zlib, bzip2 and lz4 to retrieve the decodedLen
131-
// if we implement these algorithms in the future.
132-
decodedLenU64, varIntLen := binary.Uvarint(b)
133-
if varIntLen <= 0 {
134-
return 0, 0, base.CorruptionErrorf("pebble/table: compression block has invalid length")
135-
}
136-
return int(decodedLenU64), varIntLen, nil
137-
default:
138-
return 0, 0, base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(algo))
139-
}
115+
// payload.
116+
func DecompressedLen(algo CompressionIndicator, b []byte) (decompressedLen int, err error) {
117+
decompressor := GetDecompressor(algo)
118+
return decompressor.DecompressedLen(b)
140119
}
141120

142121
// DecompressInto decompresses compressed into buf. The buf slice must have the
143122
// exact size as the decompressed value. Callers may use DecompressedLen to
144123
// determine the correct size.
145124
func DecompressInto(algo CompressionIndicator, compressed []byte, buf []byte) error {
146-
var result []byte
147-
var err error
148-
switch algo {
149-
case NoCompressionIndicator:
150-
result = buf[:len(compressed)]
151-
copy(result, compressed)
152-
case SnappyCompressionIndicator:
153-
result, err = snappy.Decode(buf, compressed)
154-
case ZstdCompressionIndicator:
155-
result, err = decodeZstd(buf, compressed)
156-
default:
157-
return base.CorruptionErrorf("pebble/table: unknown block compression: %d", errors.Safe(algo))
158-
}
125+
decompressor := GetDecompressor(algo)
126+
err := decompressor.DecompressInto(buf, compressed)
159127
if err != nil {
160128
return base.MarkCorruptionError(err)
161129
}
162-
if len(result) != len(buf) || (len(result) > 0 && &result[0] != &buf[0]) {
163-
return base.CorruptionErrorf("pebble/table: decompressed into unexpected buffer: %p != %p",
164-
errors.Safe(result), errors.Safe(buf))
165-
}
166130
return nil
167131
}
168132

@@ -249,7 +213,8 @@ func CompressAndChecksum(
249213
// least 12.5%.
250214
algo := NoCompressionIndicator
251215
if compression != NoCompression {
252-
algo, buf = compress(compression, blockData, buf)
216+
compressor := GetCompressor(compression)
217+
algo, buf = compressor.Compress(buf, blockData)
253218
if len(buf) >= len(blockData)-len(blockData)/8 {
254219
algo = NoCompressionIndicator
255220
}
@@ -270,30 +235,6 @@ func CompressAndChecksum(
270235
return pb
271236
}
272237

273-
// compress compresses a sstable block, using dstBuf as the desired destination.
274-
//
275-
// The result is aliased to dstBuf if that buffer had enough capacity, otherwise
276-
// it is a newly-allocated buffer.
277-
func compress(
278-
compression Compression, b []byte, dstBuf []byte,
279-
) (indicator CompressionIndicator, compressed []byte) {
280-
switch compression {
281-
case SnappyCompression:
282-
// snappy relies on the length of the buffer, and not the capacity to
283-
// determine if it needs to make an allocation.
284-
dstBuf = dstBuf[:cap(dstBuf):cap(dstBuf)]
285-
return SnappyCompressionIndicator, snappy.Encode(dstBuf, b)
286-
case ZstdCompression:
287-
if len(dstBuf) < binary.MaxVarintLen64 {
288-
dstBuf = append(dstBuf, make([]byte, binary.MaxVarintLen64-len(dstBuf))...)
289-
}
290-
varIntLen := binary.PutUvarint(dstBuf, uint64(len(b)))
291-
return ZstdCompressionIndicator, encodeZstd(dstBuf, varIntLen, b)
292-
default:
293-
panic("unreachable")
294-
}
295-
}
296-
297238
// A Buffer is a buffer for encoding a block. The caller mutates the buffer to
298239
// construct the uncompressed block, and calls CompressAndChecksum to produce
299240
// the physical, possibly-compressed PhysicalBlock. A Buffer recycles byte

sstable/block/compression_cgo.go

Lines changed: 0 additions & 55 deletions
This file was deleted.

sstable/block/compression_nocgo.go

Lines changed: 0 additions & 39 deletions
This file was deleted.

sstable/block/compression_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ func TestCompressionRoundtrip(t *testing.T) {
3535
// Create a randomly-sized buffer to house the compressed output. If it's
3636
// not sufficient, Compress should allocate one that is.
3737
compressedBuf := make([]byte, 1+rng.IntN(1<<10 /* 1 KiB */))
38-
39-
btyp, compressed := compress(compression, payload, compressedBuf)
38+
compressor := GetCompressor(compression)
39+
btyp, compressed := compressor.Compress(compressedBuf, payload)
4040
v, err := decompress(btyp, compressed)
4141
require.NoError(t, err)
4242
got := payload
@@ -78,12 +78,12 @@ func decompress(algo CompressionIndicator, b []byte) (*cache.Value, error) {
7878
if algo == NoCompressionIndicator {
7979
return nil, nil
8080
}
81+
8182
// first obtain the decoded length.
82-
decodedLen, prefixLen, err := DecompressedLen(algo, b)
83+
decodedLen, err := DecompressedLen(algo, b)
8384
if err != nil {
8485
return nil, err
8586
}
86-
b = b[prefixLen:]
8787
// Allocate sufficient space from the cache.
8888
decoded := cache.Alloc(decodedLen)
8989
decodedBuf := decoded.RawBuffer()

sstable/block/compressor.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package block
2+
3+
import (
4+
"encoding/binary"
5+
6+
"github.com/cockroachdb/errors"
7+
"github.com/cockroachdb/pebble/internal/base"
8+
"github.com/golang/snappy"
9+
)
10+
11+
type Compressor interface {
12+
Compress(dst, src []byte) (CompressionIndicator, []byte)
13+
}
14+
15+
type noopCompressor struct{}
16+
type snappyCompressor struct{}
17+
type zstdCompressor struct{}
18+
19+
var _ Compressor = noopCompressor{}
20+
var _ Compressor = snappyCompressor{}
21+
var _ Compressor = zstdCompressor{}
22+
23+
func (noopCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
24+
panic("NoCompressionCompressor.Compress() should not be called.")
25+
}
26+
27+
func (snappyCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
28+
dst = dst[:cap(dst):cap(dst)]
29+
return SnappyCompressionIndicator, snappy.Encode(dst, src)
30+
}
31+
32+
func GetCompressor(c Compression) Compressor {
33+
switch c {
34+
case NoCompression:
35+
return noopCompressor{}
36+
case SnappyCompression:
37+
return snappyCompressor{}
38+
case ZstdCompression:
39+
return zstdCompressor{}
40+
default:
41+
panic("Invalid compression type.")
42+
}
43+
}
44+
45+
type Decompressor interface {
46+
// DecompressInto decompresses compressed into buf. The buf slice must have the
47+
// exact size as the decompressed value. Callers may use DecompressedLen to
48+
// determine the correct size.
49+
DecompressInto(buf, compressed []byte) error
50+
51+
// DecompressedLen returns the length of the provided block once decompressed,
52+
// allowing the caller to allocate a buffer exactly sized to the decompressed
53+
// payload.
54+
DecompressedLen(b []byte) (decompressedLen int, err error)
55+
}
56+
57+
type noopDecompressor struct{}
58+
type snappyDecompressor struct{}
59+
type zstdDecompressor struct{}
60+
61+
var _ Decompressor = noopDecompressor{}
62+
var _ Decompressor = snappyDecompressor{}
63+
var _ Decompressor = zstdDecompressor{}
64+
65+
func (noopDecompressor) DecompressInto(dst, src []byte) error {
66+
dst = dst[:len(src)]
67+
copy(dst, src)
68+
return nil
69+
}
70+
71+
func (noopDecompressor) DecompressedLen(b []byte) (decompressedLen int, err error) {
72+
return len(b), nil
73+
}
74+
75+
func (snappyDecompressor) DecompressInto(buf, compressed []byte) error {
76+
result, err := snappy.Decode(buf, compressed)
77+
if err != nil {
78+
return err
79+
}
80+
if len(result) != len(buf) || (len(result) > 0 && &result[0] != &buf[0]) {
81+
return base.CorruptionErrorf("pebble/table: decompressed into unexpected buffer: %p != %p",
82+
errors.Safe(result), errors.Safe(buf))
83+
}
84+
return nil
85+
}
86+
87+
func (snappyDecompressor) DecompressedLen(b []byte) (decompressedLen int, err error) {
88+
l, err := snappy.DecodedLen(b)
89+
return l, err
90+
}
91+
92+
func (zstdDecompressor) DecompressedLen(b []byte) (decompressedLen int, err error) {
93+
// This will also be used by zlib, bzip2 and lz4 to retrieve the decodedLen
94+
// if we implement these algorithms in the future.
95+
decodedLenU64, varIntLen := binary.Uvarint(b)
96+
if varIntLen <= 0 {
97+
return 0, base.CorruptionErrorf("pebble/table: compression block has invalid length")
98+
}
99+
return int(decodedLenU64), nil
100+
}
101+
102+
func GetDecompressor(c CompressionIndicator) Decompressor {
103+
switch c {
104+
case NoCompressionIndicator:
105+
return noopDecompressor{}
106+
case SnappyCompressionIndicator:
107+
return snappyDecompressor{}
108+
case ZstdCompressionIndicator:
109+
return zstdDecompressor{}
110+
default:
111+
panic("Invalid compression type.")
112+
}
113+
}

0 commit comments

Comments
 (0)