Skip to content

Commit

Permalink
feat(metrics): add pipeline average time metrics
Browse files Browse the repository at this point in the history
Add two prometheus gauges measuring the following metrics:
1. Average time spent from kernel to decoding
2. Average time spent from kernel to publishing
  • Loading branch information
NDStrahilevitz committed Feb 21, 2024
1 parent 8b93817 commit 2277090
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 17 deletions.
10 changes: 6 additions & 4 deletions pkg/cmd/flags/server/server.go
@@ -1,6 +1,8 @@
package server

import (
"fmt"

"github.com/aquasecurity/tracee/pkg/errfmt"
"github.com/aquasecurity/tracee/pkg/logger"
"github.com/aquasecurity/tracee/pkg/server/http"
Expand Down Expand Up @@ -30,21 +32,21 @@ func PrepareHTTPServer(listenAddr string, metrics, healthz, pprof, pyro bool) (*
httpServer := http.New(listenAddr)

if metrics {
logger.Debugw("Enabling metrics endpoint")
logger.Infow(fmt.Sprintf("Enabling metrics endpoint at %s/metrics", listenAddr))
httpServer.EnableMetricsEndpoint()
}

if healthz {
logger.Debugw("Enabling healthz endpoint")
logger.Infow(fmt.Sprintf("Enabling healthz endpoint at %s/healthz", listenAddr))
httpServer.EnableHealthzEndpoint()
}

if pprof {
logger.Debugw("Enabling pprof endpoint")
logger.Infow(fmt.Sprintf("Enabling pprof endpoint at %s/debug/pprof", listenAddr))
httpServer.EnablePProfEndpoint()
}
if pyro {
logger.Debugw("Enabling pyroscope agent")
logger.Infow("Enabling pyroscope agent")
err := httpServer.EnablePyroAgent()
if err != nil {
return httpServer, err
Expand Down
55 changes: 55 additions & 0 deletions pkg/counter/average.go
@@ -0,0 +1,55 @@
package counter

import (
"fmt"
"sync"
)

type Average struct {
sum Counter
c Counter
m *sync.RWMutex
}

func NewAverage() Average {
return Average{
sum: NewCounter(0),
c: NewCounter(0),
m: new(sync.RWMutex),
}
}

func (avg *Average) Read() float64 {
avg.m.RLock()
defer avg.m.RUnlock()

sum := float64(avg.sum.Get())
count := float64(avg.c.Get())

return sum / count
}

func (avg *Average) Add(val uint64) error {
_, err := avg.AddAndRead(val)
return err
}

func (avg *Average) AddAndRead(val uint64) (float64, error) {
avg.m.Lock()
defer avg.m.Unlock()

sum, err := avg.sum.IncrementValueAndRead(val)
if err != nil {
return 0, fmt.Errorf("failed to increment average sum: %v", err)
}
count, err := avg.c.IncrementValueAndRead(1)
if err != nil {
return 0, fmt.Errorf("failed to increment average count: %v", err)
}

return float64(sum) / float64(count), nil
}

func (avg Average) String() string {
return fmt.Sprintf("%f", avg.Read())
}
7 changes: 6 additions & 1 deletion pkg/ebpf/events_pipeline.go
Expand Up @@ -165,10 +165,13 @@ func (t *Tracee) decodeEvents(ctx context.Context, sourceChan chan []byte) (<-ch
for dataRaw := range sourceChan {
ebpfMsgDecoder := bufferdecoder.New(dataRaw)
var eCtx bufferdecoder.EventContext
endTimeKernel := uint64(utils.GetMonotonicTime())
if err := ebpfMsgDecoder.DecodeContext(&eCtx); err != nil {
t.handleError(err)
continue
}
startTimeKernel := eCtx.Ts
_ = t.stats.AvgTimeInKernel.Add(endTimeKernel - startTimeKernel)
var argnum uint8
if err := ebpfMsgDecoder.DecodeUint8(&argnum); err != nil {
t.handleError(err)
Expand Down Expand Up @@ -629,8 +632,10 @@ func (t *Tracee) sinkEvents(ctx context.Context, in <-chan *trace.Event) <-chan
case <-ctx.Done():
return
default:
startTime := uint64(t.getOrigEvtTimestamp(event)) // convert back to monotonic
t.streamsManager.Publish(ctx, *event)
_ = t.stats.EventCount.Increment()
endTime := uint64(utils.GetMonotonicTime())
_ = t.stats.AvgTimeInPipeline.Add(endTime - startTime)
t.eventsPool.Put(event)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/ebpf/tracee.go
Expand Up @@ -196,6 +196,7 @@ func New(cfg config.Config) (*Tracee, error) {

t := &Tracee{
config: cfg,
stats: metrics.NewStats(),
done: make(chan struct{}),
writtenFiles: make(map[string]string),
readFiles: make(map[string]string),
Expand Down
72 changes: 62 additions & 10 deletions pkg/metrics/stats.go
Expand Up @@ -9,15 +9,33 @@ import (

// When updating this struct, please make sure to update the relevant exporting functions
type Stats struct {
EventCount counter.Counter
EventsFiltered counter.Counter
NetCapCount counter.Counter // network capture events
BPFLogsCount counter.Counter
ErrorCount counter.Counter
LostEvCount counter.Counter
LostWrCount counter.Counter
LostNtCapCount counter.Counter // lost network capture events
LostBPFLogsCount counter.Counter
EventCount counter.Counter
EventsFiltered counter.Counter
NetCapCount counter.Counter // network capture events
BPFLogsCount counter.Counter
ErrorCount counter.Counter
LostEvCount counter.Counter
LostWrCount counter.Counter
LostNtCapCount counter.Counter // lost network capture events
LostBPFLogsCount counter.Counter
AvgTimeInPipeline counter.Average
AvgTimeInKernel counter.Average
}

func NewStats() Stats {
return Stats{
EventCount: counter.NewCounter(0),
EventsFiltered: counter.NewCounter(0),
NetCapCount: counter.NewCounter(0),
BPFLogsCount: counter.NewCounter(0),
ErrorCount: counter.NewCounter(0),
LostEvCount: counter.NewCounter(0),
LostWrCount: counter.NewCounter(0),
LostNtCapCount: counter.NewCounter(0),
LostBPFLogsCount: counter.NewCounter(0),
AvgTimeInPipeline: counter.NewAverage(),
AvgTimeInKernel: counter.NewAverage(),
}
}

// Register Stats to prometheus metrics exporter
Expand Down Expand Up @@ -98,5 +116,39 @@ func (stats *Stats) RegisterPrometheus() error {
Help: "errors accumulated by tracee-ebpf",
}, func() float64 { return float64(stats.ErrorCount.Get()) }))

return errfmt.WrapError(err)
if err != nil {
return errfmt.WrapError(err)
}

err = prometheus.Register(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "tracee_ebpf",
Name: "avg_pipeline_time",
Help: "average time (in milliseconds) an event spends from creation in kernel until publishing",
},
func() float64 {
return stats.AvgTimeInPipeline.Read() / 1e6
},
))

if err != nil {
return errfmt.WrapError(err)
}

err = prometheus.Register(prometheus.NewGaugeFunc(
prometheus.GaugeOpts{
Namespace: "tracee_ebpf",
Name: "avg_pipeline_kernel_time",
Help: "average time (in milliseconds) an event spend from creation in kernel until decoding",
},
func() float64 {
return stats.AvgTimeInKernel.Read() / 1e6
},
))

if err != nil {
return errfmt.WrapError(err)
}

return nil
}
11 changes: 9 additions & 2 deletions pkg/utils/time.go
Expand Up @@ -75,8 +75,8 @@ func getBootTimeInJiffies() int64 {
// Boot time functions
//

// GetStartTimeNS returns the system start time in nanoseconds (using CLOCK_MONOTONIC).
func GetStartTimeNS() int64 {
// GetMonotonicTime returns the current CLOCK_MONOTONIC system time.
func GetMonotonicTime() int64 {
var ts unix.Timespec

// Tracee bpf code uses monotonic clock as event timestamp. Get current
Expand All @@ -89,6 +89,13 @@ func GetStartTimeNS() int64 {
return ts.Nano()
}

// GetStartTimeNS returns the system start time in nanoseconds (using CLOCK_MONOTONIC).
func GetStartTimeNS() int64 {
return sync.OnceValue[int64](func() int64 {
return GetMonotonicTime()
})()
}

// GetBootTimeNS returns the boot time of the system in nanoseconds.
func GetBootTimeNS() int64 {
// Calculate the boot time using the monotonic time (since this is the clock
Expand Down

0 comments on commit 2277090

Please sign in to comment.