Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9937b76
fix(toptalk): return receiver's window info for TCP flow queries
acooks Dec 17, 2025
3a86f5b
fix(tcp): accumulate congestion events per-interval for reliable markers
acooks Dec 18, 2025
9662e06
perf(toptalk): efficient per-interval TCP window tracking
acooks Dec 19, 2025
180d01e
fix(tcp): propagate window events to correct flow direction
acooks Dec 19, 2025
781a949
refactor(flow): extract flow_reverse() as inline utility
acooks Dec 19, 2025
f3c11da
test(flow): add unit tests for flow_reverse()
acooks Dec 19, 2025
4fa3417
test(tcp): add event receiver-direction semantics test
acooks Dec 19, 2025
2d164f7
refactor(tcp): extract make_canonical_key() to shared header
acooks Dec 19, 2025
2c38b3a
test(tcp): add unit tests for make_canonical_key()
acooks Dec 19, 2025
13b8bba
fix(intervals): add NULL checks for malloc allocations
acooks Dec 19, 2025
e41c154
refactor(intervals): clarify table rotation with free_flow_table helper
acooks Dec 19, 2025
090685d
perf(decode): store L4 offset during decode to avoid re-parsing
acooks Dec 20, 2025
7e81c97
perf(intervals): replace per-packet malloc with ring buffer
acooks Dec 20, 2025
0c4eac3
perf(intervals): O(1) pointer swap for table rotation
acooks Dec 20, 2025
6b3cfdb
perf(intervals): replace full sort with top-N selection
acooks Dec 20, 2025
d61b08b
fix(intervals): expire oldest ring entry before overwriting
acooks Dec 20, 2025
cd55d48
perf(intervals): use compact ring buffer entries (736 -> 68 bytes)
acooks Dec 20, 2025
874f30d
fix(intervals): use packet time domain for expiration
acooks Dec 20, 2025
abda02c
perf(flow): optimize pkt_ring_entry to 64 bytes (1 cache line)
acooks Dec 20, 2025
8fbe8c0
perf(intervals): defer interval updates and add runtime ring config
acooks Dec 20, 2025
51ca37c
perf(intervals): add flow pool, CRC32C hash, and prefetch hints
acooks Dec 20, 2025
134a607
fix(toptalk): add stable tie-breaker to flow ranking sort
acooks Dec 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 23 additions & 4 deletions deps/toptalk/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ LIB = toptalk.a
TEST = test-toptalk
TEST_RTT = test-tcp-rtt
TEST_WINDOW = test-tcp-window
TEST_FLOW = test-flow
TEST_FLOW_KEY = test-flow-key
TEST_VIDEO = test-video-detect
TEST_RTSP = test-rtsp-tap

Expand All @@ -22,6 +24,7 @@ HEADERS = \
flow.h \
intervals.h \
rtsp_tap.h \
tcp_flow_key.h \
tcp_rtt.h \
tcp_window.h \
video_detect.h \
Expand Down Expand Up @@ -54,6 +57,10 @@ CFLAGS_HARDENED = \
-Warray-bounds \
-Wstringop-overflow

# Enable SSE4.2 for hardware-accelerated CRC32C hash function (if available)
# Falls back to software implementation on CPUs without SSE4.2
CFLAGS_SIMD = -msse4.2

ifeq ($(BUILD_TYPE),release)
CFLAGS_OPT = -O2 -D_FORTIFY_SOURCE=2
CFLAGS_SANITIZER =
Expand All @@ -70,10 +77,10 @@ LDFLAGS := -lrt -lpthread \
$(LDFLAGS_SANITIZER) \
$(LDFLAGS)

CFLAGS := -g $(CFLAGS_OPT) -Wall -pedantic -std=c11 $(DEFINES) $(CFLAGS_HARDENED) $(CFLAGS_SANITIZER) $(CFLAGS)
CFLAGS := -g $(CFLAGS_OPT) -Wall -pedantic -std=c11 $(DEFINES) $(CFLAGS_HARDENED) $(CFLAGS_SIMD) $(CFLAGS_SANITIZER) $(CFLAGS)

.PHONY: all
all: $(LIB) $(TEST) $(TEST_RTT) $(TEST_WINDOW) $(TEST_VIDEO) $(TEST_RTSP) $(PROG)
all: $(LIB) $(TEST) $(TEST_RTT) $(TEST_WINDOW) $(TEST_FLOW) $(TEST_FLOW_KEY) $(TEST_VIDEO) $(TEST_RTSP) $(PROG)

%.o: %.c %.h Makefile ../make.config make.config
$(COMPILE.c) $(DEFINES) $< -o $@
Expand Down Expand Up @@ -101,6 +108,14 @@ $(TEST_WINDOW): $(LIB) test_tcp_window.c
@echo Building $(TEST_WINDOW)
$(CC) -o $(TEST_WINDOW) test_tcp_window.c timeywimey.c $(LIB) $(LDLIBS) $(LDFLAGS) $(CFLAGS)

$(TEST_FLOW): $(LIB) test_flow.c
@echo Building $(TEST_FLOW)
$(CC) -o $(TEST_FLOW) test_flow.c $(LIB) $(LDLIBS) $(LDFLAGS) $(CFLAGS)

$(TEST_FLOW_KEY): $(LIB) test_tcp_flow_key.c
@echo Building $(TEST_FLOW_KEY)
$(CC) -o $(TEST_FLOW_KEY) test_tcp_flow_key.c $(LIB) $(LDLIBS) $(LDFLAGS) $(CFLAGS)

$(TEST_VIDEO): $(LIB) test_video_detect.c
@echo Building $(TEST_VIDEO)
$(CC) -o $(TEST_VIDEO) test_video_detect.c $(LIB) $(LDLIBS) $(LDFLAGS) $(CFLAGS)
Expand All @@ -110,11 +125,15 @@ $(TEST_RTSP): $(LIB) test_rtsp_tap.c
$(CC) -o $(TEST_RTSP) test_rtsp_tap.c $(LIB) $(LDLIBS) $(LDFLAGS) $(CFLAGS)

.PHONY: test
test: $(TEST) $(TEST_RTT) $(TEST_WINDOW) $(TEST_VIDEO) $(TEST_RTSP)
test: $(TEST) $(TEST_RTT) $(TEST_WINDOW) $(TEST_FLOW) $(TEST_FLOW_KEY) $(TEST_VIDEO) $(TEST_RTSP)
@echo "Running RTT unit tests (no root required)..."
@./$(TEST_RTT) && echo -e "RTT unit tests OK\n"
@echo "Running window unit tests (no root required)..."
@./$(TEST_WINDOW) && echo -e "Window unit tests OK\n"
@echo "Running flow unit tests (no root required)..."
@./$(TEST_FLOW) && echo -e "Flow unit tests OK\n"
@echo "Running flow key unit tests (no root required)..."
@./$(TEST_FLOW_KEY) && echo -e "Flow key unit tests OK\n"
@echo "Running video detection unit tests (no root required)..."
@./$(TEST_VIDEO) && echo -e "Video detection unit tests OK\n"
@echo "Running RTSP tap unit tests (no root required)..."
Expand All @@ -132,5 +151,5 @@ clang-analyze: clean

.PHONY: clean
clean:
rm $(LIB) $(PROG) $(TEST) $(TEST_RTT) $(TEST_WINDOW) $(TEST_VIDEO) $(TEST_RTSP) *.o *.a || true
rm $(LIB) $(PROG) $(TEST) $(TEST_RTT) $(TEST_WINDOW) $(TEST_FLOW) $(TEST_FLOW_KEY) $(TEST_VIDEO) $(TEST_RTSP) *.o *.a || true
rm *.gcno *.gcov *.gcda || true
83 changes: 72 additions & 11 deletions deps/toptalk/decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,51 @@
#include "flow.h"
#include "decode.h"

/* Internal versions that track packet start for L4 offset calculation */
static int decode_ip4_internal(const uint8_t *packet,
const uint8_t *end_of_packet,
const uint8_t *packet_start,
struct flow_pkt *pkt, char *errstr);
static int decode_ip6_internal(const uint8_t *packet,
const uint8_t *end_of_packet,
const uint8_t *packet_start,
struct flow_pkt *pkt, char *errstr);

int decode_ethernet(const struct pcap_pkthdr *h, const uint8_t *wirebits,
struct flow_pkt *pkt, char *errstr)
{
const struct hdr_ethernet *ethernet;
const uint8_t *end_of_packet = wirebits + h->caplen;
const uint8_t *packet_start = wirebits;
int ret;

pkt->timestamp.tv_sec = h->ts.tv_sec;
pkt->timestamp.tv_usec = h->ts.tv_usec;
pkt->flow_rec.bytes = h->len;
pkt->flow_rec.packets = 1;
pkt->has_l4_offset = 0; /* Default: no L4 offset */

ethernet = (struct hdr_ethernet *)wirebits;

switch (ntohs(ethernet->type)) {
case ETHERTYPE_IP:
ret = decode_ip4(wirebits + HDR_LEN_ETHER, end_of_packet, pkt,
errstr);
ret = decode_ip4_internal(wirebits + HDR_LEN_ETHER, end_of_packet,
packet_start, pkt, errstr);
break;
case ETHERTYPE_VLAN:
/* For VLAN, recurse with position shifted past VLAN tag.
* The inner call computes l4_offset from its packet_start,
* so we adjust to get offset from original packet start. */
ret = decode_ethernet(h, wirebits + HDR_LEN_ETHER_VLAN, pkt,
errstr);
if (ret == 0 && pkt->has_l4_offset) {
/* Add 4 bytes to account for VLAN tag the inner call skipped */
pkt->l4_offset += (HDR_LEN_ETHER_VLAN - HDR_LEN_ETHER);
}
break;
case ETHERTYPE_IPV6:
ret = decode_ip6(wirebits + HDR_LEN_ETHER, end_of_packet, pkt,
errstr);
ret = decode_ip6_internal(wirebits + HDR_LEN_ETHER, end_of_packet,
packet_start, pkt, errstr);
break;
case ETHERTYPE_ARP:
snprintf(errstr, DECODE_ERRBUF_SIZE, "%s", "ARP ignored");
Expand All @@ -61,22 +80,24 @@ int decode_linux_sll(const struct pcap_pkthdr *h, const uint8_t *wirebits,
{
const struct sll_header *sll;
const uint8_t *end_of_packet = wirebits + h->caplen;
const uint8_t *packet_start = wirebits;
int ret;

pkt->timestamp.tv_sec = h->ts.tv_sec;
pkt->timestamp.tv_usec = h->ts.tv_usec;
pkt->flow_rec.bytes = h->len;
pkt->flow_rec.packets = 1;
pkt->has_l4_offset = 0; /* Default: no L4 offset */

sll = (struct sll_header *)wirebits;
switch (ntohs(sll->sll_protocol)) {
case ETHERTYPE_IP:
ret = decode_ip4(wirebits + SLL_HDR_LEN, end_of_packet, pkt,
errstr);
ret = decode_ip4_internal(wirebits + SLL_HDR_LEN, end_of_packet,
packet_start, pkt, errstr);
break;
case ETHERTYPE_IPV6:
ret = decode_ip6(wirebits + SLL_HDR_LEN, end_of_packet, pkt,
errstr);
ret = decode_ip6_internal(wirebits + SLL_HDR_LEN, end_of_packet,
packet_start, pkt, errstr);
break;
default:
snprintf(errstr, DECODE_ERRBUF_SIZE,
Expand All @@ -87,8 +108,11 @@ int decode_linux_sll(const struct pcap_pkthdr *h, const uint8_t *wirebits,
return ret;
}

int decode_ip6(const uint8_t *packet, const uint8_t *end_of_packet,
struct flow_pkt *pkt, char *errstr)
/* Internal version with packet_start for L4 offset calculation */
static int decode_ip6_internal(const uint8_t *packet,
const uint8_t *end_of_packet,
const uint8_t *packet_start,
struct flow_pkt *pkt, char *errstr)
{
int ret;
const void *next = (uint8_t *)packet + sizeof(struct hdr_ipv6);
Expand Down Expand Up @@ -141,9 +165,19 @@ int decode_ip6(const uint8_t *packet, const uint8_t *end_of_packet,
switch (next_hdr) {
case IPPROTO_TCP:
ret = decode_tcp(next, pkt, errstr);
/* Store L4 offset for TCP packets */
if (ret == 0 && packet_start) {
pkt->l4_offset = (uint16_t)((const uint8_t *)next - packet_start);
pkt->has_l4_offset = 1;
}
break;
case IPPROTO_UDP:
ret = decode_udp(next, pkt, errstr);
/* Store L4 offset for UDP packets */
if (ret == 0 && packet_start) {
pkt->l4_offset = (uint16_t)((const uint8_t *)next - packet_start);
pkt->has_l4_offset = 1;
}
break;
case IPPROTO_ICMP:
ret = decode_icmp(next, pkt, errstr);
Expand All @@ -166,8 +200,18 @@ int decode_ip6(const uint8_t *packet, const uint8_t *end_of_packet,
return ret;
}

int decode_ip4(const uint8_t *packet, const uint8_t *end_of_packet,
/* Public wrapper for backwards compatibility */
int decode_ip6(const uint8_t *packet, const uint8_t *end_of_packet,
struct flow_pkt *pkt, char *errstr)
{
return decode_ip6_internal(packet, end_of_packet, NULL, pkt, errstr);
}

/* Internal version with packet_start for L4 offset calculation */
static int decode_ip4_internal(const uint8_t *packet,
const uint8_t *end_of_packet,
const uint8_t *packet_start,
struct flow_pkt *pkt, char *errstr)
{
int ret;
const void *next;
Expand All @@ -189,9 +233,19 @@ int decode_ip4(const uint8_t *packet, const uint8_t *end_of_packet,
switch (ip4_packet->ip_p) {
case IPPROTO_TCP:
ret = decode_tcp(next, pkt, errstr);
/* Store L4 offset for TCP packets */
if (ret == 0 && packet_start) {
pkt->l4_offset = (uint16_t)((const uint8_t *)next - packet_start);
pkt->has_l4_offset = 1;
}
break;
case IPPROTO_UDP:
ret = decode_udp(next, pkt, errstr);
/* Store L4 offset for UDP packets */
if (ret == 0 && packet_start) {
pkt->l4_offset = (uint16_t)((const uint8_t *)next - packet_start);
pkt->has_l4_offset = 1;
}
break;
case IPPROTO_ICMP:
ret = decode_icmp(next, pkt, errstr);
Expand All @@ -211,6 +265,13 @@ int decode_ip4(const uint8_t *packet, const uint8_t *end_of_packet,
return ret;
}

/* Public wrapper for backwards compatibility */
int decode_ip4(const uint8_t *packet, const uint8_t *end_of_packet,
struct flow_pkt *pkt, char *errstr)
{
return decode_ip4_internal(packet, end_of_packet, NULL, pkt, errstr);
}

int decode_tcp(const struct hdr_tcp *packet, struct flow_pkt *pkt, char *errstr)
{
unsigned int size_tcp = (TH_OFF(packet) * 4);
Expand Down
70 changes: 69 additions & 1 deletion deps/toptalk/flow.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <string.h>
#include <sys/time.h>
#include <netinet/in.h>
#include <net/ethernet.h>

/* Video stream type detection */
enum video_stream_type {
Expand Down Expand Up @@ -161,13 +162,47 @@ static inline int flow_cmp(const struct flow *a, const struct flow *b)
return 0; /* Equal */
}

/*
* Create a reversed copy of a flow (swap src/dst addresses and ports).
* Used for TCP window event propagation: events detected in packets FROM host A
* are stored on the flow TO host A (where host A's window is displayed).
*
* Performance: O(1), inline, no heap allocation. Safe for hot path.
*/
static inline struct flow flow_reverse(const struct flow *f)
{
struct flow rev = *f;

if (f->ethertype == ETHERTYPE_IP) {
struct in_addr tmp = rev.src_ip;
rev.src_ip = rev.dst_ip;
rev.dst_ip = tmp;
} else {
struct in6_addr tmp6 = rev.src_ip6;
rev.src_ip6 = rev.dst_ip6;
rev.dst_ip6 = tmp6;
}

uint16_t tmp_port = rev.sport;
rev.sport = rev.dport;
rev.dport = tmp_port;

return rev;
}

/* Cached TCP RTT info - populated by tt_get_top5() for thread-safe access */
struct flow_rtt_info {
int64_t rtt_us; /* RTT in microseconds, -1 if unknown */
int32_t tcp_state; /* TCP connection state, -1 if unknown */
int32_t saw_syn; /* 1 if SYN was observed, 0 otherwise */
};

/* Window condition flags (computed at interval boundary) */
#define WINDOW_COND_LOW 0x01 /* Avg window below threshold this interval */
#define WINDOW_COND_ZERO_SEEN 0x02 /* Min window was 0 this interval */
#define WINDOW_COND_STARVING 0x04 /* Low window for 3+ consecutive intervals */
#define WINDOW_COND_RECOVERED 0x08 /* Recovered from low/zero window */

/* Cached TCP window/congestion info - populated by tt_get_top5() */
struct flow_window_info {
int64_t rwnd_bytes; /* Receive window in bytes, -1 if unknown */
Expand All @@ -176,7 +211,17 @@ struct flow_window_info {
uint32_t dup_ack_cnt; /* Duplicate ACK events */
uint32_t retransmit_cnt; /* Retransmission events */
uint32_t ece_cnt; /* ECE flag count */
uint8_t recent_events; /* Bitmask of recent congestion events */
uint64_t recent_events; /* Bitmask of recent congestion events */

/* Per-interval window accumulation (for historical window tracking) */
uint64_t window_sum; /* Sum of scaled window values */
uint32_t window_samples; /* Count of samples */
uint32_t window_min; /* Min window this interval */
uint32_t window_max; /* Max window this interval */

/* Condition flags (computed at interval boundary) */
uint8_t window_conditions; /* Bitmask of WINDOW_COND_* flags */
uint8_t low_window_streak; /* Consecutive intervals with low window */
};

/* Inter-packet gap (IPG) histogram bucket count */
Expand Down Expand Up @@ -313,6 +358,29 @@ struct flow_record {
struct flow_pkt {
struct flow_record flow_rec;
struct timeval timestamp;
/* TCP window value (set during TCP packet processing) */
uint32_t tcp_scaled_window; /* Scaled window value, 0 if not TCP */
uint8_t has_tcp_window; /* 1 if tcp_scaled_window is valid */
/* L4 header offset - computed during decode to avoid re-parsing */
uint16_t l4_offset; /* Offset to TCP/UDP header from packet start */
uint8_t has_l4_offset; /* 1 if l4_offset is valid (TCP/UDP packet) */
};

/* Compact ring buffer entry - stores only what's needed for expiration.
* Used instead of full flow_pkt to reduce memory ~10x (736 -> 64 bytes).
* The ring buffer only needs to track:
* - flow key for hash table lookup during expiration
* - bytes to subtract from flow totals
* - timestamp to determine when entry expires
*
* Layout optimized for 64-byte cache line:
* - bytes (uint32_t) fits in padding gap after flow (44 bytes + 4 = 48)
* - timestamp (struct timeval) is 8-byte aligned at offset 48
*/
struct pkt_ring_entry {
struct flow flow; /* 44 bytes - hash key for lookup */
uint32_t bytes; /* 4 bytes - fits in padding, max 4GB/pkt */
struct timeval timestamp; /* 16 bytes - for age check */
}; /* Total: 64 bytes (1 cache line) */

#endif
Loading