Skip to content

Commit

Permalink
feat: more better logging (#72)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- Closes #69 

## Short description of the changes

- add zerolog as a fast, low-resource logging library (among those
recommended by logrus)
- structure our calls formerly to stdlib's `log` now to zerolog's `log`

TODO:

- [x] replace or remove all the `fmt.Printf()`s

## How to verify that this has the expected result

- Beyond "lookit the console", TBD

---------

Co-authored-by: Mike Goldsmth <goldsmith.mike@gmail.com>
  • Loading branch information
robbkidd and MikeGoldsmith committed Aug 17, 2023
1 parent 340bcba commit 2b20759
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 118 deletions.
6 changes: 4 additions & 2 deletions assemblers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"flag"
"time"

"github.com/rs/zerolog/log"
)

const closeTimeout time.Duration = time.Hour * 24
Expand Down Expand Up @@ -74,9 +76,9 @@ func NewConfig() *config {
if c.Debug {
b, err := json.MarshalIndent(c, "", " ")
if err != nil {
Debug("Failed to marshal agent config: %e", err)
log.Debug().Err(err).Msg("Failed to marshal agent config")
} else {
Debug("Agent config: %s", string(b))
log.Debug().RawJSON("Agent config", b)
}
}
return c
Expand Down
24 changes: 0 additions & 24 deletions assemblers/log.go

This file was deleted.

119 changes: 58 additions & 61 deletions assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,16 @@ package assemblers

import (
"flag"
"fmt"
"log"
"strings"
"sync"
"time"

"github.com/google/gopacket"
"github.com/google/gopacket/ip4defrag"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/reassembly"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)

var stats struct {
Expand All @@ -33,11 +32,6 @@ var stats struct {
overlapPackets int
}

var logLevel int
var errorsMap map[string]uint
var errorsMapMutex sync.Mutex
var errors uint

type Context struct {
CaptureInfo gopacket.CaptureInfo
}
Expand All @@ -60,39 +54,40 @@ func NewTcpAssembler(config config, httpEvents chan HttpEvent) tcpAssembler {
var handle *pcap.Handle
var err error

// Set logging level
if *debug {
logLevel = 2
} else if *verbose {
logLevel = 1
} else if *quiet {
logLevel = -1
}

errorsMap = make(map[string]uint)
// Set up pcap packet capture
if *fname != "" {
log.Printf("Reading from pcap dump %q", *fname)
log.Info().
Str("filename", *fname).
Msg("Reading from pcap dump")
handle, err = pcap.OpenOffline(*fname)
} else {
log.Printf("Starting capture on interface %q", *iface)
log.Info().
Str("interface", *iface).
Msg("Starting capture")
handle, err = pcap.OpenLive(*iface, int32(*snaplen), true, pcap.BlockForever)
}
if err != nil {
log.Fatal(err)
log.Fatal().
Err(err).
Msg("Failed to open a pcap handle")
}
if len(flag.Args()) > 0 {
bpffilter := strings.Join(flag.Args(), " ")
Info("Using BPF filter %q\n", bpffilter)
log.Info().
Str("bpf_filter", bpffilter).
Msg("Using BPF filter")
if err = handle.SetBPFFilter(bpffilter); err != nil {
log.Fatal("BPF filter error:", err)
log.Fatal().
Err(err).
Str("bpf_filter", bpffilter).
Msg("BPF filter error")
}
}

packetSource := gopacket.NewPacketSource(handle, handle.LinkType())
packetSource.Lazy = *lazy
packetSource.NoCopy = true
Info("Starting to read packets\n")
log.Info().Msg("Starting to read packets")

streamFactory := NewTcpStreamFactory(httpEvents)
streamPool := reassembly.NewStreamPool(&streamFactory)
Expand Down Expand Up @@ -131,16 +126,16 @@ func (h *tcpAssembler) Start() {
l := ip4.Length
newip4, err := defragger.DefragIPv4(ip4)
if err != nil {
log.Fatalln("Error while de-fragmenting", err)
log.Fatal().Err(err).Msg("Error while de-fragmenting")
} else if newip4 == nil {
// Debug("Fragment...\n")
log.Printf("Fragment...\n")
log.Debug().Msg("Fragment...\n")
continue // packet fragment, we don't have whole packet yet.
}
if newip4.Length != l {
stats.ipdefrag++
// Debug("Decoding re-assembled packet: %s\n", newip4.NextLayerType())
log.Printf("Decoding re-assembled packet: %s\n", newip4.NextLayerType())
log.Debug().
Str("network_layer_type", newip4.NextLayerType().String()).
Msg("Decoding re-assembled packet")
pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
panic("Not a PacketBuilder")
Expand All @@ -156,7 +151,7 @@ func (h *tcpAssembler) Start() {
if h.config.Checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
if err != nil {
log.Fatalf("Failed to set network layer for checksum: %s\n", err)
log.Fatal().Err(err).Msg("Failed to set network layer for checksum")
}
}
c := Context{
Expand All @@ -168,50 +163,52 @@ func (h *tcpAssembler) Start() {
if count%h.config.Statsevery == 0 {
ref := packet.Metadata().CaptureInfo.Timestamp
flushed, closed := h.assembler.FlushWithOptions(reassembly.FlushOptions{T: ref.Add(-h.config.Timeout), TC: ref.Add(-h.config.CloseTimeout)})
Debug("Forced flush: %d flushed, %d closed (%s)", flushed, closed, ref)
log.Debug().
Int("flushed", flushed).
Int("closed", closed).
Time("packet_timestamp", ref).
Msg("Forced flush")
}

done := h.config.Maxcount > 0 && count >= h.config.Maxcount
if count%h.config.Statsevery == 0 || done {
errorsMapMutex.Lock()
errorMapLen := len(errorsMap)
errorsMapMutex.Unlock()
Debug("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)\n", count, bytes, time.Since(start), errors, errorMapLen)
log.Info().
Int("processed_count_since_start", count).
Int64("milliseconds_since_start", time.Since(start).Milliseconds()).
Int64("bytes", bytes).
Msg("Processed Packets")
}
}
}

func (h *tcpAssembler) Stop() {
closed := h.assembler.FlushAll()
// Debug("Final flush: %d closed", closed)
log.Printf("Final flush: %d closed", closed)
if logLevel >= 2 {
log.Debug().
Int("closed", closed).
Msg("Final flush")
if zerolog.GlobalLevel() >= zerolog.DebugLevel {
// this uses stdlib's log, but oh well
h.streamPool.Dump()
}

h.streamFactory.WaitGoRoutines()
// Debug("%s\n", h.assembler.Dump())
log.Printf("%s\n", h.assembler.Dump())
if !h.config.Nodefrag {
fmt.Printf("IPdefrag:\t\t%d\n", stats.ipdefrag)
}
fmt.Printf("TCP stats:\n")
fmt.Printf(" missed bytes:\t\t%d\n", stats.missedBytes)
fmt.Printf(" total packets:\t\t%d\n", stats.pkt)
fmt.Printf(" rejected FSM:\t\t%d\n", stats.rejectFsm)
fmt.Printf(" rejected Options:\t%d\n", stats.rejectOpt)
fmt.Printf(" reassembled bytes:\t%d\n", stats.sz)
fmt.Printf(" total TCP bytes:\t%d\n", stats.totalsz)
fmt.Printf(" conn rejected FSM:\t%d\n", stats.rejectConnFsm)
fmt.Printf(" reassembled chunks:\t%d\n", stats.reassembled)
fmt.Printf(" out-of-order packets:\t%d\n", stats.outOfOrderPackets)
fmt.Printf(" out-of-order bytes:\t%d\n", stats.outOfOrderBytes)
fmt.Printf(" biggest-chunk packets:\t%d\n", stats.biggestChunkPackets)
fmt.Printf(" biggest-chunk bytes:\t%d\n", stats.biggestChunkBytes)
fmt.Printf(" overlap packets:\t%d\n", stats.overlapPackets)
fmt.Printf(" overlap bytes:\t\t%d\n", stats.overlapBytes)
fmt.Printf("Errors: %d\n", errors)
for e, _ := range errorsMap {
fmt.Printf(" %s:\t\t%d\n", e, errorsMap[e])
}
log.Debug().
Str("assember_page_usage", h.assembler.Dump()).
Int("IPdefrag", stats.ipdefrag).
Int("missed_bytes", stats.missedBytes).
Int("total_packets", stats.pkt).
Int("rejected_FSM", stats.rejectFsm).
Int("rejected_Options", stats.rejectOpt).
Int("reassembled_bytes", stats.sz).
Int("total_TCP_bytes", stats.totalsz).
Int("conn_rejected_FSM", stats.rejectConnFsm).
Int("reassembled_chunks", stats.reassembled).
Int("out_of_order_packets", stats.outOfOrderPackets).
Int("out_of_order_bytes", stats.outOfOrderBytes).
Int("biggest_chunk_packets", stats.biggestChunkPackets).
Int("biggest_chunk_bytes", stats.biggestChunkBytes).
Int("overlap_packets", stats.overlapPackets).
Int("overlap_bytes", stats.overlapBytes).
Msg("Stop")
}
38 changes: 31 additions & 7 deletions assemblers/tcp_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/reassembly"
"github.com/rs/zerolog/log"
)

type tcpStream struct {
Expand Down Expand Up @@ -51,10 +52,16 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
if *checksum {
c, err := tcp.ComputeChecksum()
if err != nil {
Error("ChecksumCompute", "%s: Got error computing checksum: %s\n", t.ident, err)
log.Error().
Err(err).
Str("tcp_stream_ident", t.ident).
Msg("ChecksumCompute")
accept = false
} else if c != 0x0 {
Error("Checksum", "%s: Invalid checksum: 0x%x\n", t.ident, c)
log.Error().
Str("tcp_stream_ident", t.ident).
Uint16("checksum", c).
Msg("InvalidChecksum")
accept = false
}
}
Expand Down Expand Up @@ -86,19 +93,34 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
stats.biggestChunkPackets = sgStats.Packets
}
if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 {
fmt.Printf("bytes:%d, pkts:%d\n", sgStats.OverlapBytes, sgStats.OverlapPackets)
log.Fatal().
Int("bytes", sgStats.OverlapBytes).
Int("packets", sgStats.OverlapPackets).
Msg("Invalid overlap")
panic("Invalid overlap")
}
stats.overlapBytes += sgStats.OverlapBytes
stats.overlapPackets += sgStats.OverlapPackets

var ident string
if dir == reassembly.TCPDirClientToServer {
ident = fmt.Sprintf("%v %v(%s): ", t.net, t.transport, dir)
ident = fmt.Sprintf("%v %v", t.net, t.transport)
} else {
ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir)
ident = fmt.Sprintf("%v %v", t.net.Reverse(), t.transport.Reverse())
}
Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)\n", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
log.Debug().
Str("ident", ident). // ex: "192.168.65.4->192.168.65.4 6443->38304"
Str("direction", dir.String()). // ex: "client->server" or "server->client"
Int("byte_count", length).
Bool("start", start).
Bool("end", end).
Int("skip", skip).
Int("saved", saved).
Int("packet_count", sgStats.Packets).
Int("chunk_count", sgStats.Chunks).
Int("overlap_byte_count", sgStats.OverlapBytes).
Int("overlap_packet_count", sgStats.OverlapPackets).
Msg("SG reassembled packet")
if skip == -1 && *allowmissinginit {
// this is allowed
} else if skip != 0 {
Expand All @@ -119,7 +141,9 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}

func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Debug("%s: Connection closed\n", t.ident)
log.Debug().
Str("tcp_stream_ident", t.ident).
Msg("Connection closed")
close(t.client.bytes)
close(t.server.bytes)
// do not remove the connection to allow last ACK
Expand Down
6 changes: 5 additions & 1 deletion assemblers/tcp_stream_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/reassembly"
"github.com/rs/zerolog/log"
)

var streamId uint64 = 0
Expand All @@ -24,7 +25,10 @@ func NewTcpStreamFactory(httpEvents chan HttpEvent) tcpStreamFactory {
}

func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
Debug("* NEW: %s %s\n", net, transport)
log.Debug().
Str("net", net.String()).
Str("transport", transport.String()).
Msg("NEW tcp stream")
fsmOptions := reassembly.TCPSimpleFSMOptions{
SupportMissingEstablishment: true,
}
Expand Down
Loading

0 comments on commit 2b20759

Please sign in to comment.