Skip to content

Commit

Permalink
filebeat/input/{tcp,udp}: add support for IPv6 interface metrics (#35123
Browse files Browse the repository at this point in the history
)

Also fix base for parsing values from /proc/net tables.
  • Loading branch information
efd6 committed Apr 20, 2023
1 parent 7929f6b commit 9807be1
Show file tree
Hide file tree
Showing 9 changed files with 268 additions and 113 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Add input instance id to request trace filename for httpjson and cel inputs {pull}35024[35024]
- Fix panic in TCP and UDP inputs on Linux when collecting socket metrics from OS. {issue}35064[35064]
- Correctly collect TCP and UDP metrics for unspecified address values. {pull}35111[35111]
- Fix base for UDP and TCP queue metrics and UDP drops metric. {pull}35123[35123]

*Heartbeat*

Expand Down Expand Up @@ -258,6 +259,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Mention `mito` CEL tool in CEL input docs. {pull}34959[34959]
- Add nginx ingress_controller parsing if one of upstreams fails to return response {pull}34787[34787]
- Allow neflow v9 and ipfix templates to be shared between source addresses. {pull}35036[35036]
- Add support for collecting IPv6 metrics. {pull}35123[35123]

*Auditbeat*
- Migration of system/package module storage from gob encoding to flatbuffer encoding in bolt db. {pull}34817[34817]
Expand Down
2 changes: 1 addition & 1 deletion filebeat/docs/inputs/input-tcp.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ observe the activity of the input.
| `device` | Host/port of the TCP stream.
| `received_events_total` | Total number of packets (events) that have been received.
| `received_bytes_total` | Total number of bytes received.
| `receive_queue_length` | Size of the system receive queue (linux only) (gauge).
| `receive_queue_length` | Aggregated size of the system receive queues (IPv4 and IPv6) (linux only) (gauge).
| `arrival_period` | Histogram of the time between successive packets in nanoseconds.
| `processing_time` | Histogram of the time taken to process packets in nanoseconds.
|=======
Expand Down
4 changes: 2 additions & 2 deletions filebeat/docs/inputs/input-udp.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ observe the activity of the input.
| `udp_read_buffer_length_gauge` | Size of the UDP socket buffer length in bytes (gauge).
| `received_events_total` | Total number of packets (events) that have been received.
| `received_bytes_total` | Total number of bytes received.
| `receive_queue_length` | Size of the system receive queue (linux only) (gauge).
| `system_packet_drops` | Number of system packet drops (linux only) (gauge).
| `receive_queue_length` | Aggregated size of the system receive queues (IPv4 and IPv6) (linux only) (gauge).
| `system_packet_drops` | Aggregated number of system packet drops (IPv4 and IPv6) (linux only) (gauge).
| `arrival_period` | Histogram of the time between successive packets in nanoseconds.
| `processing_time` | Histogram of the time taken to process packets in nanoseconds.
|=======
Expand Down
53 changes: 37 additions & 16 deletions filebeat/input/tcp/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ type inputMetrics struct {
device *monitoring.String // name of the device being monitored
packets *monitoring.Uint // number of packets processed
bytes *monitoring.Uint // number of bytes processed
rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/tcp (only on linux systems)
rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/tcp{,6} (only on linux systems)
arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals
processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication
}
Expand Down Expand Up @@ -196,9 +196,9 @@ func newInputMetrics(id, device string, poll time.Duration, log *logp.Logger) *i
out.device.Set(device)

if poll > 0 && runtime.GOOS == "linux" {
host, port, ok := strings.Cut(device, ":")
if !ok {
log.Warnf("failed to get address for %s: no port separator", device)
host, port, err := net.SplitHostPort(device)
if err != nil {
log.Warnf("failed to get address for %s: could not split host and port:", err)
return out
}
ip, err := net.LookupIP(host)
Expand All @@ -213,15 +213,19 @@ func newInputMetrics(id, device string, poll time.Duration, log *logp.Logger) *i
}
ph := strconv.FormatInt(p, 16)
addr := make([]string, 0, len(ip))
addr6 := make([]string, 0, len(ip))
for _, p := range ip {
p4 := p.To4()
if len(p4) != net.IPv4len {
continue
switch len(p) {
case net.IPv4len:
addr = append(addr, fmt.Sprintf("%X:%s", binary.LittleEndian.Uint32(p.To4()), ph))
case net.IPv6len:
addr6 = append(addr6, fmt.Sprintf("%X:%s", binary.LittleEndian.Uint32(p.To16()), ph))
default:
log.Warnf("unexpected addr length %d for %s", len(p), p)
}
addr = append(addr, fmt.Sprintf("%X:%s", binary.LittleEndian.Uint32(p4), ph))
}
out.done = make(chan struct{})
go out.poll(addr, poll, log)
go out.poll(addr, addr6, poll, log)
}

return out
Expand All @@ -242,10 +246,14 @@ func (m *inputMetrics) log(data []byte, timestamp time.Time) {
}

// poll periodically gets TCP buffer stats from the OS.
func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger) {
func (m *inputMetrics) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) {
hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr)
if badAddr != nil {
log.Warnf("failed to parse addrs for metric collection %q", badAddr)
log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr)
}
hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr)
if badAddr != nil {
log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr)
}
t := time.NewTicker(each)
for {
Expand All @@ -256,7 +264,12 @@ func (m *inputMetrics) poll(addr []string, each time.Duration, log *logp.Logger)
log.Warnf("failed to get tcp stats from /proc: %v", err)
continue
}
m.rxQueue.Set(uint64(rx))
rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
log.Warnf("failed to get tcp6 stats from /proc: %v", err)
continue
}
m.rxQueue.Set(uint64(rx + rx6))
case <-m.done:
t.Stop()
return
Expand Down Expand Up @@ -284,13 +297,17 @@ func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []strin
}

// procNetTCP returns the rx_queue field of the TCP socket table for the
// socket on the provided address formatted in hex, xxxxxxxx:xxxx.
// socket on the provided address formatted in hex, xxxxxxxx:xxxx or the IPv6
// equivalent.
// This function is only useful on linux due to its dependence on the /proc
// filesystem, but is kept in this file for simplicity. If hasUnspecified
// is true, all addresses listed in the file in path are considered, and the
// sum of rx_queue matching the addr ports is returned where the corresponding
// addrIsUnspecified is true.
func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) {
if len(addr) == 0 {
return 0, nil
}
b, err := os.ReadFile(path)
if err != nil {
return 0, err
Expand All @@ -302,14 +319,18 @@ func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecifi
var found bool
for _, l := range lines[1:] {
f := bytes.Fields(l)
if len(f) > 4 && contains(f[1], addr, addrIsUnspecified) {
const queuesField = 4
if len(f) > queuesField && contains(f[1], addr, addrIsUnspecified) {
_, r, ok := bytes.Cut(f[4], []byte(":"))
if !ok {
return 0, errors.New("no rx_queue field " + string(f[4]))
return 0, errors.New("no rx_queue field " + string(f[queuesField]))
}
found = true

v, err := strconv.ParseInt(string(r), 16, 64)
// queue lengths are decimal, e.g.:
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/tcp_ipv4.c#L2643
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/tcp_ipv6.c#L1987
v, err := strconv.ParseInt(string(r), 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
Expand Down
120 changes: 84 additions & 36 deletions filebeat/input/tcp/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,93 @@ import (
)

func TestProcNetTCP(t *testing.T) {
t.Run("with_match", func(t *testing.T) {
addr := []string{"0100007F:17AC"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 1, rx)
})
t.Run("IPv4", func(t *testing.T) {
path := "testdata/proc_net_tcp.txt"
t.Run("with_match", func(t *testing.T) {
addr := []string{"0100007F:17AC"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 1, rx)
})

t.Run("unspecified", func(t *testing.T) {
addr := []string{"00000000:17AC"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 2, rx)
})
t.Run("unspecified", func(t *testing.T) {
addr := []string{"00000000:17AC"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 2, rx)
})

t.Run("without_match", func(t *testing.T) {
addr := []string{"deadbeef:f00d", "ba1dface:1135"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified)
assert.Nil(t, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
})

t.Run("without_match", func(t *testing.T) {
addr := []string{"deadbeef:f00d", "ba1dface:1135"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
assert.Nil(t, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
t.Run("bad_addrs", func(t *testing.T) {
addr := []string{"FOO:BAR", "BAR:BAZ"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified)
assert.EqualValues(t, addr, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
})
})

t.Run("bad_addrs", func(t *testing.T) {
addr := []string{"FOO:BAR", "BAR:BAZ"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, err := procNetTCP("testdata/proc_net_tcp.txt", addr, hasUnspecified, addrIsUnspecified)
assert.EqualValues(t, addr, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
t.Run("IPv6", func(t *testing.T) {
path := "testdata/proc_net_tcp6.txt"
t.Run("with_match", func(t *testing.T) {
addr := []string{"0000000000000000000000000100007f:17AC"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 1, rx)
})

t.Run("unspecified", func(t *testing.T) {
addr := []string{"00000000000000000000000000000000:17AC"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
rx, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified)
if err != nil {
t.Fatal(err)
}
assert.Nil(t, bad)
assert.EqualValues(t, 2, rx)
})

t.Run("without_match", func(t *testing.T) {
addr := []string{"deadbeefdeadbeefdeadbeefdeadbeef:f00d", "ba1dfaceba1dfaceba1dfaceba1dface:1135"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified)
assert.Nil(t, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
})

t.Run("bad_addrs", func(t *testing.T) {
addr := []string{"FOO:BAR", "BAR:BAZ"}
hasUnspecified, addrIsUnspecified, bad := containsUnspecifiedAddr(addr)
_, err := procNetTCP(path, addr, hasUnspecified, addrIsUnspecified)
assert.EqualValues(t, addr, bad)
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "entry not found")
}
})
})
}
6 changes: 6 additions & 0 deletions filebeat/input/tcp/testdata/proc_net_tcp6.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
sl local_address remote_address st tx_queue rx_queue tr tm->when retrnsmt uid timeout inode
0: 00000000000000000000000000000000:006F 00000000000000000000000000000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 19587 1 ffff880262360000 100 0 0 10 -1
1: 00000000000000000000000000000000:0050 00000000000000000000000000000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 22011 1 ffff88026c1887c0 100 0 0 10 -1
2: 00000000000000000000000000000000:0016 00000000000000000000000000000000:0000 0A 00000000:00000000 00:00000000 00000000 0 0 21958 1 ffff880216c88000 100 0 0 10 -1
3: 0000000000000000000000000100007f:17AC 00000000000000000000000000000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 28592 1 ffff8802deeae000 100 0 0 10 -1
3: 00000000000000000000000081000000:17AC 00000000000000000000000000000000:0000 0A 00000000:00000001 00:00000000 00000000 0 0 28592 1 ffff8802deeae000 100 0 0 10 -1

0 comments on commit 9807be1

Please sign in to comment.