Skip to content

Commit

Permalink
[filebeat] add netflow input metrics (#38055)
Browse files Browse the repository at this point in the history
* feat: introduce input/netmetrics to allow multiple inputs utilise the same tcp and/or udp metrics

* feat: add netflow input specific metrics discarded_events_total, flows_total, decode_errors_total

* feat: add netflow input specific metric open_connections

* fix: omit the name of unused receivers

* fix: remove deprecated ioutil with io

* fix: remove redundant type conversion

* fix: address naked return

* fix: proper equality check in TestSessionState

* fix: ignore explicitly the error from registering v9 protocol

* doc: update changelog

* feat: refactor netmetrics to instantiate and expose monitoring registry

* feat: rename GetRegistry to Registry in TCP and UDP of netmetrics to promote idiomatic go

* doc: update input-netflow.asciidoc to capture the new metrics

* fix: move metrics last in input-netflow.asciidoc
  • Loading branch information
pkoutsovasilis committed Feb 22, 2024
1 parent d762071 commit a0a567c
Show file tree
Hide file tree
Showing 22 changed files with 801 additions and 665 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ fields added to events containing the Beats version. {pull}37553[37553]
*Filebeat*

- Convert netflow input to API v2 and disable event normalisation {pull}37901[37901]
- Introduce input/netmetrics and refactor netflow input metrics {pull}38055[38055]


*Heartbeat*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,27 @@
// specific language governing permissions and limitations
// under the License.

// Package procnet provides support for obtaining and formatting /proc/net
// network addresses for linux systems.
package procnet
// Package netmetrics provides different metricsets for capturing network-related metrics,
// such as UDP and TCP metrics through NewUDP and NewTCP, respectively.
package netmetrics

import (
"bytes"
"encoding/hex"
"fmt"
"net"
"strconv"
"strings"

"github.com/elastic/elastic-agent-libs/logp"
)

// Addrs returns the linux /proc/net/tcp or /proc/net/udp addresses for the
// addrs returns the linux /proc/net/tcp or /proc/net/udp addresses for the
// provided host address, addr. addr is a host:port address and host may be
// an IPv4 or IPv6 address, or an FQDN for the host. The returned slices
// contain the string representations of the address as they would appear in
// the /proc/net tables.
func Addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) {
func addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) {
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, nil, fmt.Errorf("failed to get address for %s: could not split host and port: %w", addr, err)
Expand All @@ -54,23 +57,23 @@ func Addrs(addr string, log *logp.Logger) (addr4, addr6 []string, err error) {
// the len constants all addresses may appear to be IPv6.
switch {
case len(p.To4()) == net.IPv4len:
addr4 = append(addr4, IPv4(p, int(pn)))
addr4 = append(addr4, ipV4(p, int(pn)))
case len(p.To16()) == net.IPv6len:
addr6 = append(addr6, IPv6(p, int(pn)))
addr6 = append(addr6, ipV6(p, int(pn)))
default:
log.Warnf("unexpected addr length %d for %s", len(p), p)
}
}
return addr4, addr6, nil
}

// IPv4 returns the string representation of an IPv4 address in a /proc/net table.
func IPv4(ip net.IP, port int) string {
// ipV4 returns the string representation of an IPv4 address in a /proc/net table.
func ipV4(ip net.IP, port int) string {
return fmt.Sprintf("%08X:%04X", reverse(ip.To4()), port)
}

// IPv6 returns the string representation of an IPv6 address in a /proc/net table.
func IPv6(ip net.IP, port int) string {
// ipV6 returns the string representation of an IPv6 address in a /proc/net table.
func ipV6(ip net.IP, port int) string {
return fmt.Sprintf("%032X:%04X", reverse(ip.To16()), port)
}

Expand All @@ -81,3 +84,37 @@ func reverse(b []byte) []byte {
}
return c
}

func contains(b []byte, addr []string, addrIsUnspecified []bool) bool {
for i, a := range addr {
if addrIsUnspecified[i] {
_, ap, pok := strings.Cut(a, ":")
_, bp, bok := bytes.Cut(b, []byte(":"))
if pok && bok && strings.EqualFold(string(bp), ap) {
return true
}
} else if strings.EqualFold(string(b), a) {
return true
}
}
return false
}

func containsUnspecifiedAddr(addr []string) (yes bool, which []bool, bad []string) {
which = make([]bool, len(addr))
for i, a := range addr {
prefix, _, ok := strings.Cut(a, ":")
if !ok {
continue
}
ip, err := hex.DecodeString(prefix)
if err != nil {
bad = append(bad, a)
}
if net.IP(ip).IsUnspecified() {
yes = true
which[i] = true
}
}
return yes, which, bad
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package procnet
package netmetrics

import (
"testing"
Expand All @@ -25,7 +25,7 @@ import (

func TestAddrs(t *testing.T) {
t.Run("ipv4", func(t *testing.T) {
addr4, addr6, err := Addrs("0.0.0.0:9001", logp.L())
addr4, addr6, err := addrs("0.0.0.0:9001", logp.L())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand All @@ -38,7 +38,7 @@ func TestAddrs(t *testing.T) {
})

t.Run("ipv6", func(t *testing.T) {
addr4, addr6, err := Addrs("[::]:9001", logp.L())
addr4, addr6, err := addrs("[::]:9001", logp.L())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
251 changes: 251 additions & 0 deletions filebeat/input/netmetrics/tcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package netmetrics

import (
"bytes"
"errors"
"fmt"
"os"
"runtime"
"strconv"
"time"

"github.com/rcrowley/go-metrics"

"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
)

// TCP handles the TCP metric reporting.
type TCP struct {
unregister func()
done chan struct{}

monitorRegistry *monitoring.Registry

lastPacket time.Time

device *monitoring.String // name of the device being monitored
packets *monitoring.Uint // number of packets processed
bytes *monitoring.Uint // number of bytes processed
rxQueue *monitoring.Uint // value of the rx_queue field from /proc/net/tcp{,6} (only on linux systems)
arrivalPeriod metrics.Sample // histogram of the elapsed time between packet arrivals
processingTime metrics.Sample // histogram of the elapsed time between packet receipt and publication
}

// NewTCP returns a new TCP input metricset. Note that if the id is empty then a nil TCP metricset is returned.
func NewTCP(inputName string, id string, device string, poll time.Duration, log *logp.Logger) *TCP {
if id == "" {
return nil
}
reg, unreg := inputmon.NewInputRegistry(inputName, id, nil)
out := &TCP{
unregister: unreg,
monitorRegistry: reg,
device: monitoring.NewString(reg, "device"),
packets: monitoring.NewUint(reg, "received_events_total"),
bytes: monitoring.NewUint(reg, "received_bytes_total"),
rxQueue: monitoring.NewUint(reg, "receive_queue_length"),
arrivalPeriod: metrics.NewUniformSample(1024),
processingTime: metrics.NewUniformSample(1024),
}
_ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.arrivalPeriod))
_ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.processingTime))

out.device.Set(device)

if poll > 0 && runtime.GOOS == "linux" {
addr4, addr6, err := addrs(device, log)
if err != nil {
log.Warn(err)
return out
}
out.done = make(chan struct{})
go out.poll(addr4, addr6, poll, log)
}

return out
}

// Log logs metric for the given packet.
func (m *TCP) Log(data []byte, timestamp time.Time) {
if m == nil {
return
}
m.processingTime.Update(time.Since(timestamp).Nanoseconds())
m.packets.Add(1)
m.bytes.Add(uint64(len(data)))
if !m.lastPacket.IsZero() {
m.arrivalPeriod.Update(timestamp.Sub(m.lastPacket).Nanoseconds())
}
m.lastPacket = timestamp
}

// poll periodically gets TCP buffer stats from the OS.
func (m *TCP) poll(addr, addr6 []string, each time.Duration, log *logp.Logger) {
hasUnspecified, addrIsUnspecified, badAddr := containsUnspecifiedAddr(addr)
if badAddr != nil {
log.Warnf("failed to parse IPv4 addrs for metric collection %q", badAddr)
}
hasUnspecified6, addrIsUnspecified6, badAddr := containsUnspecifiedAddr(addr6)
if badAddr != nil {
log.Warnf("failed to parse IPv6 addrs for metric collection %q", badAddr)
}

// Do an initial check for access to the filesystem and of the
// value constructed by containsUnspecifiedAddr. This gives a
// base level for the rx_queue values and ensures that if the
// constructed address values are malformed we panic early
// within the period of system testing.
want4 := true
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
want4 = false
log.Infof("did not get initial tcp stats from /proc: %v", err)
}
want6 := true
rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
want6 = false
log.Infof("did not get initial tcp6 stats from /proc: %v", err)
}
if !want4 && !want6 {
log.Warnf("failed to get initial tcp or tcp6 stats from /proc: %v", err)
} else {
m.rxQueue.Set(uint64(rx + rx6))
}

t := time.NewTicker(each)
for {
select {
case <-t.C:
var found bool
rx, err := procNetTCP("/proc/net/tcp", addr, hasUnspecified, addrIsUnspecified)
if err != nil {
if want4 {
log.Warnf("failed to get tcp stats from /proc: %v", err)
}
} else {
found = true
want4 = true
}
rx6, err := procNetTCP("/proc/net/tcp6", addr6, hasUnspecified6, addrIsUnspecified6)
if err != nil {
if want6 {
log.Warnf("failed to get tcp6 stats from /proc: %v", err)
}
} else {
found = true
want6 = true
}
if found {
m.rxQueue.Set(uint64(rx + rx6))
}
case <-m.done:
t.Stop()
return
}
}
}

// Registry returns the monitoring registry of the TCP metricset.
func (m *TCP) Registry() *monitoring.Registry {
if m == nil {
return nil
}

return m.monitorRegistry
}

// procNetTCP returns the rx_queue field of the TCP socket table for the
// socket on the provided address formatted in hex, xxxxxxxx:xxxx or the IPv6
// equivalent.
// This function is only useful on linux due to its dependence on the /proc
// filesystem, but is kept in this file for simplicity. If hasUnspecified
// is true, all addresses listed in the file in path are considered, and the
// sum of rx_queue matching the addr ports is returned where the corresponding
// addrIsUnspecified is true.
func procNetTCP(path string, addr []string, hasUnspecified bool, addrIsUnspecified []bool) (rx int64, err error) {
if len(addr) == 0 {
return 0, nil
}
if len(addr) != len(addrIsUnspecified) {
return 0, errors.New("mismatched address/unspecified lists: please report this")
}
b, err := os.ReadFile(path)
if err != nil {
return 0, err
}
lines := bytes.Split(b, []byte("\n"))
if len(lines) < 2 {
return 0, fmt.Errorf("%s entry not found for %s (no line)", path, addr)
}
var found bool
for _, l := range lines[1:] {
f := bytes.Fields(l)
const queuesField = 4
if len(f) > queuesField && contains(f[1], addr, addrIsUnspecified) {
_, r, ok := bytes.Cut(f[4], []byte(":"))
if !ok {
return 0, errors.New("no rx_queue field " + string(f[queuesField]))
}
found = true

// queue lengths are hex, e.g.:
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv4/tcp_ipv4.c#L2643
// - https://elixir.bootlin.com/linux/v6.2.11/source/net/ipv6/tcp_ipv6.c#L1987
v, err := strconv.ParseInt(string(r), 16, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse rx_queue: %w", err)
}
rx += v

if hasUnspecified {
continue
}
return rx, nil
}
}
if found {
return rx, nil
}
return 0, fmt.Errorf("%s entry not found for %s", path, addr)
}

// Close closes the TCP metricset and unregister the metrics.
func (m *TCP) Close() {
if m == nil {
return
}
if m.done != nil {
// Shut down poller and wait until done before unregistering metrics.
m.done <- struct{}{}
}

if m.unregister != nil {
m.unregister()
m.unregister = nil
}

m.monitorRegistry = nil
}
Loading

0 comments on commit a0a567c

Please sign in to comment.