forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 4
/
process.go
130 lines (113 loc) · 3.3 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// +build darwin freebsd linux windows
package process
import (
"fmt"
"runtime"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/system"
"github.com/elastic/gosigar/cgroup"
"github.com/pkg/errors"
)
var debugf = logp.MakeDebug("system-process")
func init() {
if err := mb.Registry.AddMetricSet("system", "process", New, parse.EmptyHostParser); err != nil {
panic(err)
}
}
// MetricSet that fetches process metrics.
type MetricSet struct {
mb.BaseMetricSet
stats *ProcStats
cgroup *cgroup.Reader
cacheCmdLine bool
}
// includeTopConfig is the configuration for the "top N processes
// filtering" feature
type includeTopConfig struct {
Enabled bool `config:"enabled"`
ByCPU int `config:"by_cpu"`
ByMemory int `config:"by_memory"`
}
// New creates and returns a new MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := struct {
Procs []string `config:"processes"`
Cgroups *bool `config:"process.cgroups.enabled"`
EnvWhitelist []string `config:"process.env.whitelist"`
CPUTicks bool `config:"cpu_ticks"`
CacheCmdLine bool `config:"process.cmdline.cache.enabled"`
IncludeTop includeTopConfig `config:"process.include_top_n"`
}{
Procs: []string{".*"}, // collect all processes by default
CacheCmdLine: true,
IncludeTop: includeTopConfig{
Enabled: true,
ByCPU: 0,
ByMemory: 0,
},
}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}
m := &MetricSet{
BaseMetricSet: base,
stats: &ProcStats{
Procs: config.Procs,
EnvWhitelist: config.EnvWhitelist,
CpuTicks: config.CPUTicks,
CacheCmdLine: config.CacheCmdLine,
IncludeTop: config.IncludeTop,
},
}
err := m.stats.InitProcStats()
if err != nil {
return nil, err
}
if runtime.GOOS == "linux" {
systemModule, ok := base.Module().(*system.Module)
if !ok {
return nil, fmt.Errorf("unexpected module type")
}
if config.Cgroups == nil || *config.Cgroups {
debugf("process cgroup data collection is enabled, using hostfs='%v'", systemModule.HostFS)
m.cgroup, err = cgroup.NewReader(systemModule.HostFS, true)
if err != nil {
if err == cgroup.ErrCgroupsMissing {
logp.Warn("cgroup data collection will be disabled: %v", err)
} else {
return nil, errors.Wrap(err, "error initializing cgroup reader")
}
}
}
}
return m, nil
}
// Fetch fetches metrics for all processes. It iterates over each PID and
// collects process metadata, CPU metrics, and memory metrics.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
procs, err := m.stats.GetProcStats()
if err != nil {
return nil, errors.Wrap(err, "process stats")
}
if m.cgroup != nil {
for _, proc := range procs {
pid, ok := proc["pid"].(int)
if !ok {
debugf("error converting pid to int for proc %+v", proc)
continue
}
stats, err := m.cgroup.GetStatsForProcess(pid)
if err != nil {
debugf("error getting cgroups stats for pid=%d, %v", pid, err)
continue
}
if statsMap := cgroupStatsToMap(stats); statsMap != nil {
proc["cgroup"] = statsMap
}
}
}
return procs, err
}