-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
pipeline.go
136 lines (117 loc) · 5.26 KB
/
pipeline.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
//nolint:revive // TODO(AML) Fix revive linter
package pipeline
import (
"context"
"fmt"
"github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface"
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/client/http"
"github.com/DataDog/datadog-agent/pkg/logs/client/tcp"
"github.com/DataDog/datadog-agent/pkg/logs/diagnostic"
"github.com/DataDog/datadog-agent/pkg/logs/message"
"github.com/DataDog/datadog-agent/pkg/logs/processor"
"github.com/DataDog/datadog-agent/pkg/logs/sender"
"github.com/DataDog/datadog-agent/pkg/logs/status/statusinterface"
)
// Pipeline processes and sends messages to the backend
type Pipeline struct {
InputChan chan *message.Message
flushChan chan struct{}
processor *processor.Processor
strategy sender.Strategy
sender *sender.Sender
}
// NewPipeline returns a new Pipeline
func NewPipeline(outputChan chan *message.Payload,
processingRules []*config.ProcessingRule,
endpoints *config.Endpoints,
destinationsContext *client.DestinationsContext,
diagnosticMessageReceiver diagnostic.MessageReceiver,
serverless bool,
pipelineID int,
status statusinterface.Status,
hostname hostnameinterface.Component,
cfg pkgconfigmodel.Reader) *Pipeline {
mainDestinations := getDestinations(endpoints, destinationsContext, pipelineID, serverless, status, cfg)
strategyInput := make(chan *message.Message, config.ChanSize)
senderInput := make(chan *message.Payload, 1) // Only buffer 1 message since payloads can be large
flushChan := make(chan struct{})
var logsSender *sender.Sender
var encoder processor.Encoder
if serverless {
encoder = processor.JSONServerlessEncoder
} else if endpoints.UseHTTP {
encoder = processor.JSONEncoder
} else if endpoints.UseProto {
encoder = processor.ProtoEncoder
} else {
encoder = processor.RawEncoder
}
strategy := getStrategy(strategyInput, senderInput, flushChan, endpoints, serverless, pipelineID)
logsSender = sender.NewSender(cfg, senderInput, outputChan, mainDestinations, config.DestinationPayloadChanSize)
inputChan := make(chan *message.Message, config.ChanSize)
processor := processor.New(inputChan, strategyInput, processingRules, encoder, diagnosticMessageReceiver, hostname, pipelineID)
return &Pipeline{
InputChan: inputChan,
flushChan: flushChan,
processor: processor,
strategy: strategy,
sender: logsSender,
}
}
// Start launches the pipeline
func (p *Pipeline) Start() {
p.sender.Start()
p.strategy.Start()
p.processor.Start()
}
// Stop stops the pipeline
func (p *Pipeline) Stop() {
p.processor.Stop()
p.strategy.Stop()
p.sender.Stop()
}
// Flush flushes synchronously the processor and sender managed by this pipeline.
func (p *Pipeline) Flush(ctx context.Context) {
p.flushChan <- struct{}{}
p.processor.Flush(ctx) // flush messages in the processor into the sender
}
func getDestinations(endpoints *config.Endpoints, destinationsContext *client.DestinationsContext, pipelineID int, serverless bool, status statusinterface.Status, cfg pkgconfigmodel.Reader) *client.Destinations {
reliable := []client.Destination{}
additionals := []client.Destination{}
if endpoints.UseHTTP {
for i, endpoint := range endpoints.GetReliableEndpoints() {
telemetryName := fmt.Sprintf("logs_%d_reliable_%d", pipelineID, i)
reliable = append(reliable, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, !serverless, telemetryName, cfg))
}
for i, endpoint := range endpoints.GetUnReliableEndpoints() {
telemetryName := fmt.Sprintf("logs_%d_unreliable_%d", pipelineID, i)
additionals = append(additionals, http.NewDestination(endpoint, http.JSONContentType, destinationsContext, endpoints.BatchMaxConcurrentSend, false, telemetryName, cfg))
}
return client.NewDestinations(reliable, additionals)
}
for _, endpoint := range endpoints.GetReliableEndpoints() {
reliable = append(reliable, tcp.NewDestination(endpoint, endpoints.UseProto, destinationsContext, !serverless, status))
}
for _, endpoint := range endpoints.GetUnReliableEndpoints() {
additionals = append(additionals, tcp.NewDestination(endpoint, endpoints.UseProto, destinationsContext, false, status))
}
return client.NewDestinations(reliable, additionals)
}
//nolint:revive // TODO(AML) Fix revive linter
func getStrategy(inputChan chan *message.Message, outputChan chan *message.Payload, flushChan chan struct{}, endpoints *config.Endpoints, serverless bool, pipelineID int) sender.Strategy {
if endpoints.UseHTTP || serverless {
encoder := sender.IdentityContentType
if endpoints.Main.UseCompression {
encoder = sender.NewGzipContentEncoding(endpoints.Main.CompressionLevel)
}
return sender.NewBatchStrategy(inputChan, outputChan, flushChan, sender.ArraySerializer, endpoints.BatchWait, endpoints.BatchMaxSize, endpoints.BatchMaxContentSize, "logs", encoder)
}
return sender.NewStreamStrategy(inputChan, outputChan, sender.IdentityContentType)
}