Skip to content

Commit

Permalink
feat: Separate stream flush and close timeouts (#162)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
We currently use the same timeout for both flushing and closing streams.
This is inefficient because we should flush stream regularlary to clear
stock packets but not close stream prematurely.

## Short description of the changes
- Separate stream timeouts into flush and close config options; flush
defaults to 10 seconds and close to 90 seconds
- Update assembler to use both config options when calling
`FlushAndCloseStreams` func
- Update ticker to run more frequently by using flush ticker / 4

## How to verify that this has the expected result
Better stream memory management by flushing stream more regularly and
allowing streams to stay open longer.

---------

Co-authored-by: Jamie Danielson <jamieedanielson@gmail.com>
  • Loading branch information
MikeGoldsmith and JamieDanielson committed Sep 11, 2023
1 parent 0d49360 commit bdf0563
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
13 changes: 10 additions & 3 deletions assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,22 @@ func NewTcpAssembler(config config.Config, httpEvents chan HttpEvent) tcpAssembl

func (h *tcpAssembler) Start() {
log.Info().Msg("Starting TCP assembler")
flushTicker := time.NewTicker(time.Second * 5)
// Tick on the tightest loop. The flush timeout is the shorter of the two timeouts using this ticker.
// Tick even more frequently than the flush interval (4 is somewhat arbitrary)
flushCloseTicker := time.NewTicker(h.config.StreamFlushTimeout / 4)
statsTicker := time.NewTicker(time.Second * 10)
h.startedAt = time.Now()
defragger := ip4defrag.NewIPv4Defragmenter()

for {
select {
case <-flushTicker.C:
flushed, closed := h.assembler.FlushCloseOlderThan(time.Now().Add(-h.config.Timeout))
case <-flushCloseTicker.C:
flushed, closed := h.assembler.FlushWithOptions(
reassembly.FlushOptions{
T: time.Now().Add(-h.config.StreamFlushTimeout),
TC: time.Now().Add(-h.config.StreamCloseTimeout),
},
)
log.Debug().
Int("flushed", flushed).
Int("closed", closed).
Expand Down
11 changes: 6 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/rs/zerolog/log"
)

const timeout time.Duration = time.Second * 30

var maxcount = flag.Int("c", -1, "Only grab this many packets, then exit")
var statsevery = flag.Int("stats", 1000, "Output statistics every N packets")
var lazy = flag.Bool("lazy", false, "If true, do lazy decoding")
Expand All @@ -32,6 +30,8 @@ var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
var packetSource = flag.String("source", "pcap", "Packet source (defaults to pcap)")
var bpfFilter = flag.String("filter", "tcp", "BPF filter")
var channelBufferSize = flag.Int("channel_buffer_size", 1000, "Channel buffer size (defaults to 1000)")
var streamFlushTimeout = flag.Int("stream_flush_timeout", 10, "Stream flush timeout in seconds (defaults to 10)")
var streamCloseTimeout = flag.Int("stream_close_timeout", 90, "Stream close timeout in seconds (defaults to 90)")
var maxBufferedPagesTotal = flag.Int("gopacket_pages", 150_000, "Maximum number of TCP reassembly pages to allocate per interface")
var maxBufferedPagesPerConnection = flag.Int("gopacket_per_conn", 4000, "Maximum number of TCP reassembly pages per connection")

Expand All @@ -52,8 +52,8 @@ type Config struct {
Snaplen int
TsType string
Promiscuous bool
CloseTimeout time.Duration
Timeout time.Duration
StreamFlushTimeout time.Duration
StreamCloseTimeout time.Duration
PacketSource string
BpfFilter string
ChannelBufferSize int
Expand All @@ -79,7 +79,8 @@ func NewConfig() Config {
Snaplen: *snaplen,
TsType: *tstype,
Promiscuous: *promisc,
Timeout: timeout,
StreamFlushTimeout: time.Duration(*streamFlushTimeout) * time.Second,
StreamCloseTimeout: time.Duration(*streamCloseTimeout) * time.Second,
PacketSource: *packetSource,
BpfFilter: *bpfFilter,
ChannelBufferSize: *channelBufferSize,
Expand Down

0 comments on commit bdf0563

Please sign in to comment.