Skip to content

Commit

Permalink
maint: Refactor stats collection & include pcap stats (#153)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
Refactors stats collection into a single func and includes pcap total
packet, dropped and interface dropped counts.

## Short description of the changes
- Move assembler start time to struct property instead of function
variable
- Add source_received, source_dropped, source_if_dropped to stats struct
- Consolidate stats logic from stats ticker, packet tickers, close func
and logPcapStats func into new `logAssemblerStats` func

## How to verify that this has the expected result
one complete set of stats are generated and are both debug logged and
sent to Honeycomb in an event

---------

Co-authored-by: JamieDanielson <jamieedanielson@gmail.com>
Co-authored-by: Vera Reynolds <vreynolds@users.noreply.github.com>
Co-authored-by: Robb Kidd <robbkidd@honeycomb.io>
  • Loading branch information
4 people committed Sep 7, 2023
1 parent b1b5ef0 commit c1645ad
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 75 deletions.
113 changes: 48 additions & 65 deletions assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package assemblers

import (
"runtime"
"time"

"github.com/google/gopacket"
Expand Down Expand Up @@ -30,6 +31,9 @@ var stats struct {
biggestChunkPackets int
overlapBytes int
overlapPackets int
source_received int
source_dropped int
source_if_dropped int
}

type Context struct {
Expand All @@ -41,6 +45,7 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
}

type tcpAssembler struct {
startedAt time.Time
config config.Config
packetSource *gopacket.PacketSource
streamFactory *tcpStreamFactory
Expand Down Expand Up @@ -85,15 +90,9 @@ func (h *tcpAssembler) Start() {
log.Info().Msg("Starting TCP assembler")
flushTicker := time.NewTicker(time.Second * 5)
statsTicker := time.NewTicker(time.Second * 10)
count := 0
bytes := int64(0)
start := time.Now()
h.startedAt = time.Now()
defragger := ip4defrag.NewIPv4Defragmenter()

// statsEvent is for sending packet processing stats to Honeycomb and
// is declared outside the loop for memory re-use.
var statsEvent *libhoney.Event

for {
select {
case <-flushTicker.C:
Expand All @@ -103,25 +102,8 @@ func (h *tcpAssembler) Start() {
Int("closed", closed).
Msg("Flushing old streams")
case <-statsTicker.C:
// intentionally reusing the variable above
statsEvent = libhoney.NewEvent()
statsEvent.Dataset = "hny-ebpf-agent-stats"
statsEvent.Add(map[string]interface{}{
"name": "tcp_assembler_processed",
"packet_count_since_start": count,
"uptime_ms": time.Since(start).Milliseconds(),
"bytes": bytes,
})
statsEvent.Send()
log.Debug().
Int("processed_count_since_start", count).
Int64("uptime_ms", time.Since(start).Milliseconds()).
Int64("bytes", bytes).
Msg("Processed Packets")
h.logAssemblerStats()
case packet := <-h.packetSource.Packets():
count++
data := packet.Data()
bytes += int64(len(data))
// defrag the IPv4 packet if required
if ipv4Layer := packet.Layer(layers.LayerTypeIPv4); ipv4Layer != nil {
ipv4 := ipv4Layer.(*layers.IPv4)
Expand Down Expand Up @@ -163,15 +145,6 @@ func (h *tcpAssembler) Start() {
stats.totalsz += len(tcp.Payload)
h.assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &context)
}

done := h.config.Maxcount > 0 && count >= h.config.Maxcount
if count%h.config.Statsevery == 0 || done {
log.Debug().
Int("processed_count_since_start", count).
Int64("milliseconds_since_start", time.Since(start).Milliseconds()).
Int64("bytes", bytes).
Msg("Processed Packets")
}
}
}
}
Expand All @@ -184,27 +157,48 @@ func (h *tcpAssembler) Stop() {
}

h.streamFactory.WaitGoRoutines()
h.logAssemblerStats()
log.Debug().
Int("closed", closed).
Str("assember_page_usage", h.assembler.Dump()).
Int("IPdefrag", stats.ipdefrag).
Int("missed_bytes", stats.missedBytes).
Int("total_packets", stats.pkt).
Int("rejected_FSM", stats.rejectFsm).
Int("rejected_Options", stats.rejectOpt).
Int("reassembled_bytes", stats.sz).
Int("total_TCP_bytes", stats.totalsz).
Int("conn_rejected_FSM", stats.rejectConnFsm).
Int("reassembled_chunks", stats.reassembled).
Int("out_of_order_packets", stats.outOfOrderPackets).
Int("out_of_order_bytes", stats.outOfOrderBytes).
Int("biggest_chunk_packets", stats.biggestChunkPackets).
Int("biggest_chunk_bytes", stats.biggestChunkBytes).
Int("overlap_packets", stats.overlapPackets).
Int("overlap_bytes", stats.overlapBytes).
Str("assembler_page_usage", h.assembler.Dump()).
Msg("Stopping TCP assembler")
}

func (a *tcpAssembler) logAssemblerStats() {
statsFields := map[string]interface{}{
"uptime_ms": time.Since(a.startedAt).Milliseconds(),
"IPdefrag": stats.ipdefrag,
"missed_bytes": stats.missedBytes,
"total_packets": stats.pkt,
"rejected_FSM": stats.rejectFsm,
"rejected_Options": stats.rejectOpt,
"reassembled_bytes": stats.sz,
"total_TCP_bytes": stats.totalsz,
"conn_rejected_FSM": stats.rejectConnFsm,
"reassembled_chunks": stats.reassembled,
"out_of_order_packets": stats.outOfOrderPackets,
"out_of_order_bytes": stats.outOfOrderBytes,
"biggest_chunk_packets": stats.biggestChunkPackets,
"biggest_chunk_bytes": stats.biggestChunkBytes,
"overlap_packets": stats.overlapPackets,
"overlap_bytes": stats.overlapBytes,
"source_received": stats.source_received,
"source_dropped": stats.source_dropped,
"source_if_dropped": stats.source_if_dropped,
"event_queue_length": len(a.httpEvents),
"goroutines": runtime.NumGoroutine(),
}
statsEvent := libhoney.NewEvent()
statsEvent.Dataset = "hny-ebpf-agent-stats"
statsEvent.AddField("name", "tcp_assembler_stats")
statsEvent.Add(statsFields)
statsEvent.Send()

log.Debug().
Fields(statsFields).
Msg("TCP assembler stats")
}

func newPcapPacketSource(config config.Config) (*gopacket.PacketSource, error) {
log.Info().
Str("interface", config.Interface).
Expand Down Expand Up @@ -240,24 +234,13 @@ func logPcapHandleStats(handle *pcap.Handle) {
ticker := time.NewTicker(time.Second * 10)
for {
<-ticker.C
stats, err := handle.Stats()
pcapStats, err := handle.Stats()
if err != nil {
log.Error().Err(err).Msg("Failed to get pcap handle stats")
continue
}
// TODO use config for different dataset for stats telemetry
// create libhoney event
ev := libhoney.NewEvent()
ev.Dataset = "hny-ebpf-agent-stats"
ev.AddField("name", "tcp_assembler_pcap")
ev.AddField("pcap.packets_received", stats.PacketsReceived)
ev.AddField("pcap.packets_dropped", stats.PacketsDropped)
ev.AddField("pcap.packets_if_dropped", stats.PacketsIfDropped)
log.Info().
Int("pcap.packets_received", stats.PacketsReceived).
Int("pcap.packets_dropped", stats.PacketsDropped).
Int("pcap.packets_if_dropped", stats.PacketsIfDropped).
Msg("Pcap handle stats")
ev.Send()
stats.source_received += pcapStats.PacketsReceived
stats.source_dropped += pcapStats.PacketsDropped
stats.source_if_dropped += pcapStats.PacketsIfDropped
}
}
12 changes: 2 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,17 +113,9 @@ func main() {
}

func handleHttpEvents(events chan assemblers.HttpEvent, client *utils.CachedK8sClient) {
ticker := time.NewTicker(time.Second * 10)
for {
select {
case event := <-events:
sendHttpEventToHoneycomb(event, client)
case <-ticker.C:
log.Info().
Int("event queue length", len(events)).
Int("goroutines", runtime.NumGoroutine()).
Msg("Queue length ticker")
}
event := <-events
sendHttpEventToHoneycomb(event, client)
}
}

Expand Down

0 comments on commit c1645ad

Please sign in to comment.