Skip to content

Commit

Permalink
perf:(pipeline) memory efficiency using pool
Browse files Browse the repository at this point in the history
This commit employs sync.Pool to bolster memory performance in event
handling. The benchmarking file, events_pipeline_bench_test.go,
simulates the pipeline execution and measures sync.Pool's effectiveness.

Benchmark command:

go test \
  -benchmem -benchtime=10000x -cpu=1 \
  -run=^$ -bench ^(BenchmarkEventPool|BenchmarkEventNew)$ \
  github.com/aquasecurity/tracee/pkg/ebpf

Findings:

Execution Time (ns/op): EventPool's execution time compared to EventNew
varies slightly between -1.43% to +1.03%.

Memory Allocation (B/op): EventPool is significantly memory-efficient,
allocating just 26 bytes against EventNew's 448 bytes, yielding around
94.20% memory reduction.

Allocation Counts (allocs/op): EventPool reuses memory allocations,
achieving 0 allocations per operation, while EventNew makes 1 allocation
per operation.

In conclusion, sync.Pool results in remarkable memory optimization
without substantial impact on runtime. This efficiency is critical in
the pipeline that must have high throughput, as it can facilitate
garbage collection and promote resource efficiency.
  • Loading branch information
geyslan committed Jul 3, 2023
1 parent 84bcb2e commit 19adaff
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 40 deletions.
104 changes: 65 additions & 39 deletions pkg/ebpf/events_pipeline.go
Expand Up @@ -31,7 +31,7 @@ func (t *Tracee) handleEvents(ctx context.Context) {
var errcList []<-chan error

// Source pipeline stage.
eventsChan, errc := t.decodeEvents(ctx, t.eventsChannel)
eventsChan, evtPool, errc := t.decodeEvents(ctx, t.eventsChannel)
errcList = append(errcList, errc)

if t.config.Cache != nil {
Expand All @@ -46,7 +46,7 @@ func (t *Tracee) handleEvents(ctx context.Context) {

// Process events stage
// in this stage we perform event specific logic
eventsChan, errc = t.processEvents(ctx, eventsChan)
eventsChan, errc = t.processEvents(ctx, eventsChan, evtPool)
errcList = append(errcList, errc)

// Enrichment stage
Expand All @@ -70,7 +70,7 @@ func (t *Tracee) handleEvents(ctx context.Context) {
}

// Sink pipeline stage: events go through printers.
errc = t.sinkEvents(ctx, eventsChan)
errc = t.sinkEvents(ctx, eventsChan, evtPool)
errcList = append(errcList, errc)

// Pipeline started. Waiting for pipeline to complete
Expand Down Expand Up @@ -144,8 +144,21 @@ func (t *Tracee) queueEvents(ctx context.Context, in <-chan *trace.Event) (chan
}

// decodeEvents read the events received from the BPF programs and parse it into trace.Event type
func (t *Tracee) decodeEvents(outerCtx context.Context, sourceChan chan []byte) (<-chan *trace.Event, <-chan error) {
func (t *Tracee) decodeEvents(outerCtx context.Context, sourceChan chan []byte) (<-chan *trace.Event, *sync.Pool, <-chan error) {
out := make(chan *trace.Event, 10000)
evtPool := func() *sync.Pool {
pool := sync.Pool{
New: func() interface{} {
return &trace.Event{}
},
}
// warm up the pool
for i := 0; i < 10000; i++ {
pool.Put(pool.New())
}

return &pool
}()
errc := make(chan error, 1)
sysCompatTranslation := events.Definitions.IDs32ToIDs()
go func() {
Expand Down Expand Up @@ -218,55 +231,64 @@ func (t *Tracee) decodeEvents(outerCtx context.Context, sourceChan chan []byte)
}
}

evt := trace.Event{
Timestamp: int(ctx.Ts),
ThreadStartTime: int(ctx.StartTime),
ProcessorID: int(ctx.ProcessorId),
ProcessID: int(ctx.Pid),
ThreadID: int(ctx.Tid),
ParentProcessID: int(ctx.Ppid),
HostProcessID: int(ctx.HostPid),
HostThreadID: int(ctx.HostTid),
HostParentProcessID: int(ctx.HostPpid),
UserID: int(ctx.Uid),
MountNS: int(ctx.MntID),
PIDNS: int(ctx.PidID),
ProcessName: string(bytes.TrimRight(ctx.Comm[:], "\x00")),
HostName: string(bytes.TrimRight(ctx.UtsName[:], "\x00")),
CgroupID: uint(ctx.CgroupID),
ContainerID: containerData.ID,
Container: containerData,
Kubernetes: kubernetesData,
EventID: int(ctx.EventID),
EventName: eventDefinition.Name,
MatchedPoliciesKernel: ctx.MatchedPolicies,
ArgsNum: int(argnum),
ReturnValue: int(ctx.Retval),
Args: args,
StackAddresses: stackAddresses,
ContextFlags: flags,
Syscall: syscall,
}
// get an event from the pool
evt := evtPool.Get().(*trace.Event)
// Populate all the fields of the event that are assigned in this stage.
// Note: some fields are populated in the next stages of the pipeline.
// MatchedPoliciesUser can be populated in matchPolicies function.
// MatchedPolicies can be populated in sinkEvents function.
// Metadata is used in events created in signatures engine.
evt.Timestamp = int(ctx.Ts)
evt.ThreadStartTime = int(ctx.StartTime)
evt.ProcessorID = int(ctx.ProcessorId)
evt.ProcessID = int(ctx.Pid)
evt.ThreadID = int(ctx.Tid)
evt.ParentProcessID = int(ctx.Ppid)
evt.HostProcessID = int(ctx.HostPid)
evt.HostThreadID = int(ctx.HostTid)
evt.HostParentProcessID = int(ctx.HostPpid)
evt.UserID = int(ctx.Uid)
evt.MountNS = int(ctx.MntID)
evt.PIDNS = int(ctx.PidID)
evt.ProcessName = string(bytes.TrimRight(ctx.Comm[:], "\x00"))
evt.HostName = string(bytes.TrimRight(ctx.UtsName[:], "\x00"))
evt.CgroupID = uint(ctx.CgroupID)
evt.ContainerID = containerData.ID
evt.Container = containerData
evt.Kubernetes = kubernetesData
evt.EventID = int(ctx.EventID)
evt.EventName = eventDefinition.Name
evt.MatchedPoliciesKernel = ctx.MatchedPolicies
// evt.MatchedPoliciesUser = 0
// evt.MatchedPolicies = []string{}
evt.ArgsNum = int(argnum)
evt.ReturnValue = int(ctx.Retval)
evt.Args = args
evt.StackAddresses = stackAddresses
evt.ContextFlags = flags
evt.Syscall = syscall
// evt.Metadata = &trace.Metadata{}

// If there aren't any policies that need filtering in userland, tracee **may** skip
// this event, as long as there aren't any derivatives that depend on it. Some base
// events (for derivative ones) might not have set related policy bit, thus the need
// to continue with those within the pipeline.
if t.matchPolicies(&evt) == 0 {
if t.matchPolicies(evt) == 0 {
if _, ok := t.eventDerivations[eventId]; !ok {
_ = t.stats.EventsFiltered.Increment()
evtPool.Put(evt)
continue
}
}

select {
case out <- &evt:
case out <- evt:
case <-outerCtx.Done():
return
}
}
}()
return out, errc
return out, evtPool, errc
}

// matchPolicies does the userland filtering (policy matching) for events. It iterates through all
Expand Down Expand Up @@ -407,7 +429,7 @@ func parseSyscallID(syscallID int, isCompat bool, compatTranslationMap map[event
// all event processors and check if there is any internal processing needed for that event type.
// It also clears policy bits for out-of-order container related events (after the processing
// logic).
func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) (
func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event, evtPool *sync.Pool) (
<-chan *trace.Event, <-chan error,
) {
out := make(chan *trace.Event, 10000)
Expand All @@ -431,6 +453,7 @@ func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) (
for _, err := range errs {
t.handleError(err)
}
evtPool.Put(event)
continue
}

Expand Down Expand Up @@ -461,6 +484,7 @@ func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) (
utils.ClearBits(&event.MatchedPoliciesUser, policiesWithContainerFilter)

if event.MatchedPoliciesKernel == 0 {
evtPool.Put(event)
continue
}
}
Expand Down Expand Up @@ -539,7 +563,7 @@ func (t *Tracee) deriveEvents(ctx context.Context, in <-chan *trace.Event) (
return out, errc
}

func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan error {
func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event, evtPool *sync.Pool) <-chan error {
errc := make(chan error, 1)

go func() {
Expand All @@ -554,6 +578,7 @@ func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan
id := events.ID(event.EventID)
event.MatchedPoliciesUser &= t.events[id].emit
if event.MatchedPoliciesUser == 0 {
evtPool.Put(event)
continue
}

Expand All @@ -572,7 +597,8 @@ func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan
select {
case t.config.ChanEvents <- *event:
_ = t.stats.EventCount.Increment()
event = nil
evtPool.Put(event)
// event = nil
case <-ctx.Done():
return
}
Expand Down

0 comments on commit 19adaff

Please sign in to comment.