-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
demultiplexer_agent.go
610 lines (510 loc) · 19.8 KB
/
demultiplexer_agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package aggregator
import (
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/DataDog/datadog-agent/comp/core/log"
forwarder "github.com/DataDog/datadog-agent/comp/forwarder/defaultforwarder"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform"
orchestratorforwarder "github.com/DataDog/datadog-agent/comp/forwarder/orchestrator"
"github.com/DataDog/datadog-agent/comp/serializer/compression"
"github.com/DataDog/datadog-agent/pkg/aggregator/internal/tags"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
checkid "github.com/DataDog/datadog-agent/pkg/collector/check/id"
"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/config/utils"
"github.com/DataDog/datadog-agent/pkg/metrics"
"github.com/DataDog/datadog-agent/pkg/metrics/event"
"github.com/DataDog/datadog-agent/pkg/metrics/servicecheck"
"github.com/DataDog/datadog-agent/pkg/serializer"
)
// DemultiplexerWithAggregator is a Demultiplexer running an Aggregator.
// This flavor uses a AgentDemultiplexerOptions struct for startup configuration.
type DemultiplexerWithAggregator interface {
Demultiplexer
Aggregator() *BufferedAggregator
// AggregateCheckSample adds check sample sent by a check from one of the collectors into a check sampler pipeline.
AggregateCheckSample(sample metrics.MetricSample)
Options() AgentDemultiplexerOptions
GetEventPlatformForwarder() (eventplatform.Forwarder, error)
GetEventsAndServiceChecksChannels() (chan []*event.Event, chan []*servicecheck.ServiceCheck)
DumpDogstatsdContexts(io.Writer) error
}
// AgentDemultiplexer is the demultiplexer implementation for the main Agent.
type AgentDemultiplexer struct {
log log.Component
m sync.Mutex
// stopChan completely stops the flushLoop of the Demultiplexer when receiving
// a message, not doing anything else.
stopChan chan struct{}
// flushChan receives a trigger to run an internal flush of all
// samplers (TimeSampler, BufferedAggregator (CheckSampler, Events, ServiceChecks))
// to the shared serializer.
flushChan chan trigger
// options are the options with which the demultiplexer has been created
options AgentDemultiplexerOptions
aggregator *BufferedAggregator
dataOutputs
senders *senders
// sharded statsd time samplers
statsd
}
// AgentDemultiplexerOptions are the options used to initialize a Demultiplexer.
type AgentDemultiplexerOptions struct {
FlushInterval time.Duration
EnableNoAggregationPipeline bool
DontStartForwarders bool // unit tests don't need the forwarders to be instanciated
UseDogstatsdContextLimiter bool
DogstatsdMaxMetricsTags int
}
// DefaultAgentDemultiplexerOptions returns the default options to initialize an AgentDemultiplexer.
func DefaultAgentDemultiplexerOptions() AgentDemultiplexerOptions {
return AgentDemultiplexerOptions{
FlushInterval: DefaultFlushInterval,
// the different agents/binaries enable it on a per-need basis
EnableNoAggregationPipeline: false,
}
}
type statsd struct {
// how many sharded statsdSamplers exists.
// len(workers) would return the same result but having it stored
// it will provide more explicit visiblility / no extra function call for
// every metric to distribute.
pipelinesCount int
workers []*timeSamplerWorker
// shared metric sample pool between the dogstatsd server & the time sampler
metricSamplePool *metrics.MetricSamplePool
// the noAggregationStreamWorker is the one dealing with metrics that don't need to
// be aggregated/sampled.
noAggStreamWorker *noAggregationStreamWorker
}
type forwarders struct {
containerLifecycle *forwarder.DefaultForwarder
}
type dataOutputs struct {
forwarders forwarders
sharedSerializer serializer.MetricSerializer
noAggSerializer serializer.MetricSerializer
}
// InitAndStartAgentDemultiplexer creates a new Demultiplexer and runs what's necessary
// in goroutines. As of today, only the embedded BufferedAggregator needs a separate goroutine.
// In the future, goroutines will be started for the event platform forwarder and/or orchestrator forwarder.
func InitAndStartAgentDemultiplexer(
log log.Component,
sharedForwarder forwarder.Forwarder,
orchestratorForwarder orchestratorforwarder.Component,
options AgentDemultiplexerOptions,
eventPlatformForwarder eventplatform.Component,
compressor compression.Component,
hostname string) *AgentDemultiplexer {
demux := initAgentDemultiplexer(log, sharedForwarder, orchestratorForwarder, options, eventPlatformForwarder, compressor, hostname)
go demux.run()
return demux
}
func initAgentDemultiplexer(
log log.Component,
sharedForwarder forwarder.Forwarder,
orchestratorForwarder orchestratorforwarder.Component,
options AgentDemultiplexerOptions,
eventPlatformForwarder eventplatform.Component,
compressor compression.Component,
hostname string) *AgentDemultiplexer {
// prepare the multiple forwarders
// -------------------------------
if config.Datadog.GetBool("telemetry.enabled") && config.Datadog.GetBool("telemetry.dogstatsd_origin") && !config.Datadog.GetBool("aggregator_use_tags_store") {
log.Warn("DogStatsD origin telemetry is not supported when aggregator_use_tags_store is disabled.")
config.Datadog.Set("telemetry.dogstatsd_origin", false, model.SourceAgentRuntime)
}
// prepare the serializer
// ----------------------
sharedSerializer := serializer.NewSerializer(sharedForwarder, orchestratorForwarder, compressor, config.Datadog, hostname)
// prepare the embedded aggregator
// --
agg := NewBufferedAggregator(sharedSerializer, eventPlatformForwarder, hostname, options.FlushInterval)
// statsd samplers
// ---------------
bufferSize := config.Datadog.GetInt("aggregator_buffer_size")
metricSamplePool := metrics.NewMetricSamplePool(MetricSamplePoolBatchSize, utils.IsTelemetryEnabled(config.Datadog))
_, statsdPipelinesCount := GetDogStatsDWorkerAndPipelineCount()
log.Debug("the Demultiplexer will use", statsdPipelinesCount, "pipelines")
statsdWorkers := make([]*timeSamplerWorker, statsdPipelinesCount)
for i := 0; i < statsdPipelinesCount; i++ {
// the sampler
tagsStore := tags.NewStore(config.Datadog.GetBool("aggregator_use_tags_store"), fmt.Sprintf("timesampler #%d", i))
statsdSampler := NewTimeSampler(TimeSamplerID(i), bucketSize, tagsStore, agg.hostname)
// its worker (process loop + flush/serialization mechanism)
statsdWorkers[i] = newTimeSamplerWorker(statsdSampler, options.FlushInterval,
bufferSize, metricSamplePool, agg.flushAndSerializeInParallel, tagsStore)
}
var noAggWorker *noAggregationStreamWorker
var noAggSerializer serializer.MetricSerializer
if options.EnableNoAggregationPipeline {
noAggSerializer = serializer.NewSerializer(sharedForwarder, orchestratorForwarder, compressor, config.Datadog, hostname)
noAggWorker = newNoAggregationStreamWorker(
config.Datadog.GetInt("dogstatsd_no_aggregation_pipeline_batch_size"),
metricSamplePool,
noAggSerializer,
agg.flushAndSerializeInParallel,
)
}
// --
demux := &AgentDemultiplexer{
log: log,
options: options,
stopChan: make(chan struct{}),
flushChan: make(chan trigger),
// Input
aggregator: agg,
// Output
dataOutputs: dataOutputs{
forwarders: forwarders{},
sharedSerializer: sharedSerializer,
noAggSerializer: noAggSerializer,
},
senders: newSenders(agg),
// statsd time samplers
statsd: statsd{
pipelinesCount: statsdPipelinesCount,
workers: statsdWorkers,
metricSamplePool: metricSamplePool,
noAggStreamWorker: noAggWorker,
},
}
return demux
}
// Options returns options used during the demux initialization.
func (d *AgentDemultiplexer) Options() AgentDemultiplexerOptions {
return d.options
}
// AddAgentStartupTelemetry adds a startup event and count (in a DSD time sampler)
// to be sent on the next flush.
func (d *AgentDemultiplexer) AddAgentStartupTelemetry(agentVersion string) {
if agentVersion != "" {
d.AggregateSample(metrics.MetricSample{
Name: fmt.Sprintf("datadog.%s.started", d.aggregator.agentName),
Value: 1,
Tags: d.aggregator.tags(true),
Host: d.aggregator.hostname,
Mtype: metrics.CountType,
SampleRate: 1,
Timestamp: 0,
})
if d.aggregator.hostname != "" {
// Send startup event only when we have a valid hostname
d.aggregator.eventIn <- event.Event{
Text: fmt.Sprintf("Version %s", agentVersion),
SourceTypeName: "System",
Host: d.aggregator.hostname,
EventType: "Agent Startup",
}
}
}
}
// run runs all demultiplexer parts
func (d *AgentDemultiplexer) run() {
if !d.options.DontStartForwarders {
d.log.Debugf("Starting forwarders")
// container lifecycle forwarder
if d.forwarders.containerLifecycle != nil {
if err := d.forwarders.containerLifecycle.Start(); err != nil {
d.log.Errorf("error starting container lifecycle forwarder: %v", err)
}
} else {
d.log.Debug("not starting the container lifecycle forwarder")
}
d.log.Debug("Forwarders started")
}
for _, w := range d.statsd.workers {
go w.run()
}
go d.aggregator.run()
if d.noAggStreamWorker != nil {
go d.noAggStreamWorker.run()
}
d.flushLoop() // this is the blocking call
}
func (d *AgentDemultiplexer) flushLoop() {
var flushTicker <-chan time.Time
if d.options.FlushInterval > 0 {
flushTicker = time.NewTicker(d.options.FlushInterval).C
} else {
d.log.Debug("flushInterval set to 0: will never flush automatically")
}
for {
select {
// stop sequence
case <-d.stopChan:
return
// manual flush sequence
case trigger := <-d.flushChan:
d.flushToSerializer(trigger.time, trigger.waitForSerializer)
if trigger.blockChan != nil {
trigger.blockChan <- struct{}{}
}
// automatic flush sequence
case t := <-flushTicker:
d.flushToSerializer(t, false)
}
}
}
// Stop stops the demultiplexer.
// Resources are released, the instance should not be used after a call to `Stop()`.
func (d *AgentDemultiplexer) Stop(flush bool) {
timeout := config.Datadog.GetDuration("aggregator_stop_timeout") * time.Second
if d.noAggStreamWorker != nil {
d.noAggStreamWorker.stop(flush)
}
// do a manual complete flush then stop
// stop all automatic flush & the mainloop,
if flush {
trigger := trigger{
time: time.Now(),
blockChan: make(chan struct{}),
waitForSerializer: flush,
}
d.flushChan <- trigger
select {
case <-trigger.blockChan:
case <-time.After(timeout):
d.log.Errorf("flushing data on Stop() timed out")
}
}
// stops the flushloop and makes sure no automatic flushes will happen anymore
d.stopChan <- struct{}{}
d.m.Lock()
defer d.m.Unlock()
// aggregated data
for _, worker := range d.statsd.workers {
worker.stop()
}
if d.aggregator != nil {
d.aggregator.Stop()
}
d.aggregator = nil
// forwarders
if !d.options.DontStartForwarders {
if d.dataOutputs.forwarders.containerLifecycle != nil {
d.dataOutputs.forwarders.containerLifecycle.Stop()
d.dataOutputs.forwarders.containerLifecycle = nil
}
}
// misc
d.dataOutputs.sharedSerializer = nil
d.senders = nil
}
// ForceFlushToSerializer triggers the execution of a flush from all data of samplers
// and the BufferedAggregator to the serializer.
// Safe to call from multiple threads.
func (d *AgentDemultiplexer) ForceFlushToSerializer(start time.Time, waitForSerializer bool) {
trigger := trigger{
time: start,
waitForSerializer: waitForSerializer,
blockChan: make(chan struct{}),
}
d.flushChan <- trigger
<-trigger.blockChan
}
// flushToSerializer flushes all data from the aggregator and time samplers
// to the serializer.
//
// Best practice is that this method is *only* called by the flushLoop routine.
// It technically works if called from outside of this routine, but beware of
// deadlocks with the parallel stream series implementation.
//
// This implementation is not flushing the TimeSampler and the BufferedAggregator
// concurrently because the IterableSeries is not thread safe / supporting concurrent usage.
// If one day a better (faster?) solution is needed, we could either consider:
// - to have an implementation of SendIterableSeries listening on multiple sinks in parallel, or,
// - to have a thread-safe implementation of the underlying `util.BufferedChan`.
func (d *AgentDemultiplexer) flushToSerializer(start time.Time, waitForSerializer bool) {
d.m.Lock()
defer d.m.Unlock()
if d.aggregator == nil {
// NOTE(remy): we could consider flushing only the time samplers
return
}
logPayloads := config.Datadog.GetBool("log_payloads")
series, sketches := createIterableMetrics(d.aggregator.flushAndSerializeInParallel, d.sharedSerializer, logPayloads, false)
metrics.Serialize(
series,
sketches,
func(seriesSink metrics.SerieSink, sketchesSink metrics.SketchesSink) {
// flush DogStatsD pipelines (statsd/time samplers)
// ------------------------------------------------
for _, worker := range d.statsd.workers {
// order the flush to the time sampler, and wait, in a different routine
t := flushTrigger{
trigger: trigger{
time: start,
blockChan: make(chan struct{}),
},
sketchesSink: sketchesSink,
seriesSink: seriesSink,
}
worker.flushChan <- t
<-t.trigger.blockChan
}
// flush the aggregator (check samplers)
// -------------------------------------
if d.aggregator != nil {
t := flushTrigger{
trigger: trigger{
time: start,
blockChan: make(chan struct{}),
waitForSerializer: waitForSerializer,
},
sketchesSink: sketchesSink,
seriesSink: seriesSink,
}
d.aggregator.flushChan <- t
<-t.trigger.blockChan
}
}, func(serieSource metrics.SerieSource) {
sendIterableSeries(d.sharedSerializer, start, serieSource)
},
func(sketches metrics.SketchesSource) {
// Don't send empty sketches payloads
if sketches.WaitForValue() {
err := d.sharedSerializer.SendSketch(sketches)
sketchesCount := sketches.Count()
d.log.Debugf("Flushing %d sketches to the serializer", sketchesCount)
updateSketchTelemetry(start, sketchesCount, err)
addFlushCount("Sketches", int64(sketchesCount))
}
})
addFlushTime("MainFlushTime", int64(time.Since(start)))
aggregatorNumberOfFlush.Add(1)
}
// GetEventsAndServiceChecksChannels returneds underlying events and service checks channels.
func (d *AgentDemultiplexer) GetEventsAndServiceChecksChannels() (chan []*event.Event, chan []*servicecheck.ServiceCheck) {
return d.aggregator.GetBufferedChannels()
}
// GetEventPlatformForwarder returns underlying events and service checks channels.
func (d *AgentDemultiplexer) GetEventPlatformForwarder() (eventplatform.Forwarder, error) {
return d.aggregator.GetEventPlatformForwarder()
}
// SendSamplesWithoutAggregation buffers a bunch of metrics with timestamp. This data will be directly
// transmitted "as-is" (i.e. no aggregation, no sampling) to the serializer.
func (d *AgentDemultiplexer) SendSamplesWithoutAggregation(samples metrics.MetricSampleBatch) {
// safe-guard: if for some reasons we are receiving some metrics here despite
// having the no-aggregation pipeline disabled, they are redirected to the first
// time sampler.
if !d.options.EnableNoAggregationPipeline {
d.AggregateSamples(TimeSamplerID(0), samples)
return
}
tlmProcessed.Add(float64(len(samples)), "", "late_metrics")
d.statsd.noAggStreamWorker.addSamples(samples)
}
// AggregateSamples adds a batch of MetricSample into the given DogStatsD time sampler shard.
// If you have to submit a single metric sample see `AggregateSample`.
func (d *AgentDemultiplexer) AggregateSamples(shard TimeSamplerID, samples metrics.MetricSampleBatch) {
// distribute the samples on the different statsd samplers using a channel
// (in the time sampler implementation) for latency reasons:
// its buffering + the fact that it is another goroutine processing the samples,
// it should get back to the caller as fast as possible once the samples are
// in the channel.
d.statsd.workers[shard].samplesChan <- samples
}
// AggregateSample adds a MetricSample in the first DogStatsD time sampler.
func (d *AgentDemultiplexer) AggregateSample(sample metrics.MetricSample) {
batch := d.GetMetricSamplePool().GetBatch()
batch[0] = sample
d.statsd.workers[0].samplesChan <- batch[:1]
}
// AggregateCheckSample adds check sample sent by a check from one of the collectors into a check sampler pipeline.
//
//nolint:revive // TODO(AML) Fix revive linter
func (d *AgentDemultiplexer) AggregateCheckSample(sample metrics.MetricSample) {
panic("not implemented yet.")
}
// GetDogStatsDPipelinesCount returns how many sampling pipeline are running for
// the DogStatsD samples.
func (d *AgentDemultiplexer) GetDogStatsDPipelinesCount() int {
return d.statsd.pipelinesCount
}
// Serializer returns a serializer that anyone can use. This method exists
// to keep compatibility with existing code while introducing the Demultiplexer,
// however, the plan is to remove it anytime soon.
//
// Deprecated.
func (d *AgentDemultiplexer) Serializer() serializer.MetricSerializer {
return d.dataOutputs.sharedSerializer
}
// Aggregator returns an aggregator that anyone can use. This method exists
// to keep compatibility with existing code while introducing the Demultiplexer,
// however, the plan is to remove it anytime soon.
//
// Deprecated.
func (d *AgentDemultiplexer) Aggregator() *BufferedAggregator {
return d.aggregator
}
// GetMetricSamplePool returns a shared resource used in the whole DogStatsD
// pipeline to re-use metric samples slices: the server is getting a slice
// and filling it with samples, the rest of the pipeline process them the
// end of line (the time sampler) is putting back the slice in the pool.
// Main idea is to reduce the garbage generated by slices allocation.
func (d *AgentDemultiplexer) GetMetricSamplePool() *metrics.MetricSamplePool {
return d.statsd.metricSamplePool
}
// DumpDogstatsdContexts writes the current state of the context resolver to dest.
//
// This blocks metrics processing, so dest is expected to be reasonably fast and not block for too
// long.
func (d *AgentDemultiplexer) DumpDogstatsdContexts(dest io.Writer) error {
for _, w := range d.statsd.workers {
err := w.dumpContexts(dest)
if err != nil {
return err
}
}
return nil
}
// GetSender returns a sender.Sender with passed ID, properly registered with the aggregator
// If no error is returned here, DestroySender must be called with the same ID
// once the sender is not used anymore
func (d *AgentDemultiplexer) GetSender(id checkid.ID) (sender.Sender, error) {
d.m.Lock()
defer d.m.Unlock()
if d.senders == nil {
return nil, errors.New("demultiplexer is stopped")
}
return d.senders.GetSender(id)
}
// SetSender returns the passed sender with the passed ID.
// This is largely for testing purposes
func (d *AgentDemultiplexer) SetSender(s sender.Sender, id checkid.ID) error {
d.m.Lock()
defer d.m.Unlock()
if d.senders == nil {
return errors.New("demultiplexer is stopped")
}
return d.senders.SetSender(s, id)
}
// DestroySender frees up the resources used by the sender with passed ID (by deregistering it from the aggregator)
// Should be called when no sender with this ID is used anymore
// The metrics of this (these) sender(s) that haven't been flushed yet will be lost
func (d *AgentDemultiplexer) DestroySender(id checkid.ID) {
d.m.Lock()
defer d.m.Unlock()
if d.senders == nil {
return
}
d.senders.DestroySender(id)
}
// GetDefaultSender returns a default sender.
func (d *AgentDemultiplexer) GetDefaultSender() (sender.Sender, error) {
d.m.Lock()
defer d.m.Unlock()
if d.senders == nil {
return nil, errors.New("demultiplexer is stopped")
}
return d.senders.GetDefaultSender()
}