Skip to content

Commit

Permalink
fix: separate event transform function (#81)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

- Latency between when http request is made and when the event is sent
to Honeycomb

## Short description of the changes

+ use request timestamp as libhoney event timestamp
+ added a ticker to monitor httpEvent events queue size & concurrent
goroutines
+ commented out k8s metadata for the moment because it was taking too
long and queue was backing up

<img width="1094" alt="Screenshot 2023-08-17 at 4 50 16 PM"
src="https://github.com/honeycombio/honeycomb-ebpf-agent/assets/517302/2304d64b-e4ec-407e-b37f-de05a7f3978a">

Co-authored-by: Mike Goldsmith <MikeGoldsmith@users.noreply.github.com>
  • Loading branch information
robbkidd and MikeGoldsmith committed Aug 17, 2023
1 parent 97ddeed commit ff8e804
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 58 deletions.
1 change: 1 addition & 0 deletions assemblers/http_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ type HttpEvent struct {
RequestId string
Request *http.Request
Response *http.Response
Timestamp time.Time
Duration time.Duration
SrcIp string
DstIp string
Expand Down
17 changes: 9 additions & 8 deletions assemblers/http_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
)

type httpReader struct {
isClient bool
srcIp string
srcPort string
dstIp string
dstPort string
bytes chan []byte
data []byte
parent *tcpStream
isClient bool
srcIp string
srcPort string
dstIp string
dstPort string
bytes chan []byte
data []byte
parent *tcpStream
timestamp time.Time
}

Expand Down Expand Up @@ -74,6 +74,7 @@ func (h *httpReader) processEvent(entry *entry) {
RequestId: h.parent.ident,
Request: entry.request,
Response: entry.response,
Timestamp: entry.requestTimestamp,
Duration: entry.responseTimestamp.Sub(entry.requestTimestamp),
SrcIp: h.srcIp,
DstIp: h.dstIp,
Expand Down
2 changes: 1 addition & 1 deletion assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (h *tcpAssembler) Start() {

done := h.config.Maxcount > 0 && count >= h.config.Maxcount
if count%h.config.Statsevery == 0 || done {
log.Info().
log.Debug().
Int("processed_count_since_start", count).
Int64("milliseconds_since_start", time.Since(start).Milliseconds()).
Int64("bytes", bytes).
Expand Down
119 changes: 70 additions & 49 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"os"
"os/signal"
"runtime"
"strconv"
"syscall"
"time"

"github.com/honeycombio/ebpf-agent/assemblers"
"github.com/honeycombio/ebpf-agent/bpf/probes"
Expand Down Expand Up @@ -107,62 +109,81 @@ func main() {
}

func handleHttpEvents(events chan assemblers.HttpEvent, client *kubernetes.Clientset) {
ticker := time.NewTicker(time.Second * 10)
for {
select {
case event := <-events:

// create libhoney event
ev := libhoney.NewEvent()

// common attributes
ev.AddField("duration_ms", event.Duration.Microseconds())
ev.AddField(string(semconv.NetSockHostAddrKey), event.SrcIp)
ev.AddField("destination.address", event.DstIp)

// request attributes
if event.Request != nil {
bodySizeString := event.Request.Header.Get("Content-Length")
bodySize, _ := strconv.ParseInt(bodySizeString, 10, 64)
ev.AddField("name", fmt.Sprintf("HTTP %s", event.Request.Method))
ev.AddField(string(semconv.HTTPMethodKey), event.Request.Method)
ev.AddField(string(semconv.HTTPURLKey), event.Request.RequestURI)
ev.AddField("http.request.body", fmt.Sprintf("%v", event.Request.Body))
ev.AddField("http.request.headers", fmt.Sprintf("%v", event.Request.Header))
ev.AddField(string(semconv.UserAgentOriginalKey), event.Request.Header.Get("User-Agent"))
ev.AddField("http.request.body.size", bodySize)
} else {
ev.AddField("name", "HTTP")
ev.AddField("http.request.missing", "no request on this event")
}

// response attributes
if event.Response != nil {
bodySizeString := event.Response.Header.Get("Content-Length")
bodySize, _ := strconv.ParseInt(bodySizeString, 10, 64)

ev.AddField(string(semconv.HTTPStatusCodeKey), event.Response.StatusCode)
ev.AddField("http.response.body", event.Response.Body)
ev.AddField("http.response.headers", event.Response.Header)
ev.AddField("http.response.body.size", bodySize)

} else {
ev.AddField("http.response.missing", "no response on this event")
}

// k8s attributes
k8sEventAttrs := utils.GetK8sEventAttrs(client, event.SrcIp, event.DstIp)
ev.Add(k8sEventAttrs)

err := ev.Send()
if err != nil {
log.Debug().
Err(err).
Msg("error sending event")
}
sendHttpEventToHoneycomb(event, client)
case <-ticker.C:
log.Info().
Int("event queue length", len(events)).
Int("goroutines", runtime.NumGoroutine()).
Msg("Queue length ticker")
}
}
}

func sendHttpEventToHoneycomb(event assemblers.HttpEvent, client *kubernetes.Clientset) {
// create libhoney event
ev := libhoney.NewEvent()

// common attributes
ev.Timestamp = event.Timestamp
ev.AddField("httpEvent_handled_at", time.Now())
ev.AddField("httpEvent_handled_latency", time.Now().Sub(event.Timestamp))
ev.AddField("goroutine_count", runtime.NumGoroutine())
ev.AddField("duration_ms", event.Duration.Microseconds())
ev.AddField(string(semconv.NetSockHostAddrKey), event.SrcIp)
ev.AddField("destination.address", event.DstIp)

// request attributes
if event.Request != nil {
bodySizeString := event.Request.Header.Get("Content-Length")
bodySize, _ := strconv.ParseInt(bodySizeString, 10, 64)
ev.AddField("name", fmt.Sprintf("HTTP %s", event.Request.Method))
ev.AddField(string(semconv.HTTPMethodKey), event.Request.Method)
ev.AddField(string(semconv.HTTPURLKey), event.Request.RequestURI)
ev.AddField("http.request.body", fmt.Sprintf("%v", event.Request.Body))
ev.AddField("http.request.headers", fmt.Sprintf("%v", event.Request.Header))
ev.AddField(string(semconv.UserAgentOriginalKey), event.Request.Header.Get("User-Agent"))
ev.AddField("http.request.body.size", bodySize)
} else {
ev.AddField("name", "HTTP")
ev.AddField("http.request.missing", "no request on this event")
}

// response attributes
if event.Response != nil {
bodySizeString := event.Response.Header.Get("Content-Length")
bodySize, _ := strconv.ParseInt(bodySizeString, 10, 64)

ev.AddField(string(semconv.HTTPStatusCodeKey), event.Response.StatusCode)
ev.AddField("http.response.body", event.Response.Body)
ev.AddField("http.response.headers", event.Response.Header)
ev.AddField("http.response.body.size", bodySize)

} else {
ev.AddField("http.response.missing", "no response on this event")
}

// k8s attributes
// TODO: make this faster; the call to the k8s API takes a bit of time and
// slows the processing of the event queue
// k8sEventAttrs := utils.GetK8sEventAttrs(client, event.SrcIp, event.DstIp)
// ev.Add(k8sEventAttrs)

log.Info().
Time("event.timestamp", ev.Timestamp).
Str("http.url", event.Request.RequestURI).
Msg("Event sent")
err := ev.Send()
if err != nil {
log.Debug().
Err(err).
Msg("error sending event")
}
}

func getEnvOrDefault(key string, defaultValue string) string {
if value, ok := os.LookupEnv(key); ok {
return value
Expand Down

0 comments on commit ff8e804

Please sign in to comment.