From 19adaffa11ec97ff9bd9bd3ade0fe8b02b4b08c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Geyslan=20Greg=C3=B3rio?= Date: Sat, 1 Jul 2023 13:53:05 -0300 Subject: [PATCH] perf:(pipeline) memory efficiency using pool This commit employs sync.Pool to bolster memory performance in event handling. The benchmarking file, events_pipeline_bench_test.go, simulates the pipeline execution and measures sync.Pool's effectiveness. Benchmark command: go test \ -benchmem -benchtime=10000x -cpu=1 \ -run=^$ -bench ^(BenchmarkEventPool|BenchmarkEventNew)$ \ github.com/aquasecurity/tracee/pkg/ebpf Findings: Execution Time (ns/op): EventPool's execution time compared to EventNew varies slightly between -1.43% to +1.03%. Memory Allocation (B/op): EventPool is significantly memory-efficient, allocating just 26 bytes against EventNew's 448 bytes, yielding around 94.20% memory reduction. Allocation Counts (allocs/op): EventPool reuses memory allocations, achieving 0 allocations per operation, while EventNew makes 1 allocation per operation. In conclusion, sync.Pool results in remarkable memory optimization without substantial impact on runtime. This efficiency is critical in the pipeline that must have high throughput, as it can facilitate garbage collection and promote resource efficiency. --- pkg/ebpf/events_pipeline.go | 104 +++++++----- pkg/ebpf/events_pipeline_bench_test.go | 217 +++++++++++++++++++++++++ pkg/ebpf/net_capture.go | 2 +- 3 files changed, 283 insertions(+), 40 deletions(-) create mode 100644 pkg/ebpf/events_pipeline_bench_test.go diff --git a/pkg/ebpf/events_pipeline.go b/pkg/ebpf/events_pipeline.go index 369d42eb90b7..01f69b93cff1 100644 --- a/pkg/ebpf/events_pipeline.go +++ b/pkg/ebpf/events_pipeline.go @@ -31,7 +31,7 @@ func (t *Tracee) handleEvents(ctx context.Context) { var errcList []<-chan error // Source pipeline stage. - eventsChan, errc := t.decodeEvents(ctx, t.eventsChannel) + eventsChan, evtPool, errc := t.decodeEvents(ctx, t.eventsChannel) errcList = append(errcList, errc) if t.config.Cache != nil { @@ -46,7 +46,7 @@ func (t *Tracee) handleEvents(ctx context.Context) { // Process events stage // in this stage we perform event specific logic - eventsChan, errc = t.processEvents(ctx, eventsChan) + eventsChan, errc = t.processEvents(ctx, eventsChan, evtPool) errcList = append(errcList, errc) // Enrichment stage @@ -70,7 +70,7 @@ func (t *Tracee) handleEvents(ctx context.Context) { } // Sink pipeline stage: events go through printers. - errc = t.sinkEvents(ctx, eventsChan) + errc = t.sinkEvents(ctx, eventsChan, evtPool) errcList = append(errcList, errc) // Pipeline started. Waiting for pipeline to complete @@ -144,8 +144,21 @@ func (t *Tracee) queueEvents(ctx context.Context, in <-chan *trace.Event) (chan } // decodeEvents read the events received from the BPF programs and parse it into trace.Event type -func (t *Tracee) decodeEvents(outerCtx context.Context, sourceChan chan []byte) (<-chan *trace.Event, <-chan error) { +func (t *Tracee) decodeEvents(outerCtx context.Context, sourceChan chan []byte) (<-chan *trace.Event, *sync.Pool, <-chan error) { out := make(chan *trace.Event, 10000) + evtPool := func() *sync.Pool { + pool := sync.Pool{ + New: func() interface{} { + return &trace.Event{} + }, + } + // warm up the pool + for i := 0; i < 10000; i++ { + pool.Put(pool.New()) + } + + return &pool + }() errc := make(chan error, 1) sysCompatTranslation := events.Definitions.IDs32ToIDs() go func() { @@ -218,55 +231,64 @@ func (t *Tracee) decodeEvents(outerCtx context.Context, sourceChan chan []byte) } } - evt := trace.Event{ - Timestamp: int(ctx.Ts), - ThreadStartTime: int(ctx.StartTime), - ProcessorID: int(ctx.ProcessorId), - ProcessID: int(ctx.Pid), - ThreadID: int(ctx.Tid), - ParentProcessID: int(ctx.Ppid), - HostProcessID: int(ctx.HostPid), - HostThreadID: int(ctx.HostTid), - HostParentProcessID: int(ctx.HostPpid), - UserID: int(ctx.Uid), - MountNS: int(ctx.MntID), - PIDNS: int(ctx.PidID), - ProcessName: string(bytes.TrimRight(ctx.Comm[:], "\x00")), - HostName: string(bytes.TrimRight(ctx.UtsName[:], "\x00")), - CgroupID: uint(ctx.CgroupID), - ContainerID: containerData.ID, - Container: containerData, - Kubernetes: kubernetesData, - EventID: int(ctx.EventID), - EventName: eventDefinition.Name, - MatchedPoliciesKernel: ctx.MatchedPolicies, - ArgsNum: int(argnum), - ReturnValue: int(ctx.Retval), - Args: args, - StackAddresses: stackAddresses, - ContextFlags: flags, - Syscall: syscall, - } + // get an event from the pool + evt := evtPool.Get().(*trace.Event) + // Populate all the fields of the event that are assigned in this stage. + // Note: some fields are populated in the next stages of the pipeline. + // MatchedPoliciesUser can be populated in matchPolicies function. + // MatchedPolicies can be populated in sinkEvents function. + // Metadata is used in events created in signatures engine. + evt.Timestamp = int(ctx.Ts) + evt.ThreadStartTime = int(ctx.StartTime) + evt.ProcessorID = int(ctx.ProcessorId) + evt.ProcessID = int(ctx.Pid) + evt.ThreadID = int(ctx.Tid) + evt.ParentProcessID = int(ctx.Ppid) + evt.HostProcessID = int(ctx.HostPid) + evt.HostThreadID = int(ctx.HostTid) + evt.HostParentProcessID = int(ctx.HostPpid) + evt.UserID = int(ctx.Uid) + evt.MountNS = int(ctx.MntID) + evt.PIDNS = int(ctx.PidID) + evt.ProcessName = string(bytes.TrimRight(ctx.Comm[:], "\x00")) + evt.HostName = string(bytes.TrimRight(ctx.UtsName[:], "\x00")) + evt.CgroupID = uint(ctx.CgroupID) + evt.ContainerID = containerData.ID + evt.Container = containerData + evt.Kubernetes = kubernetesData + evt.EventID = int(ctx.EventID) + evt.EventName = eventDefinition.Name + evt.MatchedPoliciesKernel = ctx.MatchedPolicies + // evt.MatchedPoliciesUser = 0 + // evt.MatchedPolicies = []string{} + evt.ArgsNum = int(argnum) + evt.ReturnValue = int(ctx.Retval) + evt.Args = args + evt.StackAddresses = stackAddresses + evt.ContextFlags = flags + evt.Syscall = syscall + // evt.Metadata = &trace.Metadata{} // If there aren't any policies that need filtering in userland, tracee **may** skip // this event, as long as there aren't any derivatives that depend on it. Some base // events (for derivative ones) might not have set related policy bit, thus the need // to continue with those within the pipeline. - if t.matchPolicies(&evt) == 0 { + if t.matchPolicies(evt) == 0 { if _, ok := t.eventDerivations[eventId]; !ok { _ = t.stats.EventsFiltered.Increment() + evtPool.Put(evt) continue } } select { - case out <- &evt: + case out <- evt: case <-outerCtx.Done(): return } } }() - return out, errc + return out, evtPool, errc } // matchPolicies does the userland filtering (policy matching) for events. It iterates through all @@ -407,7 +429,7 @@ func parseSyscallID(syscallID int, isCompat bool, compatTranslationMap map[event // all event processors and check if there is any internal processing needed for that event type. // It also clears policy bits for out-of-order container related events (after the processing // logic). -func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) ( +func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event, evtPool *sync.Pool) ( <-chan *trace.Event, <-chan error, ) { out := make(chan *trace.Event, 10000) @@ -431,6 +453,7 @@ func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) ( for _, err := range errs { t.handleError(err) } + evtPool.Put(event) continue } @@ -461,6 +484,7 @@ func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) ( utils.ClearBits(&event.MatchedPoliciesUser, policiesWithContainerFilter) if event.MatchedPoliciesKernel == 0 { + evtPool.Put(event) continue } } @@ -539,7 +563,7 @@ func (t *Tracee) deriveEvents(ctx context.Context, in <-chan *trace.Event) ( return out, errc } -func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan error { +func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event, evtPool *sync.Pool) <-chan error { errc := make(chan error, 1) go func() { @@ -554,6 +578,7 @@ func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan id := events.ID(event.EventID) event.MatchedPoliciesUser &= t.events[id].emit if event.MatchedPoliciesUser == 0 { + evtPool.Put(event) continue } @@ -572,7 +597,8 @@ func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan select { case t.config.ChanEvents <- *event: _ = t.stats.EventCount.Increment() - event = nil + evtPool.Put(event) + // event = nil case <-ctx.Done(): return } diff --git a/pkg/ebpf/events_pipeline_bench_test.go b/pkg/ebpf/events_pipeline_bench_test.go new file mode 100644 index 000000000000..1b6f0108ccce --- /dev/null +++ b/pkg/ebpf/events_pipeline_bench_test.go @@ -0,0 +1,217 @@ +package ebpf + +import ( + "bytes" + "sync" + "testing" + "time" + + "github.com/aquasecurity/tracee/pkg/bufferdecoder" + "github.com/aquasecurity/tracee/pkg/events" + "github.com/aquasecurity/tracee/types/trace" +) + +// BenchmarkEventPool is a benchmark of using a sync.Pool for Event objects, which +// simulates the way the pipeline works. +func BenchmarkEventPool(b *testing.B) { + evtPool := sync.Pool{ + New: func() interface{} { + return &trace.Event{} + }, + } + // warm up the pool + for i := 0; i < 10000; i++ { + evtPool.Put(evtPool.New()) + } + + ctx := bufferdecoder.Context{} + containerData := trace.Container{} + kubernetesData := trace.Kubernetes{} + eventDefinition := events.Event{} + args := []trace.Argument{} + stackAddresses := []uint64{} + flags := trace.ContextFlags{} + syscall := "" + argnum := uint8(0) + + ctxChan := make(chan *bufferdecoder.Context, 10000) + stage1Chan := make(chan *trace.Event, 10000) + stage2Chan := make(chan *trace.Event) + stage3Chan := make(chan *trace.Event) + + b.ResetTimer() + go func() { + for i := 0; i < b.N; i++ { + ctxChan <- &ctx + } + }() + + go func() { + for i := 0; i < b.N; i++ { + ctx := <-ctxChan + evt := evtPool.Get().(*trace.Event) + + evt.Timestamp = int(ctx.Ts) + evt.ThreadStartTime = int(ctx.StartTime) + evt.ProcessorID = int(ctx.ProcessorId) + evt.ProcessID = int(ctx.Pid) + evt.ThreadID = int(ctx.Tid) + evt.ParentProcessID = int(ctx.Ppid) + evt.HostProcessID = int(ctx.HostPid) + evt.HostThreadID = int(ctx.HostTid) + evt.HostParentProcessID = int(ctx.HostPpid) + evt.UserID = int(ctx.Uid) + evt.MountNS = int(ctx.MntID) + evt.PIDNS = int(ctx.PidID) + evt.ProcessName = string(bytes.TrimRight(ctx.Comm[:], "\x00")) + evt.HostName = string(bytes.TrimRight(ctx.UtsName[:], "\x00")) + evt.CgroupID = uint(ctx.CgroupID) + evt.ContainerID = containerData.ID + evt.Container = containerData + evt.Kubernetes = kubernetesData + evt.EventID = int(ctx.EventID) + evt.EventName = eventDefinition.Name + evt.MatchedPoliciesKernel = ctx.MatchedPolicies + evt.ArgsNum = int(argnum) + evt.ReturnValue = int(ctx.Retval) + evt.Args = args + evt.StackAddresses = stackAddresses + evt.ContextFlags = flags + evt.Syscall = syscall + // these fields are not assigned in this step, see comment on decodeEvents + // evt.MatchedPoliciesUser = 0 + // evt.MatchedPolicies = []string{} + // evt.Metadata = &trace.Metadata{} + + stage1Chan <- evt + } + }() + + done := make(chan struct{}) + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage1Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + stage2Chan <- evt + } + }() + + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage2Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + stage3Chan <- evt + } + }() + + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage3Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + _ = evt + evtPool.Put(evt) // return the event to the pool + } + close(done) + }() + + <-done +} + +// BenchmarkEventNew is a benchmark of using a new Event object for each event, which +// simulates the way the pipeline works. +func BenchmarkEventNew(b *testing.B) { + ctx := bufferdecoder.Context{} + containerData := trace.Container{} + kubernetesData := trace.Kubernetes{} + eventDefinition := events.Event{} + args := []trace.Argument{} + stackAddresses := []uint64{} + flags := trace.ContextFlags{} + syscall := "" + argnum := uint8(0) + + ctxChan := make(chan *bufferdecoder.Context, 10000) + stage1Chan := make(chan *trace.Event, 10000) + stage2Chan := make(chan *trace.Event) + stage3Chan := make(chan *trace.Event) + + b.ResetTimer() + go func() { + for i := 0; i < b.N; i++ { + ctxChan <- &ctx + } + }() + + go func() { + for i := 0; i < b.N; i++ { + ctx := <-ctxChan + + evt := &trace.Event{ + Timestamp: int(ctx.Ts), + ThreadStartTime: int(ctx.StartTime), + ProcessorID: int(ctx.ProcessorId), + ProcessID: int(ctx.Pid), + ThreadID: int(ctx.Tid), + ParentProcessID: int(ctx.Ppid), + HostProcessID: int(ctx.HostPid), + HostThreadID: int(ctx.HostTid), + HostParentProcessID: int(ctx.HostPpid), + UserID: int(ctx.Uid), + MountNS: int(ctx.MntID), + PIDNS: int(ctx.PidID), + ProcessName: string(bytes.TrimRight(ctx.Comm[:], "\x00")), + HostName: string(bytes.TrimRight(ctx.UtsName[:], "\x00")), + CgroupID: uint(ctx.CgroupID), + ContainerID: containerData.ID, + Container: containerData, + Kubernetes: kubernetesData, + EventID: int(ctx.EventID), + EventName: eventDefinition.Name, + MatchedPoliciesKernel: ctx.MatchedPolicies, + ArgsNum: int(argnum), + ReturnValue: int(ctx.Retval), + Args: args, + StackAddresses: stackAddresses, + ContextFlags: flags, + Syscall: syscall, + } + + stage1Chan <- evt + } + }() + + done := make(chan struct{}) + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage1Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + stage2Chan <- evt + } + }() + + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage2Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + stage3Chan <- evt + } + }() + + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage3Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + _ = evt + evt = nil // release the reference to the event + } + close(done) + }() + + <-done +} diff --git a/pkg/ebpf/net_capture.go b/pkg/ebpf/net_capture.go index b8a1ba15d054..c3aac98d371d 100644 --- a/pkg/ebpf/net_capture.go +++ b/pkg/ebpf/net_capture.go @@ -36,7 +36,7 @@ func (t *Tracee) processNetCaptureEvents(ctx context.Context) { var errChanList []<-chan error // source pipeline stage (re-used from regular pipeline) - eventsChan, errChan := t.decodeEvents(ctx, t.netCapChannel) + eventsChan, _, errChan := t.decodeEvents(ctx, t.netCapChannel) errChanList = append(errChanList, errChan) // process events stage (network capture only)