/
process_discovery_check.go
172 lines (140 loc) · 5.7 KB
/
process_discovery_check.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package checks
import (
"fmt"
"time"
ddconfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/util/flavor"
"github.com/DataDog/datadog-agent/pkg/util/log"
model "github.com/DataDog/agent-payload/v5/process"
"github.com/DataDog/datadog-agent/pkg/process/procutil"
)
// NewProcessDiscoveryCheck returns an instance of the ProcessDiscoveryCheck.
func NewProcessDiscoveryCheck(config ddconfig.Reader) *ProcessDiscoveryCheck {
return &ProcessDiscoveryCheck{
config: config,
scrubber: procutil.NewDefaultDataScrubber(),
userProbe: NewLookupIDProbe(config),
}
}
// ProcessDiscoveryCheck is a check that gathers basic process metadata.
// It uses its own ProcessDiscovery payload.
// The goal of this check is to collect information about possible integrations that may be enabled by the end user.
type ProcessDiscoveryCheck struct {
config ddconfig.Reader
probe procutil.Probe
scrubber *procutil.DataScrubber
userProbe *LookupIdProbe
info *HostInfo
initCalled bool
maxBatchSize int
}
// Init initializes the ProcessDiscoveryCheck. It is a runtime error to call Run without first having called Init.
func (d *ProcessDiscoveryCheck) Init(syscfg *SysProbeConfig, info *HostInfo, _ bool) error {
d.info = info
d.initCalled = true
initScrubber(d.config, d.scrubber)
d.probe = newProcessProbe(d.config, procutil.WithPermission(syscfg.ProcessModuleEnabled))
d.maxBatchSize = getMaxBatchSize(d.config)
return nil
}
// IsEnabled returns true if the check is enabled by configuration
func (d *ProcessDiscoveryCheck) IsEnabled() bool {
if d.config.GetBool("process_config.run_in_core_agent.enabled") && flavor.GetFlavor() == flavor.ProcessAgent {
return false
}
// The Process and Process Discovery checks are mutually exclusive
if d.config.GetBool("process_config.process_collection.enabled") {
return false
}
if ddconfig.IsECSFargate() {
log.Debug("Process discovery is not supported on ECS Fargate")
return false
}
return d.config.GetBool("process_config.process_discovery.enabled")
}
// SupportsRunOptions returns true if the check supports RunOptions
func (d *ProcessDiscoveryCheck) SupportsRunOptions() bool {
return false
}
// Name returns the name of the ProcessDiscoveryCheck.
func (d *ProcessDiscoveryCheck) Name() string { return DiscoveryCheckName }
// Realtime returns a value that says whether this check should be run in real time.
func (d *ProcessDiscoveryCheck) Realtime() bool { return false }
// ShouldSaveLastRun indicates if the output from the last run should be saved for use in flares
func (d *ProcessDiscoveryCheck) ShouldSaveLastRun() bool { return true }
// Run collects process metadata, and packages it into a CollectorProcessDiscovery payload to be sent.
// It is a runtime error to call Run without first having called Init.
func (d *ProcessDiscoveryCheck) Run(nextGroupID func() int32, _ *RunOptions) (RunResult, error) {
if !d.initCalled {
return nil, fmt.Errorf("ProcessDiscoveryCheck.Run called before Init")
}
// Does not need to collect process stats, only metadata
procs, err := d.probe.ProcessesByPID(time.Now(), false)
if err != nil {
return nil, err
}
host := &model.Host{
Name: d.info.HostName,
NumCpus: calculateNumCores(d.info.SystemInfo),
TotalMemory: d.info.SystemInfo.TotalMemory,
}
procDiscoveryChunks := chunkProcessDiscoveries(pidMapToProcDiscoveries(procs, d.userProbe, d.scrubber), d.maxBatchSize)
payload := make([]model.MessageBody, len(procDiscoveryChunks))
groupID := nextGroupID()
for i, procDiscoveryChunk := range procDiscoveryChunks {
payload[i] = &model.CollectorProcDiscovery{
HostName: d.info.HostName,
GroupId: groupID,
GroupSize: int32(len(procDiscoveryChunks)),
ProcessDiscoveries: procDiscoveryChunk,
Host: host,
}
}
return StandardRunResult(payload), nil
}
// Cleanup frees any resource held by the ProcessDiscoveryCheck before the agent exits
func (d *ProcessDiscoveryCheck) Cleanup() {}
func pidMapToProcDiscoveries(pidMap map[int32]*procutil.Process, userProbe *LookupIdProbe, scrubber *procutil.DataScrubber) []*model.ProcessDiscovery {
pd := make([]*model.ProcessDiscovery, 0, len(pidMap))
for _, proc := range pidMap {
proc.Cmdline = scrubber.ScrubProcessCommand(proc)
pd = append(pd, &model.ProcessDiscovery{
Pid: proc.Pid,
NsPid: proc.NsPid,
Command: formatCommand(proc),
User: formatUser(proc, userProbe),
CreateTime: proc.Stats.CreateTime,
})
}
return pd
}
// chunkProcessDiscoveries split non-container processes into chunks and return a list of chunks
// This function is patiently awaiting go to support generics, so that we don't need two chunkProcesses functions :)
func chunkProcessDiscoveries(procs []*model.ProcessDiscovery, size int) [][]*model.ProcessDiscovery {
chunkCount := len(procs) / size
if chunkCount*size < len(procs) {
chunkCount++
}
chunks := make([][]*model.ProcessDiscovery, 0, chunkCount)
for i := 0; i < len(procs); i += size {
end := i + size
if end > len(procs) {
end = len(procs)
}
chunks = append(chunks, procs[i:end])
}
return chunks
}
// Needed to calculate the correct normalized cpu metric value
// On linux, the cpu array contains an entry per logical core.
// On windows, the cpu array contains an entry per physical core, with correct logical core counts.
func calculateNumCores(info *model.SystemInfo) (numCores int32) {
for _, cpu := range info.Cpus {
numCores += cpu.Cores
}
return numCores
}