diff --git a/pkg/ebpf/events_pipeline.go b/pkg/ebpf/events_pipeline.go index 369d42eb90b7..01f69b93cff1 100644 --- a/pkg/ebpf/events_pipeline.go +++ b/pkg/ebpf/events_pipeline.go @@ -31,7 +31,7 @@ func (t *Tracee) handleEvents(ctx context.Context) { var errcList []<-chan error // Source pipeline stage. - eventsChan, errc := t.decodeEvents(ctx, t.eventsChannel) + eventsChan, evtPool, errc := t.decodeEvents(ctx, t.eventsChannel) errcList = append(errcList, errc) if t.config.Cache != nil { @@ -46,7 +46,7 @@ func (t *Tracee) handleEvents(ctx context.Context) { // Process events stage // in this stage we perform event specific logic - eventsChan, errc = t.processEvents(ctx, eventsChan) + eventsChan, errc = t.processEvents(ctx, eventsChan, evtPool) errcList = append(errcList, errc) // Enrichment stage @@ -70,7 +70,7 @@ func (t *Tracee) handleEvents(ctx context.Context) { } // Sink pipeline stage: events go through printers. - errc = t.sinkEvents(ctx, eventsChan) + errc = t.sinkEvents(ctx, eventsChan, evtPool) errcList = append(errcList, errc) // Pipeline started. Waiting for pipeline to complete @@ -144,8 +144,21 @@ func (t *Tracee) queueEvents(ctx context.Context, in <-chan *trace.Event) (chan } // decodeEvents read the events received from the BPF programs and parse it into trace.Event type -func (t *Tracee) decodeEvents(outerCtx context.Context, sourceChan chan []byte) (<-chan *trace.Event, <-chan error) { +func (t *Tracee) decodeEvents(outerCtx context.Context, sourceChan chan []byte) (<-chan *trace.Event, *sync.Pool, <-chan error) { out := make(chan *trace.Event, 10000) + evtPool := func() *sync.Pool { + pool := sync.Pool{ + New: func() interface{} { + return &trace.Event{} + }, + } + // warm up the pool + for i := 0; i < 10000; i++ { + pool.Put(pool.New()) + } + + return &pool + }() errc := make(chan error, 1) sysCompatTranslation := events.Definitions.IDs32ToIDs() go func() { @@ -218,55 +231,64 @@ func (t *Tracee) decodeEvents(outerCtx context.Context, sourceChan chan []byte) } } - evt := trace.Event{ - Timestamp: int(ctx.Ts), - ThreadStartTime: int(ctx.StartTime), - ProcessorID: int(ctx.ProcessorId), - ProcessID: int(ctx.Pid), - ThreadID: int(ctx.Tid), - ParentProcessID: int(ctx.Ppid), - HostProcessID: int(ctx.HostPid), - HostThreadID: int(ctx.HostTid), - HostParentProcessID: int(ctx.HostPpid), - UserID: int(ctx.Uid), - MountNS: int(ctx.MntID), - PIDNS: int(ctx.PidID), - ProcessName: string(bytes.TrimRight(ctx.Comm[:], "\x00")), - HostName: string(bytes.TrimRight(ctx.UtsName[:], "\x00")), - CgroupID: uint(ctx.CgroupID), - ContainerID: containerData.ID, - Container: containerData, - Kubernetes: kubernetesData, - EventID: int(ctx.EventID), - EventName: eventDefinition.Name, - MatchedPoliciesKernel: ctx.MatchedPolicies, - ArgsNum: int(argnum), - ReturnValue: int(ctx.Retval), - Args: args, - StackAddresses: stackAddresses, - ContextFlags: flags, - Syscall: syscall, - } + // get an event from the pool + evt := evtPool.Get().(*trace.Event) + // Populate all the fields of the event that are assigned in this stage. + // Note: some fields are populated in the next stages of the pipeline. + // MatchedPoliciesUser can be populated in matchPolicies function. + // MatchedPolicies can be populated in sinkEvents function. + // Metadata is used in events created in signatures engine. + evt.Timestamp = int(ctx.Ts) + evt.ThreadStartTime = int(ctx.StartTime) + evt.ProcessorID = int(ctx.ProcessorId) + evt.ProcessID = int(ctx.Pid) + evt.ThreadID = int(ctx.Tid) + evt.ParentProcessID = int(ctx.Ppid) + evt.HostProcessID = int(ctx.HostPid) + evt.HostThreadID = int(ctx.HostTid) + evt.HostParentProcessID = int(ctx.HostPpid) + evt.UserID = int(ctx.Uid) + evt.MountNS = int(ctx.MntID) + evt.PIDNS = int(ctx.PidID) + evt.ProcessName = string(bytes.TrimRight(ctx.Comm[:], "\x00")) + evt.HostName = string(bytes.TrimRight(ctx.UtsName[:], "\x00")) + evt.CgroupID = uint(ctx.CgroupID) + evt.ContainerID = containerData.ID + evt.Container = containerData + evt.Kubernetes = kubernetesData + evt.EventID = int(ctx.EventID) + evt.EventName = eventDefinition.Name + evt.MatchedPoliciesKernel = ctx.MatchedPolicies + // evt.MatchedPoliciesUser = 0 + // evt.MatchedPolicies = []string{} + evt.ArgsNum = int(argnum) + evt.ReturnValue = int(ctx.Retval) + evt.Args = args + evt.StackAddresses = stackAddresses + evt.ContextFlags = flags + evt.Syscall = syscall + // evt.Metadata = &trace.Metadata{} // If there aren't any policies that need filtering in userland, tracee **may** skip // this event, as long as there aren't any derivatives that depend on it. Some base // events (for derivative ones) might not have set related policy bit, thus the need // to continue with those within the pipeline. - if t.matchPolicies(&evt) == 0 { + if t.matchPolicies(evt) == 0 { if _, ok := t.eventDerivations[eventId]; !ok { _ = t.stats.EventsFiltered.Increment() + evtPool.Put(evt) continue } } select { - case out <- &evt: + case out <- evt: case <-outerCtx.Done(): return } } }() - return out, errc + return out, evtPool, errc } // matchPolicies does the userland filtering (policy matching) for events. It iterates through all @@ -407,7 +429,7 @@ func parseSyscallID(syscallID int, isCompat bool, compatTranslationMap map[event // all event processors and check if there is any internal processing needed for that event type. // It also clears policy bits for out-of-order container related events (after the processing // logic). -func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) ( +func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event, evtPool *sync.Pool) ( <-chan *trace.Event, <-chan error, ) { out := make(chan *trace.Event, 10000) @@ -431,6 +453,7 @@ func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) ( for _, err := range errs { t.handleError(err) } + evtPool.Put(event) continue } @@ -461,6 +484,7 @@ func (t *Tracee) processEvents(ctx context.Context, in <-chan *trace.Event) ( utils.ClearBits(&event.MatchedPoliciesUser, policiesWithContainerFilter) if event.MatchedPoliciesKernel == 0 { + evtPool.Put(event) continue } } @@ -539,7 +563,7 @@ func (t *Tracee) deriveEvents(ctx context.Context, in <-chan *trace.Event) ( return out, errc } -func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan error { +func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event, evtPool *sync.Pool) <-chan error { errc := make(chan error, 1) go func() { @@ -554,6 +578,7 @@ func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan id := events.ID(event.EventID) event.MatchedPoliciesUser &= t.events[id].emit if event.MatchedPoliciesUser == 0 { + evtPool.Put(event) continue } @@ -572,7 +597,8 @@ func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan select { case t.config.ChanEvents <- *event: _ = t.stats.EventCount.Increment() - event = nil + evtPool.Put(event) + // event = nil case <-ctx.Done(): return } diff --git a/pkg/ebpf/events_pipeline_bench_test.go b/pkg/ebpf/events_pipeline_bench_test.go new file mode 100644 index 000000000000..1b6f0108ccce --- /dev/null +++ b/pkg/ebpf/events_pipeline_bench_test.go @@ -0,0 +1,217 @@ +package ebpf + +import ( + "bytes" + "sync" + "testing" + "time" + + "github.com/aquasecurity/tracee/pkg/bufferdecoder" + "github.com/aquasecurity/tracee/pkg/events" + "github.com/aquasecurity/tracee/types/trace" +) + +// BenchmarkEventPool is a benchmark of using a sync.Pool for Event objects, which +// simulates the way the pipeline works. +func BenchmarkEventPool(b *testing.B) { + evtPool := sync.Pool{ + New: func() interface{} { + return &trace.Event{} + }, + } + // warm up the pool + for i := 0; i < 10000; i++ { + evtPool.Put(evtPool.New()) + } + + ctx := bufferdecoder.Context{} + containerData := trace.Container{} + kubernetesData := trace.Kubernetes{} + eventDefinition := events.Event{} + args := []trace.Argument{} + stackAddresses := []uint64{} + flags := trace.ContextFlags{} + syscall := "" + argnum := uint8(0) + + ctxChan := make(chan *bufferdecoder.Context, 10000) + stage1Chan := make(chan *trace.Event, 10000) + stage2Chan := make(chan *trace.Event) + stage3Chan := make(chan *trace.Event) + + b.ResetTimer() + go func() { + for i := 0; i < b.N; i++ { + ctxChan <- &ctx + } + }() + + go func() { + for i := 0; i < b.N; i++ { + ctx := <-ctxChan + evt := evtPool.Get().(*trace.Event) + + evt.Timestamp = int(ctx.Ts) + evt.ThreadStartTime = int(ctx.StartTime) + evt.ProcessorID = int(ctx.ProcessorId) + evt.ProcessID = int(ctx.Pid) + evt.ThreadID = int(ctx.Tid) + evt.ParentProcessID = int(ctx.Ppid) + evt.HostProcessID = int(ctx.HostPid) + evt.HostThreadID = int(ctx.HostTid) + evt.HostParentProcessID = int(ctx.HostPpid) + evt.UserID = int(ctx.Uid) + evt.MountNS = int(ctx.MntID) + evt.PIDNS = int(ctx.PidID) + evt.ProcessName = string(bytes.TrimRight(ctx.Comm[:], "\x00")) + evt.HostName = string(bytes.TrimRight(ctx.UtsName[:], "\x00")) + evt.CgroupID = uint(ctx.CgroupID) + evt.ContainerID = containerData.ID + evt.Container = containerData + evt.Kubernetes = kubernetesData + evt.EventID = int(ctx.EventID) + evt.EventName = eventDefinition.Name + evt.MatchedPoliciesKernel = ctx.MatchedPolicies + evt.ArgsNum = int(argnum) + evt.ReturnValue = int(ctx.Retval) + evt.Args = args + evt.StackAddresses = stackAddresses + evt.ContextFlags = flags + evt.Syscall = syscall + // these fields are not assigned in this step, see comment on decodeEvents + // evt.MatchedPoliciesUser = 0 + // evt.MatchedPolicies = []string{} + // evt.Metadata = &trace.Metadata{} + + stage1Chan <- evt + } + }() + + done := make(chan struct{}) + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage1Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + stage2Chan <- evt + } + }() + + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage2Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + stage3Chan <- evt + } + }() + + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage3Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + _ = evt + evtPool.Put(evt) // return the event to the pool + } + close(done) + }() + + <-done +} + +// BenchmarkEventNew is a benchmark of using a new Event object for each event, which +// simulates the way the pipeline works. +func BenchmarkEventNew(b *testing.B) { + ctx := bufferdecoder.Context{} + containerData := trace.Container{} + kubernetesData := trace.Kubernetes{} + eventDefinition := events.Event{} + args := []trace.Argument{} + stackAddresses := []uint64{} + flags := trace.ContextFlags{} + syscall := "" + argnum := uint8(0) + + ctxChan := make(chan *bufferdecoder.Context, 10000) + stage1Chan := make(chan *trace.Event, 10000) + stage2Chan := make(chan *trace.Event) + stage3Chan := make(chan *trace.Event) + + b.ResetTimer() + go func() { + for i := 0; i < b.N; i++ { + ctxChan <- &ctx + } + }() + + go func() { + for i := 0; i < b.N; i++ { + ctx := <-ctxChan + + evt := &trace.Event{ + Timestamp: int(ctx.Ts), + ThreadStartTime: int(ctx.StartTime), + ProcessorID: int(ctx.ProcessorId), + ProcessID: int(ctx.Pid), + ThreadID: int(ctx.Tid), + ParentProcessID: int(ctx.Ppid), + HostProcessID: int(ctx.HostPid), + HostThreadID: int(ctx.HostTid), + HostParentProcessID: int(ctx.HostPpid), + UserID: int(ctx.Uid), + MountNS: int(ctx.MntID), + PIDNS: int(ctx.PidID), + ProcessName: string(bytes.TrimRight(ctx.Comm[:], "\x00")), + HostName: string(bytes.TrimRight(ctx.UtsName[:], "\x00")), + CgroupID: uint(ctx.CgroupID), + ContainerID: containerData.ID, + Container: containerData, + Kubernetes: kubernetesData, + EventID: int(ctx.EventID), + EventName: eventDefinition.Name, + MatchedPoliciesKernel: ctx.MatchedPolicies, + ArgsNum: int(argnum), + ReturnValue: int(ctx.Retval), + Args: args, + StackAddresses: stackAddresses, + ContextFlags: flags, + Syscall: syscall, + } + + stage1Chan <- evt + } + }() + + done := make(chan struct{}) + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage1Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + stage2Chan <- evt + } + }() + + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage2Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + stage3Chan <- evt + } + }() + + go func() { + for i := 0; i < b.N; i++ { + evt := <-stage3Chan + // simulate some work through the pipeline + time.Sleep(1 * time.Millisecond) + _ = evt + evt = nil // release the reference to the event + } + close(done) + }() + + <-done +} diff --git a/pkg/ebpf/net_capture.go b/pkg/ebpf/net_capture.go index b8a1ba15d054..c3aac98d371d 100644 --- a/pkg/ebpf/net_capture.go +++ b/pkg/ebpf/net_capture.go @@ -36,7 +36,7 @@ func (t *Tracee) processNetCaptureEvents(ctx context.Context) { var errChanList []<-chan error // source pipeline stage (re-used from regular pipeline) - eventsChan, errChan := t.decodeEvents(ctx, t.netCapChannel) + eventsChan, _, errChan := t.decodeEvents(ctx, t.netCapChannel) errChanList = append(errChanList, errChan) // process events stage (network capture only)