-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
probe.go
224 lines (187 loc) · 6.49 KB
/
probe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package main
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"sync/atomic"
"time"
"github.com/DataDog/datadog-agent/pkg/process/statsd"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/ebpf"
"github.com/DataDog/datadog-agent/pkg/ebpf/encoding"
"github.com/DataDog/datadog-agent/pkg/process/config"
"github.com/DataDog/datadog-agent/pkg/process/net"
)
// ErrSysprobeUnsupported is the unsupported error prefix, for error-class matching from callers
var ErrSysprobeUnsupported = errors.New("system-probe unsupported")
var inactivityLogDuration = 10 * time.Minute
// SystemProbe maintains and starts the underlying network connection collection process as well as
// exposes these connections over HTTP (via UDS)
type SystemProbe struct {
cfg *config.AgentConfig
tracer *ebpf.Tracer
conn net.Conn
tcpQueueLengthTracer *ebpf.TCPQueueLengthTracer
}
// CreateSystemProbe creates a SystemProbe as well as it's UDS socket after confirming that the OS supports BPF-based
// system probe
func CreateSystemProbe(cfg *config.AgentConfig) (*SystemProbe, error) {
// Checking whether the current OS + kernel version is supported by the tracer
if supported, msg := ebpf.IsTracerSupportedByOS(cfg.ExcludedBPFLinuxVersions); !supported {
return nil, fmt.Errorf("%s: %s", ErrSysprobeUnsupported, msg)
}
log.Infof("Creating tracer for: %s", filepath.Base(os.Args[0]))
t, err := ebpf.NewTracer(config.SysProbeConfigFromConfig(cfg))
if err != nil {
return nil, err
}
var tqlt *ebpf.TCPQueueLengthTracer
if cfg.CheckIsEnabled("TCP queue length") {
log.Infof("Starting the TCP queue length tracer")
tqlt, err = ebpf.NewTCPQueueLengthTracer()
if err != nil {
log.Errorf("unable to start the TCP queue length tracer: %v", err)
}
} else {
log.Infof("TCP queue length tracer disabled")
}
// Setting up the unix socket
conn, err := net.NewListener(cfg)
if err != nil {
return nil, err
}
return &SystemProbe{
tracer: t,
tcpQueueLengthTracer: tqlt,
cfg: cfg,
conn: conn,
}, nil
}
// Run makes available the HTTP endpoint for network collection
func (nt *SystemProbe) Run() {
// if a debug port is specified, we expose the default handler to that port
if nt.cfg.SystemProbeDebugPort > 0 {
go http.ListenAndServe(fmt.Sprintf("localhost:%d", nt.cfg.SystemProbeDebugPort), http.DefaultServeMux)
}
var runCounter uint64
// We don't want the endpoint for the system-probe output to be mixed with pprof and expvar
// We can only do this by creating a new HTTP Mux that does not have these endpoints handled
httpMux := http.NewServeMux()
httpMux.HandleFunc("/status", func(w http.ResponseWriter, req *http.Request) {})
httpMux.HandleFunc("/connections", func(w http.ResponseWriter, req *http.Request) {
start := time.Now()
id := getClientID(req)
cs, err := nt.tracer.GetActiveConnections(id)
if err != nil {
log.Errorf("unable to retrieve connections: %s", err)
w.WriteHeader(500)
return
}
contentType := req.Header.Get("Accept")
marshaler := encoding.GetMarshaler(contentType)
writeConnections(w, marshaler, cs)
count := atomic.AddUint64(&runCounter, 1)
logRequests(id, count, len(cs.Conns), start)
})
httpMux.HandleFunc("/debug/net_maps", func(w http.ResponseWriter, req *http.Request) {
cs, err := nt.tracer.DebugNetworkMaps()
if err != nil {
log.Errorf("unable to retrieve connections: %s", err)
w.WriteHeader(500)
return
}
contentType := req.Header.Get("Accept")
marshaler := encoding.GetMarshaler(contentType)
writeConnections(w, marshaler, cs)
})
httpMux.HandleFunc("/debug/net_state", func(w http.ResponseWriter, req *http.Request) {
stats, err := nt.tracer.DebugNetworkState(getClientID(req))
if err != nil {
log.Errorf("unable to retrieve tracer stats: %s", err)
w.WriteHeader(500)
return
}
writeAsJSON(w, stats)
})
httpMux.HandleFunc("/debug/stats", func(w http.ResponseWriter, req *http.Request) {
stats, err := nt.tracer.GetStats()
if err != nil {
log.Errorf("unable to retrieve tracer stats: %s", err)
w.WriteHeader(500)
return
}
writeAsJSON(w, stats)
})
httpMux.HandleFunc("/check/tcp_queue_length", func(w http.ResponseWriter, req *http.Request) {
if nt.tcpQueueLengthTracer == nil {
log.Errorf("TCP queue length tracer was not properly initialized")
w.WriteHeader(500)
return
}
stats := nt.tcpQueueLengthTracer.GetAndFlush()
writeAsJSON(w, stats)
})
go func() {
tags := []string{
fmt.Sprintf("version:%s", Version),
fmt.Sprintf("revision:%s", GitCommit),
}
heartbeat := time.NewTicker(15 * time.Second)
for range heartbeat.C {
statsd.Client.Gauge("datadog.system_probe.agent", 1, tags, 1)
}
}()
// Convenience logging if nothing has made any requests to the system-probe in some time, let's log something.
// This should be helpful for customers + support to debug the underlying issue.
time.AfterFunc(inactivityLogDuration, func() {
if run := atomic.LoadUint64(&runCounter); run == 0 {
log.Warnf("%v since the agent started without activity, the process-agent may not be configured correctly and/or running", inactivityLogDuration)
}
})
http.Serve(nt.conn.GetListener(), httpMux)
}
func logRequests(client string, count uint64, connectionsCount int, start time.Time) {
args := []interface{}{client, count, connectionsCount, time.Now().Sub(start)}
msg := "Got request on /connections?client_id=%s (count: %d): retrieved %d connections in %s"
switch {
case count <= 5, count%20 == 0:
log.Infof(msg, args...)
default:
log.Debugf(msg, args...)
}
}
func getClientID(req *http.Request) string {
var clientID = ebpf.DEBUGCLIENT
if rawCID := req.URL.Query().Get("client_id"); rawCID != "" {
clientID = rawCID
}
return clientID
}
func writeConnections(w http.ResponseWriter, marshaler encoding.Marshaler, cs *ebpf.Connections) {
buf, err := marshaler.Marshal(cs)
if err != nil {
log.Errorf("unable to marshall connections with type %s: %s", marshaler.ContentType(), err)
w.WriteHeader(500)
return
}
w.Header().Set("Content-type", marshaler.ContentType())
w.Write(buf)
log.Tracef("/connections: %d connections, %d bytes", len(cs.Conns), len(buf))
}
func writeAsJSON(w http.ResponseWriter, data interface{}) {
buf, err := json.Marshal(data)
if err != nil {
log.Errorf("unable to marshall connections into JSON: %s", err)
w.WriteHeader(500)
return
}
w.Write(buf)
}
// Close will stop all system probe activities
func (nt *SystemProbe) Close() {
nt.conn.Stop()
nt.tracer.Stop()
}