forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 4
/
parse.go
173 lines (144 loc) · 3.73 KB
/
parse.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package memcache
// Generic memcache parser types and helper functions for use by binary and text parser protocol parsers.
import (
"time"
"github.com/elastic/beats/libbeat/common/streambuf"
)
const (
codeSpace byte = ' '
codeTab = '\t'
)
type parserConfig struct {
maxValues int
maxBytesPerValue int
parseUnkown bool
}
type parser struct {
state parserState
message *message
config *parserConfig
}
type parserState uint8
const (
parseStateCommand parserState = iota
parseStateTextCommand
parseStateBinaryCommand
parseStateData
parseStateDataBinary
parseStateIncompleteData
parseStateIncompleteDataBinary
parseStateFailing
)
type parserStateFn func(parser *parser, buf *streambuf.Buffer) parseResult
type argParser func(parser *parser, hdr, buf *streambuf.Buffer) error
type parseResult struct {
err error
msg *message
}
// module init
func init() {
// link parseCommand (break compile time initialization loop check)
parseCommand = doParseCommand
}
func newParser(config *parserConfig) *parser {
var p parser
p.init(config)
return &p
}
func (p *parser) init(config *parserConfig) {
p.state = parseStateCommand
p.message = nil
p.config = config
}
func (p *parser) reset() {
debug("parser(%p) reset", p)
p.init(p.config)
}
func (p *parser) parse(buf *streambuf.Buffer, ts time.Time) (*message, error) {
if p.message == nil {
p.message = newMessage(ts)
}
res := p.dispatch(p.state, buf)
return res.msg, res.err
}
func (p *parser) dispatch(state parserState, buf *streambuf.Buffer) parseResult {
var f parserStateFn
switch state {
case parseStateCommand:
f = parseCommand
case parseStateTextCommand:
f = parseTextCommand
case parseStateBinaryCommand:
f = parseBinaryCommand
case parseStateData:
f = parseData
case parseStateIncompleteData:
f = parseData
case parseStateDataBinary:
f = parseDataBinary
case parseStateIncompleteDataBinary:
f = parseDataBinary
case parseStateFailing:
f = parseFailing
}
return f(p, buf)
}
func (p *parser) needMore() parseResult {
return parseResult{nil, nil}
}
func (p *parser) yield(nbytes int) parseResult {
p.state = parseStateCommand
msg := p.message
msg.Size = uint64(nbytes - int(msg.bytesLost))
p.message = nil
debug("yield(%p) memcache message type %v", p, msg.command.code)
return parseResult{nil, msg}
}
func (p *parser) yieldNoData(buf *streambuf.Buffer) parseResult {
return p.yield(buf.BufferConsumed())
}
func (p *parser) failing(err error) parseResult {
p.state = parseStateFailing
return parseResult{err, nil}
}
func (p *parser) contWith(buf *streambuf.Buffer, state parserState) parseResult {
p.state = state
return p.dispatch(state, buf)
}
func (p *parser) contWithShallow(
buf *streambuf.Buffer,
fn parserStateFn,
) parseResult {
return fn(p, buf)
}
func (p *parser) appendMessageData(data []byte) {
msg := p.message
if p.config.maxValues != 0 {
msg.data = memcacheData{data}
if len(msg.data.data) > p.config.maxBytesPerValue {
msg.data.data = msg.data.data[0:p.config.maxBytesPerValue]
}
msg.values = append(msg.values, msg.data)
}
msg.countValues++
}
func parseFailing(parser *parser, buf *streambuf.Buffer) parseResult {
return parser.failing(errParserCaughtInError)
}
// required to break initialization loop warning
var parseCommand parserStateFn
func doParseCommand(parser *parser, buf *streambuf.Buffer) parseResult {
// check if binary + text command and dispatch
if !buf.Avail(2) {
return parser.needMore()
}
magic := buf.Bytes()[0]
isBinary := magic == memcacheMagicRequest || magic == memcacheMagicResponse
if isBinary {
return parser.contWith(buf, parseStateBinaryCommand)
}
return parser.contWith(buf, parseStateTextCommand)
}
func argparseNoop(p *parser, h, b *streambuf.Buffer) error {
return nil
}