Skip to content

Commit

Permalink
feat: Add k8s metadata to gopacket events (#71)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
Connects gopacket data to k8s metadata

## Short description of the changes
- Add k8s util to gather k8s metadata
- Use k8s util in bpf and gopacket event stream
  • Loading branch information
pkanal committed Aug 16, 2023
1 parent 0793c6e commit 28443ac
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 118 deletions.
14 changes: 7 additions & 7 deletions assemblers/http_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"time"
)

type httpEvent struct {
requestId string
request *http.Request
response *http.Response
duration time.Duration
srcIp string
dstIp string
type HttpEvent struct {
RequestId string
Request *http.Request
Response *http.Response
Duration time.Duration
SrcIp string
DstIp string
}
14 changes: 7 additions & 7 deletions assemblers/http_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
}

func (h *httpReader) processEvent(entry *entry) {
h.parent.events <- httpEvent{
requestId: h.parent.ident,
request: entry.request,
response: entry.response,
duration: entry.responseTimestamp.Sub(entry.requestTimestamp),
srcIp: h.srcIp,
dstIp: h.dstIp,
h.parent.events <- HttpEvent{
RequestId: h.parent.ident,
Request: entry.request,
Response: entry.response,
Duration: entry.responseTimestamp.Sub(entry.requestTimestamp),
SrcIp: h.srcIp,
DstIp: h.dstIp,
}
}
63 changes: 2 additions & 61 deletions assemblers/tcp_assembler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package assemblers
import (
"flag"
"fmt"
"io"
"log"
"os"
"strings"
Expand All @@ -15,8 +14,6 @@ import (
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/reassembly"
"github.com/honeycombio/libhoney-go"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
)

var stats struct {
Expand Down Expand Up @@ -57,10 +54,10 @@ type tcpAssembler struct {
streamFactory *tcpStreamFactory
streamPool *reassembly.StreamPool
assembler *reassembly.Assembler
httpEvents chan httpEvent
httpEvents chan HttpEvent
}

func NewTcpAssembler(config config) tcpAssembler {
func NewTcpAssembler(config config, httpEvents chan HttpEvent) tcpAssembler {
var handle *pcap.Handle
var err error

Expand Down Expand Up @@ -97,7 +94,6 @@ func NewTcpAssembler(config config) tcpAssembler {
packetSource.NoCopy = true
Info("Starting to read packets\n")

httpEvents := make(chan httpEvent, 10000)
streamFactory := NewTcpStreamFactory(httpEvents)
streamPool := reassembly.NewStreamPool(&streamFactory)
assembler := reassembly.NewAssembler(streamPool)
Expand All @@ -116,8 +112,6 @@ func NewTcpAssembler(config config) tcpAssembler {
func (h *tcpAssembler) Start() {

// start up http event handler
// TODO: move this up to main.go level to acces k8s pod metadata
go handleHttpEvents(h.httpEvents)

count := 0
bytes := int64(0)
Expand Down Expand Up @@ -222,56 +216,3 @@ func (h *tcpAssembler) Stop() {
fmt.Printf(" %s:\t\t%d\n", e, errorsMap[e])
}
}

func handleHttpEvents(events chan httpEvent) {
for {
select {
case event := <-events:

ev := libhoney.NewEvent()
ev.AddField("duration_ms", event.duration.Microseconds())
ev.AddField("http.source_ip", event.srcIp)
ev.AddField("http.destination_ip", event.dstIp)
if event.request != nil {
ev.AddField("name", fmt.Sprintf("HTTP %s", event.request.Method))
ev.AddField(string(semconv.HTTPMethodKey), event.request.Method)
ev.AddField(string(semconv.HTTPURLKey), event.request.RequestURI)
ev.AddField("http.request.body", fmt.Sprintf("%v", event.request.Body))
ev.AddField("http.request.headers", fmt.Sprintf("%v", event.request.Header))
} else {
ev.AddField("name", "HTTP")
ev.AddField("http.request.missing", "no request on this event")
}

if event.response != nil {
ev.AddField(string(semconv.HTTPStatusCodeKey), event.response.StatusCode)
ev.AddField("http.response.body", event.response.Body)
ev.AddField("http.response.headers", event.response.Header)
} else {
ev.AddField("http.response.missing", "no request on this event")
}

//TODO: Body size produces a runtime error, commenting out for now.
// requestSize := getBodySize(event.request.Body)
// ev.AddField("http.request.body.size", requestSize)
// responseSize := getBodySize(event.response.Body)
// ev.AddField("http.response.body.size", responseSize)

err := ev.Send()
if err != nil {
log.Printf("error sending event: %v\n", err)
}
}
}
}

func getBodySize(r io.ReadCloser) int {
length := 0
b, err := io.ReadAll(r)
if err == nil {
length = len(b)
r.Close()
}

return length
}
6 changes: 3 additions & 3 deletions assemblers/tcp_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

type tcpStream struct {
id uint64
id uint64
tcpstate *reassembly.TCPSimpleFSM
fsmerr bool
optchecker reassembly.TCPOptionCheck
Expand All @@ -20,8 +20,8 @@ type tcpStream struct {
urls []string
ident string
sync.Mutex
matcher httpMatcher
events chan httpEvent
matcher httpMatcher
events chan HttpEvent
}

func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
Expand Down
4 changes: 2 additions & 2 deletions assemblers/tcp_stream_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ var streamId uint64 = 0

type tcpStreamFactory struct {
wg sync.WaitGroup
httpEvents chan httpEvent
httpEvents chan HttpEvent
}

func NewTcpStreamFactory(httpEvents chan httpEvent) tcpStreamFactory {
func NewTcpStreamFactory(httpEvents chan HttpEvent) tcpStreamFactory {
return tcpStreamFactory{
httpEvents: httpEvents,
}
Expand Down
45 changes: 8 additions & 37 deletions bpf/probes/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,13 @@ import (
"context"
"encoding/binary"
"errors"
"fmt"
"log"
"net"
"os"
"strings"

"github.com/cilium/ebpf/link"
"github.com/cilium/ebpf/perf"
"github.com/honeycombio/ebpf-agent/utils"
"github.com/honeycombio/libhoney-go"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
v1 "k8s.io/api/core/v1"
Expand All @@ -27,7 +26,7 @@ const mapKey uint32 = 0

type manager struct {
bpfObjects bpfObjects
probes []link.Link
probes []link.Link
reader *perf.Reader
client *kubernetes.Clientset
}
Expand Down Expand Up @@ -62,9 +61,9 @@ func New(client *kubernetes.Clientset) manager {

return manager{
bpfObjects: objs,
probes: []link.Link{ kprobeTcpConnect, kprobeTcpClose },
reader: reader,
client: client,
probes: []link.Link{kprobeTcpConnect, kprobeTcpClose},
reader: reader,
client: client,
}
}

Expand Down Expand Up @@ -150,10 +149,6 @@ func sendEvent(event bpfTcpEvent, client *kubernetes.Clientset) {
sourceIpAddr := intToIP(event.Saddr).String()
destIpAddr := intToIP(event.Daddr).String()

destPod := getPodByIPAddr(client, destIpAddr)
sourcePod := getPodByIPAddr(client, sourceIpAddr)
sourceNode := getNodeByPod(client, sourcePod)

ev := libhoney.NewEvent()
ev.AddField("name", "tcp_event")
ev.AddField("duration_ms", (event.EndTime-event.StartTime)/1_000_000) // convert ns to ms
Expand All @@ -163,33 +158,9 @@ func sendEvent(event bpfTcpEvent, client *kubernetes.Clientset) {
ev.AddField(string(semconv.NetHostPortKey), event.Sport)
ev.AddField("destination.port", event.Dport)

// dest pod
ev.AddField(fmt.Sprintf("destination.%s", semconv.K8SPodNameKey), destPod.Name)
ev.AddField(fmt.Sprintf("destination.%s", semconv.K8SPodUIDKey), destPod.UID)

// source pod
ev.AddField(string(semconv.K8SPodNameKey), sourcePod.Name)
ev.AddField(string(semconv.K8SPodUIDKey), sourcePod.UID)

// namespace
ev.AddField(string(semconv.K8SNamespaceNameKey), sourcePod.Namespace)

// service
// no semconv for service yet
ev.AddField("k8s.service.name", getServiceForPod(client, sourcePod).Name)

// node
ev.AddField(string(semconv.K8SNodeNameKey), sourceNode.Name)
ev.AddField(string(semconv.K8SNodeUIDKey), sourceNode.UID)

// container names
if len(sourcePod.Spec.Containers) > 0 {
var containerNames []string
for _, container := range sourcePod.Spec.Containers {
containerNames = append(containerNames, container.Name)
}
ev.AddField(string(semconv.K8SContainerNameKey), strings.Join(containerNames, ","))
}
// k8s metadata
k8sEventAttrs := utils.GetK8sEventAttrs(client, sourceIpAddr, destIpAddr)
ev.Add(k8sEventAttrs)

err := ev.Send()
if err != nil {
Expand Down
63 changes: 62 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"
"io"
"log"
"os"
"os/signal"
Expand All @@ -11,6 +12,7 @@ import (
"github.com/honeycombio/ebpf-agent/bpf/probes"
"github.com/honeycombio/ebpf-agent/utils"
"github.com/honeycombio/libhoney-go"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand Down Expand Up @@ -80,7 +82,9 @@ func main() {
agentConfig := assemblers.NewConfig()

// setup TCP stream reader
assember := assemblers.NewTcpAssembler(*agentConfig)
httpEvents := make(chan assemblers.HttpEvent, 10000)
assember := assemblers.NewTcpAssembler(*agentConfig, httpEvents)
go handleHttpEvents(httpEvents, k8sClient)
go assember.Start()
defer assember.Stop()

Expand All @@ -93,6 +97,63 @@ func main() {
log.Println("Shutting down...")
}

func handleHttpEvents(events chan assemblers.HttpEvent, client *kubernetes.Clientset) {
for {
select {
case event := <-events:

ev := libhoney.NewEvent()
ev.AddField("duration_ms", event.Duration.Microseconds())
ev.AddField(string(semconv.NetSockHostAddrKey), event.SrcIp)
ev.AddField("destination.address", event.DstIp)
if event.Request != nil {
ev.AddField("name", fmt.Sprintf("HTTP %s", event.Request.Method))
ev.AddField(string(semconv.HTTPMethodKey), event.Request.Method)
ev.AddField(string(semconv.HTTPURLKey), event.Request.RequestURI)
ev.AddField("http.request.body", fmt.Sprintf("%v", event.Request.Body))
ev.AddField("http.request.headers", fmt.Sprintf("%v", event.Request.Header))
} else {
ev.AddField("name", "HTTP")
ev.AddField("http.request.missing", "no request on this event")
}

if event.Response != nil {
ev.AddField(string(semconv.HTTPStatusCodeKey), event.Response.StatusCode)
ev.AddField("http.response.body", event.Response.Body)
ev.AddField("http.response.headers", event.Response.Header)
} else {
ev.AddField("http.response.missing", "no response on this event")
}

// k8s metadata
k8sEventAttrs := utils.GetK8sEventAttrs(client, event.SrcIp, event.DstIp)
ev.Add(k8sEventAttrs)

//TODO: Body size produces a runtime error, commenting out for now.
// requestSize := getBodySize(event.request.Body)
// ev.AddField("http.request.body.size", requestSize)
// responseSize := getBodySize(event.response.Body)
// ev.AddField("http.response.body.size", responseSize)

err := ev.Send()
if err != nil {
log.Printf("error sending event: %v\n", err)
}
}
}
}

func getBodySize(r io.ReadCloser) int {
length := 0
b, err := io.ReadAll(r)
if err == nil {
length = len(b)
r.Close()
}

return length
}

func getEnvOrDefault(key string, defaultValue string) string {
if value, ok := os.LookupEnv(key); ok {
return value
Expand Down
Loading

0 comments on commit 28443ac

Please sign in to comment.