Skip to content

Commit

Permalink
Close Tracee upon EOF
Browse files Browse the repository at this point in the history
  • Loading branch information
AlonZivony committed Nov 5, 2023
1 parent 60b33b6 commit 480b866
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 11 deletions.
25 changes: 21 additions & 4 deletions pkg/ebpf/signature_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ebpf

import (
"context"
"sync"

"github.com/aquasecurity/tracee/pkg/containers"
"github.com/aquasecurity/tracee/pkg/events"
Expand Down Expand Up @@ -45,19 +46,25 @@ func (t *Tracee) engineEvents(ctx context.Context, in <-chan *trace.Event) (<-ch

go t.sigEngine.Start(ctx)

// Use wait group to close output channels only upon the end of both internal goroutines
wg := sync.WaitGroup{}
wg.Add(2)

// TODO: in the upcoming releases, the rule engine should be changed to receive trace.Event,
// and return a trace.Event, which should remove the necessity of converting trace.Event to protocol.Event,
// and converting detect.Finding into trace.Event

go func() {
defer close(out)
defer close(errc)
defer close(engineInput)
defer close(engineOutput)
defer wg.Done()

for {
select {
case event := <-in:
case event, ok := <-in:
if !ok {
return
}
if event == nil {
continue // might happen during initialization (ctrl+c seg faults)
}
Expand Down Expand Up @@ -91,9 +98,13 @@ func (t *Tracee) engineEvents(ctx context.Context, in <-chan *trace.Event) (<-ch
}()

go func() {
defer wg.Done()
for {
select {
case finding := <-engineOutput:
case finding, ok := <-engineOutput:
if !ok {
return
}
if finding.Event.Payload == nil {
continue // might happen during initialization (ctrl+c seg faults)
}
Expand All @@ -116,6 +127,12 @@ func (t *Tracee) engineEvents(ctx context.Context, in <-chan *trace.Event) (<-ch
}
}()

go func() {
wg.Wait()
close(out)
close(errc)
}()

return out, errc
}

Expand Down
18 changes: 14 additions & 4 deletions pkg/ebpf/tracee.go
Original file line number Diff line number Diff line change
Expand Up @@ -1446,11 +1446,19 @@ func (t *Tracee) Run(ctx gocontext.Context) error {

// runAnalyze is the version of the Run method for the Analyze mode
func (t *Tracee) runAnalyze(ctx gocontext.Context) error {
go t.handleEvents(ctx)
pipelineDone := make(chan struct{})
go func() {
t.handleEvents(ctx)
close(pipelineDone)
}()

t.running.Store(true) // set running state after writing pid file
t.ready(ctx) // executes ready callback, non blocking
<-ctx.Done() // block until ctx is cancelled elsewhere
select {
case <-ctx.Done(): // block until ctx is cancelled elsewhere
case <-pipelineDone:
break
}

t.Close() // close Tracee

Expand Down Expand Up @@ -1514,8 +1522,10 @@ func (t *Tracee) Close() {
logger.Errorw("failed to clean containers module when closing tracee", "err", err)
}
}
if err := t.cgroups.Destroy(); err != nil {
logger.Errorw("Cgroups destroy", "error", err)
if t.cgroups != nil {
if err := t.cgroups.Destroy(); err != nil {
logger.Errorw("Cgroups destroy", "error", err)
}
}

// set 'running' to false and close 'done' channel only after attempting to close all resources
Expand Down
11 changes: 8 additions & 3 deletions pkg/producer/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
)

type jsonEventProducer struct {
in *bufio.Scanner
in *bufio.Scanner
done chan struct{}
}

func newJsonEventProducer(input io.Reader) *jsonEventProducer {
scanner := bufio.NewScanner(input)
scanner.Split(bufio.ScanLines)
return &jsonEventProducer{in: scanner}
return &jsonEventProducer{in: scanner, done: make(chan struct{})}
}

func (j jsonEventProducer) Init() error {
Expand All @@ -25,7 +26,7 @@ func (j jsonEventProducer) Init() error {

func (j jsonEventProducer) Produce() (trace.Event, error) {
if !j.in.Scan() { // if EOF or error close the done channel and return
// TODO: Create an error for end of stream
close(j.done)
return trace.Event{}, io.EOF
}

Expand All @@ -36,3 +37,7 @@ func (j jsonEventProducer) Produce() (trace.Event, error) {
}
return e, nil
}

func (j jsonEventProducer) Done() <-chan struct{} {
return j.done
}
1 change: 1 addition & 0 deletions pkg/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type EventsProducer interface {
// Produce produces a single event.
// Return io.EOF for end of events stream.
Produce() (trace.Event, error)
Done() <-chan struct{}
}

func New(cfg *config.ProducerConfig) (EventsProducer, error) {
Expand Down

0 comments on commit 480b866

Please sign in to comment.