Skip to content

Commit

Permalink
feat: Request / Response matching (#70)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- Closes #66 

## Short description of the changes
- Make sure there is only one event being created for every request

## How to verify that this has the expected result
- Run the agent and look at the events in Honeycomb

## Known issues
- Some requests don't have a response pair, we'll be addressing this in
a future PR
- Request / response body byte size doesn't currently work

---------

Co-authored-by: JamieDanielson <jamieedanielson@gmail.com>
Co-authored-by: Mike Goldsmth <goldsmith.mike@gmail.com>
Co-authored-by: Jamie Danielson <JamieDanielson@users.noreply.github.com>
Co-authored-by: Robb Kidd <robb@thekidds.org>
Co-authored-by: Vera Reynolds <verareynolds@honeycomb.io>
Co-authored-by: Mike Goldsmith <MikeGoldsmith@users.noreply.github.com>
  • Loading branch information
7 people committed Aug 15, 2023
1 parent da8c2b3 commit 0793c6e
Show file tree
Hide file tree
Showing 7 changed files with 185 additions and 136 deletions.
15 changes: 15 additions & 0 deletions assemblers/http_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package assemblers

import (
"net/http"
"time"
)

type httpEvent struct {
requestId string
request *http.Request
response *http.Response
duration time.Duration
srcIp string
dstIp string
}
61 changes: 61 additions & 0 deletions assemblers/http_matcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package assemblers

import (
"net/http"
"sync"
"time"
)

type httpMatcher struct {
messages *sync.Map
}

type entry struct {
request *http.Request
requestTimestamp time.Time

response *http.Response
responseTimestamp time.Time
}

func newRequestResponseMatcher() httpMatcher {
return httpMatcher{
messages: &sync.Map{},
}
}

func (m *httpMatcher) LoadOrStoreRequest(requestID string, request *http.Request) *entry {

// check if we already have a response for this request, if yes, return it
if e, ok := m.messages.LoadAndDelete(requestID); ok {
e.(*entry).request = request
e.(*entry).requestTimestamp = time.Now()
return e.(*entry)
}

// we don't have a response for this request, so store it for later
entry := entry{
request: request,
requestTimestamp: time.Now(),
}
m.messages.Store(requestID, &entry)
return nil
}

func (m *httpMatcher) LoadOrStoreResponse(requestID string, response *http.Response) *entry {

// check if we already have a request for this response, if yes, return it
if e, ok := m.messages.LoadAndDelete(requestID); ok {
e.(*entry).response = response
e.(*entry).responseTimestamp = time.Now()
return e.(*entry)
}

// we don't have a request for this response, so store it for later
entry := entry{
response: response,
responseTimestamp: time.Now(),
}
m.messages.Store(requestID, &entry)
return nil
}
143 changes: 21 additions & 122 deletions assemblers/http_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,12 @@ package assemblers

import (
"bufio"
"bytes"
"compress/gzip"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"sync"

"github.com/honeycombio/libhoney-go"
)

type httpReader struct {
ident string
isClient bool
srcIp string
srcPort string
Expand All @@ -41,7 +32,6 @@ func (h *httpReader) Read(p []byte) (int, error) {
return l, nil
}


func (h *httpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
Expand All @@ -51,130 +41,39 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
Error("HTTP-request", "HTTP/%s Request error: %s (%v,%+v)\n", h.ident, err, err, err)
// Error("HTTP-request", "HTTP/%s Request error: %s (%v,%+v)\n", h.ident, err, err, err)
continue
}
body, err := io.ReadAll(req.Body)
s := len(body)
if err != nil {
Error("HTTP-request-body", "Got body err: %s\n", err)
}
req.Body.Close()

eventAttrs := map[string]string{
"name": fmt.Sprintf("HTTP %s", req.Method),
"http.request_method": req.Method,
"http.request_ident": h.ident,
"http.request_source_ip": h.srcIp,
"http.request_source_port": h.srcPort,
"http.request_dest_ip": h.dstIp,
"http.request_dest_port": h.dstPort,
"http.request_url": req.RequestURI,
"http.request_body": fmt.Sprintf("%v", req.Body),
"http.request_headers": fmt.Sprintf("%v", req.Header),
"http.h_request_bytes": string(<-h.bytes),
entry := h.parent.matcher.LoadOrStoreRequest(h.parent.ident, req)
if entry != nil {
// we have a match, process complete request/response pair
h.processEvent(entry)
}

Info("HTTP/%s Request: %s %s (body:%d)\n", h.ident, req.Method, req.URL, s)
h.parent.Lock()
h.parent.urls = append(h.parent.urls, req.URL.String())
h.parent.eventAttrs = eventAttrs
h.parent.Unlock()
} else {
res, err := http.ReadResponse(b, nil)
var req string
var eventAttrs map[string]string
h.parent.Lock()
if len(h.parent.urls) == 0 {
req = fmt.Sprintf("<no-request-seen>")
} else {
req, h.parent.urls = h.parent.urls[0], h.parent.urls[1:]
eventAttrs = h.parent.eventAttrs
}
h.parent.Unlock()
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
Error("HTTP-response", "HTTP/%s Response error: %s (%v,%+v)\n", h.ident, err, err, err)
// Error("HTTP-response", "HTTP/%s Response error: %s (%v,%+v)\n", h.ident, err, err, err)
continue
}

body, err := io.ReadAll(res.Body)
s := len(body)
if err != nil {
Error("HTTP-response-body", "HTTP/%s: failed to get body(parsed len:%d): %s\n", h.ident, s, err)
}
res.Body.Close()

ev := libhoney.NewEvent()
ev.Add(eventAttrs)
ev.AddField("http.response_ident", h.ident)
ev.AddField("http.response_body", res.Body)
ev.AddField("http.response_code", res.StatusCode)
ev.AddField("http.response_headers", res.Header)
ev.AddField("http.h_response_bytes", h.bytes)
ev.AddField("http.response_request_url", req)

err = ev.Send()
if err != nil {
Error("Error sending even", "error sending event: %e\n", err)
}

sym := ","
if res.ContentLength > 0 && res.ContentLength != int64(s) {
sym = "!="
}
contentType, ok := res.Header["Content-Type"]
if !ok {
contentType = []string{http.DetectContentType(body)}
}
encoding := res.Header["Content-Encoding"]
Info("HTTP/%s Response: %s URL:%s (%d%s%d%s) -> %s\n", h.ident, res.Status, req, res.ContentLength, sym, s, contentType, encoding)
if err == nil {
base := url.QueryEscape(path.Base(req))
if err != nil {
base = "incomplete-" + base
}
if len(base) > 250 {
base = base[:250] + "..."
}
target := base
n := 0
for true {
_, err := os.Stat(target)
//if os.IsNotExist(err) != nil {
if err != nil {
break
}
target = fmt.Sprintf("%s-%d", base, n)
n++
}
f, err := os.Create(target)
if err != nil {
Error("HTTP-create", "Cannot create %s: %s\n", target, err)
continue
}
var r io.Reader
r = bytes.NewBuffer(body)
if len(encoding) > 0 && (encoding[0] == "gzip" || encoding[0] == "deflate") {
r, err = gzip.NewReader(r)
if err != nil {
Error("HTTP-gunzip", "Failed to gzip decode: %s", err)
}
}
if err == nil {
w, err := io.Copy(f, r)
if _, ok := r.(*gzip.Reader); ok {
r.(*gzip.Reader).Close()
}
f.Close()
if err != nil {
Error("HTTP-save", "%s: failed to save %s (l:%d): %s\n", h.ident, target, w, err)
} else {
Info("%s: Saved %s (l:%d)\n", h.ident, target, w)
}
}
entry := h.parent.matcher.LoadOrStoreResponse(h.parent.ident, res)
if entry != nil {
// we have a match, process complete request/response pair
h.processEvent(entry)
}
}
}
}

func (h *httpReader) processEvent(entry *entry) {
h.parent.events <- httpEvent{
requestId: h.parent.ident,
request: entry.request,
response: entry.response,
duration: entry.responseTimestamp.Sub(entry.requestTimestamp),
srcIp: h.srcIp,
dstIp: h.dstIp,
}
}
76 changes: 68 additions & 8 deletions assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package assemblers
import (
"flag"
"fmt"
"io"
"log"
"os"
"strings"
Expand All @@ -14,6 +15,8 @@ import (
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/reassembly"
"github.com/honeycombio/libhoney-go"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
)

var stats struct {
Expand Down Expand Up @@ -54,6 +57,7 @@ type tcpAssembler struct {
streamFactory *tcpStreamFactory
streamPool *reassembly.StreamPool
assembler *reassembly.Assembler
httpEvents chan httpEvent
}

func NewTcpAssembler(config config) tcpAssembler {
Expand Down Expand Up @@ -93,33 +97,36 @@ func NewTcpAssembler(config config) tcpAssembler {
packetSource.NoCopy = true
Info("Starting to read packets\n")

streamFactory := &tcpStreamFactory{}
streamPool := reassembly.NewStreamPool(streamFactory)
httpEvents := make(chan httpEvent, 10000)
streamFactory := NewTcpStreamFactory(httpEvents)
streamPool := reassembly.NewStreamPool(&streamFactory)
assembler := reassembly.NewAssembler(streamPool)

return tcpAssembler{
handle: handle,
config: &config,
handle: handle,
packetSource: packetSource,
streamFactory: streamFactory,
streamFactory: &streamFactory,
streamPool: streamPool,
assembler: assembler,
httpEvents: httpEvents,
}
}

func (h *tcpAssembler) Start() {

// start up http event handler
// TODO: move this up to main.go level to acces k8s pod metadata
go handleHttpEvents(h.httpEvents)

count := 0
bytes := int64(0)
start := time.Now()
defragger := ip4defrag.NewIPv4Defragmenter()

for packet := range h.packetSource.Packets() {
count++
// Debug("PACKET #%d\n", count)
log.Printf("PACKET #%d\n", count)
data := packet.Data()
bytes += int64(len(data))

// defrag the IPv4 packet if required
if !h.config.nodefrag {
ip4Layer := packet.Layer(layers.LayerTypeIPv4)
Expand Down Expand Up @@ -215,3 +222,56 @@ func (h *tcpAssembler) Stop() {
fmt.Printf(" %s:\t\t%d\n", e, errorsMap[e])
}
}

func handleHttpEvents(events chan httpEvent) {
for {
select {
case event := <-events:

ev := libhoney.NewEvent()
ev.AddField("duration_ms", event.duration.Microseconds())
ev.AddField("http.source_ip", event.srcIp)
ev.AddField("http.destination_ip", event.dstIp)
if event.request != nil {
ev.AddField("name", fmt.Sprintf("HTTP %s", event.request.Method))
ev.AddField(string(semconv.HTTPMethodKey), event.request.Method)
ev.AddField(string(semconv.HTTPURLKey), event.request.RequestURI)
ev.AddField("http.request.body", fmt.Sprintf("%v", event.request.Body))
ev.AddField("http.request.headers", fmt.Sprintf("%v", event.request.Header))
} else {
ev.AddField("name", "HTTP")
ev.AddField("http.request.missing", "no request on this event")
}

if event.response != nil {
ev.AddField(string(semconv.HTTPStatusCodeKey), event.response.StatusCode)
ev.AddField("http.response.body", event.response.Body)
ev.AddField("http.response.headers", event.response.Header)
} else {
ev.AddField("http.response.missing", "no request on this event")
}

//TODO: Body size produces a runtime error, commenting out for now.
// requestSize := getBodySize(event.request.Body)
// ev.AddField("http.request.body.size", requestSize)
// responseSize := getBodySize(event.response.Body)
// ev.AddField("http.response.body.size", responseSize)

err := ev.Send()
if err != nil {
log.Printf("error sending event: %v\n", err)
}
}
}
}

func getBodySize(r io.ReadCloser) int {
length := 0
b, err := io.ReadAll(r)
if err == nil {
length = len(b)
r.Close()
}

return length
}
Loading

0 comments on commit 0793c6e

Please sign in to comment.