-
Notifications
You must be signed in to change notification settings - Fork 136
/
compressor.go
93 lines (79 loc) · 2.39 KB
/
compressor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package compress
import (
"bytes"
"compress/zlib"
"github.com/pkg/errors"
)
// Compressor is a fast, single threaded compressor.
// This type allows us to reuse buffers etc for performance
type Compressor interface {
// Compress compresses the byte array
Compress(b []byte) ([]byte, error)
}
// NoOpCompressor is a Compressor that does nothing. Useful for tests.
type NoOpCompressor struct{}
func (c *NoOpCompressor) Compress(b []byte) ([]byte, error) {
return b, nil
}
// ZlibCompressor compresses to Zlib, which for KB size payloads seems more (cpu) efficient than the newer formats
// such as zstd. The compressor will only compress if the msg is greater than minCompressSize
type ZlibCompressor struct {
buffer *bytes.Buffer
compressedWriter *zlib.Writer
unCompressedWriter *zlib.Writer
minCompressSize int
}
func NewZlibCompressor(minCompressSize int) (*ZlibCompressor, error) {
var b bytes.Buffer
compressedWriter, err := zlib.NewWriterLevel(&b, zlib.BestSpeed)
if err != nil {
return nil, errors.WithStack(err)
}
unCompressedWriter, err := zlib.NewWriterLevel(&b, zlib.NoCompression)
if err != nil {
return nil, errors.WithStack(err)
}
return &ZlibCompressor{
buffer: &b,
compressedWriter: compressedWriter,
unCompressedWriter: unCompressedWriter,
minCompressSize: minCompressSize,
}, nil
}
func (c *ZlibCompressor) Compress(b []byte) ([]byte, error) {
writer := c.unCompressedWriter
if len(b) > c.minCompressSize {
writer = c.compressedWriter
}
c.buffer.Reset()
writer.Reset(c.buffer)
_, err := writer.Write(b)
if err != nil {
return nil, err
}
// For some reason writer.Flush() doesn't work here
err = writer.Close()
if err != nil {
return nil, errors.WithStack(err)
}
compressed := make([]byte, len(c.buffer.Bytes()))
copy(compressed, c.buffer.Bytes())
return compressed, nil
}
// ThreadSafeZlibCompressor provides a thread safe compressor, at the cost of instantiating a new ZlibCompressor
// for each Compress call
type ThreadSafeZlibCompressor struct {
minCompressSize int
}
func NewThreadSafeZlibCompressor(minCompressSize int) *ThreadSafeZlibCompressor {
return &ThreadSafeZlibCompressor{
minCompressSize: minCompressSize,
}
}
func (c *ThreadSafeZlibCompressor) Compress(b []byte) ([]byte, error) {
compressor, err := NewZlibCompressor(c.minCompressSize)
if err != nil {
return nil, err
}
return compressor.Compress(b)
}