/
pids.go
223 lines (177 loc) · 5.34 KB
/
pids.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package ebpfcommon
import (
"log/slog"
"sync"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/grafana/beyla/pkg/internal/exec"
"github.com/grafana/beyla/pkg/internal/request"
"github.com/grafana/beyla/pkg/internal/svc"
)
type PIDType uint8
const (
PIDTypeKProbes PIDType = iota + 1
PIDTypeGo
)
var activePids, _ = lru.New[uint32, svc.ID](1024)
// injectable functions (can be replaced in tests). It reads the
// current process namespace from the /proc filesystem. It is required to
// choose to filter traces using whether the User-space or Host-space PIDs
var readNamespace = FindNamespace
var readNamespacePIDs = FindNamespacedPids
type PIDInfo struct {
service svc.ID
pidType PIDType
}
type ServiceFilter interface {
AllowPID(uint32, svc.ID, PIDType)
BlockPID(uint32)
Filter(inputSpans []request.Span) []request.Span
CurrentPIDs(PIDType) map[uint32]map[uint32]svc.ID
}
// PIDsFilter keeps a thread-safe copy of the PIDs whose traces are allowed to
// be forwarded. Its Filter method filters the request.Span instances whose
// PIDs are not in the allowed list.
type PIDsFilter struct {
log *slog.Logger
current map[uint32]map[uint32]PIDInfo
mux *sync.RWMutex
}
var commonPIDsFilter *PIDsFilter
var commonLock sync.Mutex
func NewPIDsFilter(log *slog.Logger) *PIDsFilter {
return &PIDsFilter{
log: log,
current: map[uint32]map[uint32]PIDInfo{},
mux: &sync.RWMutex{},
}
}
func CommonPIDsFilter(systemWide bool) ServiceFilter {
commonLock.Lock()
defer commonLock.Unlock()
if systemWide {
return &IdentityPidsFilter{}
}
if commonPIDsFilter == nil {
commonPIDsFilter = NewPIDsFilter(slog.With("component", "ebpfCommon.CommonPIDsFilter"))
}
return commonPIDsFilter
}
func (pf *PIDsFilter) AllowPID(pid uint32, svc svc.ID, pidType PIDType) {
pf.mux.Lock()
defer pf.mux.Unlock()
pf.addPID(pid, svc, pidType)
}
func (pf *PIDsFilter) BlockPID(pid uint32) {
pf.mux.Lock()
defer pf.mux.Unlock()
pf.removePID(pid)
}
func (pf *PIDsFilter) CurrentPIDs(t PIDType) map[uint32]map[uint32]svc.ID {
pf.mux.RLock()
defer pf.mux.RUnlock()
cp := map[uint32]map[uint32]svc.ID{}
for k, v := range pf.current {
cVal := map[uint32]svc.ID{}
for kv, vv := range v {
if vv.pidType == t {
cVal[kv] = vv.service
}
}
cp[k] = cVal
}
return cp
}
func (pf *PIDsFilter) Filter(inputSpans []request.Span) []request.Span {
pf.mux.RLock()
defer pf.mux.RUnlock()
// todo: adaptive presizing as a function of the historical percentage
// of filtered spans
outputSpans := make([]request.Span, 0, len(inputSpans))
for i := range inputSpans {
span := &inputSpans[i]
// We first confirm that the current namespace seen by BPF is tracked by Beyla
ns, nsExists := pf.current[span.Pid.Namespace]
if !nsExists {
continue
}
// If the namespace exist, we confirm that we are tracking the user PID that Beyla
// saw. We don't check for the host pid, because we can't be sure of the number
// of container layers. The Host PID is always the outer most layer.
if info, pidExists := ns[span.Pid.UserPID]; pidExists {
inputSpans[i].ServiceID = info.service
outputSpans = append(outputSpans, inputSpans[i])
}
}
if len(outputSpans) != len(inputSpans) {
pf.log.Debug("filtered spans from processes that did not match discovery",
"function", "PIDsFilter.Filter", "inLen", len(inputSpans), "outLen", len(outputSpans),
"pids", pf.current, "spans", inputSpans,
)
}
return outputSpans
}
func (pf *PIDsFilter) addPID(pid uint32, s svc.ID, t PIDType) {
nsid, err := readNamespace(int32(pid))
if err != nil {
pf.log.Error("Error looking up namespace for tracking PID", "pid", pid, "error", err)
return
}
ns, nsExists := pf.current[nsid]
if !nsExists {
ns = make(map[uint32]PIDInfo)
pf.current[nsid] = ns
}
allPids, err := readNamespacePIDs(int32(pid))
if err != nil {
pf.log.Error("Error looking up namespaced pids", "pid", pid, "error", err)
return
}
for _, p := range allPids {
ns[p] = PIDInfo{service: s, pidType: t}
}
}
func (pf *PIDsFilter) removePID(pid uint32) {
nsid, err := readNamespace(int32(pid))
if err != nil {
// this will always happen on process removal, as /proc/<pid>/ns/pid won't be found
// the code is kept here as a placeholder for a future fix (e.g. using eBPF notifications
// to get both the PID and the nsid)
// TODO: fix
pf.log.Debug("Error looking up namespace for removing PID", "pid", pid, "error", err)
return
}
ns, nsExists := pf.current[nsid]
if !nsExists {
return
}
delete(ns, pid)
if len(ns) == 0 {
delete(pf.current, nsid)
}
}
// IdentityPidsFilter is a PIDsFilter that does not filter anything. It is feasible
// for system-wide instrumenation
type IdentityPidsFilter struct{}
func (pf *IdentityPidsFilter) AllowPID(_ uint32, _ svc.ID, _ PIDType) {}
func (pf *IdentityPidsFilter) BlockPID(_ uint32) {}
func (pf *IdentityPidsFilter) CurrentPIDs(_ PIDType) map[uint32]map[uint32]svc.ID {
return nil
}
func (pf *IdentityPidsFilter) Filter(inputSpans []request.Span) []request.Span {
for i := range inputSpans {
s := &inputSpans[i]
s.ServiceID = serviceInfo(s.Pid.HostPID)
}
return inputSpans
}
func serviceInfo(pid uint32) svc.ID {
cached, ok := activePids.Get(pid)
if ok {
return cached
}
name := commName(pid)
lang := exec.FindProcLanguage(int32(pid), nil)
result := svc.ID{Name: name, SDKLanguage: lang}
activePids.Add(pid, result)
return result
}