Skip to content

Commit

Permalink
[SIEM][Auditbeat] Make socket dataset work with IPv6 disabled (elasti…
Browse files Browse the repository at this point in the history
…c#13667)

This allows the dataset to run when IPv6 is not available, that is,
has been disabled in `/proc/net/ipv6/conf/lo/disable_ipv6`, which is the
default in RHEL 8 distro.

This behavior is controlled by a new configuration option, `socket.enable_ipv6`:
- When true, it works as before. IPv6 is required to be enabled in loopback.
- When false, IPv6 is not required in order to run.
- When unset (default), the dataset will detect if IPv6 support is
present or not.

Note that this still needs IPv6 support in the kernel (CONFIG_IPV6=y or
=m).

Fixes elastic#13658
  • Loading branch information
adriansr committed Sep 16, 2019
1 parent 19da163 commit ef177a6
Show file tree
Hide file tree
Showing 14 changed files with 473 additions and 181 deletions.
22 changes: 16 additions & 6 deletions x-pack/auditbeat/module/system/socket/_meta/docs.asciidoc
Expand Up @@ -51,6 +51,7 @@ distributions under which the dataset is known to work:
| CentOS 6.5 | 2.6.32-431.el6 | NO^<<anchor-1,[1]>>^
| CentOS 6.9 | 2.6.32-696.30.1.el6 | &#10003;
| CentOS 7.6 | 3.10.0-957.1.3.el7 | &#10003;
| RHEL 8 | 4.18.0-80.rhel8 | &#10003;
| Debian 8 | 3.16.0-6 | &#10003;
| Debian 9 | 4.9.0-8 | &#10003;
| Debian 10 | 4.19.0-5 | &#10003;
Expand All @@ -75,23 +76,25 @@ A kernel built with the following configuration options enabled is required:

- `CONFIG_KPROBE_EVENTS`: Enables the KProbes subsystem.
- `CONFIG_DEBUG_FS`: For kernels laking `tracefs` support (<4.1).
- `CONFIG_IPV6`.
- `CONFIG_IPV6`: IPv6 support in the kernel is needed even if disabled with
`socket.enable_ipv6: false`.

These settings are enabled by default in most distributions.

The following configuration settings can prevent the dataset from starting:

- `/sys/kernel/debug/kprobes/enabled` must be 1.
- `/proc/sys/net/ipv6/conf/lo/disable_ipv6` must be 0
(IPv6 enabled in the loopback device).
- `/proc/sys/net/ipv6/conf/lo/disable_ipv6` (IPv6 enabled in loopback device) is
required when running with IPv6 enabled.


[float]
==== Running on docker

The dataset can monitor the Docker host when running inside a container. However
it needs to run on a `privileged` container with `CAP_NET_ADMIN` and IPv6
support. The docker container running {beatname_uc} needs access to the host's
tracefs or debugfs directory. This is achieved by bind-mounting `/sys`.
it needs to run on a `privileged` container with `CAP_NET_ADMIN`. The docker
container running {beatname_uc} needs access to the host's tracefs or debugfs
directory. This is achieved by bind-mounting `/sys`.

[float]
=== Configuration
Expand All @@ -106,6 +109,13 @@ the default locations: `/sys/kernel/tracing` and `/sys/kernel/debug/tracing`.
If not found, it will attempt to mount `tracefs` and `debugfs` at their
default locations.

- `socket.enable_ipv6` (default: unset)

Determines whether IPv6 must be monitored. When unset (default), IPv6 support
is automatically detected. Even when IPv6 is disabled, in order to run the
dataset you still need a kernel with IPv6 support (the `ipv6` module must be
loaded if compiled as a module).

- `socket.flow_inactive_timeout` (default: 30s)

Determines how long a flow has to be inactive to be considered closed.
Expand Down
4 changes: 4 additions & 0 deletions x-pack/auditbeat/module/system/socket/config.go
Expand Up @@ -54,6 +54,10 @@ type Config struct {
// DevelopmentMode is an undocumented flag to ignore SSH traffic so that the
// dataset can be run with debug output without creating a feedback loop.
DevelopmentMode bool `config:"socket.development_mode"`

// EnableIPv6 allows to control IPv6 support. When unset (default) IPv6
// will be automatically detected on runtime.
EnableIPv6 *bool `config:"socket.enable_ipv6"`
}

// Validate validates the host metricset config.
Expand Down
81 changes: 81 additions & 0 deletions x-pack/auditbeat/module/system/socket/events.go
Expand Up @@ -221,6 +221,45 @@ func (e *tcpAcceptResult) Update(s *state) error {
return nil
}

type tcpAcceptResult4 struct {
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
LAddr uint32 `kprobe:"laddr"`
RAddr uint32 `kprobe:"raddr"`
LPort uint16 `kprobe:"lport"`
RPort uint16 `kprobe:"rport"`
Af uint16 `kprobe:"family"`
}

func (e *tcpAcceptResult4) asFlow() flow {
f := flow{
sock: e.Sock,
pid: e.Meta.PID,
inetType: inetType(e.Af),
proto: protoTCP,
dir: directionInbound,
complete: true,
lastSeen: kernelTime(e.Meta.Timestamp),
}
f.local = newEndpointIPv4(e.LAddr, e.LPort, 0, 0)
f.remote = newEndpointIPv4(e.RAddr, e.RPort, 0, 0)
return f
}

// String returns a representation of the event.
func (e *tcpAcceptResult4) String() string {
f := e.asFlow()
return fmt.Sprintf("%s <- accept(sock=0x%x, af=%s, %s <- %s)", header(e.Meta), e.Sock, inetType(e.Af), f.local.String(), f.remote.String())
}

// Update the state with the contents of this event.
func (e *tcpAcceptResult4) Update(s *state) error {
if e.Sock != 0 {
return s.UpdateFlow(e.asFlow())
}
return nil
}

type tcpSendMsgCall struct {
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Expand Down Expand Up @@ -272,6 +311,48 @@ func (e *tcpSendMsgCall) Update(s *state) error {
return s.UpdateFlow(e.asFlow())
}

type tcpSendMsgCall4 struct {
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Size uintptr `kprobe:"size"`
LAddr uint32 `kprobe:"laddr"`
RAddr uint32 `kprobe:"raddr"`
LPort uint16 `kprobe:"lport"`
RPort uint16 `kprobe:"rport"`
Af uint16 `kprobe:"family"`
}

func (e *tcpSendMsgCall4) asFlow() flow {
f := flow{
sock: e.Sock,
pid: e.Meta.PID,
inetType: inetType(e.Af),
proto: protoTCP,
lastSeen: kernelTime(e.Meta.Timestamp),
}
f.local = newEndpointIPv4(e.LAddr, e.LPort, 0, 0)
f.remote = newEndpointIPv4(e.RAddr, e.RPort, 0, 0)
return f
}

// String returns a representation of the event.
func (e *tcpSendMsgCall4) String() string {
flow := e.asFlow()
return fmt.Sprintf(
"%s tcp_sendmsg(sock=0x%x, size=%d, af=%s, %s -> %s)",
header(e.Meta),
flow.sock,
e.Size,
inetType(e.Af),
flow.local.String(),
flow.remote.String())
}

// Update the state with the contents of this event.
func (e *tcpSendMsgCall4) Update(s *state) error {
return s.UpdateFlow(e.asFlow())
}

type ipLocalOutCall struct {
Meta tracing.Metadata `kprobe:"metadata"`
Sock uintptr `kprobe:"sock"`
Expand Down
13 changes: 9 additions & 4 deletions x-pack/auditbeat/module/system/socket/guess/cskxmit6.go
Expand Up @@ -41,7 +41,7 @@ func init() {

type guessInet6CskXmit struct {
ctx Context
loopback ipv6loopback
loopback helper.IPv6Loopback
clientAddr, serverAddr unix.SockaddrInet6
client, server int
sock uintptr
Expand Down Expand Up @@ -69,6 +69,11 @@ func (g *guessInet6CskXmit) Requires() []string {
}
}

// Condition allows this probe to run only when IPv6 is enabled.
func (g *guessInet6CskXmit) Condition(ctx Context) (bool, error) {
return isIPv6Enabled(ctx.Vars)
}

// Probes returns 2 probes:
// - kretprobe on inet_csk_accept, which returns a struct sock*
// - kprobe on inet6_csk_xmit, returning 1st argument as pointer and dump.
Expand Down Expand Up @@ -99,7 +104,7 @@ func (g *guessInet6CskXmit) Probes() ([]helper.ProbeDef, error) {
func (g *guessInet6CskXmit) Prepare(ctx Context) (err error) {
g.ctx = ctx
g.acceptedFd = -1
g.loopback, err = newIPv6Loopback()
g.loopback, err = helper.NewIPv6Loopback()
if err != nil {
return errors.Wrap(err, "detect IPv6 loopback failed")
}
Expand All @@ -108,11 +113,11 @@ func (g *guessInet6CskXmit) Prepare(ctx Context) (err error) {
g.loopback.Cleanup()
}
}()
clientIP, err := g.loopback.addRandomAddress()
clientIP, err := g.loopback.AddRandomAddress()
if err != nil {
return errors.Wrap(err, "failed adding first device address")
}
serverIP, err := g.loopback.addRandomAddress()
serverIP, err := g.loopback.AddRandomAddress()
if err != nil {
return errors.Wrap(err, "failed adding second device address")
}
Expand Down
43 changes: 43 additions & 0 deletions x-pack/auditbeat/module/system/socket/guess/guess.go
Expand Up @@ -72,6 +72,16 @@ type EventualGuesser interface {
MaxRepeats() int
}

// ConditionalGuesser is a guess that might only need to run under certain
// conditions (i.e. when IPv6 is enabled).
type ConditionalGuesser interface {
Guesser
// Condition returns if this guess has to be run.
// When false, it must set all its Provides() to dummy values to ensure that
// dependent guesses are run.
Condition(ctx Context) (bool, error)
}

// Guess is a helper function to easily determine memory layouts of kernel
// structs and similar tasks. It installs the guesser's Probe, starts a perf
// channel and executes the Trigger function. Each record received through the
Expand Down Expand Up @@ -251,6 +261,16 @@ func GuessAll(installer helper.ProbeInstaller, ctx Context) (err error) {
for len(list) > 0 {
var next []Guesser
for _, guesser := range list {
if cond, isCond := guesser.(ConditionalGuesser); isCond {
mustRun, err := cond.Condition(ctx)
if err != nil {
return errors.Wrapf(err, "condition failed for %s", cond.Name())
}
if !mustRun {
ctx.Log.Debugf("Guess %s skipped.", cond.Name())
continue
}
}
if !containsAll(guesser.Requires(), ctx.Vars) {
next = append(next, guesser)
continue
Expand All @@ -267,10 +287,33 @@ func GuessAll(installer helper.ProbeInstaller, ctx Context) (err error) {
ctx.Log.Debugf("Guess %s completed: %v", guesser.Name(), result)
}
if len(next) == len(list) {
ctx.Log.Warnf("Internal error: No guess can be run (%d pending):", len(list))
for _, guess := range list {
requires := guess.Requires()
var missing []string
for _, req := range requires {
if _, found := ctx.Vars[req]; !found {
missing = append(missing, req)
}
}
ctx.Log.Warnf(" guess '%s' requires missing vars: %v", guess.Name(), missing)
}
return errors.New("no guess can be run")
}
list = next
}
ctx.Log.Infof("Guessing done after %v", time.Since(start))
return nil
}

func isIPv6Enabled(vars common.MapStr) (bool, error) {
iface, err := vars.GetValue("HAS_IPV6")
if err != nil {
return false, err
}
hasIPv6, ok := iface.(bool)
if !ok {
return false, errors.New("HAS_IPV6 is not a bool")
}
return hasIPv6, nil
}
113 changes: 0 additions & 113 deletions x-pack/auditbeat/module/system/socket/guess/helpers.go
Expand Up @@ -10,11 +10,7 @@ import (
"bytes"
"fmt"
"math/rand"
"net"
"time"
"unsafe"

"github.com/joeshaw/multierror"
"github.com/pkg/errors"
"golang.org/x/sys/unix"

Expand Down Expand Up @@ -110,115 +106,6 @@ func getListField(m common.MapStr, key string) ([]int, error) {
return list, nil
}

type ipv6loopback struct {
fd int
deviceName string
addresses []net.IP
ifreq ifReq
}

type in6Ifreq struct {
addr [16]byte
prefix uint32
ifindex int32
}

const ifnamsiz = 16

type ifReq struct {
name [ifnamsiz]byte
index int32
padding [128]byte
}

func newIPv6Loopback() (lo ipv6loopback, err error) {
lo.fd = -1
devs, err := net.Interfaces()
if err != nil {
return lo, errors.Wrap(err, "cannot list interfaces")
}
for _, dev := range devs {
addrs, err := dev.Addrs()
if err != nil || len(dev.Name) >= ifnamsiz {
continue
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.IsLoopback() {
lo.deviceName = dev.Name
lo.fd, err = unix.Socket(unix.AF_INET6, unix.SOCK_DGRAM, unix.IPPROTO_IP)
if err != nil {
lo.fd = -1
return lo, errors.Wrap(err, "ipv6 socket failed")
}
copy(lo.ifreq.name[:], dev.Name)
lo.ifreq.name[len(dev.Name)] = 0
_, _, errno := unix.Syscall(unix.SYS_IOCTL, uintptr(lo.fd), unix.SIOCGIFINDEX, uintptr(unsafe.Pointer(&lo.ifreq)))
if errno != 0 {
unix.Close(lo.fd)
return lo, errors.Wrap(errno, "ioctl(SIOCGIFINDEX) failed")
}
return lo, nil
}
}
}
return lo, errors.New("no loopback interface detected")
}

// adds a randomly-generated address from the fd00::/8 prefix (Unique Local Address)
func (lo *ipv6loopback) addRandomAddress() (addr net.IP, err error) {
addr = make(net.IP, 16)
addr[0] = 0xFD
rand.Read(addr[1:])
var req in6Ifreq
copy(req.addr[:], addr)
req.ifindex = lo.ifreq.index
req.prefix = 128
_, _, e := unix.Syscall(unix.SYS_IOCTL, uintptr(lo.fd), unix.SIOCSIFADDR, uintptr(unsafe.Pointer(&req)))
if e != 0 {
return nil, errors.Wrap(e, "ioctl SIOCSIFADDR failed")
}
lo.addresses = append(lo.addresses, addr)

// wait for the added address to be available. There seems to be a small
// delay in some systems between the time an address is added and it is
// available to bind.
fd, err := unix.Socket(unix.AF_INET6, unix.SOCK_DGRAM, 0)
if err != nil {
return addr, errors.Wrap(err, "socket ipv6 dgram failed")
}
defer unix.Close(fd)
var bindAddr unix.SockaddrInet6
copy(bindAddr.Addr[:], addr)
for i := 1; i < 50; i++ {
if err = unix.Bind(fd, &bindAddr); err == nil {
break
}
if errno, ok := err.(unix.Errno); !ok || errno != unix.EADDRNOTAVAIL {
break
}
time.Sleep(time.Millisecond * time.Duration(i))
}
return addr, errors.Wrap(err, "bind failed")
}

func (lo *ipv6loopback) Cleanup() error {
var errs multierror.Errors
var req in6Ifreq
req.ifindex = lo.ifreq.index
req.prefix = 128
for _, addr := range lo.addresses {
copy(req.addr[:], addr)
_, _, e := unix.Syscall(unix.SYS_IOCTL, uintptr(lo.fd), unix.SIOCDIFADDR, uintptr(unsafe.Pointer(&req)))
if e != 0 {
errs = append(errs, e)
}
}
if lo.fd != -1 {
unix.Close(lo.fd)
}
return errs.Err()
}

// consolidate takes a list of guess results in the form of maps with []int
// values, and returns a map where for each key the value is an []int with
// the values that appeared in all the guesses.
Expand Down

0 comments on commit ef177a6

Please sign in to comment.