forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 4
/
multiline.go
311 lines (263 loc) · 7.43 KB
/
multiline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
package reader
import (
"errors"
"fmt"
"time"
"github.com/elastic/beats/libbeat/common/match"
)
// MultiLine reader combining multiple line events into one multi-line event.
//
// Lines to be combined are matched by some configurable predicate using
// regular expression.
//
// The maximum number of bytes and lines to be returned is fully configurable.
// Even if limits are reached subsequent lines are matched, until event is
// fully finished.
//
// Errors will force the multiline reader to return the currently active
// multiline event first and finally return the actual error on next call to Next.
type Multiline struct {
reader Reader
pred matcher
maxBytes int // bytes stored in content
maxLines int
separator []byte
last []byte
numLines int
err error // last seen error
state func(*Multiline) (Message, error)
message Message
}
const (
// Default maximum number of lines to return in one multi-line event
defaultMaxLines = 500
// Default timeout to finish a multi-line event.
defaultMultilineTimeout = 5 * time.Second
)
// Matcher represents the predicate comparing any two lines
// to find start and end of multiline events in stream of line events.
type matcher func(last, current []byte) bool
var (
sigMultilineTimeout = errors.New("multline timeout")
)
// NewMultiline creates a new multi-line reader combining stream of
// line events into stream of multi-line events.
func NewMultiline(
reader Reader,
separator string,
maxBytes int,
config *MultilineConfig,
) (*Multiline, error) {
types := map[string]func(match.Matcher) (matcher, error){
"before": beforeMatcher,
"after": afterMatcher,
}
matcherType, ok := types[config.Match]
if !ok {
return nil, fmt.Errorf("unknown matcher type: %s", config.Match)
}
matcher, err := matcherType(config.Pattern)
if err != nil {
return nil, err
}
if config.Negate {
matcher = negatedMatcher(matcher)
}
maxLines := defaultMaxLines
if config.MaxLines != nil {
maxLines = *config.MaxLines
}
timeout := defaultMultilineTimeout
if config.Timeout != nil {
timeout = *config.Timeout
if timeout < 0 {
return nil, fmt.Errorf("timeout %v must not be negative", config.Timeout)
}
}
if timeout > 0 {
reader = NewTimeout(reader, sigMultilineTimeout, timeout)
}
mlr := &Multiline{
reader: reader,
pred: matcher,
state: (*Multiline).readFirst,
maxBytes: maxBytes,
maxLines: maxLines,
separator: []byte(separator),
message: Message{},
}
return mlr, nil
}
// Next returns next multi-line event.
func (mlr *Multiline) Next() (Message, error) {
return mlr.state(mlr)
}
func (mlr *Multiline) readFirst() (Message, error) {
for {
message, err := mlr.reader.Next()
if err != nil {
// no lines buffered -> ignore timeout
if err == sigMultilineTimeout {
continue
}
// pass error to caller (next layer) for handling
return message, err
}
if message.Bytes == 0 {
continue
}
// Start new multiline event
mlr.clear()
mlr.load(message)
mlr.setState((*Multiline).readNext)
return mlr.readNext()
}
}
func (mlr *Multiline) readNext() (Message, error) {
for {
message, err := mlr.reader.Next()
if err != nil {
// handle multiline timeout signal
if err == sigMultilineTimeout {
// no lines buffered -> ignore timeout
if mlr.numLines == 0 {
continue
}
// return collected multiline event and
// empty buffer for new multiline event
msg := mlr.finalize()
mlr.resetState()
return msg, nil
}
// handle error without any bytes returned from reader
if message.Bytes == 0 {
// no lines buffered -> return error
if mlr.numLines == 0 {
return Message{}, err
}
// lines buffered, return multiline and error on next read
msg := mlr.finalize()
mlr.err = err
mlr.setState((*Multiline).readFailed)
return msg, nil
}
// handle error with some content being returned by reader and
// line matching multiline criteria or no multiline started yet
if mlr.message.Bytes == 0 || mlr.pred(mlr.last, message.Content) {
mlr.addLine(message)
// return multiline and error on next read
msg := mlr.finalize()
mlr.err = err
mlr.setState((*Multiline).readFailed)
return msg, nil
}
// no match, return current multline and retry with current line on next
// call to readNext awaiting the error being reproduced (or resolved)
// in next call to Next
msg := mlr.finalize()
mlr.load(message)
return msg, nil
}
// if predicate does not match current multiline -> return multiline event
if mlr.message.Bytes > 0 && !mlr.pred(mlr.last, message.Content) {
msg := mlr.finalize()
mlr.load(message)
return msg, nil
}
// add line to current multiline event
mlr.addLine(message)
}
}
// readFailed returns empty message and error and resets line reader
func (mlr *Multiline) readFailed() (Message, error) {
err := mlr.err
mlr.err = nil
mlr.resetState()
return Message{}, err
}
// load loads the reader with the given message. It is recommend to either
// run clear or finalize before.
func (mlr *Multiline) load(m Message) {
mlr.addLine(m)
// Timestamp of first message is taken as overall timestamp
mlr.message.Ts = m.Ts
mlr.message.AddFields(m.Fields)
}
// clearBuffer resets the reader buffer variables
func (mlr *Multiline) clear() {
mlr.message = Message{}
mlr.last = nil
mlr.numLines = 0
mlr.err = nil
}
// finalize writes the existing content into the returned message and resets all reader variables.
func (mlr *Multiline) finalize() Message {
// Copy message from existing content
msg := mlr.message
mlr.clear()
return msg
}
// addLine adds the read content to the message
// The content is only added if maxBytes and maxLines is not exceed. In case one of the
// two is exceeded, addLine keeps processing but does not add it to the content.
func (mlr *Multiline) addLine(m Message) {
if m.Bytes <= 0 {
return
}
sz := len(mlr.message.Content)
addSeparator := len(mlr.message.Content) > 0 && len(mlr.separator) > 0
if addSeparator {
sz += len(mlr.separator)
}
space := mlr.maxBytes - sz
maxBytesReached := (mlr.maxBytes <= 0 || space > 0)
maxLinesReached := (mlr.maxLines <= 0 || mlr.numLines < mlr.maxLines)
if maxBytesReached && maxLinesReached {
if space < 0 || space > len(m.Content) {
space = len(m.Content)
}
tmp := mlr.message.Content
if addSeparator {
tmp = append(tmp, mlr.separator...)
}
mlr.message.Content = append(tmp, m.Content[:space]...)
mlr.numLines++
}
mlr.last = m.Content
mlr.message.Bytes += m.Bytes
mlr.message.AddFields(m.Fields)
}
// resetState sets state of the reader to readFirst
func (mlr *Multiline) resetState() {
mlr.setState((*Multiline).readFirst)
}
// setState sets state to the given function
func (mlr *Multiline) setState(next func(mlr *Multiline) (Message, error)) {
mlr.state = next
}
// matchers
func afterMatcher(pat match.Matcher) (matcher, error) {
return genPatternMatcher(pat, func(last, current []byte) []byte {
return current
})
}
func beforeMatcher(pat match.Matcher) (matcher, error) {
return genPatternMatcher(pat, func(last, current []byte) []byte {
return last
})
}
func negatedMatcher(m matcher) matcher {
return func(last, current []byte) bool {
return !m(last, current)
}
}
func genPatternMatcher(
pat match.Matcher,
sel func(last, current []byte) []byte,
) (matcher, error) {
matcher := func(last, current []byte) bool {
line := sel(last, current)
return pat.Match(line)
}
return matcher, nil
}