Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(metrics): add pipeline average time metrics #3845

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions pkg/cmd/flags/server/server.go
@@ -1,6 +1,8 @@
package server

import (
"fmt"

"github.com/aquasecurity/tracee/pkg/errfmt"
"github.com/aquasecurity/tracee/pkg/logger"
"github.com/aquasecurity/tracee/pkg/server/http"
Expand Down Expand Up @@ -30,17 +32,17 @@ func PrepareHTTPServer(listenAddr string, metrics, healthz, pprof, pyro bool) (*
httpServer := http.New(listenAddr)

if metrics {
logger.Debugw("Enabling metrics endpoint")
NDStrahilevitz marked this conversation as resolved.
Show resolved Hide resolved
logger.Debugw(fmt.Sprintf("Enabling metrics endpoint at %s/metrics", listenAddr))
httpServer.EnableMetricsEndpoint()
}

if healthz {
logger.Debugw("Enabling healthz endpoint")
logger.Debugw(fmt.Sprintf("Enabling healthz endpoint at %s/healthz", listenAddr))
httpServer.EnableHealthzEndpoint()
}

if pprof {
logger.Debugw("Enabling pprof endpoint")
logger.Debugw(fmt.Sprintf("Enabling pprof endpoint at %s/debug/pprof", listenAddr))
httpServer.EnablePProfEndpoint()
}
if pyro {
Expand Down
55 changes: 55 additions & 0 deletions pkg/counter/average.go
@@ -0,0 +1,55 @@
package counter

import (
"fmt"
"sync"
)

type Average struct {
sum Counter
c Counter
m *sync.RWMutex
}

func NewAverage() Average {
return Average{
sum: NewCounter(0),
c: NewCounter(0),
m: new(sync.RWMutex),
}
}

func (avg *Average) Read() float64 {
avg.m.RLock()
defer avg.m.RUnlock()

sum := float64(avg.sum.Get())
count := float64(avg.c.Get())

return sum / count
}

func (avg *Average) Add(val uint64) error {
_, err := avg.AddAndRead(val)
return err
}

func (avg *Average) AddAndRead(val uint64) (float64, error) {
avg.m.Lock()
defer avg.m.Unlock()

sum, err := avg.sum.IncrementValueAndRead(val)
if err != nil {
return 0, fmt.Errorf("failed to increment average sum: %v", err)
}
count, err := avg.c.IncrementValueAndRead(1)
if err != nil {
return 0, fmt.Errorf("failed to increment average count: %v", err)
}

return float64(sum) / float64(count), nil
}

func (avg Average) String() string {
return fmt.Sprintf("%f", avg.Read())
}
7 changes: 6 additions & 1 deletion pkg/ebpf/events_pipeline.go
Expand Up @@ -165,10 +165,13 @@ func (t *Tracee) decodeEvents(ctx context.Context, sourceChan chan []byte) (<-ch
for dataRaw := range sourceChan {
ebpfMsgDecoder := bufferdecoder.New(dataRaw)
var eCtx bufferdecoder.EventContext
endTimeKernel := uint64(utils.GetMonotonicTime())
if err := ebpfMsgDecoder.DecodeContext(&eCtx); err != nil {
t.handleError(err)
continue
}
startTimeKernel := eCtx.Ts
_ = t.stats.AvgTimeInKernel.Add(endTimeKernel - startTimeKernel)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This value is actually the time in the kernel + submit time + the time it took tracee to read the buffer, isn't it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be kernel + submit time + time blocked in channel (what you meant by read time?). Note that the endpoint timestamp is taken before we decode the buffer.
This time blocked in channel is actually critical and I haven't considered it. This measurement should be rethought.

var argnum uint8
if err := ebpfMsgDecoder.DecodeUint8(&argnum); err != nil {
t.handleError(err)
Expand Down Expand Up @@ -631,8 +634,10 @@ func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan
case <-ctx.Done():
return
default:
startTime := uint64(t.getOrigEvtTimestamp(event)) // convert back to monotonic
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage of this getOrigEvtTimestamp is discouraged since future fixes to the timestamp normalization may cause it to break. Also see here: #3820 (comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's not ideal, but i'm not sure there's any better option until #3820 is resolved.

t.streamsManager.Publish(ctx, *event)
_ = t.stats.EventCount.Increment()
endTime := uint64(utils.GetMonotonicTime())
_ = t.stats.AvgTimeInPipeline.Add(endTime - startTime)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this actually the time in the kernel+pipeline?
If so, consider renaming this to something else, e.g. AvgEventProcessingTime

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this should be renamed. I think I originally included a subtraction of the former kernel time here, but it didn't work out. Anyway, that is why the original name was leftover.

t.eventsPool.Put(event)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ebpf/tracee.go
Expand Up @@ -226,6 +226,7 @@ func New(cfg config.Config) (*Tracee, error) {

t := &Tracee{
config: cfg,
stats: metrics.NewStats(),
done: make(chan struct{}),
writtenFiles: make(map[string]string),
readFiles: make(map[string]string),
Expand Down
72 changes: 62 additions & 10 deletions pkg/metrics/stats.go
Expand Up @@ -9,15 +9,33 @@ import (

// When updating this struct, please make sure to update the relevant exporting functions
type Stats struct {
EventCount counter.Counter
EventsFiltered counter.Counter
NetCapCount counter.Counter // network capture events
BPFLogsCount counter.Counter
ErrorCount counter.Counter
LostEvCount counter.Counter
LostWrCount counter.Counter
LostNtCapCount counter.Counter // lost network capture events
LostBPFLogsCount counter.Counter
EventCount counter.Counter
EventsFiltered counter.Counter
NetCapCount counter.Counter // network capture events
BPFLogsCount counter.Counter
ErrorCount counter.Counter
LostEvCount counter.Counter
LostWrCount counter.Counter
LostNtCapCount counter.Counter // lost network capture events
LostBPFLogsCount counter.Counter
AvgTimeInPipeline counter.Average
AvgTimeInKernel counter.Average
Comment on lines +21 to +22
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider making these stats per-event type.
Different events have different behavior and processing time, and it would be much more informative to know about the average time of the different events.
WDYT?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's how i've originally wanted to do it, but I couldn't find a good way to represent it in prometheus (ideally a histogram, yet I couldn't figure out at the time how to implement it with their SDK). If you find it critical, this PR should probably be closed and reintroduced with that implementation in mind.

}

func NewStats() Stats {
return Stats{
EventCount: counter.NewCounter(0),
EventsFiltered: counter.NewCounter(0),
NetCapCount: counter.NewCounter(0),
BPFLogsCount: counter.NewCounter(0),
ErrorCount: counter.NewCounter(0),
LostEvCount: counter.NewCounter(0),
LostWrCount: counter.NewCounter(0),
LostNtCapCount: counter.NewCounter(0),
LostBPFLogsCount: counter.NewCounter(0),
AvgTimeInPipeline: counter.NewAverage(),
AvgTimeInKernel: counter.NewAverage(),
}
}

// Register Stats to prometheus metrics exporter
Expand Down Expand Up @@ -98,5 +116,39 @@ func (stats *Stats) RegisterPrometheus() error {
Help: "errors accumulated by tracee-ebpf",
}, func() float64 { return float64(stats.ErrorCount.Get()) }))

return errfmt.WrapError(err)
if err != nil {
return errfmt.WrapError(err)
}

err = prometheus.Register(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "tracee_ebpf",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly related to this PR, but we should consider renaming this namespace

Name: "avg_pipeline_time",
Help: "average time (in milliseconds) an event spends from creation in kernel until publishing",
NDStrahilevitz marked this conversation as resolved.
Show resolved Hide resolved
},
func() float64 {
return stats.AvgTimeInPipeline.Read() / 1e6
},
))

if err != nil {
return errfmt.WrapError(err)
}

err = prometheus.Register(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "tracee_ebpf",
Name: "avg_pipeline_kernel_time",
Help: "average time (in milliseconds) an event spend from creation in kernel until decoding",
},
func() float64 {
return stats.AvgTimeInKernel.Read() / 1e6
},
))

if err != nil {
return errfmt.WrapError(err)
}

return nil
Comment on lines +149 to +153
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err != nil {
return errfmt.WrapError(err)
}
return nil
return errfmt.WrapError(err)

}
11 changes: 9 additions & 2 deletions pkg/utils/time.go
Expand Up @@ -75,8 +75,8 @@ func getBootTimeInJiffies() int64 {
// Boot time functions
//

// GetStartTimeNS returns the system start time in nanoseconds (using CLOCK_MONOTONIC).
func GetStartTimeNS() int64 {
// GetMonotonicTime returns the current CLOCK_MONOTONIC system time.
func GetMonotonicTime() int64 {
NDStrahilevitz marked this conversation as resolved.
Show resolved Hide resolved
var ts unix.Timespec

// Tracee bpf code uses monotonic clock as event timestamp. Get current
Expand All @@ -89,6 +89,13 @@ func GetStartTimeNS() int64 {
return ts.Nano()
}

// GetStartTimeNS returns the system start time in nanoseconds (using CLOCK_MONOTONIC).
func GetStartTimeNS() int64 {
return sync.OnceValue[int64](func() int64 {
return GetMonotonicTime()
})()
}

// GetBootTimeNS returns the boot time of the system in nanoseconds.
func GetBootTimeNS() int64 {
// Calculate the boot time using the monotonic time (since this is the clock
Expand Down