Skip to content

Commit

Permalink
[add_session_metadata processor] Keep exited processes in the process…
Browse files Browse the repository at this point in the history
… DB (#39173) (#39225)

With the add_session_metadata processor, don't remove processes from the process db when the process has exited.

The processor can be run on an fork/exec events after the process has actually exited, so the process must remain in the DB after it has exited, so the info can be used in enrichment of these events.

Now that the process is kept in the DB, the exit code is also appended on exit events, so the exit code can be used in enrichment of the exit events.

(cherry picked from commit 9649588)

Co-authored-by: Michael Wolf <michael.wolf@elastic.co>
  • Loading branch information
mergify[bot] and mjwolf committed Apr 25, 2024
1 parent 752ae6f commit 0376f3b
Show file tree
Hide file tree
Showing 7 changed files with 495 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Auditbeat*
- Set field types to correctly match ECS in sessionmd processor {issue}38955[38955] {pull}38994[38994]
- Keep process info on exited processes, to avoid failing to enrich events in sessionmd processor {pull}39173[39173]

*Filebeat*

Expand Down
10 changes: 9 additions & 1 deletion x-pack/auditbeat/processors/sessionmd/add_session_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func New(cfg *cfg.C) (beat.Processor, error) {
}

backfilledPIDs := db.ScrapeProcfs()
logger.Debugf("backfilled %d processes", len(backfilledPIDs))
logger.Infof("backfilled %d processes", len(backfilledPIDs))

var p provider.Provider

Expand All @@ -70,6 +70,9 @@ func New(cfg *cfg.C) (beat.Processor, error) {
if err != nil {
return nil, fmt.Errorf("failed to create provider: %w", err)
}
logger.Info("backend=auto using procfs")
} else {
logger.Info("backend=auto using ebpf")
}
case "ebpf":
p, err = ebpf_provider.NewProvider(ctx, logger, db)
Expand Down Expand Up @@ -111,6 +114,11 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) {
return result, nil
}

func (p *addSessionMetadata) Close() error {
p.db.Close()
return nil
}

func (p *addSessionMetadata) String() string {
return fmt.Sprintf("%v=[backend=%s, pid_field=%s]",
processorName, p.config.Backend, p.config.PIDField)
Expand Down
32 changes: 27 additions & 5 deletions x-pack/auditbeat/processors/sessionmd/processdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package processdb

import (
"container/heap"
"encoding/base64"
"errors"
"fmt"
Expand Down Expand Up @@ -82,6 +83,7 @@ type Process struct {
Cwd string
Env map[string]string
Filename string
ExitCode int32
}

var (
Expand Down Expand Up @@ -185,20 +187,26 @@ type DB struct {
entryLeaders map[uint32]EntryType
entryLeaderRelationships map[uint32]uint32
procfs procfs.Reader
stopChan chan struct{}
removalCandidates rcHeap
}

func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) {
once.Do(initialize)
if initError != nil {
return &DB{}, initError
}
return &DB{
db := DB{
logger: logp.NewLogger("processdb"),
processes: make(map[uint32]Process),
entryLeaders: make(map[uint32]EntryType),
entryLeaderRelationships: make(map[uint32]uint32),
procfs: reader,
}, nil
stopChan: make(chan struct{}),
removalCandidates: make(rcHeap, 0),
}
db.startReaper()
return &db, nil
}

func (db *DB) calculateEntityIDv1(pid uint32, startTime time.Time) string {
Expand Down Expand Up @@ -406,9 +414,18 @@ func (db *DB) InsertExit(exit types.ProcessExitEvent) {
defer db.mutex.Unlock()

pid := exit.PIDs.Tgid
delete(db.processes, pid)
delete(db.entryLeaders, pid)
delete(db.entryLeaderRelationships, pid)
process, ok := db.processes[pid]
if !ok {
db.logger.Errorf("could not insert exit, pid %v not found in db", pid)
return
}
process.ExitCode = exit.ExitCode
db.processes[pid] = process
heap.Push(&db.removalCandidates, removalCandidate{
pid: pid,
startTime: process.PIDs.StartTimeNS,
exitTime: time.Now(),
})
}

func interactiveFromTTY(tty types.TTYDev) bool {
Expand Down Expand Up @@ -437,6 +454,7 @@ func fullProcessFromDBProcess(p Process) types.Process {
ret.Thread.Capabilities.Effective, _ = capabilities.FromUint64(p.Creds.CapEffective)
ret.TTY.CharDevice.Major = p.CTTY.Major
ret.TTY.CharDevice.Minor = p.CTTY.Minor
ret.ExitCode = p.ExitCode

return ret
}
Expand Down Expand Up @@ -716,3 +734,7 @@ func (db *DB) scrapeAncestors(proc Process) {
db.insertProcess(p)
}
}

func (db *DB) Close() {
close(db.stopChan)
}
Loading

0 comments on commit 0376f3b

Please sign in to comment.