-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
agent.go
228 lines (192 loc) · 7.17 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2024-present Datadog, Inc.
package logsagentpipelineimpl
import (
"context"
"errors"
"fmt"
"time"
configComponent "github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/hostname/hostnameinterface"
logComponent "github.com/DataDog/datadog-agent/comp/core/log"
"github.com/DataDog/datadog-agent/comp/logs/agent/config"
"github.com/DataDog/datadog-agent/comp/otelcol/logsagentpipeline"
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
"github.com/DataDog/datadog-agent/pkg/logs/auditor"
"github.com/DataDog/datadog-agent/pkg/logs/client"
"github.com/DataDog/datadog-agent/pkg/logs/client/http"
"github.com/DataDog/datadog-agent/pkg/logs/diagnostic"
"github.com/DataDog/datadog-agent/pkg/logs/pipeline"
"github.com/DataDog/datadog-agent/pkg/status/health"
"github.com/DataDog/datadog-agent/pkg/util/optional"
"github.com/DataDog/datadog-agent/pkg/util/startstop"
"go.uber.org/fx"
"go.uber.org/zap"
)
const (
intakeTrackType = "logs"
// Log messages
multiLineWarning = "multi_line processing rules are not supported as global processing rules."
)
// Dependencies specifies the list of dependencies needed to initialize the logs agent
type Dependencies struct {
fx.In
Lc fx.Lifecycle
Log logComponent.Component
Config configComponent.Component
Hostname hostnameinterface.Component
}
// Agent represents the data pipeline that collects, decodes, processes and sends logs to the backend.
type Agent struct {
log logComponent.Component
config pkgconfigmodel.Reader
hostname hostnameinterface.Component
endpoints *config.Endpoints
auditor auditor.Auditor
destinationsCtx *client.DestinationsContext
pipelineProvider pipeline.Provider
health *health.Handle
}
// NewLogsAgentComponent returns a new instance of Agent as a Component
func NewLogsAgentComponent(deps Dependencies) optional.Option[logsagentpipeline.Component] {
logsAgent := NewLogsAgent(deps)
if logsAgent == nil {
return optional.NewNoneOption[logsagentpipeline.Component]()
}
return optional.NewOption[logsagentpipeline.Component](logsAgent)
}
// NewLogsAgent returns a new instance of Agent with the given dependencies
func NewLogsAgent(deps Dependencies) logsagentpipeline.LogsAgent {
if deps.Config.GetBool("logs_enabled") || deps.Config.GetBool("log_enabled") {
if deps.Config.GetBool("log_enabled") {
deps.Log.Warn(`"log_enabled" is deprecated, use "logs_enabled" instead`)
}
logsAgent := &Agent{
log: deps.Log,
config: deps.Config,
hostname: deps.Hostname,
}
if deps.Lc != nil {
deps.Lc.Append(fx.Hook{
OnStart: logsAgent.Start,
OnStop: logsAgent.Stop,
})
}
return logsAgent
}
deps.Log.Debug("logs-agent disabled")
return nil
}
// Start sets up the logs agent and starts its pipelines
func (a *Agent) Start(context.Context) error {
a.log.Debug("Starting logs-agent...")
// setup the server config
endpoints, err := buildEndpoints(a.config)
if err != nil {
message := fmt.Sprintf("Invalid endpoints: %v", err)
return errors.New(message)
}
a.endpoints = endpoints
err = a.setupAgent()
if err != nil {
a.log.Error("Could not start logs-agent: ", zap.Error(err))
return err
}
a.startPipeline()
a.log.Debug("logs-agent started")
return nil
}
func (a *Agent) setupAgent() error {
// setup global processing rules
processingRules, err := config.GlobalProcessingRules(a.config)
if err != nil {
message := fmt.Sprintf("Invalid processing rules: %v", err)
return errors.New(message)
}
if config.HasMultiLineRule(processingRules) {
a.log.Warn(multiLineWarning)
}
a.SetupPipeline(processingRules)
return nil
}
// startPipeline starts all the elements of the data pipeline in the right order to prevent data loss
func (a *Agent) startPipeline() {
starter := startstop.NewStarter(
a.destinationsCtx,
a.auditor,
a.pipelineProvider,
)
starter.Start()
}
// Stop stops the logs agent and all elements of the data pipeline
func (a *Agent) Stop(context.Context) error {
a.log.Debug("Stopping logs-agent")
stopper := startstop.NewSerialStopper(
a.pipelineProvider,
a.auditor,
a.destinationsCtx,
)
// This will try to stop everything in order, including the potentially blocking
// parts like the sender. After StopTimeout it will just stop the last part of the
// pipeline, disconnecting it from the auditor, to make sure that the pipeline is
// flushed before stopping.
// TODO: Add this feature in the stopper.
c := make(chan struct{})
go func() {
stopper.Stop()
close(c)
}()
timeout := time.Duration(a.config.GetInt("logs_config.stop_grace_period")) * time.Second
select {
case <-c:
case <-time.After(timeout):
a.log.Debug("Timed out when stopping logs-agent, forcing it to stop now")
// We force all destinations to read/flush all the messages they get without
// trying to write to the network.
a.destinationsCtx.Stop()
// Wait again for the stopper to complete.
// In some situation, the stopper unfortunately never succeed to complete,
// we've already reached the grace period, give it some more seconds and
// then force quit.
timeout := time.NewTimer(5 * time.Second)
select {
case <-c:
case <-timeout.C:
a.log.Warn("Force close of the Logs Agent.")
}
}
a.log.Debug("logs-agent stopped")
return nil
}
// GetPipelineProvider gets the pipeline provider
func (a *Agent) GetPipelineProvider() pipeline.Provider {
return a.pipelineProvider
}
// SetupPipeline initializes the logs agent pipeline and its dependencies
func (a *Agent) SetupPipeline(
processingRules []*config.ProcessingRule,
) {
health := health.RegisterLiveness("logs-agent")
// setup the auditor
// We pass the health handle to the auditor because it's the end of the pipeline and the most
// critical part. Arguably it could also be plugged to the destination.
auditorTTL := time.Duration(a.config.GetInt("logs_config.auditor_ttl")) * time.Hour
auditor := auditor.New(a.config.GetString("logs_config.run_path"), auditor.DefaultRegistryFilename, auditorTTL, health)
destinationsCtx := client.NewDestinationsContext()
// setup the pipeline provider that provides pairs of processor and sender
pipelineProvider := pipeline.NewProvider(config.NumberOfPipelines, auditor, &diagnostic.NoopMessageReceiver{}, processingRules, a.endpoints, destinationsCtx, NewStatusProvider(), a.hostname, a.config)
a.auditor = auditor
a.destinationsCtx = destinationsCtx
a.pipelineProvider = pipelineProvider
a.health = health
}
// buildEndpoints builds endpoints for the logs agent
func buildEndpoints(coreConfig pkgconfigmodel.Reader) (*config.Endpoints, error) {
httpConnectivity := config.HTTPConnectivityFailure
if endpoints, err := config.BuildHTTPEndpoints(coreConfig, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin); err == nil {
httpConnectivity = http.CheckConnectivity(endpoints.Main, coreConfig)
}
return config.BuildEndpoints(coreConfig, httpConnectivity, intakeTrackType, config.AgentJSONIntakeProtocol, config.DefaultIntakeOrigin)
}