forked from TencentBlueKing/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
process.go
109 lines (93 loc) · 2.63 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// +build darwin freebsd linux windows
package process
import (
"fmt"
"runtime"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
"github.com/elastic/beats/metricbeat/module/system"
"github.com/elastic/gosigar/cgroup"
"github.com/pkg/errors"
)
var debugf = logp.MakeDebug("system-process")
func init() {
if err := mb.Registry.AddMetricSet("system", "process", New, parse.EmptyHostParser); err != nil {
panic(err)
}
}
// MetricSet that fetches process metrics.
type MetricSet struct {
mb.BaseMetricSet
stats *ProcStats
cgroup *cgroup.Reader
}
// New creates and returns a new MetricSet.
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := struct {
Procs []string `config:"processes"`
Cgroups *bool `config:"process.cgroups.enabled"`
EnvWhitelist []string `config:"process.env.whitelist"`
}{
Procs: []string{".*"}, // collect all processes by default
}
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}
m := &MetricSet{
BaseMetricSet: base,
stats: &ProcStats{
Procs: config.Procs,
EnvWhitelist: config.EnvWhitelist,
},
}
err := m.stats.InitProcStats()
if err != nil {
return nil, err
}
if runtime.GOOS == "linux" {
systemModule, ok := base.Module().(*system.Module)
if !ok {
return nil, fmt.Errorf("unexpected module type")
}
if config.Cgroups == nil || *config.Cgroups {
debugf("process cgroup data collection is enabled, using hostfs='%v'", systemModule.HostFS)
m.cgroup, err = cgroup.NewReader(systemModule.HostFS, true)
if err != nil {
if err == cgroup.ErrCgroupsMissing {
logp.Warn("cgroup data collection will be disabled: %v", err)
} else {
return nil, errors.Wrap(err, "error initializing cgroup reader")
}
}
}
}
return m, nil
}
// Fetch fetches metrics for all processes. It iterates over each PID and
// collects process metadata, CPU metrics, and memory metrics.
func (m *MetricSet) Fetch() ([]common.MapStr, error) {
procs, err := m.stats.GetProcStats()
if err != nil {
return nil, errors.Wrap(err, "process stats")
}
if m.cgroup != nil {
for _, proc := range procs {
pid, ok := proc["pid"].(int)
if !ok {
debugf("error converting pid to int for proc %+v", proc)
continue
}
stats, err := m.cgroup.GetStatsForProcess(pid)
if err != nil {
debugf("error getting cgroups stats for pid=%d, %v", pid, err)
continue
}
if statsMap := cgroupStatsToMap(stats); statsMap != nil {
proc["cgroup"] = statsMap
}
}
}
return procs, err
}