Skip to content

Commit 18864be

Browse files
committed
sstable: use zstd context to reduce compression allocations
Use zstd context to reduce zstd allocations.
1 parent 72cf350 commit 18864be

File tree

8 files changed

+108
-20
lines changed

8 files changed

+108
-20
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
module github.com/cockroachdb/pebble
22

33
require (
4-
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e
4+
github.com/DataDog/zstd v1.5.7
55
github.com/HdrHistogram/hdrhistogram-go v1.1.2
66
github.com/cespare/xxhash/v2 v2.2.0
77
github.com/cockroachdb/crlib v0.0.0-20241112164430-1264a2edc35b

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
44
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
55
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
66
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
7-
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e h1:ZIWapoIRN1VqT8GR8jAwb1Ie9GyehWjVcGh32Y2MznE=
8-
github.com/DataDog/zstd v1.5.6-0.20230824185856-869dae002e5e/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
7+
github.com/DataDog/zstd v1.5.7 h1:ybO8RBeh29qrxIhCA9E8gKY6xfONU9T6G6aP9DTKfLE=
8+
github.com/DataDog/zstd v1.5.7/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw=
99
github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20190129172621-c8b1d7a94ddf/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo=
1010
github.com/HdrHistogram/hdrhistogram-go v1.1.2 h1:5IcZpTvzydCQeHzK4Ef/D5rrSqwxob0t8PQPMybUNFM=
1111
github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=

sstable/block/compression.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ func (i CompressionIndicator) String() string {
115115
// payload.
116116
func DecompressedLen(algo CompressionIndicator, b []byte) (decompressedLen int, err error) {
117117
decompressor := GetDecompressor(algo)
118+
defer decompressor.Close()
118119
return decompressor.DecompressedLen(b)
119120
}
120121

@@ -123,6 +124,7 @@ func DecompressedLen(algo CompressionIndicator, b []byte) (decompressedLen int,
123124
// determine the correct size.
124125
func DecompressInto(algo CompressionIndicator, compressed []byte, buf []byte) error {
125126
decompressor := GetDecompressor(algo)
127+
defer decompressor.Close()
126128
err := decompressor.DecompressInto(buf, compressed)
127129
if err != nil {
128130
return base.MarkCorruptionError(err)
@@ -214,6 +216,7 @@ func CompressAndChecksum(
214216
algo := NoCompressionIndicator
215217
if compression != NoCompression {
216218
compressor := GetCompressor(compression)
219+
defer compressor.Close()
217220
algo, buf = compressor.Compress(buf, blockData)
218221
if len(buf) >= len(blockData)-len(blockData)/8 {
219222
algo = NoCompressionIndicator

sstable/block/compression_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func TestCompressionRoundtrip(t *testing.T) {
3636
// not sufficient, Compress should allocate one that is.
3737
compressedBuf := make([]byte, 1+rng.IntN(1<<10 /* 1 KiB */))
3838
compressor := GetCompressor(compression)
39+
defer compressor.Close()
3940
btyp, compressed := compressor.Compress(compressedBuf, payload)
4041
v, err := decompress(btyp, compressed)
4142
require.NoError(t, err)

sstable/block/compressor.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,38 @@ import (
1010

1111
type Compressor interface {
1212
Compress(dst, src []byte) (CompressionIndicator, []byte)
13+
14+
// Close must be called when the Compressor is no longer needed.
15+
// After Close is called, the Compressor must not be used again.
16+
Close()
1317
}
1418

1519
type noopCompressor struct{}
1620
type snappyCompressor struct{}
17-
type zstdCompressor struct{}
1821

1922
var _ Compressor = noopCompressor{}
2023
var _ Compressor = snappyCompressor{}
21-
var _ Compressor = zstdCompressor{}
2224

2325
func (noopCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
2426
panic("NoCompressionCompressor.Compress() should not be called.")
2527
}
28+
func (noopCompressor) Close() {}
2629

2730
func (snappyCompressor) Compress(dst, src []byte) (CompressionIndicator, []byte) {
2831
dst = dst[:cap(dst):cap(dst)]
2932
return SnappyCompressionIndicator, snappy.Encode(dst, src)
3033
}
3134

35+
func (snappyCompressor) Close() {}
36+
3237
func GetCompressor(c Compression) Compressor {
3338
switch c {
3439
case NoCompression:
3540
return noopCompressor{}
3641
case SnappyCompression:
3742
return snappyCompressor{}
3843
case ZstdCompression:
39-
return zstdCompressor{}
44+
return getZstdCompressor()
4045
default:
4146
panic("Invalid compression type.")
4247
}
@@ -52,15 +57,17 @@ type Decompressor interface {
5257
// allowing the caller to allocate a buffer exactly sized to the decompressed
5358
// payload.
5459
DecompressedLen(b []byte) (decompressedLen int, err error)
60+
61+
// Close must be called when the Decompressor is no longer needed.
62+
// After Close is called, the Decompressor must not be used again.
63+
Close()
5564
}
5665

5766
type noopDecompressor struct{}
5867
type snappyDecompressor struct{}
59-
type zstdDecompressor struct{}
6068

6169
var _ Decompressor = noopDecompressor{}
6270
var _ Decompressor = snappyDecompressor{}
63-
var _ Decompressor = zstdDecompressor{}
6471

6572
func (noopDecompressor) DecompressInto(dst, src []byte) error {
6673
dst = dst[:len(src)]
@@ -72,6 +79,8 @@ func (noopDecompressor) DecompressedLen(b []byte) (decompressedLen int, err erro
7279
return len(b), nil
7380
}
7481

82+
func (noopDecompressor) Close() {}
83+
7584
func (snappyDecompressor) DecompressInto(buf, compressed []byte) error {
7685
result, err := snappy.Decode(buf, compressed)
7786
if err != nil {
@@ -89,6 +98,8 @@ func (snappyDecompressor) DecompressedLen(b []byte) (decompressedLen int, err er
8998
return l, err
9099
}
91100

101+
func (snappyDecompressor) Close() {}
102+
92103
func (zstdDecompressor) DecompressedLen(b []byte) (decompressedLen int, err error) {
93104
// This will also be used by zlib, bzip2 and lz4 to retrieve the decodedLen
94105
// if we implement these algorithms in the future.
@@ -106,7 +117,7 @@ func GetDecompressor(c CompressionIndicator) Decompressor {
106117
case SnappyCompressionIndicator:
107118
return snappyDecompressor{}
108119
case ZstdCompressionIndicator:
109-
return zstdDecompressor{}
120+
return getZstdDecompressor()
110121
default:
111122
panic("Invalid compression type.")
112123
}

sstable/block/compressor_cgo.go

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,25 @@
33
package block
44

55
import (
6-
"bytes"
76
"encoding/binary"
7+
"sync"
88

99
"github.com/DataDog/zstd"
1010
"github.com/cockroachdb/errors"
1111
)
1212

13+
type zstdCompressor struct {
14+
ctx zstd.Ctx
15+
}
16+
17+
var _ Compressor = (*zstdCompressor)(nil)
18+
19+
var zstdCompressorPool = sync.Pool{
20+
New: func() any {
21+
return &zstdCompressor{ctx: zstd.NewCtx()}
22+
},
23+
}
24+
1325
// UseStandardZstdLib indicates whether the zstd implementation is a port of the
1426
// official one in the facebook/zstd repository.
1527
//
@@ -26,21 +38,48 @@ const UseStandardZstdLib = true
2638
// is sufficient. The subslice `compressedBuf[:varIntLen]` should already encode
2739
// the length of `b` before calling Compress. It returns the encoded byte
2840
// slice, including the `compressedBuf[:varIntLen]` prefix.
29-
func (zstdCompressor) Compress(compressedBuf []byte, b []byte) (CompressionIndicator, []byte) {
41+
func (z *zstdCompressor) Compress(compressedBuf []byte, b []byte) (CompressionIndicator, []byte) {
3042
if len(compressedBuf) < binary.MaxVarintLen64 {
3143
compressedBuf = append(compressedBuf, make([]byte, binary.MaxVarintLen64-len(compressedBuf))...)
3244
}
45+
46+
// Get the bound and allocate the proper amount of memory instead of relying on
47+
// Datadog/zstd to do it for us. This allows us to avoid memcopying data around
48+
// for the varIntLen prefix.
49+
bound := zstd.CompressBound(len(b))
50+
if cap(compressedBuf) < binary.MaxVarintLen64+bound {
51+
compressedBuf = make([]byte, binary.MaxVarintLen64, binary.MaxVarintLen64+bound)
52+
}
53+
3354
varIntLen := binary.PutUvarint(compressedBuf, uint64(len(b)))
34-
buf := bytes.NewBuffer(compressedBuf[:varIntLen])
35-
writer := zstd.NewWriterLevel(buf, 3)
36-
writer.Write(b)
37-
writer.Close()
38-
return ZstdCompressionIndicator, buf.Bytes()
55+
result, err := z.ctx.CompressLevel(compressedBuf[varIntLen:varIntLen+bound], b, 3)
56+
if err != nil {
57+
panic("Error while compressing using Zstd.")
58+
}
59+
if &result[0] != &compressedBuf[varIntLen] {
60+
panic("Allocated a new buffer despite checking CompressBound.")
61+
}
62+
63+
return ZstdCompressionIndicator, compressedBuf[:varIntLen+len(result)]
3964
}
4065

66+
func (z *zstdCompressor) Close() {
67+
zstdCompressorPool.Put(z)
68+
}
69+
70+
func getZstdCompressor() *zstdCompressor {
71+
return zstdCompressorPool.Get().(*zstdCompressor)
72+
}
73+
74+
type zstdDecompressor struct {
75+
ctx zstd.Ctx
76+
}
77+
78+
var _ Decompressor = (*zstdDecompressor)(nil)
79+
4180
// DecompressInto decompresses src with the Zstandard algorithm. The destination
42-
// buffer must already be sufficiently sized, otherwise Decompress may error.
43-
func (zstdDecompressor) DecompressInto(dst, src []byte) error {
81+
// buffer must already be sufficiently sized, otherwise DecompressInto may error.
82+
func (z *zstdDecompressor) DecompressInto(dst, src []byte) error {
4483
// The payload is prefixed with a varint encoding the length of
4584
// the decompressed block.
4685
_, prefixLen := binary.Uvarint(src)
@@ -51,9 +90,23 @@ func (zstdDecompressor) DecompressInto(dst, src []byte) error {
5190
if len(dst) == 0 {
5291
return errors.Errorf("decodeZstd: empty dst buffer")
5392
}
54-
_, err := zstd.DecompressInto(dst, src)
93+
_, err := z.ctx.DecompressInto(dst, src)
5594
if err != nil {
5695
return err
5796
}
5897
return nil
5998
}
99+
100+
func (z *zstdDecompressor) Close() {
101+
zstdDecompressorPool.Put(z)
102+
}
103+
104+
var zstdDecompressorPool = sync.Pool{
105+
New: func() any {
106+
return &zstdDecompressor{ctx: zstd.NewCtx()}
107+
},
108+
}
109+
110+
func getZstdDecompressor() *zstdDecompressor {
111+
return zstdDecompressorPool.Get().(*zstdDecompressor)
112+
}

sstable/block/compressor_nocgo.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ import (
1414
"github.com/klauspost/compress/zstd"
1515
)
1616

17+
type zstdCompressor struct{}
18+
19+
var _ Compressor = zstdCompressor{}
20+
1721
// UseStandardZstdLib indicates whether the zstd implementation is a port of the
1822
// official one in the facebook/zstd repository.
1923
//
@@ -40,8 +44,18 @@ func (zstdCompressor) Compress(compressedBuf, b []byte) (CompressionIndicator, [
4044
return ZstdCompressionIndicator, encoder.EncodeAll(b, compressedBuf[:varIntLen])
4145
}
4246

47+
func (zstdCompressor) Close() {}
48+
49+
func getZstdCompressor() zstdCompressor {
50+
return zstdCompressor{}
51+
}
52+
53+
type zstdDecompressor struct{}
54+
55+
var _ Decompressor = zstdDecompressor{}
56+
4357
// Decompress decompresses src with the Zstandard algorithm. The destination
44-
// buffer must already be sufficiently sized, otherwise Decompress may error.
58+
// buffer must already be sufficiently sized, otherwise DecompressInto may error.
4559
func (zstdDecompressor) DecompressInto(dst, src []byte) error {
4660
// The payload is prefixed with a varint encoding the length of
4761
// the decompressed block.
@@ -59,3 +73,9 @@ func (zstdDecompressor) DecompressInto(dst, src []byte) error {
5973
}
6074
return nil
6175
}
76+
77+
func (zstdDecompressor) Close() {}
78+
79+
func getZstdDecompressor() zstdDecompressor {
80+
return zstdDecompressor{}
81+
}
8 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)