forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
parser.go
263 lines (214 loc) · 5.42 KB
/
parser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package cassandra
import (
"errors"
"time"
"github.com/elastic/beats/libbeat/common/streambuf"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/packetbeat/protos/applayer"
gocql "github.com/elastic/beats/packetbeat/protos/cassandra/internal/gocql"
)
type parser struct {
buf streambuf.Buffer
config *parserConfig
framer *gocql.Framer
message *message
onMessage func(m *message) error
}
type parserConfig struct {
maxBytes int
compressor gocql.Compressor
ignoredOps map[gocql.FrameOp]bool
}
// check whether this ops is enabled or not
func (p *parser) CheckFrameOpsIgnored() bool {
if p.config.ignoredOps != nil && len(p.config.ignoredOps) > 0 {
//default map value is false
v := p.config.ignoredOps[p.framer.Header.Op]
if v {
return true
}
}
return false
}
type message struct {
applayer.Message
// indicator for parsed message being complete or requires more messages
// (if false) to be merged to generate full message.
isComplete bool
failed bool
ignored bool
data map[string]interface{}
header map[string]interface{}
// list element use by 'transactions' for correlation
next *message
transactionTimeout time.Duration
results transactions
}
// Error code if stream exceeds max allowed size on append.
var (
errStreamTooLarge = errors.New("Stream data too large")
isDebug = false
)
func (p *parser) init(
cfg *parserConfig,
onMessage func(*message) error,
) {
*p = parser{
buf: streambuf.Buffer{},
config: cfg,
onMessage: onMessage,
}
isDebug = logp.IsDebug("cassandra")
}
func (p *parser) append(data []byte) error {
_, err := p.buf.Write(data)
if err != nil {
return err
}
if p.config.maxBytes > 0 && p.buf.Total() > p.config.maxBytes {
return errStreamTooLarge
}
return nil
}
func (p *parser) feed(ts time.Time, data []byte) error {
if err := p.append(data); err != nil {
return err
}
for p.buf.Total() > 0 {
if p.message == nil {
// allocate new message object to be used by parser with current timestamp
p.message = p.newMessage(ts)
}
msg, err := p.parse()
if err != nil {
return err
}
if msg == nil {
break // wait for more data
}
// reset buffer and message -> handle next message in buffer
p.buf.Reset()
p.message = nil
// call message handler callback
if err := p.onMessage(msg); err != nil {
return err
}
}
return nil
}
func (p *parser) newMessage(ts time.Time) *message {
return &message{
Message: applayer.Message{
Ts: ts,
},
}
}
func (p *parser) parserBody() (bool, error) {
headLen := p.framer.Header.HeadLength
bdyLen := p.framer.Header.BodyLength
if bdyLen <= 0 {
return true, nil
}
//let's wait for enough buf
debugf("bodyLength: %d", bdyLen)
if !p.buf.Avail(bdyLen) {
if isDebug {
debugf("buf not enough for body, waiting for more, return")
}
return false, nil
}
//check if the ops already ignored
if p.message.ignored {
if isDebug {
debugf("message marked to be ignored, let's do this")
}
p.buf.Collect(bdyLen)
} else {
// start to parse body
data, err := p.framer.ReadFrame()
if err != nil {
// if the frame parsed failed, should ignore the whole message
p.framer = nil
return false, err
}
// dealing with un-parsed content
frameParsedLength := p.buf.BufferConsumed()
// collect leftover
unParsedSize := bdyLen + headLen - frameParsedLength
if unParsedSize > 0 {
if !p.buf.Avail(unParsedSize) {
err := errors.New("should be enough bytes for cleanup,but not enough")
logp.Err("Finishing frame failed with: %v", err)
return false, err
}
p.buf.Collect(unParsedSize)
}
p.message.data = data
}
finalCollectedFrameLength := p.buf.BufferConsumed()
if finalCollectedFrameLength-headLen != bdyLen {
logp.Err("body_length:%d, head_length:%d, all_consumed:%d",
bdyLen, headLen, finalCollectedFrameLength)
return false, errors.New("data messed while parse frame body")
}
return true, nil
}
func (p *parser) parse() (*message, error) {
// if p.frame is nil then create a new framer, or continue to process the last message
if p.framer == nil {
if isDebug {
debugf("start new framer")
}
p.framer = gocql.NewFramer(&p.buf, p.config.compressor)
}
// check if the frame header were parsed or not
if p.framer.Header == nil {
if isDebug {
debugf("start to parse header")
}
if !p.buf.Avail(9) {
debugf("not enough head bytes, ignore")
return nil, nil
}
_, err := p.framer.ReadHeader()
if err != nil {
logp.Err("%v", err)
p.framer = nil
return nil, err
}
}
//check if the ops need to be ignored
if p.CheckFrameOpsIgnored() {
// as we already ignore the content, we now mark the result is ignored
p.message.ignored = true
if isDebug {
debugf("Ops: %s was marked to be ignored, ignoring, request:%v", p.framer.Header.Op.String(), p.framer.Header.Version.IsRequest())
}
}
msg := p.message
finished, err := p.parserBody()
if err != nil {
return nil, err
}
//ignore and wait for more data
if !finished {
return nil, nil
}
dir := applayer.NetOriginalDirection
isRequest := true
if p.framer.Header.Version.IsResponse() {
dir = applayer.NetReverseDirection
isRequest = false
}
msg.Size = uint64(p.buf.BufferConsumed())
msg.IsRequest = isRequest
msg.Direction = dir
msg.header = p.framer.Header.ToMap()
if msg.IsRequest {
p.message.results.requests.append(msg)
} else {
p.message.results.responses.append(msg)
}
p.framer = nil
return msg, nil
}