diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 01ddadad8d1..f45083c0d19 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -94,8 +94,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Auditbeat* - Set field types to correctly match ECS in sessionmd processor {issue}38955[38955] {pull}38994[38994] -- Keep process info on exited processes, to avoid failing to enrich events in sessionmd processor {pull}39173[39173] - +- Fix failing to enrich process events in sessionmd processor {issue}38955[38955] {pull}39173[39173] {pull}39243[39243] - Prevent scenario of losing children-related file events in a directory for recursive fsnotify backend of auditbeat file integrity module {pull}39133[39133] diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go index ff9fa54e556..766e9623b9e 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -96,13 +96,24 @@ func New(cfg *cfg.C) (beat.Processor, error) { } func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) { - _, err := ev.GetValue(p.config.PIDField) + pi, err := ev.GetValue(p.config.PIDField) if err != nil { // Do not attempt to enrich events without PID; it's not a supported event return ev, nil //nolint:nilerr // Running on events without PID is expected } - err = p.provider.UpdateDB(ev) + // Do not enrich failed syscalls, as there was no actual process change related to it + v, err := ev.GetValue("auditd.result") + if err == nil && v == "fail" { + return ev, nil + } + + pid, err := pidToUInt32(pi) + if err != nil { + return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error + } + + err = p.provider.UpdateDB(ev, pid) if err != nil { return ev, err } @@ -136,7 +147,9 @@ func (p *addSessionMetadata) enrich(ev *beat.Event) (*beat.Event, error) { fullProcess, err := p.db.GetProcess(pid) if err != nil { - return nil, fmt.Errorf("pid %v not found in db: %w", pid, err) + e := fmt.Errorf("pid %v not found in db: %w", pid, err) + p.logger.Errorf("%v", e) + return nil, e } processMap := fullProcess.ToMap() diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db.go b/x-pack/auditbeat/processors/sessionmd/processdb/db.go index 2c7c228e2c1..b8c624abe00 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/db.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db.go @@ -238,7 +238,6 @@ func (db *DB) InsertFork(fork types.ProcessForkEvent) { pid := fork.ChildPIDs.Tgid ppid := fork.ParentPIDs.Tgid - db.scrapeAncestors(db.processes[pid]) if entry, ok := db.processes[ppid]; ok { entry.PIDs = pidInfoFromProto(fork.ChildPIDs) @@ -282,7 +281,6 @@ func (db *DB) InsertExec(exec types.ProcessExecEvent) { } db.processes[exec.PIDs.Tgid] = proc - db.scrapeAncestors(proc) entryLeaderPID := db.evaluateEntryLeader(proc) if entryLeaderPID != nil { db.entryLeaderRelationships[exec.PIDs.Tgid] = *entryLeaderPID @@ -568,6 +566,14 @@ func setSameAsProcess(process *types.Process) { } } +func (db *DB) HasProcess(pid uint32) bool { + db.mutex.RLock() + defer db.mutex.RUnlock() + + _, ok := db.processes[pid] + return ok +} + func (db *DB) GetProcess(pid uint32) (types.Process, error) { db.mutex.RLock() defer db.mutex.RUnlock() @@ -585,8 +591,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) { fillParent(&ret, parent) break } - db.logger.Debugf("failed to find %d in DB (parent of %d), attempting to scrape", process.PIDs.Ppid, pid) - db.scrapeAncestors(process) } } @@ -596,8 +600,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) { fillGroupLeader(&ret, groupLeader) break } - db.logger.Debugf("failed to find %d in DB (group leader of %d), attempting to scrape", process.PIDs.Pgid, pid) - db.scrapeAncestors(process) } } @@ -607,8 +609,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) { fillSessionLeader(&ret, sessionLeader) break } - db.logger.Debugf("failed to find %d in DB (session leader of %d), attempting to scrape", process.PIDs.Sid, pid) - db.scrapeAncestors(process) } } @@ -712,29 +712,6 @@ func getTTYType(major uint16, minor uint16) TTYType { return TTYUnknown } -func (db *DB) scrapeAncestors(proc Process) { - for _, pid := range []uint32{proc.PIDs.Pgid, proc.PIDs.Ppid, proc.PIDs.Sid} { - if _, exists := db.processes[pid]; pid == 0 || exists { - continue - } - procInfo, err := db.procfs.GetProcess(pid) - if err != nil { - db.logger.Debugf("couldn't get %v from procfs: %w", pid, err) - continue - } - p := Process{ - PIDs: pidInfoFromProto(procInfo.PIDs), - Creds: credInfoFromProto(procInfo.Creds), - CTTY: ttyDevFromProto(procInfo.CTTY), - Argv: procInfo.Argv, - Cwd: procInfo.Cwd, - Env: procInfo.Env, - Filename: procInfo.Filename, - } - db.insertProcess(p) - } -} - func (db *DB) Close() { close(db.stopChan) } diff --git a/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go b/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go index 2b9b540e037..f1b8bae0b67 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go @@ -9,6 +9,7 @@ package ebpf_provider import ( "context" "fmt" + "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/ebpf" @@ -151,7 +152,80 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB) (pr return &p, nil } -func (s prvdr) UpdateDB(ev *beat.Event) error { - // no-op for ebpf, DB is updated from pushed ebpf events - return nil +const ( + maxWaitLimit = 200 * time.Millisecond // Maximum time UpdateDB will wait for process + combinedWaitLimit = 2 * time.Second // Multiple UpdateDB calls will wait up to this amount within resetDuration + backoffDuration = 10 * time.Second // UpdateDB will stop waiting for processes for this time + resetDuration = 5 * time.Second // After this amount of times with no backoffs, the combinedWait will be reset +) + +var ( + combinedWait = 0 * time.Millisecond + inBackoff = false + backoffStart = time.Now() + since = time.Now() + backoffSkipped = 0 +) + +// With ebpf, process events are pushed to the DB by the above goroutine, so this doesn't actually update the DB. +// It does to try sync the processor and ebpf events, so that the process is in the process db before continuing. +// +// It's possible that the event to enrich arrives before the process is inserted into the DB. In that case, this +// will block continuing the enrichment until the process is seen (or the timeout is reached). +// +// If for some reason a lot of time has been spent waiting for missing processes, this also has a backoff timer during +// which it will continue without waiting for missing events to arrive, so the processor doesn't become overly backed-up +// waiting for these processes, at the cost of possibly not enriching some processes. +func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { + if s.db.HasProcess(pid) { + return nil + } + + now := time.Now() + if inBackoff { + if now.Sub(backoffStart) > backoffDuration { + s.logger.Warnf("ended backoff, skipped %d processes", backoffSkipped) + inBackoff = false + combinedWait = 0 * time.Millisecond + } else { + backoffSkipped += 1 + return nil + } + } else { + if combinedWait > combinedWaitLimit { + s.logger.Warn("starting backoff") + inBackoff = true + backoffStart = now + backoffSkipped = 0 + return nil + } + // maintain a moving window of time for the delays we track + if now.Sub(since) > resetDuration { + since = now + combinedWait = 0 * time.Millisecond + } + } + + start := now + nextWait := 5 * time.Millisecond + for { + waited := time.Since(start) + if s.db.HasProcess(pid) { + s.logger.Debugf("got process that was missing after %v", waited) + combinedWait = combinedWait + waited + return nil + } + if waited >= maxWaitLimit { + e := fmt.Errorf("process %v was not seen after %v", pid, waited) + s.logger.Warnf("%w", e) + combinedWait = combinedWait + waited + return e + } + time.Sleep(nextWait) + if nextWait*2+waited > maxWaitLimit { + nextWait = maxWaitLimit - waited + } else { + nextWait = nextWait * 2 + } + } } diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go index 2f99dd72b1f..6525b860b6d 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go @@ -41,16 +41,7 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB, rea } // UpdateDB will update the process DB with process info from procfs or the event itself -func (s prvdr) UpdateDB(ev *beat.Event) error { - pi, err := ev.Fields.GetValue(s.pidField) - if err != nil { - return fmt.Errorf("event not supported, no pid") - } - pid, ok := pi.(int) - if !ok { - return fmt.Errorf("pid field not int") - } - +func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { syscall, err := ev.GetValue(syscallField) if err != nil { return fmt.Errorf("event not supported, no syscall data") @@ -59,7 +50,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error { switch syscall { case "execveat", "execve": pe := types.ProcessExecEvent{} - proc_info, err := s.reader.GetProcess(uint32(pid)) + proc_info, err := s.reader.GetProcess(pid) if err == nil { pe.PIDs = proc_info.PIDs pe.Creds = proc_info.Creds @@ -72,7 +63,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error { s.logger.Warnf("couldn't get process info from proc for pid %v: %w", pid, err) // If process info couldn't be taken from procfs, populate with as much info as // possible from the event - pe.PIDs.Tgid = uint32(pid) + pe.PIDs.Tgid = pid var intr interface{} var i int var ok bool @@ -106,7 +97,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error { case "exit_group": pe := types.ProcessExitEvent{ PIDs: types.PIDInfo{ - Tgid: uint32(pid), + Tgid: pid, }, } s.db.InsertExit(pe) @@ -122,8 +113,8 @@ func (s prvdr) UpdateDB(ev *beat.Event) error { if result == "success" { setsid_ev := types.ProcessSetsidEvent{ PIDs: types.PIDInfo{ - Tgid: uint32(pid), - Sid: uint32(pid), + Tgid: pid, + Sid: pid, }, } s.db.InsertSetsid(setsid_ev) diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go index 6fd333c4711..c438efcfe1a 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go @@ -124,7 +124,7 @@ func TestExecveEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event) + err = provider.UpdateDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -234,7 +234,7 @@ func TestExecveatEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event) + err = provider.UpdateDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -317,7 +317,7 @@ func TestSetSidEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event) + err = provider.UpdateDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -399,7 +399,7 @@ func TestSetSidEventFailed(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event) + err = provider.UpdateDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -470,7 +470,7 @@ func TestSetSidSessionLeaderNotScraped(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event) + err = provider.UpdateDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) diff --git a/x-pack/auditbeat/processors/sessionmd/provider/provider.go b/x-pack/auditbeat/processors/sessionmd/provider/provider.go index e3fa1547806..6452eb9e2bf 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/provider.go @@ -11,5 +11,5 @@ import ( ) type Provider interface { - UpdateDB(*beat.Event) error + UpdateDB(*beat.Event, uint32) error }