Skip to content

Commit

Permalink
feat: Refactor readers to not hold byte buffers (#184)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
Refactors TCP readers to not hold an internal byte buffer and instead
attempts to convert completed packets directly into HTTP requests or
responses.

## Short description of the changes
- Remove channel buffer, use reassembled packet data directly when
parsing request / response
- Move reassemble logic from stream into reader
  • Loading branch information
MikeGoldsmith committed Sep 15, 2023
1 parent 8ec1641 commit e6b6ead
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 211 deletions.
4 changes: 2 additions & 2 deletions assemblers/http_matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ type entry struct {
responseTimestamp time.Time
}

func newRequestResponseMatcher() httpMatcher {
return httpMatcher{
func newRequestResponseMatcher() *httpMatcher {
return &httpMatcher{
messages: make(map[string]entry),
}
}
Expand Down
157 changes: 66 additions & 91 deletions assemblers/tcp_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,113 +2,94 @@ package assemblers

import (
"bufio"
"bytes"
"fmt"
"io"
"net/http"
"sync"
"time"

"github.com/gopacket/gopacket"
"github.com/honeycombio/honeycomb-network-agent/config"
"github.com/gopacket/gopacket/reassembly"
"github.com/rs/zerolog/log"
)

type message struct {
data []byte
timestamp time.Time
// Seq will hold SEQ or ACK number for incoming or outgoing HTTP TCP segments
// https://madpackets.com/2018/04/25/tcp-sequence-and-acknowledgement-numbers-explained/
Seq int
}

type tcpReader struct {
isClient bool
srcIp string
srcPort string
dstIp string
dstPort string
data []byte
parent *tcpStream
messages chan message
timestamp time.Time
seq int
streamIdent string
isClient bool
srcIp string
srcPort string
dstIp string
dstPort string
matcher *httpMatcher
events chan HttpEvent
}

func NewTcpReader(isClient bool, stream *tcpStream, net gopacket.Flow, transport gopacket.Flow, config config.Config) *tcpReader {
func NewTcpReader(streamIdent string, isClient bool, net gopacket.Flow, transport gopacket.Flow, matcher *httpMatcher, httpEvents chan HttpEvent) *tcpReader {
return &tcpReader{
parent: stream,
isClient: isClient,
srcIp: net.Src().String(),
dstIp: net.Dst().String(),
srcPort: transport.Src().String(),
dstPort: transport.Dst().String(),
messages: make(chan message, config.ChannelBufferSize),
streamIdent: streamIdent,
isClient: isClient,
srcIp: net.Src().String(),
dstIp: net.Dst().String(),
srcPort: transport.Src().String(),
dstPort: transport.Dst().String(),
matcher: matcher,
events: httpEvents,
}
}

func (reader *tcpReader) Read(p []byte) (int, error) {
var msg message
ok := true
for ok && len(reader.data) == 0 {
msg, ok = <-reader.messages
reader.timestamp = msg.timestamp
reader.seq = msg.Seq
reader.data = msg.data
msg.data = nil // clear the []byte so we can release the memory
}
if !ok || len(reader.data) == 0 {
return 0, io.EOF
func (reader *tcpReader) reassembledSG(sg reassembly.ScatterGather, ac reassembly.AssemblerContext) {
len, _ := sg.Lengths()
data := sg.Fetch(len)
ctx, ok := ac.(*Context)
if !ok {
log.Warn().
Msg("Failed to cast ScatterGather to ContextWithSeq")
}

l := copy(p, reader.data)
reader.data = reader.data[l:]
return l, nil
}

func (reader *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
for {
b := bufio.NewReader(reader)
if reader.isClient {
req, err := http.ReadRequest(b)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
log.Debug().
Err(err).
Str("ident", reader.parent.ident).
Msg("Error reading HTTP request")
continue
}

ident := fmt.Sprintf("%s:%d", reader.parent.ident, reader.seq)
if entry, ok := reader.parent.matcher.GetOrStoreRequest(ident, reader.timestamp, req); ok {
// we have a match, process complete request/response pair
reader.processEvent(ident, entry)
}
} else {
res, err := http.ReadResponse(b, nil)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
log.Debug().
Err(err).
Str("ident", reader.parent.ident).
Msg("Error reading HTTP response")
continue
}

ident := fmt.Sprintf("%s:%d", reader.parent.ident, reader.seq)
if entry, ok := reader.parent.matcher.GetOrStoreResponse(ident, reader.timestamp, res); ok {
// we have a match, process complete request/response pair
reader.processEvent(ident, entry)
}
b := bytes.NewReader(data)
r := bufio.NewReader(b)
if reader.isClient {
// We use TCP SEQ & ACK numbers to identify request/response pairs
// ACK corresponds to SEQ of the HTTP response
// https://madpackets.com/2018/04/25/tcp-sequence-and-acknowledgement-numbers-explained/
reqIdent := fmt.Sprintf("%s:%d", reader.streamIdent, ctx.ack)
req, err := http.ReadRequest(r)
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
} else if err != nil {
log.Info().
Err(err).
Str("ident", reader.streamIdent).
Msg("Error reading HTTP request")
return
}
if entry, ok := reader.matcher.GetOrStoreRequest(reqIdent, ctx.CaptureInfo.Timestamp, req); ok {
// we have a match, process complete request/response pair
reader.processEvent(reqIdent, entry)
}
} else {
// We use TCP SEQ & ACK numbers to identify request/response pairs
// SEQ corresponds to ACK of the HTTP request
// https://madpackets.com/2018/04/25/tcp-sequence-and-acknowledgement-numbers-explained/
resIdent := fmt.Sprintf("%s:%d", reader.streamIdent, ctx.seq)
res, err := http.ReadResponse(r, nil)
if err == io.EOF || err == io.ErrUnexpectedEOF {
return
} else if err != nil {
log.Info().
Err(err).
Str("ident", resIdent).
Msg("Error reading HTTP response")
return
}
if entry, ok := reader.matcher.GetOrStoreResponse(resIdent, ctx.CaptureInfo.Timestamp, res); ok {
// we have a match, process complete request/response pair
reader.processEvent(resIdent, entry)
}
}
}

func (reader *tcpReader) processEvent(ident string, entry *entry) {
reader.parent.events <- HttpEvent{
reader.events <- HttpEvent{
RequestId: ident,
Request: entry.request,
Response: entry.response,
Expand All @@ -118,9 +99,3 @@ func (reader *tcpReader) processEvent(ident string, entry *entry) {
DstIp: reader.dstIp,
}
}

func (reader *tcpReader) close() error {
close(reader.messages)
reader.data = nil // release the data, free up that memory! ᕕ( ᐛ )ᕗ
return nil
}
135 changes: 21 additions & 114 deletions assemblers/tcp_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package assemblers

import (
"fmt"
"sync"

"github.com/gopacket/gopacket"
"github.com/gopacket/gopacket/layers"
Expand All @@ -11,40 +10,33 @@ import (
"github.com/rs/zerolog/log"
)

// tcpStream has two unidirectional httpReaders, one for client and one for server
// tcpStream has two unidirectional tcpReaders, one for client and one for server
type tcpStream struct {
id uint64
tcpstate *reassembly.TCPSimpleFSM
fsmerr bool
optchecker reassembly.TCPOptionCheck
net, transport gopacket.Flow
client *tcpReader
server *tcpReader
ident string
closed bool
config config.Config
sync.Mutex
matcher httpMatcher
events chan HttpEvent
id uint64
ident string
config config.Config
tcpstate *reassembly.TCPSimpleFSM
fsmerr bool
optchecker reassembly.TCPOptionCheck
client *tcpReader
server *tcpReader
}

func NewTcpStream(streamId uint64, net gopacket.Flow, transport gopacket.Flow, config config.Config, httpEvents chan HttpEvent) *tcpStream {
stream := &tcpStream{
config: config,
id: streamId,
net: net,
transport: transport,
streamIdent := fmt.Sprintf("%s:%s:%d", net, transport, streamId)
matcher := newRequestResponseMatcher()
return &tcpStream{
id: streamId,
ident: streamIdent,
config: config,
tcpstate: reassembly.NewTCPSimpleFSM(reassembly.TCPSimpleFSMOptions{
SupportMissingEstablishment: true,
}),
ident: fmt.Sprintf("%s:%s:%d", net, transport, streamId),
fsmerr: false, // TODO: verify whether we need this
optchecker: reassembly.NewTCPOptionCheck(),
matcher: newRequestResponseMatcher(),
events: httpEvents,
client: NewTcpReader(streamIdent, true, net, transport, matcher, httpEvents),
server: NewTcpReader(streamIdent, false, net.Reverse(), transport.Reverse(), matcher, httpEvents),
}
stream.client = NewTcpReader(true, stream, net, transport, config)
stream.server = NewTcpReader(false, stream, net.Reverse(), transport.Reverse(), config)
return stream
}

func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
Expand Down Expand Up @@ -94,83 +86,11 @@ func (stream *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir re
}

func (stream *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.AssemblerContext) {
dir, start, end, skip := sg.Info()
length, saved := sg.Lengths()
// update stats
sgStats := sg.Stats()
if skip > 0 {
stats.missedBytes += skip
}
stats.sz += length - saved
stats.pkt += sgStats.Packets
if sgStats.Chunks > 1 {
stats.reassembled++
}
stats.outOfOrderPackets += sgStats.QueuedPackets
stats.outOfOrderBytes += sgStats.QueuedBytes
if length > stats.biggestChunkBytes {
stats.biggestChunkBytes = length
}
if sgStats.Packets > stats.biggestChunkPackets {
stats.biggestChunkPackets = sgStats.Packets
}
if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 {
log.Fatal().
Int("bytes", sgStats.OverlapBytes).
Int("packets", sgStats.OverlapPackets).
Msg("Invalid overlap")
panic("Invalid overlap")
}
stats.overlapBytes += sgStats.OverlapBytes
stats.overlapPackets += sgStats.OverlapPackets

var ident string
dir, _, _, _ := sg.Info()
if dir == reassembly.TCPDirClientToServer {
ident = fmt.Sprintf("%v %v", stream.net, stream.transport)
stream.client.reassembledSG(sg, ac)
} else {
ident = fmt.Sprintf("%v %v", stream.net.Reverse(), stream.transport.Reverse())
}
log.Debug().
Str("ident", ident). // ex: "192.168.65.4->192.168.65.4 6443->38304"
Str("direction", dir.String()). // ex: "client->server" or "server->client"
Int("byte_count", length).
Bool("start", start).
Bool("end", end).
Int("skip", skip).
Int("saved", saved).
Int("packet_count", sgStats.Packets).
Int("chunk_count", sgStats.Chunks).
Int("overlap_byte_count", sgStats.OverlapBytes).
Int("overlap_packet_count", sgStats.OverlapPackets).
Msg("SG reassembled packet")
if skip == -1 && stream.config.Allowmissinginit {
// this is allowed
} else if skip != 0 {
// Missing bytes in stream: do not even try to parse it
return
}

ctx, ok := ac.(*Context)
if !ok {
log.Warn().
Msg("Failed to cast ScatterGather to ContextWithSeq")
}

if length > 0 {
data := sg.Fetch(length)
if dir == reassembly.TCPDirClientToServer {
stream.client.messages <- message{
data: data,
timestamp: ctx.CaptureInfo.Timestamp,
Seq: int(ctx.ack), // client ACK matches server SEQ
}
} else {
stream.server.messages <- message{
data: data,
timestamp: ctx.CaptureInfo.Timestamp,
Seq: int(ctx.seq), // server SEQ matches client ACK
}
}
stream.server.reassembledSG(sg, ac)
}
}

Expand All @@ -179,21 +99,8 @@ func (stream *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool
log.Debug().
Str("tcp_stream_ident", stream.ident).
Msg("Connection closed")
stream.close()

// decrement the number of active streams
DecrementActiveStreamCount()
return true // remove the connection, heck with the last ACK
}

// close closes the tcpStream and its httpReaders.
func (stream *tcpStream) close() {
stream.Lock()
defer stream.Unlock()

if !stream.closed {
stream.closed = true
stream.client.close()
stream.server.close()
}
}
4 changes: 0 additions & 4 deletions assemblers/tcp_stream_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
streamId := IncrementStreamCount()
stream := NewTcpStream(streamId, net, transport, factory.config, factory.httpEvents)

factory.wg.Add(2)
go stream.client.run(&factory.wg)
go stream.server.run(&factory.wg)

// increment the number of active streams
IncrementActiveStreamCount()
return stream
Expand Down

0 comments on commit e6b6ead

Please sign in to comment.