-
Notifications
You must be signed in to change notification settings - Fork 229
/
host.go
411 lines (351 loc) · 12.3 KB
/
host.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Apache License 2.0.
* See the file "LICENSE" for details.
*/
package host
import (
"bytes"
"fmt"
"io"
"net"
"os"
"regexp"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"github.com/jsimonetti/rtnetlink"
log "github.com/sirupsen/logrus"
"github.com/syndtr/gocapability/capability"
"go.uber.org/multierr"
"golang.org/x/sys/unix"
"github.com/elastic/otel-profiling-agent/config"
"github.com/elastic/otel-profiling-agent/libpf"
"github.com/elastic/otel-profiling-agent/libpf/pfnamespaces"
)
// Host metadata keys
// Changing these values is a customer-visible change.
const (
KeyKernelProcVersion = "host:kernel_proc_version"
KeyKernelVersion = "host:kernel_version"
KeyHostname = "host:hostname"
KeyMachine = "host:machine"
KeyIPAddress = "host:ip"
// Prefix for all the sysctl keys
keyPrefixSysctl = "host:sysctl/"
keyTags = "host:tags"
)
// Various sysctls we are interested in.
// net.* sysctls must be read from the root network namespace.
var sysctls = []string{
"net.core.bpf_jit_enable",
"kernel.bpf_stats_enabled",
"kernel.unprivileged_bpf_disabled",
}
var ValidTagRegex = regexp.MustCompile(`^[a-zA-Z0-9-:._]+$`)
// AddMetadata adds host metadata to the result map, that is common across all environments.
// The IP address and hostname (part of the returned metadata) are evaluated in the context of
// PID 1's namespaces, in order to make the information agnostic to any container solutions.
// This may not be the best thing to do in some scenarios, but still seems to be the most sensible
// default.
func AddMetadata(caEndpoint string, result map[string]string) error {
// Extract the host part of the endpoint
// Remove the port from the endpoint in case it is present
host, _, err := net.SplitHostPort(caEndpoint)
if err != nil {
host = caEndpoint
}
validatedTags := config.ValidatedTags()
if validatedTags != "" {
result[keyTags] = validatedTags
}
// Get /proc/version. This is better than the information returned by `uname`, as it contains
// the version of the compiler that compiled the kernel.
kernelProcVersion, err := os.ReadFile("/proc/version")
if err != nil {
return fmt.Errorf("unable to read /proc/version: %v", err)
}
result[KeyKernelProcVersion] = sanitizeString(kernelProcVersion)
info, err := readCPUInfo()
if err != nil {
return fmt.Errorf("unable to read CPU information: %v", err)
}
dedupCPUInfo(result, info)
// The rest of the metadata needs CAP_SYS_ADMIN to be collected, so we check that first
hasCapSysAdmin, err := hasCapSysAdmin()
if err != nil {
return err
}
// We need to call the `setns` syscall to extract information (network route, hostname) from
// different namespaces.
// However, `setns` doesn't know about goroutines, it operates on OS threads.
// Therefore, the below code needs to take extra steps to make sure no other code (outside of
// this function) will execute in a different namespace.
//
// To do this, we use `runtime.LockOSThread()`, which we call from a separate goroutine.
// runtime.LockOSThread() ensures that the thread executing the goroutine will be terminated
// when the goroutine exits, which makes it impossible for the entered namespaces to be used in
// a different context than the below code.
//
// It would be doable without a goroutine, by saving and restoring the namespaces before calling
// runtime.UnlockOSThread(), but error handling makes things complicated and unsafe/dangerous.
// The below implementation is always safe to run even in the presence of errors.
//
// The only downside is that calling this function comes at the cost of sacrificing an OS
// thread, which will likely force the Go runtime to launch a new thread later. This should be
// acceptable if it doesn't happen too often.
var wg sync.WaitGroup
wg.Add(1)
// Error result of the below goroutine. May contain multiple combined errors.
var errResult error
go func() {
defer wg.Done()
if hasCapSysAdmin {
// Before entering a different namespace, lock the current goroutine to a thread.
// Note that we do *not* call runtime.UnlockOSThread(): this ensures the current thread
// will exit after the goroutine finishes, which makes it impossible for other
// goroutines to enter a different namespace.
runtime.LockOSThread()
// Try to enter root namespaces. If that fails, continue anyway as we might be able to
// gather some metadata.
utsFD, netFD := tryEnterRootNamespaces()
// Any errors were already logged by the above function.
if utsFD != -1 {
defer unix.Close(utsFD)
}
if netFD != -1 {
defer unix.Close(netFD)
}
} else {
log.Warnf("No CAP_SYS_ADMIN capability, collecting metadata from " +
"current process namespaces")
}
// Add sysctls to the result map
for _, sysctl := range sysctls {
sysctlValue, err2 := getSysctl(sysctl)
if err2 != nil {
errResult = multierr.Combine(errResult, err2)
continue
}
sysctlKey := keyPrefixSysctl + sysctl
result[sysctlKey] = sanitizeString(sysctlValue)
}
// Get IP address
var ip net.IP
ip, err = getSourceIPAddress(host)
if err != nil {
errResult = multierr.Combine(errResult, err)
} else {
result[KeyIPAddress] = ip.String()
}
// Get uname-related metadata: hostname and kernel version
uname := &unix.Utsname{}
if err = unix.Uname(uname); err != nil {
errResult = multierr.Combine(errResult, fmt.Errorf("error calling uname: %v", err))
} else {
result[KeyKernelVersion] = sanitizeString(uname.Release[:])
result[KeyHostname] = sanitizeString(uname.Nodename[:])
result[KeyMachine] = sanitizeString(uname.Machine[:])
}
}()
wg.Wait()
if errResult != nil {
return errResult
}
return nil
}
const keySuffixCPUSocketID = "socketIDs"
func keySocketID(prefix string) string {
return fmt.Sprintf("%s/%s", prefix, keySuffixCPUSocketID)
}
// dedupCPUInfo transforms cpuInfo values into a more compact form and populates result map.
// The resulting keys and values generated from a cpuInfo key K with socket IDs 0,1,2,3
// and values V, V, V1, V2 will be of the form:
//
// "K": "V;V1;V2"
// "K/socketIDs": "0,1;2;3"
//
// The character ';' is used as a separator for distinct values since is it highly unlikely
// that it will occur as part of the values themselves, most of which are numeric.
// TODO: Investigate alternative encoding schemes such as JSON.
func dedupCPUInfo(result map[string]string, info cpuInfo) {
for key, socketValues := range info {
// A map from CPU info values to their associated socket ids
uniques := map[string]libpf.Set[string]{}
// Gather all unique values and their socket ids for this key
for socketID, socketValue := range socketValues {
sid := strconv.Itoa(socketID)
if _, ok := uniques[socketValue]; !ok {
uniques[socketValue] = libpf.Set[string]{}
}
uniques[socketValue][sid] = libpf.Void{}
}
values := libpf.MapKeysToSlice(uniques)
result[key] = strings.Join(values, ";")
// Gather all socketIDs, combine them and write them to result
socketIDs := make([]string, 0, len(values))
for _, v := range values {
sids := uniques[v].ToSlice()
sort.Slice(sids, func(a, b int) bool {
intA, _ := strconv.Atoi(sids[a])
intB, _ := strconv.Atoi(sids[b])
return intA < intB
})
socketIDs = append(socketIDs, strings.Join(sids, ","))
}
result[keySocketID(key)] = strings.Join(socketIDs, ";")
}
}
func sanitizeString(str []byte) string {
// Trim byte array from 0x00 bytes
return string(bytes.Trim(str, "\x00"))
}
func addressFamily(ip net.IP) (uint8, error) {
if ip.To4() != nil {
return unix.AF_INET, nil
}
if len(ip) == net.IPv6len {
return unix.AF_INET6, nil
}
return 0, fmt.Errorf("invalid IP address: %v", ip)
}
// getSourceIPAddress returns the source IP address for the traffic destined to the specified
// domain.
func getSourceIPAddress(domain string) (net.IP, error) {
conn, err := rtnetlink.Dial(nil)
if err != nil {
return nil, fmt.Errorf("unable to open netlink connection")
}
defer conn.Close()
dstIPs, err := net.LookupIP(domain)
if err != nil {
return nil, fmt.Errorf("unable to resolve %s: %v", domain, err)
}
if len(dstIPs) == 0 {
return nil, fmt.Errorf("unable to resolve %s: no IP address", domain)
}
var srcIP net.IP
var lastError error
found := false
// We might get multiple IP addresses, check all of them as some may not be routable (like an
// IPv6 address on an IPv4 network).
for _, ip := range dstIPs {
addressFamily, err := addressFamily(ip)
if err != nil {
return nil, fmt.Errorf("unable to get address family for %s: %v", ip.String(), err)
}
req := &rtnetlink.RouteMessage{
Family: addressFamily,
Table: unix.RT_TABLE_MAIN,
Attributes: rtnetlink.RouteAttributes{
Dst: ip,
},
}
routes, err := conn.Route.Get(req)
if err != nil {
lastError = fmt.Errorf("unable to get route to %s (%s): %v", domain, ip.String(), err)
continue
}
if len(routes) == 0 {
continue
}
if len(routes) > 1 {
// More than 1 route!
// This doesn't look like this should ever happen (even in the presence of overlapping
// routes with same metric, this will return a single route).
// May be a leaky abstraction/artifact from the way the netlink API works?
// Regardless, this seems ok to ignore, but log just in case.
log.Warnf("Found multiple (%d) routes to %v; first 2 routes: %#v and %#v",
len(routes), domain, routes[0], routes[1])
}
// Sanity-check the result, in case the source address is left uninitialized
if len(routes[0].Attributes.Src) == 0 {
lastError = fmt.Errorf(
"unable to get route to %s (%s): no source IP address", domain, ip.String())
continue
}
srcIP = routes[0].Attributes.Src
found = true
break
}
if !found {
return nil, fmt.Errorf("no route found to %s: %v", domain, lastError)
}
log.Debugf("Traffic to %v is routed from %v", domain, srcIP.String())
return srcIP, nil
}
func hasCapSysAdmin() (bool, error) {
caps, err := capability.NewPid2(0) // 0 == current process
if err != nil {
return false, fmt.Errorf("unable to get process capabilities")
}
err = caps.Load()
if err != nil {
return false, fmt.Errorf("unable to load process capabilities")
}
return caps.Get(capability.EFFECTIVE, capability.CAP_SYS_ADMIN), nil
}
// getSysctl returns the value of a particular sysctl (eg: "net.core.bpf_jit_enable").
func getSysctl(sysctl string) ([]byte, error) {
// "net.core.bpf_jit_enable" => /proc/sys/net/core/bpf_jit_enable
path := "/proc/sys/" + strings.ReplaceAll(sysctl, ".", "/")
file, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf("unable to open %v: %v", path, err)
}
defer file.Close()
contents, err := io.ReadAll(file)
if err != nil {
return nil, fmt.Errorf("unable to read %v: %v", path, err)
}
if len(contents) == 0 {
return []byte{}, nil
}
// Remove the trailing newline if present
length := len(contents)
if contents[length-1] == 0x0a {
contents = contents[:length-1]
}
return contents, nil
}
// ValidateTags parses and validates user-specified tags.
// Each tag must match ValidTagRegex with ';' used as a separator.
// Tags that can't be validated are dropped.
// The empty string is returned if no tags can be validated.
func ValidateTags(tags string) string {
if tags == "" {
return ""
}
splitTags := strings.Split(tags, ";")
validatedTags := make([]string, 0, len(splitTags))
for _, tag := range splitTags {
if !ValidTagRegex.MatchString(tag) {
log.Warnf("Rejected user-specified tag '%s' since it doesn't match regexp '%v'",
tag, ValidTagRegex)
} else {
validatedTags = append(validatedTags, tag)
}
}
if len(validatedTags) > 0 {
return strings.Join(validatedTags, ";")
}
return ""
}
// tryEnterRootNamespaces tries to enter PID 1's UTS and network namespaces.
// It returns the file descriptor associated to each, or -1 if the namespace cannot be entered.
func tryEnterRootNamespaces() (utsFD, netFD int) {
netFD, err := pfnamespaces.EnterNamespace(1, "net")
if err != nil {
log.Errorf(
"Unable to enter root network namespace, host metadata may be incorrect: %v", err)
netFD = -1
}
utsFD, err = pfnamespaces.EnterNamespace(1, "uts")
if err != nil {
log.Errorf("Unable to enter root UTS namespace, host metadata may be incorrect: %v", err)
utsFD = -1
}
return utsFD, netFD
}