Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline memory efficiency using pool #3297

Merged
merged 1 commit into from Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
72 changes: 40 additions & 32 deletions pkg/ebpf/events_pipeline.go
Expand Up @@ -218,52 +218,57 @@ func (t *Tracee) decodeEvents(outerCtx context.Context, sourceChan chan []byte)
}
}

evt := trace.Event{
Timestamp: int(ctx.Ts),
ThreadStartTime: int(ctx.StartTime),
ProcessorID: int(ctx.ProcessorId),
ProcessID: int(ctx.Pid),
ThreadID: int(ctx.Tid),
ParentProcessID: int(ctx.Ppid),
HostProcessID: int(ctx.HostPid),
HostThreadID: int(ctx.HostTid),
HostParentProcessID: int(ctx.HostPpid),
UserID: int(ctx.Uid),
MountNS: int(ctx.MntID),
PIDNS: int(ctx.PidID),
ProcessName: string(bytes.TrimRight(ctx.Comm[:], "\x00")),
HostName: string(bytes.TrimRight(ctx.UtsName[:], "\x00")),
CgroupID: uint(ctx.CgroupID),
ContainerID: containerData.ID,
Container: containerData,
Kubernetes: kubernetesData,
EventID: int(ctx.EventID),
EventName: eventDefinition.Name,
MatchedPoliciesKernel: ctx.MatchedPolicies,
ArgsNum: int(argnum),
ReturnValue: int(ctx.Retval),
Args: args,
StackAddresses: stackAddresses,
ContextFlags: flags,
Syscall: syscall,
}
// get an event pointer from the pool
NDStrahilevitz marked this conversation as resolved.
Show resolved Hide resolved
evt := t.eventsPool.Get().(*trace.Event)
// populate all the fields of the event used in this stage, and reset the rest
evt.Timestamp = int(ctx.Ts)
evt.ThreadStartTime = int(ctx.StartTime)
evt.ProcessorID = int(ctx.ProcessorId)
evt.ProcessID = int(ctx.Pid)
evt.ThreadID = int(ctx.Tid)
evt.ParentProcessID = int(ctx.Ppid)
evt.HostProcessID = int(ctx.HostPid)
evt.HostThreadID = int(ctx.HostTid)
evt.HostParentProcessID = int(ctx.HostPpid)
evt.UserID = int(ctx.Uid)
evt.MountNS = int(ctx.MntID)
evt.PIDNS = int(ctx.PidID)
evt.ProcessName = string(bytes.TrimRight(ctx.Comm[:], "\x00"))
evt.HostName = string(bytes.TrimRight(ctx.UtsName[:], "\x00"))
evt.CgroupID = uint(ctx.CgroupID)
evt.ContainerID = containerData.ID
evt.Container = containerData
evt.Kubernetes = kubernetesData
evt.EventID = int(ctx.EventID)
evt.EventName = eventDefinition.Name
evt.MatchedPoliciesKernel = ctx.MatchedPolicies
evt.MatchedPoliciesUser = 0
evt.MatchedPolicies = []string{}
evt.ArgsNum = int(argnum)
evt.ReturnValue = int(ctx.Retval)
evt.Args = args
evt.StackAddresses = stackAddresses
evt.ContextFlags = flags
evt.Syscall = syscall
evt.Metadata = nil
geyslan marked this conversation as resolved.
Show resolved Hide resolved

// If there aren't any policies that need filtering in userland, tracee **may** skip
// this event, as long as there aren't any derivatives or signatures that depend on it.
// Some base events (derivative and signatures) might not have set related policy bit,
// thus the need to continue with those within the pipeline.
if t.matchPolicies(&evt) == 0 {
if t.matchPolicies(evt) == 0 {
_, hasDerivation := t.eventDerivations[eventId]
_, hasSignature := t.eventSignatures[eventId]

if !hasDerivation && !hasSignature {
_ = t.stats.EventsFiltered.Increment()
t.eventsPool.Put(evt)
continue
}
}

select {
case out <- &evt:
case out <- evt:
case <-outerCtx.Done():
return
}
Expand Down Expand Up @@ -434,6 +439,7 @@ func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) (
for _, err := range errs {
t.handleError(err)
}
t.eventsPool.Put(event)
continue
}

Expand Down Expand Up @@ -464,6 +470,7 @@ func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) (
utils.ClearBits(&event.MatchedPoliciesUser, policiesWithContainerFilter)

if event.MatchedPoliciesKernel == 0 {
t.eventsPool.Put(event)
continue
}
}
Expand Down Expand Up @@ -557,6 +564,7 @@ func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan
id := events.ID(event.EventID)
event.MatchedPoliciesUser &= t.events[id].emit
if event.MatchedPoliciesUser == 0 {
t.eventsPool.Put(event)
continue
}

Expand All @@ -575,7 +583,7 @@ func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan
select {
case t.config.ChanEvents <- *event:
_ = t.stats.EventCount.Increment()
event = nil
t.eventsPool.Put(event)
case <-ctx.Done():
return
}
Expand Down