Skip to content

Commit

Permalink
feat: Emit events for packet stats to send to Honeycomb (#142)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- Closes #141 

## Short description of the changes

- Add ticker and emit event for pcap handle stats, sent to separate
dataset in Honeycomb
- Add statsTicker and emit event for processed packet stats, sent to
separate dataset in Honeycomb

## How to verify that this has the expected result

See new dataset in Honeycomb "hny-ebpf-agent-stats" with data for events
named tcp_assembler_processed and tcp_assembler_pcap

Follow-up items for separate PRs:
- make tickers configurable
- make stats dataset configurable
- move dataset and other similar config to config file

---------

Co-authored-by: Mike Goldsmth <goldsmith.mike@gmail.com>
Co-authored-by: Robb Kidd <robbkidd@honeycomb.io>
  • Loading branch information
3 people committed Sep 6, 2023
1 parent f54bb5d commit 7401edc
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 4 deletions.
52 changes: 51 additions & 1 deletion assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/reassembly"
"github.com/honeycombio/libhoney-go"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
Expand Down Expand Up @@ -82,11 +83,16 @@ func NewTcpAssembler(config config, httpEvents chan HttpEvent) tcpAssembler {
func (h *tcpAssembler) Start() {
log.Info().Msg("Starting TCP assembler")
flushTicker := time.NewTicker(time.Second * 5)
statsTicker := time.NewTicker(time.Second * 10)
count := 0
bytes := int64(0)
start := time.Now()
defragger := ip4defrag.NewIPv4Defragmenter()

// statsEvent is for sending packet processing stats to Honeycomb and
// is declared outside the loop for memory re-use.
var statsEvent *libhoney.Event

for {
select {
case <-flushTicker.C:
Expand All @@ -95,6 +101,22 @@ func (h *tcpAssembler) Start() {
Int("flushed", flushed).
Int("closed", closed).
Msg("Flushing old streams")
case <-statsTicker.C:
// intentionally reusing the variable above
statsEvent = libhoney.NewEvent()
statsEvent.Dataset = "hny-ebpf-agent-stats"
statsEvent.Add(map[string]interface{}{
"name": "tcp_assembler_processed",
"packet_count_since_start": count,
"uptime_ms": time.Since(start).Milliseconds(),
"bytes": bytes,
})
statsEvent.Send()
log.Debug().
Int("processed_count_since_start", count).
Int64("uptime_ms", time.Since(start).Milliseconds()).
Int64("bytes", bytes).
Msg("Processed Packets")
case packet := <-h.packetSource.Packets():
count++
data := packet.Data()
Expand All @@ -108,7 +130,7 @@ func (h *tcpAssembler) Start() {
continue
}
if newipv4 == nil {
log.Debug().Msg("Ingoring packet fragment")
log.Debug().Msg("Ignoring packet fragment")
continue
}

Expand Down Expand Up @@ -205,8 +227,36 @@ func newPcapPacketSource(config config) (*gopacket.PacketSource, error) {
}
}

go logPcapHandleStats(handle)
return gopacket.NewPacketSource(
handle,
handle.LinkType(),
), nil
}

func logPcapHandleStats(handle *pcap.Handle) {
// TODO make ticker configurable
ticker := time.NewTicker(time.Second * 10)
for {
<-ticker.C
stats, err := handle.Stats()
if err != nil {
log.Error().Err(err).Msg("Failed to get pcap handle stats")
continue
}
// TODO use config for different dataset for stats telemetry
// create libhoney event
ev := libhoney.NewEvent()
ev.Dataset = "hny-ebpf-agent-stats"
ev.AddField("name", "tcp_assembler_pcap")
ev.AddField("pcap.packets_received", stats.PacketsReceived)
ev.AddField("pcap.packets_dropped", stats.PacketsDropped)
ev.AddField("pcap.packets_if_dropped", stats.PacketsIfDropped)
log.Info().
Int("pcap.packets_received", stats.PacketsReceived).
Int("pcap.packets_dropped", stats.PacketsDropped).
Int("pcap.packets_if_dropped", stats.PacketsIfDropped).
Msg("Pcap handle stats")
ev.Send()
}
}
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ func main() {

// setup TCP stream reader
httpEvents := make(chan assemblers.HttpEvent, 10000)
assember := assemblers.NewTcpAssembler(*agentConfig, httpEvents)
assembler := assemblers.NewTcpAssembler(*agentConfig, httpEvents)
go handleHttpEvents(httpEvents, cachedK8sClient)
go assember.Start()
defer assember.Stop()
go assembler.Start()
defer assembler.Stop()

log.Info().Msg("Agent is ready!")

Expand Down

0 comments on commit 7401edc

Please sign in to comment.