Skip to content

Commit

Permalink
maint: Replace custom request/response counter with TCP seq & ack cou…
Browse files Browse the repository at this point in the history
…nters (#163)

## Which problem is this PR solving?
We currently use a custom request / response counter struct to pair up
HTTP request and response pairs. This PR switches to using TCP segment
seq and ack references received by the stream to match requests and
responses.

HTTP requires the request to finish before sending a response. The TCP
ack number on the first segment of a HTTP request will equal TCP seq
number on the first segment of the corresponding HTTP response. This
allows us to track multiple HTTP request / pairs on the same TCP stream.

## Short description of the changes
- Record Seq and Ack field in AssemblyContext struct during packet
handling
- Use seq or ack from context based on flow direction when passing the
packet to a tcp readers in place of request/response counter
- Remove RequestCounter struct and remove from stream struct

## How to verify that this has the expected result
No external change, HTTP requests / response matching better than
before!

---------

Co-authored-by: JamieDanielson <jamieedanielson@gmail.com>
Co-authored-by: Robb Kidd <robbkidd@honeycomb.io>
  • Loading branch information
3 people committed Sep 11, 2023
1 parent bdf0563 commit bd1ac45
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 32 deletions.
11 changes: 7 additions & 4 deletions assemblers/http_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (
type message struct {
data []byte
timestamp time.Time
// Seq will hold SEQ or ACK number for incoming or outgoing HTTP TCP segments
// https://madpackets.com/2018/04/25/tcp-sequence-and-acknowledgement-numbers-explained/
Seq int
}

type httpReader struct {
Expand All @@ -27,6 +30,7 @@ type httpReader struct {
parent *tcpStream
messages chan message
timestamp time.Time
seq int
}

func (h *httpReader) Read(p []byte) (int, error) {
Expand All @@ -36,6 +40,7 @@ func (h *httpReader) Read(p []byte) (int, error) {
msg, ok = <-h.messages
h.data = msg.data
h.timestamp = msg.timestamp
h.seq = msg.Seq
}
if !ok || len(h.data) == 0 {
return 0, io.EOF
Expand All @@ -62,8 +67,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
continue
}

requestCount := h.parent.counter.incrementRequest()
ident := fmt.Sprintf("%s:%d", h.parent.ident, requestCount)
ident := fmt.Sprintf("%s:%d", h.parent.ident, h.seq)
if entry, ok := h.parent.matcher.GetOrStoreRequest(ident, h.timestamp, req); ok {
// we have a match, process complete request/response pair
h.processEvent(ident, entry)
Expand All @@ -80,8 +84,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
continue
}

responseCount := h.parent.counter.incrementResponse()
ident := fmt.Sprintf("%s:%d", h.parent.ident, responseCount)
ident := fmt.Sprintf("%s:%d", h.parent.ident, h.seq)
if entry, ok := h.parent.matcher.GetOrStoreResponse(ident, h.timestamp, res); ok {
// we have a match, process complete request/response pair
h.processEvent(ident, entry)
Expand Down
25 changes: 0 additions & 25 deletions assemblers/request_counter.go

This file was deleted.

8 changes: 8 additions & 0 deletions assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var stats struct {

type Context struct {
CaptureInfo gopacket.CaptureInfo
seq, ack reassembly.Sequence
}

func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
Expand Down Expand Up @@ -115,6 +116,11 @@ func (h *tcpAssembler) Start() {
case <-statsTicker.C:
h.logAssemblerStats()
case packet := <-h.packetSource.Packets():
if packet.NetworkLayer() == nil {
// can't use this packet
continue
}

// defrag the IPv4 packet if required
if ipv4Layer := packet.Layer(layers.LayerTypeIPv4); ipv4Layer != nil {
ipv4 := ipv4Layer.(*layers.IPv4)
Expand Down Expand Up @@ -152,6 +158,8 @@ func (h *tcpAssembler) Start() {
}
context := Context{
CaptureInfo: packet.Metadata().CaptureInfo,
seq: reassembly.Sequence(tcp.Seq),
ack: reassembly.Sequence(tcp.Ack),
}
stats.totalsz += len(tcp.Payload)
h.assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &context)
Expand Down
14 changes: 11 additions & 3 deletions assemblers/tcp_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/rs/zerolog/log"
)

// tcpStream has two unidirectional httpReaders, one for client and one for server
type tcpStream struct {
id uint64
tcpstate *reassembly.TCPSimpleFSM
Expand All @@ -19,7 +20,6 @@ type tcpStream struct {
net, transport gopacket.Flow
client httpReader
server httpReader
counter requestCounter
ident string
closed bool
config config.Config
Expand Down Expand Up @@ -131,17 +131,25 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
return
}

ctx, ok := ac.(*Context)
if !ok {
log.Warn().
Msg("Failed to cast ScatterGather to ContextWithSeq")
}

if length > 0 {
data := sg.Fetch(length)
if dir == reassembly.TCPDirClientToServer {
t.client.messages <- message{
data: data,
timestamp: ac.GetCaptureInfo().Timestamp,
timestamp: ctx.CaptureInfo.Timestamp,
Seq: int(ctx.ack), // client ACK matches server SEQ
}
} else {
t.server.messages <- message{
data: data,
timestamp: ac.GetCaptureInfo().Timestamp,
timestamp: ctx.CaptureInfo.Timestamp,
Seq: int(ctx.seq), // server SEQ matches client ACK
}
}
}
Expand Down

0 comments on commit bd1ac45

Please sign in to comment.