forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 9
/
pipeline.go
408 lines (334 loc) · 10.2 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
// Package pipeline combines all publisher functionality (processors, queue,
// outputs) to create instances of complete publisher pipelines, beats can
// connect to publish events to.
package pipeline
import (
"errors"
"sync"
"time"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/atomic"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/queue"
)
// Pipeline implementation providint all beats publisher functionality.
// The pipeline consists of clients, processors, a central queue, an output
// controller and the actual outputs.
// The queue implementing the queue.Queue interface is the most entral entity
// to the pipeline, providing support for pushung, batching and pulling events.
// The pipeline adds different ACKing strategies and wait close support on top
// of the queue. For handling ACKs, the pipeline keeps track of filtered out events,
// to be ACKed to the client in correct order.
// The output controller configures a (potentially reloadable) set of load
// balanced output clients. Events will be pulled from the queue and pushed to
// the output clients using a shared work queue for the active outputs.Group.
// Processors in the pipeline are executed in the clients go-routine, before
// entering the queue. No filtering/processing will occur on the output side.
type Pipeline struct {
beatInfo beat.Info
logger *logp.Logger
queue queue.Queue
output *outputController
observer observer
eventer pipelineEventer
// wait close support
waitCloseMode WaitCloseMode
waitCloseTimeout time.Duration
waitCloser *waitCloser
// pipeline ack
ackMode pipelineACKMode
ackActive atomic.Bool
ackDone chan struct{}
ackBuilder ackBuilder
eventSema *sema
processors pipelineProcessors
}
type pipelineProcessors struct {
// The pipeline its processor settings for
// constructing the clients complete processor
// pipeline on connect.
beatsMeta common.MapStr
fields common.MapStr
tags []string
processors beat.Processor
disabled bool // disabled is set if outputs have been disabled via CLI
alwaysCopy bool
}
// Settings is used to pass additional settings to a newly created pipeline instance.
type Settings struct {
// WaitClose sets the maximum duration to block when clients or pipeline itself is closed.
// When and how WaitClose is applied depends on WaitCloseMode.
WaitClose time.Duration
WaitCloseMode WaitCloseMode
Annotations Annotations
Processors *processors.Processors
Disabled bool
}
// Annotations configures additional metadata to be adde to every single event
// being published. The meta data will be added before executing the configured
// processors, so all processors configured with the pipeline or client will see
// the same/complete event.
type Annotations struct {
Beat common.MapStr
Event common.EventMetadata
}
// WaitCloseMode enumerates the possible behaviors of WaitClose in a pipeline.
type WaitCloseMode uint8
const (
// NoWaitOnClose disable wait close in the pipeline. Clients can still
// selectively enable WaitClose when connecting to the pipeline.
NoWaitOnClose WaitCloseMode = iota
// WaitOnPipelineClose applies WaitClose to the pipeline itself, waiting for outputs
// to ACK any outstanding events. This is independent of Clients asking for
// ACK and/or WaitClose. Clients can still optionally configure WaitClose themselves.
WaitOnPipelineClose
// WaitOnClientClose applies WaitClose timeout to each client connecting to
// the pipeline. Clients are still allowed to overwrite WaitClose with a timeout > 0s.
WaitOnClientClose
)
type pipelineEventer struct {
mutex sync.Mutex
modifyable bool
observer queueObserver
waitClose *waitCloser
cb *pipelineEventCB
}
type waitCloser struct {
// keep track of total number of active events (minus dropped by processors)
events sync.WaitGroup
}
type queueFactory func(queue.Eventer) (queue.Queue, error)
// New create a new Pipeline instance from a queue instance and a set of outputs.
// The new pipeline will take ownership of queue and outputs. On Close, the
// queue and outputs will be closed.
func New(
beat beat.Info,
metrics *monitoring.Registry,
queueFactory queueFactory,
out outputs.Group,
settings Settings,
) (*Pipeline, error) {
var err error
log := defaultLogger
annotations := settings.Annotations
processors := settings.Processors
disabledOutput := settings.Disabled
p := &Pipeline{
beatInfo: beat,
logger: log,
observer: nilObserver,
waitCloseMode: settings.WaitCloseMode,
waitCloseTimeout: settings.WaitClose,
processors: makePipelineProcessors(annotations, processors, disabledOutput),
}
p.ackBuilder = &pipelineEmptyACK{p}
p.ackActive = atomic.MakeBool(true)
if metrics != nil {
p.observer = newMetricsObserver(metrics)
}
p.eventer.observer = p.observer
p.eventer.modifyable = true
if settings.WaitCloseMode == WaitOnPipelineClose && settings.WaitClose > 0 {
p.waitCloser = &waitCloser{}
// waitCloser decrements counter on queue ACK (not per client)
p.eventer.waitClose = p.waitCloser
}
p.queue, err = queueFactory(&p.eventer)
if err != nil {
return nil, err
}
p.eventSema = newSema(p.queue.BufferConfig().Events)
p.output = newOutputController(log, p.observer, p.queue)
p.output.Set(out)
return p, nil
}
// SetACKHandler sets a global ACK handler on all events published to the pipeline.
// SetACKHandler must be called before any connection is made.
func (p *Pipeline) SetACKHandler(handler beat.PipelineACKHandler) error {
p.eventer.mutex.Lock()
defer p.eventer.mutex.Unlock()
if !p.eventer.modifyable {
return errors.New("can not set ack handler on already active pipeline")
}
// TODO: check only one type being configured
cb, err := newPipelineEventCB(handler)
if err != nil {
return err
}
if cb == nil {
p.ackBuilder = &pipelineEmptyACK{p}
p.eventer.cb = nil
return nil
}
p.eventer.cb = cb
if cb.mode == countACKMode {
p.ackBuilder = &pipelineCountACK{
pipeline: p,
cb: cb.onCounts,
}
} else {
p.ackBuilder = &pipelineEventsACK{
pipeline: p,
cb: cb.onEvents,
}
}
return nil
}
// Close stops the pipeline, outputs and queue.
// If WaitClose with WaitOnPipelineClose mode is configured, Close will block
// for a duration of WaitClose, if there are still active events in the pipeline.
// Note: clients must be closed before calling Close.
func (p *Pipeline) Close() error {
log := p.logger
log.Debug("close pipeline")
if p.waitCloser != nil {
ch := make(chan struct{})
go func() {
p.waitCloser.wait()
ch <- struct{}{}
}()
select {
case <-ch:
// all events have been ACKed
case <-time.After(p.waitCloseTimeout):
// timeout -> close pipeline with pending events
}
}
// TODO: close/disconnect still active clients
// close output before shutting down queue
p.output.Close()
// shutdown queue
err := p.queue.Close()
if err != nil {
log.Error("pipeline queue shutdown error: ", err)
}
p.observer.cleanup()
return nil
}
// Connect creates a new client with default settings
func (p *Pipeline) Connect() (beat.Client, error) {
return p.ConnectWith(beat.ClientConfig{})
}
// ConnectWith create a new Client for publishing events to the pipeline.
// The client behavior on close and ACK handling can be configured by setting
// the appropriate fields in the passed ClientConfig.
func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
var (
canDrop bool
dropOnCancel bool
eventFlags publisher.EventFlags
)
err := validateClientConfig(&cfg)
if err != nil {
return nil, err
}
p.eventer.mutex.Lock()
p.eventer.modifyable = false
p.eventer.mutex.Unlock()
switch cfg.PublishMode {
case beat.GuaranteedSend:
eventFlags = publisher.GuaranteedSend
dropOnCancel = true
case beat.DropIfFull:
canDrop = true
}
waitClose := cfg.WaitClose
reportEvents := p.waitCloser != nil
switch p.waitCloseMode {
case NoWaitOnClose:
case WaitOnClientClose:
if waitClose <= 0 {
waitClose = p.waitCloseTimeout
}
}
processors := newProcessorPipeline(p.beatInfo, p.processors, cfg)
acker := p.makeACKer(processors != nil, &cfg, waitClose)
producerCfg := queue.ProducerConfig{
// Cancel events from queue if acker is configured
// and no pipeline-wide ACK handler is registered.
DropOnCancel: dropOnCancel && acker != nil && p.eventer.cb == nil,
}
if reportEvents || cfg.Events != nil {
producerCfg.OnDrop = func(event beat.Event) {
if cfg.Events != nil {
cfg.Events.DroppedOnPublish(event)
}
if reportEvents {
p.waitCloser.dec(1)
}
}
}
if acker != nil {
producerCfg.ACK = acker.ackEvents
} else {
acker = nilACKer
}
producer := p.queue.Producer(producerCfg)
client := &client{
pipeline: p,
isOpen: atomic.MakeBool(true),
eventer: cfg.Events,
processors: processors,
producer: producer,
acker: acker,
eventFlags: eventFlags,
canDrop: canDrop,
reportEvents: reportEvents,
}
p.observer.clientConnected()
return client, nil
}
func (e *pipelineEventer) OnACK(n int) {
e.observer.queueACKed(n)
if wc := e.waitClose; wc != nil {
wc.dec(n)
}
if e.cb != nil {
e.cb.reportQueueACK(n)
}
}
func (e *waitCloser) inc() {
e.events.Add(1)
}
func (e *waitCloser) dec(n int) {
for i := 0; i < n; i++ {
e.events.Done()
}
}
func (e *waitCloser) wait() {
e.events.Wait()
}
func makePipelineProcessors(
annotations Annotations,
processors *processors.Processors,
disabled bool,
) pipelineProcessors {
p := pipelineProcessors{
disabled: disabled,
}
hasProcessors := processors != nil && len(processors.List) > 0
if hasProcessors {
tmp := &program{title: "global"}
for _, p := range processors.List {
tmp.add(p)
}
p.processors = tmp
}
if meta := annotations.Beat; meta != nil {
p.beatsMeta = common.MapStr{"beat": meta}
}
if em := annotations.Event; len(em.Fields) > 0 {
fields := common.MapStr{}
common.MergeFields(fields, em.Fields.Clone(), em.FieldsUnderRoot)
p.fields = fields
}
if t := annotations.Event.Tags; len(t) > 0 {
p.tags = t
}
return p
}