-
Notifications
You must be signed in to change notification settings - Fork 1
/
msgpack.go
173 lines (139 loc) · 3.44 KB
/
msgpack.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Package msgpack provides a msgpack codec for encoding and decoding
package msgpack
import (
"fmt"
"github.com/klauspost/compress/s2"
"github.com/vmihailenco/msgpack/v5"
"github.com/beihai0xff/pudding/pkg/log"
)
const (
compressionThreshold = 64
noCompression = 0x0
s2Compression = 0x1
)
var defaultPack = &MsgPack{
marshal: msgpack.Marshal,
unmarshal: msgpack.Unmarshal,
compress: S2Compress,
decompress: S2Decompress,
}
type (
// MarshalFunc is a function that marshals a value into a byte array.
MarshalFunc func(any) ([]byte, error)
// UnmarshalFunc is a function that unmarshals a byte array into a value.
UnmarshalFunc func([]byte, any) error
// CompressFunc is a function that compresses a byte array.
CompressFunc func(data []byte) []byte
// DecompressFunc is a function that decompresses a byte array.
DecompressFunc func(b []byte) ([]byte, error)
// OptionFunc is a function that configures a MsgPack.
OptionFunc func(*MsgPack)
)
// MsgPack is a msgpack codec.
type MsgPack struct {
marshal MarshalFunc
unmarshal UnmarshalFunc
compress CompressFunc
decompress DecompressFunc
}
// New creates a new MsgPack.
func New(opts ...OptionFunc) *MsgPack {
pack := *defaultPack
for _, opt := range opts {
opt(defaultPack)
}
return &pack
}
/*
Functional Options Pattern
*/
// WithMarshalFunc sets the marshal function.
func WithMarshalFunc(fnm MarshalFunc, fnu UnmarshalFunc) OptionFunc {
return func(p *MsgPack) {
p.marshal = fnm
p.unmarshal = fnu
}
}
// WithCompressFunc sets the compress function.
func WithCompressFunc(fnc CompressFunc, fnd DecompressFunc) OptionFunc {
return func(p *MsgPack) {
p.compress = fnc
p.decompress = fnd
}
}
/*
Encode and decode like below:
*/
// Encode wrap for msgpack.Encode
func Encode(item any) ([]byte, error) {
return defaultPack.Encode(item)
}
// Decode wrap for msgpack.Decode
func Decode(b []byte, value any) error {
return defaultPack.Decode(b, value)
}
// Encode wraps msgpack.Marshal and compresses the result.
func (p *MsgPack) Encode(item any) ([]byte, error) {
switch value := item.(type) {
case nil:
return nil, nil
case []byte:
return value, nil
case string:
return []byte(value), nil
}
b, err := p.marshal(item)
if err != nil {
return nil, err
}
return p.compress(b), nil
}
// Decode a msgpack encoded byte array
func (p *MsgPack) Decode(b []byte, value any) error {
if len(b) == 0 {
return nil
}
var err error
if b, err = p.decompress(b); err != nil {
log.Errorf("Decompress failed: %v", err)
return err
}
return p.unmarshal(b, value)
}
/*
compress and decompress like below:
*/
// S2Compress compresses a byte array using s2.
func S2Compress(data []byte) []byte {
// if data length is less than compressionThreshold, skip compress.
if len(data) < compressionThreshold {
n := len(data) + 1
b := make([]byte, n)
copy(b, data)
b[len(b)-1] = noCompression
return b
}
n := s2.MaxEncodedLen(len(data)) + 1
b := make([]byte, n)
b = s2.Encode(b, data)
// use the last byte to store positive compression method
b = append(b, s2Compression)
return b
}
// S2Decompress decompresses a byte array using s2.
func S2Decompress(b []byte) ([]byte, error) {
switch c := b[len(b)-1]; c {
case noCompression:
b = b[:len(b)-1]
case s2Compression:
b = b[:len(b)-1]
var err error
b, err = s2.Decode(nil, b)
if err != nil {
return b, err
}
default:
return nil, fmt.Errorf("unknown compression method: %x", c)
}
return b, nil
}