Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[add_session_metadata processor] Keep exited processes in the process DB #39173

Merged
merged 12 commits into from
Apr 25, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Auditbeat*
- Set field types to correctly match ECS in sessionmd processor {issue}38955[38955] {pull}38994[38994]
- Keep process info on exited processes, to avoid failing to enrich events in sessionmd processor {pull}39173[39173]

*Filebeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func New(cfg *cfg.C) (beat.Processor, error) {
}

backfilledPIDs := db.ScrapeProcfs()
logger.Debugf("backfilled %d processes", len(backfilledPIDs))
logger.Infof("backfilled %d processes", len(backfilledPIDs))

var p provider.Provider

Expand All @@ -70,6 +70,9 @@ func New(cfg *cfg.C) (beat.Processor, error) {
if err != nil {
return nil, fmt.Errorf("failed to create provider: %w", err)
}
logger.Info("backend=auto using procfs")
} else {
logger.Info("backend=auto using ebpf")
}
case "ebpf":
p, err = ebpf_provider.NewProvider(ctx, logger, db)
Expand Down Expand Up @@ -111,6 +114,11 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) {
return result, nil
}

func (p *addSessionMetadata) Close() error {
p.db.Close()
return nil
}

func (p *addSessionMetadata) String() string {
return fmt.Sprintf("%v=[backend=%s, pid_field=%s]",
processorName, p.config.Backend, p.config.PIDField)
Expand Down
32 changes: 27 additions & 5 deletions x-pack/auditbeat/processors/sessionmd/processdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package processdb

import (
"container/heap"
"encoding/base64"
"errors"
"fmt"
Expand Down Expand Up @@ -82,6 +83,7 @@ type Process struct {
Cwd string
Env map[string]string
Filename string
ExitCode int32
}

var (
Expand Down Expand Up @@ -185,20 +187,26 @@ type DB struct {
entryLeaders map[uint32]EntryType
entryLeaderRelationships map[uint32]uint32
procfs procfs.Reader
stopChan chan struct{}
removalCandidates rcHeap
}

func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) {
once.Do(initialize)
if initError != nil {
return &DB{}, initError
}
return &DB{
db := DB{
logger: logp.NewLogger("processdb"),
processes: make(map[uint32]Process),
entryLeaders: make(map[uint32]EntryType),
entryLeaderRelationships: make(map[uint32]uint32),
procfs: reader,
}, nil
stopChan: make(chan struct{}),
removalCandidates: make(rcHeap, 0),
}
db.startReaper()
return &db, nil
}

func (db *DB) calculateEntityIDv1(pid uint32, startTime time.Time) string {
Expand Down Expand Up @@ -406,9 +414,18 @@ func (db *DB) InsertExit(exit types.ProcessExitEvent) {
defer db.mutex.Unlock()

pid := exit.PIDs.Tgid
delete(db.processes, pid)
mjwolf marked this conversation as resolved.
Show resolved Hide resolved
delete(db.entryLeaders, pid)
delete(db.entryLeaderRelationships, pid)
process, ok := db.processes[pid]
if !ok {
db.logger.Errorf("could not insert exit, pid %v not found in db", pid)
return
}
process.ExitCode = exit.ExitCode
db.processes[pid] = process
heap.Push(&db.removalCandidates, removalCandidate{
pid: pid,
startTime: process.PIDs.StartTimeNS,
exitTime: time.Now(),
})
}

func interactiveFromTTY(tty types.TTYDev) bool {
Expand Down Expand Up @@ -437,6 +454,7 @@ func fullProcessFromDBProcess(p Process) types.Process {
ret.Thread.Capabilities.Effective, _ = capabilities.FromUint64(p.Creds.CapEffective)
ret.TTY.CharDevice.Major = p.CTTY.Major
ret.TTY.CharDevice.Minor = p.CTTY.Minor
ret.ExitCode = p.ExitCode

return ret
}
Expand Down Expand Up @@ -716,3 +734,7 @@ func (db *DB) scrapeAncestors(proc Process) {
db.insertProcess(p)
}
}

func (db *DB) Close() {
close(db.stopChan)
}