forked from DataDog/datadog-agent
/
process_rt.go
191 lines (168 loc) · 6.33 KB
/
process_rt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package checks
import (
"time"
model "github.com/DataDog/agent-payload/v5/process"
"github.com/StackVista/stackstate-agent/pkg/process/config"
"github.com/StackVista/stackstate-agent/pkg/process/net"
"github.com/StackVista/stackstate-agent/pkg/process/procutil"
"github.com/StackVista/stackstate-agent/pkg/process/util"
"github.com/StackVista/stackstate-agent/pkg/util/containers"
"github.com/StackVista/stackstate-agent/pkg/util/log"
"github.com/DataDog/gopsutil/cpu"
)
// runRealtime runs the realtime ProcessCheck to collect statistics about the running processes.
// Underying procutil.Probe is responsible for the actual implementation
func (p *ProcessCheck) runRealtime(cfg *config.AgentConfig, groupID int32) (*RunResult, error) {
cpuTimes, err := cpu.Times(false)
if err != nil {
return nil, err
}
if len(cpuTimes) == 0 {
return nil, errEmptyCPUTime
}
// if processCheck haven't fetched any PIDs, return early
if len(p.lastPIDs) == 0 {
return &RunResult{}, nil
}
var sysProbeUtil *net.RemoteSysProbeUtil
// if the Process module is disabled, we allow Probe to collect
// fields that require elevated permission to collect with best effort
if !cfg.CheckIsEnabled(config.ProcessModuleCheckName) {
procutil.WithPermission(true)(p.probe)
} else {
procutil.WithPermission(false)(p.probe)
if pu, err := net.GetRemoteSystemProbeUtil(); err == nil {
sysProbeUtil = pu
} else if p.notInitializedLogLimit.ShouldLog() {
log.Warnf("could not initialize system-probe connection in rtprocess check: %v (will only log every 10 minutes)", err)
}
}
procs, err := p.probe.StatsForPIDs(p.lastPIDs, time.Now())
if err != nil {
return nil, err
}
if sysProbeUtil != nil {
mergeStatWithSysprobeStats(p.lastPIDs, procs, sysProbeUtil)
}
ctrList, _ := util.GetContainers()
// End check early if this is our first run.
if p.realtimeLastProcs == nil {
p.realtimeLastCtrRates = util.ExtractContainerRateMetric(ctrList)
p.realtimeLastProcs = procs
p.realtimeLastCPUTime = cpuTimes[0]
p.realtimeLastRun = time.Now()
log.Debug("first run of rtprocess check - no stats to report")
return &RunResult{}, nil
}
connsByPID := Connections.getLastConnectionsByPID()
chunkedStats := fmtProcessStats(cfg, procs, p.realtimeLastProcs, ctrList, cpuTimes[0], p.realtimeLastCPUTime, p.realtimeLastRun, connsByPID)
groupSize := len(chunkedStats)
chunkedCtrStats := fmtContainerStats(ctrList, p.realtimeLastCtrRates, p.realtimeLastRun, groupSize)
messages := make([]model.MessageBody, 0, groupSize)
for i := 0; i < groupSize; i++ {
messages = append(messages, &model.CollectorRealTime{
HostName: cfg.HostName,
Stats: chunkedStats[i],
ContainerStats: chunkedCtrStats[i],
GroupId: groupID,
GroupSize: int32(groupSize),
NumCpus: int32(len(p.sysInfo.Cpus)),
TotalMemory: p.sysInfo.TotalMemory,
ContainerHostType: cfg.ContainerHostType,
})
}
// Store the last state for comparison on the next run.
// Note: not storing the filtered in case there are new processes that haven't had a chance to show up twice.
p.realtimeLastRun = time.Now()
p.realtimeLastProcs = procs
p.realtimeLastCtrRates = util.ExtractContainerRateMetric(ctrList)
p.realtimeLastCPUTime = cpuTimes[0]
return &RunResult{
RealTime: messages,
}, nil
}
// fmtProcessStats formats and chunks a slice of ProcessStat into chunks.
func fmtProcessStats(
cfg *config.AgentConfig,
procs, lastProcs map[int32]*procutil.Stats,
ctrList []*containers.Container,
syst2, syst1 cpu.TimesStat,
lastRun time.Time,
connsByPID map[int32][]*model.Connection,
) [][]*model.ProcessStat {
cidByPid := make(map[int32]string, len(ctrList))
for _, c := range ctrList {
for _, p := range c.Pids {
cidByPid[p] = c.ID
}
}
connCheckIntervalS := int(cfg.CheckIntervals[config.ConnectionsCheckName] / time.Second)
chunked := make([][]*model.ProcessStat, 0)
chunk := make([]*model.ProcessStat, 0, cfg.MaxPerMessage)
for pid, fp := range procs {
// Skipping any processes that didn't exist in the previous run.
// This means short-lived processes (<2s) will never be captured.
if _, ok := lastProcs[pid]; !ok {
continue
}
var ioStat *model.IOStat
if fp.IORateStat != nil {
ioStat = &model.IOStat{
ReadRate: float32(fp.IORateStat.ReadRate),
WriteRate: float32(fp.IORateStat.WriteRate),
ReadBytesRate: float32(fp.IORateStat.ReadBytesRate),
WriteBytesRate: float32(fp.IORateStat.WriteBytesRate),
}
} else {
ioStat = formatIO(fp, lastProcs[pid].IOStat, lastRun)
}
chunk = append(chunk, &model.ProcessStat{
Pid: pid,
CreateTime: fp.CreateTime,
Memory: formatMemory(fp),
Cpu: formatCPU(fp, lastProcs[pid], syst2, syst1),
Nice: fp.Nice,
Threads: fp.NumThreads,
OpenFdCount: fp.OpenFdCount,
ProcessState: model.ProcessState(model.ProcessState_value[fp.Status]),
IoStat: ioStat,
VoluntaryCtxSwitches: uint64(fp.CtxSwitches.Voluntary),
InvoluntaryCtxSwitches: uint64(fp.CtxSwitches.Involuntary),
ContainerId: cidByPid[pid],
Networks: formatNetworks(connsByPID[pid], connCheckIntervalS),
})
if len(chunk) == cfg.MaxPerMessage {
chunked = append(chunked, chunk)
chunk = make([]*model.ProcessStat, 0, cfg.MaxPerMessage)
}
}
if len(chunk) > 0 {
chunked = append(chunked, chunk)
}
return chunked
}
func calculateRate(cur, prev uint64, before time.Time) float32 {
now := time.Now()
diff := now.Unix() - before.Unix()
if before.IsZero() || diff <= 0 || prev == 0 || prev > cur {
return 0
}
return float32(cur-prev) / float32(diff)
}
// mergeStatWithSysprobeStats takes a process by PID map and fill the stats from system probe into the processes in the map
func mergeStatWithSysprobeStats(pids []int32, stats map[int32]*procutil.Stats, pu *net.RemoteSysProbeUtil) {
pStats, err := pu.GetProcStats(pids)
if err == nil {
for pid, stats := range stats {
if s, ok := pStats.StatsByPID[pid]; ok {
stats.OpenFdCount = s.OpenFDCount
stats.IOStat.ReadCount = s.ReadCount
stats.IOStat.WriteCount = s.WriteCount
stats.IOStat.ReadBytes = s.ReadBytes
stats.IOStat.WriteBytes = s.WriteBytes
}
}
} else {
log.Debugf("cannot do GetProcStats from system-probe for rtprocess check: %s", err)
}
}