-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
decoder.go
144 lines (125 loc) · 3.89 KB
/
decoder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-2020 Datadog, Inc.
package decoder
import (
"bytes"
"github.com/DataDog/datadog-agent/pkg/logs/config"
"github.com/DataDog/datadog-agent/pkg/logs/parser"
)
// defaultContentLenLimit represents the max size for a line,
// if a line is bigger than this limit, it will be truncated.
const defaultContentLenLimit = 256 * 1000
// Input represents a chunk of line.
type Input struct {
content []byte
}
// NewInput returns a new input.
func NewInput(content []byte) *Input {
return &Input{
content: content,
}
}
// Output represents a structured line.
type Output struct {
Content []byte
Status string
RawDataLen int
Timestamp string
}
// NewOutput returns a new output.
func NewOutput(content []byte, status string, rawDataLen int, timestamp string) *Output {
return &Output{
Content: content,
Status: status,
RawDataLen: rawDataLen,
Timestamp: timestamp,
}
}
// Decoder splits raw data into lines and passes them to a lineHandler that emits outputs
type Decoder struct {
InputChan chan *Input
OutputChan chan *Output
matcher EndLineMatcher
lineBuffer *bytes.Buffer
lineHandler LineHandler
contentLenLimit int
}
// InitializeDecoder returns a properly initialized Decoder
func InitializeDecoder(source *config.LogSource, parser parser.Parser) *Decoder {
return NewDecoderWithEndLineMatcher(source, parser, &newLineMatcher{})
}
// NewDecoderWithEndLineMatcher initialize a decoder with given endline strategy.
func NewDecoderWithEndLineMatcher(source *config.LogSource, parser parser.Parser, matcher EndLineMatcher) *Decoder {
inputChan := make(chan *Input)
outputChan := make(chan *Output)
lineLimit := defaultContentLenLimit
var lineHandler LineHandler
for _, rule := range source.Config.ProcessingRules {
if rule.Type == config.MultiLine {
lineHandler = NewMultiLineHandler(outputChan, rule.Regex, defaultFlushTimeout, parser, lineLimit)
}
}
if lineHandler == nil {
lineHandler = NewSingleLineHandler(outputChan, parser, lineLimit)
}
return New(inputChan, outputChan, lineHandler, lineLimit, matcher)
}
// New returns an initialized Decoder
func New(InputChan chan *Input, OutputChan chan *Output, lineHandler LineHandler, contentLenLimit int, matcher EndLineMatcher) *Decoder {
var lineBuffer bytes.Buffer
return &Decoder{
InputChan: InputChan,
OutputChan: OutputChan,
lineBuffer: &lineBuffer,
lineHandler: lineHandler,
contentLenLimit: contentLenLimit,
matcher: matcher,
}
}
// Start starts the Decoder
func (d *Decoder) Start() {
d.lineHandler.Start()
go d.run()
}
// Stop stops the Decoder
func (d *Decoder) Stop() {
close(d.InputChan)
}
// run lets the Decoder handle data coming from InputChan
func (d *Decoder) run() {
for data := range d.InputChan {
d.decodeIncomingData(data.content)
}
// finish to stop decoder
d.lineHandler.Stop()
}
// decodeIncomingData splits raw data based on '\n', creates and processes new lines
func (d *Decoder) decodeIncomingData(inBuf []byte) {
i, j := 0, 0
n := len(inBuf)
maxj := d.contentLenLimit - d.lineBuffer.Len()
for ; j < n; j++ {
if j == maxj {
// send line because it is too long
d.lineBuffer.Write(inBuf[i:j])
d.sendLine()
i = j
maxj = i + d.contentLenLimit
} else if d.matcher.Match(d.lineBuffer.Bytes(), inBuf, i, j) {
d.lineBuffer.Write(inBuf[i:j])
d.sendLine()
i = j + 1 // skip the matching byte.
maxj = i + d.contentLenLimit
}
}
d.lineBuffer.Write(inBuf[i:j])
}
// sendLine copies content from lineBuffer which is passed to lineHandler
func (d *Decoder) sendLine() {
content := make([]byte, d.lineBuffer.Len())
copy(content, d.lineBuffer.Bytes())
d.lineBuffer.Reset()
d.lineHandler.Handle(content)
}