Skip to content

Commit

Permalink
feat: Record total and active stream counts (#178)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
The agent assigns stream IDs to new streams to help identify them.
However, we do not track the total number of streams created or the
number of active streams. Streams are considered active once created and
while receiving data, when data stops flowing they become eligible for
closing.

Understanding the impact of total and active streams is important
because we believe this is directly related to memory utilisation and
growth.

The PR records the number of active streams and adds both the total and
active stream counts to logged stats.

## Short description of the changes
- Move streamID count from stream factory to stats struct as
total_streams - this is incremented whenever a new stream is created by
the stream factory
- Add active_streams count to stats struct and increment / decrement as
new streams are created and closed
- Add both total_streams and active_streams counts to stats fields that
are both sent in stats events sent to honeycomb and logged at debug
level

## How to verify that this has the expected result
Total and active stream counts are visible in logs and in honeycomb
agent stats events.

<img width="1320" alt="image"
src="https://github.com/honeycombio/honeycomb-network-agent/assets/3481731/f1743bef-45b6-412d-81c8-64c111077f61">
  • Loading branch information
MikeGoldsmith committed Sep 14, 2023
1 parent 2a801f9 commit 155301e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
17 changes: 17 additions & 0 deletions assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package assemblers

import (
"runtime"
"sync/atomic"
"time"

"github.com/honeycombio/ebpf-agent/config"
Expand Down Expand Up @@ -34,6 +35,20 @@ var stats struct {
source_received int
source_dropped int
source_if_dropped int
total_streams uint64
active_streams int64
}

func IncrementStreamCount() uint64 {
return atomic.AddUint64(&stats.total_streams, 1)
}

func IncrementActiveStreamCount() {
atomic.AddInt64(&stats.active_streams, 1)
}

func DecrementActiveStreamCount() {
atomic.AddInt64(&stats.active_streams, -1)
}

type Context struct {
Expand Down Expand Up @@ -206,6 +221,8 @@ func (a *tcpAssembler) logAssemblerStats() {
"source_if_dropped": stats.source_if_dropped,
"event_queue_length": len(a.httpEvents),
"goroutines": runtime.NumGoroutine(),
"total_streams": stats.total_streams,
"active_streams": stats.active_streams,
}
statsEvent := libhoney.NewEvent()
statsEvent.Dataset = "hny-ebpf-agent-stats"
Expand Down
3 changes: 3 additions & 0 deletions assemblers/tcp_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Str("tcp_stream_ident", t.ident).
Msg("Connection closed")
t.close()

// decrement the number of active streams
DecrementActiveStreamCount()
return true // remove the connection, heck with the last ACK
}

Expand Down
10 changes: 6 additions & 4 deletions assemblers/tcp_stream_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package assemblers
import (
"fmt"
"sync"
"sync/atomic"

"github.com/honeycombio/ebpf-agent/config"
"github.com/honeycombio/gopacket"
Expand All @@ -12,8 +11,6 @@ import (
"github.com/rs/zerolog/log"
)

var streamId uint64 = 0

type tcpStreamFactory struct {
config config.Config
wg sync.WaitGroup
Expand All @@ -35,7 +32,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
fsmOptions := reassembly.TCPSimpleFSMOptions{
SupportMissingEstablishment: true,
}
streamId := atomic.AddUint64(&streamId, 1)

// increment total stream count and use as stream id
streamId := IncrementStreamCount()
stream := &tcpStream{
config: factory.config,
id: streamId,
Expand Down Expand Up @@ -69,6 +68,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
factory.wg.Add(2)
go stream.client.run(&factory.wg)
go stream.server.run(&factory.wg)

// increment the number of active streams
IncrementActiveStreamCount()
return stream
}

Expand Down

0 comments on commit 155301e

Please sign in to comment.