-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
processor.go
220 lines (192 loc) · 6.12 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package processor
import (
"context"
"sync"
"github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface"
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
"github.com/DataDog/datadog-agent/pkg/logs/diagnostic"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/metrics"
"github.com/DataDog/datadog-agent/pkg/logs/sds"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
// UnstructuredProcessingMetricName collects how many rules are used on unstructured
// content for tailers capable of processing both unstructured and structured content.
const UnstructuredProcessingMetricName = "datadog.logs_agent.tailer.unstructured_processing"
// A Processor updates messages from an inputChan and pushes
// in an outputChan.
type Processor struct {
pipelineID int
inputChan chan *message.Message
outputChan chan *message.Message // strategy input
// ReconfigChan transports rules to use in order to reconfigure
// the processing rules of the SDS Scanner.
ReconfigChan chan sds.ReconfigureOrder
processingRules []*config.ProcessingRule
encoder Encoder
done chan struct{}
diagnosticMessageReceiver diagnostic.MessageReceiver
mu sync.Mutex
hostname hostnameinterface.Component
sds *sds.Scanner // configured through RC
}
// New returns an initialized Processor.
func New(inputChan, outputChan chan *message.Message, processingRules []*config.ProcessingRule, encoder Encoder,
diagnosticMessageReceiver diagnostic.MessageReceiver, hostname hostnameinterface.Component, pipelineID int) *Processor {
sdsScanner := sds.CreateScanner(pipelineID)
return &Processor{
pipelineID: pipelineID,
inputChan: inputChan,
outputChan: outputChan, // strategy input
ReconfigChan: make(chan sds.ReconfigureOrder),
processingRules: processingRules,
encoder: encoder,
done: make(chan struct{}),
sds: sdsScanner,
diagnosticMessageReceiver: diagnosticMessageReceiver,
hostname: hostname,
}
}
// Start starts the Processor.
func (p *Processor) Start() {
go p.run()
}
// Stop stops the Processor,
// this call blocks until inputChan is flushed
func (p *Processor) Stop() {
close(p.inputChan)
<-p.done
// once the processor mainloop is not running, it's safe
// to delete the sds scanner instance.
if p.sds != nil {
p.sds.Delete()
p.sds = nil
}
}
// Flush processes synchronously the messages that this processor has to process.
func (p *Processor) Flush(ctx context.Context) {
p.mu.Lock()
defer p.mu.Unlock()
for {
select {
case <-ctx.Done():
return
default:
if len(p.inputChan) == 0 {
return
}
msg := <-p.inputChan
p.processMessage(msg)
}
}
}
// run starts the processing of the inputChan
func (p *Processor) run() {
defer func() {
p.done <- struct{}{}
}()
for {
select {
case msg, ok := <-p.inputChan:
if !ok { // channel has been closed
return
}
p.processMessage(msg)
p.mu.Lock() // block here if we're trying to flush synchronously
//nolint:staticcheck
p.mu.Unlock()
case order := <-p.ReconfigChan:
p.mu.Lock()
if err := p.sds.Reconfigure(order); err != nil {
log.Errorf("Error while reconfiguring the SDS scanner: %v", err)
order.ResponseChan <- err
} else {
order.ResponseChan <- nil
}
p.mu.Unlock()
}
}
}
func (p *Processor) processMessage(msg *message.Message) {
metrics.LogsDecoded.Add(1)
metrics.TlmLogsDecoded.Inc()
if toSend := p.applyRedactingRules(msg); toSend {
metrics.LogsProcessed.Add(1)
metrics.TlmLogsProcessed.Inc()
// render the message
rendered, err := msg.Render()
if err != nil {
log.Error("can't render the msg", err)
return
}
msg.SetRendered(rendered)
// report this message to diagnostic receivers (e.g. `stream-logs` command)
p.diagnosticMessageReceiver.HandleMessage(msg, rendered, "")
// encode the message to its final format, it is done in-place
if err := p.encoder.Encode(msg, p.GetHostname(msg)); err != nil {
log.Error("unable to encode msg ", err)
return
}
p.outputChan <- msg
}
}
// applyRedactingRules returns given a message if we should process it or not,
// it applies the change directly on the Message content.
func (p *Processor) applyRedactingRules(msg *message.Message) bool {
var content []byte = msg.GetContent()
// Use the internal scrubbing implementation of the Agent
// ---------------------------
rules := append(p.processingRules, msg.Origin.LogSource.Config.ProcessingRules...)
for _, rule := range rules {
switch rule.Type {
case config.ExcludeAtMatch:
// if this message matches, we ignore it
if rule.Regex.Match(content) {
return false
}
case config.IncludeAtMatch:
// if this message doesn't match, we ignore it
if !rule.Regex.Match(content) {
return false
}
case config.MaskSequences:
content = rule.Regex.ReplaceAll(content, rule.Placeholder)
}
}
// Use the SDS implementation
// --------------------------
// Global SDS scanner, applied on all log sources
if p.sds.IsReady() {
mutated, evtProcessed, err := p.sds.Scan(content, msg)
if err != nil {
log.Error("while using SDS to scan the log:", err)
} else if mutated {
content = evtProcessed
}
}
msg.SetContent(content)
return true // we want to send this message
}
// GetHostname returns the hostname to applied the given log message
func (p *Processor) GetHostname(msg *message.Message) string {
if msg.Hostname != "" {
return msg.Hostname
}
if msg.Lambda != nil {
return msg.Lambda.ARN
}
if p.hostname == nil {
return "unknown"
}
hname, err := p.hostname.Get(context.TODO())
if err != nil {
// this scenario is not likely to happen since
// the agent cannot start without a hostname
hname = "unknown"
}
return hname
}