-
Notifications
You must be signed in to change notification settings - Fork 0
/
messagescanner.go
97 lines (76 loc) · 1.54 KB
/
messagescanner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package encoding
import (
"sync"
)
var messagePool = sync.Pool{
New: func() any {
return &Message{}
},
}
var messageScannerPool = sync.Pool{
New: func() any {
return NewMessageScanner(nil)
},
}
func AcquireMessageScanner(buf []byte) *MessageScanner {
s := messageScannerPool.Get().(*MessageScanner)
s.buf = buf
s.lastErr = nil
return s
}
func ReleaseMessageScanner(s *MessageScanner) {
s.buf = nil
s.lastErr = nil
messageScannerPool.Put(s)
}
func AcquireMessage() *Message {
return messagePool.Get().(*Message)
}
func ReleaseMessage(m *Message) {
m.name = nil
m.KV = nil
m.kvEntryCount = 0
messagePool.Put(m)
}
type Message struct {
name []byte
kvEntryCount byte
KV *KVScanner
}
func (m *Message) NameBytes() []byte {
return m.name
}
type MessageScanner struct {
buf []byte
lastErr error
}
func NewMessageScanner(b []byte) *MessageScanner {
return &MessageScanner{buf: b}
}
func (s *MessageScanner) Error() error {
return s.lastErr
}
func (s *MessageScanner) Next(m *Message) bool {
if m.KV != nil {
// if the scanner is still existing from a previous read
// forward the current slice to the correct position
s.buf = s.buf[len(s.buf)-m.KV.RemainingBuf():]
ReleaseKVScanner(m.KV)
m.KV = nil
}
if len(s.buf) == 0 {
return false
}
nameLen, n, err := Varint(s.buf)
if err != nil {
s.lastErr = err
return false
}
s.buf = s.buf[n:]
m.name = s.buf[:nameLen]
s.buf = s.buf[nameLen:]
m.kvEntryCount = s.buf[0]
s.buf = s.buf[1:]
m.KV = AcquireKVScanner(s.buf, int(m.kvEntryCount))
return true
}