Skip to content

Commit

Permalink
feat: Add cached k8s client (#84)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

Uses the kubernetes client directly to query resource details (eg pod,
service and nodes) is both expensive and inefficient because it does
network requests to the cluster each time. This also led to delays in
processing events because the cluster API would enforce backoffs to
repeated frequent requests.

This PR introduces a cached client using informers which sets up
watchers for the resources we care about and creates in-memory caches
for us to query instead.

- Closes #86 

## Short description of the changes
- Create utils.CachedK8sClient
- Setup and start the cache in main.go
- Replace direct usage of k8s client with new cached version instead
- Re-enable k8s metadata attrs when processing events
- Refactor k8s metadata function to only add metadata for resources it
can find (pod, service, etc)

## How to verify that this has the expected result
- k8s metadata is now visible on sent events
- There is no slow-down of events because of API query backoffs

---------

Co-authored-by: Purvi Kanal <kanal.purvi@gmail.com>
  • Loading branch information
MikeGoldsmith and pkanal committed Aug 21, 2023
1 parent 380bc11 commit 57028ac
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 86 deletions.
7 changes: 3 additions & 4 deletions bpf/probes/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/honeycombio/libhoney-go"
"github.com/rs/zerolog/log"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
"k8s.io/client-go/kubernetes"
)

//go:generate go run github.com/cilium/ebpf/cmd/bpf2go -target amd64,arm64 -cc clang -cflags $CFLAGS bpf source/tcp_probe.c
Expand All @@ -22,10 +21,10 @@ type manager struct {
bpfObjects bpfObjects
probes []link.Link
reader *perf.Reader
client *kubernetes.Clientset
client *utils.CachedK8sClient
}

func New(client *kubernetes.Clientset) manager {
func New(client *utils.CachedK8sClient) manager {
// Load pre-compiled programs and maps into the kernel.
objs := bpfObjects{}
if err := loadBpfObjects(&objs, nil); err != nil {
Expand Down Expand Up @@ -95,7 +94,7 @@ func (m *manager) Stop() {
}

// Send event to Honeycomb
func sendEvent(event bpfTcpEvent, client *kubernetes.Clientset) {
func sendEvent(event bpfTcpEvent, client *utils.CachedK8sClient) {

sourceIpAddr := intToIP(event.Saddr).String()
destIpAddr := intToIP(event.Daddr).String()
Expand Down
25 changes: 14 additions & 11 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -80,13 +81,18 @@ func main() {

// creates the clientset
k8sClient, err := kubernetes.NewForConfig(k8sConfig)

if err != nil {
panic(err.Error())
}

// create k8s monitor that caches k8s objects
ctx, done := context.WithCancel(context.Background())
defer done()
cachedK8sClient := utils.NewCachedK8sClient(k8sClient)
cachedK8sClient.Start(ctx)

// setup probes
p := probes.New(k8sClient)
p := probes.New(cachedK8sClient)
go p.Start()
defer p.Stop()

Expand All @@ -95,7 +101,7 @@ func main() {
// setup TCP stream reader
httpEvents := make(chan assemblers.HttpEvent, 10000)
assember := assemblers.NewTcpAssembler(*agentConfig, httpEvents)
go handleHttpEvents(httpEvents, k8sClient)
go handleHttpEvents(httpEvents, cachedK8sClient)
go assember.Start()
defer assember.Stop()

Expand All @@ -108,7 +114,7 @@ func main() {
log.Info().Msg("Shutting down...")
}

func handleHttpEvents(events chan assemblers.HttpEvent, client *kubernetes.Clientset) {
func handleHttpEvents(events chan assemblers.HttpEvent, client *utils.CachedK8sClient) {
ticker := time.NewTicker(time.Second * 10)
for {
select {
Expand All @@ -123,7 +129,7 @@ func handleHttpEvents(events chan assemblers.HttpEvent, client *kubernetes.Clien
}
}

func sendHttpEventToHoneycomb(event assemblers.HttpEvent, client *kubernetes.Clientset) {
func sendHttpEventToHoneycomb(event assemblers.HttpEvent, k8sClient *utils.CachedK8sClient) {
// create libhoney event
ev := libhoney.NewEvent()

Expand Down Expand Up @@ -166,13 +172,10 @@ func sendHttpEventToHoneycomb(event assemblers.HttpEvent, client *kubernetes.Cli
ev.AddField("http.response.missing", "no response on this event")
}

// k8s attributes
// TODO: make this faster; the call to the k8s API takes a bit of time and
// slows the processing of the event queue
// k8sEventAttrs := utils.GetK8sEventAttrs(client, event.SrcIp, event.DstIp)
// ev.Add(k8sEventAttrs)
k8sEventAttrs := utils.GetK8sEventAttrs(k8sClient, event.SrcIp, event.DstIp)
ev.Add(k8sEventAttrs)

log.Info().
log.Debug().
Time("event.timestamp", ev.Timestamp).
Str("http.url", event.Request.RequestURI).
Msg("Event sent")
Expand Down
130 changes: 130 additions & 0 deletions utils/cached_k8s_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package utils

import (
"context"
"time"

"github.com/rs/zerolog/log"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

const (
ResyncTime = time.Minute * 5
)

type CachedK8sClient struct {
factory informers.SharedInformerFactory
nodeInformer cache.SharedInformer
podInformer cache.SharedInformer
serviceInformer cache.SharedInformer
}

func NewCachedK8sClient(client *kubernetes.Clientset) *CachedK8sClient {
factory := informers.NewSharedInformerFactory(client, ResyncTime)
podInformer := factory.Core().V1().Pods().Informer()
serviceInformer := factory.Core().V1().Services().Informer()
nodeInformer := factory.Core().V1().Nodes().Informer()
// TODO: add custom indexes to improve lookup speed
// - podinformer: pod IP
// - serviceinformer: by pod name
// - nodeinformer: by pod IP

return &CachedK8sClient{
factory: factory,
nodeInformer: nodeInformer,
podInformer: podInformer,
serviceInformer: serviceInformer,
}
}

func (c *CachedK8sClient) Start(ctx context.Context) {
c.factory.Start(ctx.Done())
c.factory.WaitForCacheSync(ctx.Done())
}

func (c *CachedK8sClient) GetNodes() []*v1.Node {
var nodes []*v1.Node
items := c.nodeInformer.GetStore().List()
for _, node := range items {
nodes = append(nodes, node.(*v1.Node))
}
return nodes
}

func (c *CachedK8sClient) GetPods() []*v1.Pod {
var pods []*v1.Pod
items := c.podInformer.GetStore().List()
for _, pod := range items {
pods = append(pods, pod.(*v1.Pod))
}
return pods
}

func (c *CachedK8sClient) GetServices() ([]*v1.Service, error) {
var services []*v1.Service
items := c.serviceInformer.GetStore().List()
for _, service := range items {
services = append(services, service.(*v1.Service))
}
return services, nil
}

func (c *CachedK8sClient) GetPodsWithSelector(selector labels.Selector) ([]*v1.Pod, error) {
pods := c.GetPods()
var matchedPods []*v1.Pod
for _, pod := range pods {
if selector.Matches(labels.Set(pod.Labels)) {
matchedPods = append(matchedPods, pod)
}
}
return matchedPods, nil
}

func (c *CachedK8sClient) GetPodByIPAddr(ipAddr string) *v1.Pod {
pods := c.GetPods()
var matchedPod *v1.Pod
for _, pod := range pods {
if ipAddr == pod.Status.PodIP || ipAddr == pod.Status.HostIP {
matchedPod = pod
}
}
return matchedPod
}

func (c *CachedK8sClient) GetServiceForPod(inputPod *v1.Pod) *v1.Service {
services, err := c.GetServices()
if err != nil {
log.Error().Err(err).Msg("Failed to get service for pod")
}
var matchedService *v1.Service
for _, service := range services {
set := labels.Set(service.Spec.Selector)
pods, err := c.GetPodsWithSelector(set.AsSelector())
if err != nil {
log.Error().Str("msg", "failed to get service for pod").Msg("failed to get pods")
}
for _, pod := range pods {
if pod.Name == inputPod.Name {
matchedService = service
}
}
}
return matchedService
}

func (c *CachedK8sClient) GetNodeByPod(pod *v1.Pod) *v1.Node {
nodes := c.GetNodes()
var matchedNode *v1.Node
for _, node := range nodes {
for _, addr := range node.Status.Addresses {
if addr.Address == pod.Status.HostIP {
matchedNode = node
}
}
}
return matchedNode
}
99 changes: 28 additions & 71 deletions utils/k8sutils.go
Original file line number Diff line number Diff line change
@@ -1,91 +1,48 @@
package utils

import (
"context"
"fmt"
"log"
"strings"

"github.com/rs/zerolog/log"
semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
)

func getPodByIPAddr(client *kubernetes.Clientset, ipAddr string) v1.Pod {
pods, _ := client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
func GetK8sEventAttrs(client *CachedK8sClient, srcIp string, dstIp string) map[string]any {
log.Debug().
Str("src_ip", srcIp).
Str("dst_ip", dstIp).
Msg("Getting k8s event attrs")

var matchedPod v1.Pod
k8sEventAttrs := map[string]any{}

for _, pod := range pods.Items {
if ipAddr == pod.Status.PodIP {
matchedPod = pod
}
}
if srcPod := client.GetPodByIPAddr(srcIp); srcPod != nil {
k8sEventAttrs[string(semconv.K8SPodNameKey)] = srcPod.Name
k8sEventAttrs[string(semconv.K8SPodUIDKey)] = srcPod.UID
k8sEventAttrs[string(semconv.K8SNamespaceNameKey)] = srcPod.Namespace

return matchedPod
}

func getServiceForPod(client *kubernetes.Clientset, inputPod v1.Pod) v1.Service {
// get list of services
services, _ := client.CoreV1().Services(v1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
var matchedService v1.Service
// loop over services
for _, service := range services.Items {
set := labels.Set(service.Spec.Selector)
listOptions := metav1.ListOptions{LabelSelector: set.AsSelector().String()}
pods, err := client.CoreV1().Pods(v1.NamespaceAll).List(context.TODO(), listOptions)
if err != nil {
log.Println(err)
}
for _, pod := range pods.Items {
if pod.Name == inputPod.Name {
matchedService = service
if len(srcPod.Spec.Containers) > 0 {
var containerNames []string
for _, container := range srcPod.Spec.Containers {
containerNames = append(containerNames, container.Name)
}
k8sEventAttrs[string(semconv.K8SContainerNameKey)] = strings.Join(containerNames, ",")
}
}

return matchedService
}

func getNodeByPod(client *kubernetes.Clientset, pod v1.Pod) *v1.Node {
node, _ := client.CoreV1().Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{})
return node
}

func GetK8sEventAttrs(client *kubernetes.Clientset, srcIp string, dstIp string) map[string]any {
dstPod := getPodByIPAddr(client, dstIp)
srcPod := getPodByIPAddr(client, srcIp)
srcNode := getNodeByPod(client, srcPod)
service := getServiceForPod(client, srcPod)

k8sEventAttrs := map[string]any{
// dest pod
fmt.Sprintf("destination.%s", semconv.K8SPodNameKey): dstPod.Name,
fmt.Sprintf("destination.%s", semconv.K8SPodUIDKey): dstPod.UID,

// source pod
string(semconv.K8SPodNameKey): srcPod.Name,
string(semconv.K8SPodUIDKey): srcPod.UID,

// namespace
string(semconv.K8SNamespaceNameKey): srcPod.Namespace,

// service
// no semconv for service yet
"k8s.service.name": service.Name,
if srcNode := client.GetNodeByPod(srcPod); srcNode != nil {
k8sEventAttrs[string(semconv.K8SNodeNameKey)] = srcNode.Name
k8sEventAttrs[string(semconv.K8SNodeUIDKey)] = srcNode.UID
}

// node
string(semconv.K8SNodeNameKey): srcNode.Name,
string(semconv.K8SNodeUIDKey): srcNode.UID,
}
if len(srcPod.Spec.Containers) > 0 {
var containerNames []string
for _, container := range srcPod.Spec.Containers {
containerNames = append(containerNames, container.Name)
if service := client.GetServiceForPod(srcPod); service != nil {
// no semconv for service yet
k8sEventAttrs["k8s.service.name"] = service.Name
}
k8sEventAttrs[string(semconv.K8SContainerNameKey)] = strings.Join(containerNames, ",")
}

if dstPod := client.GetPodByIPAddr(dstIp); dstPod != nil {
k8sEventAttrs[fmt.Sprintf("destination.%s", semconv.K8SPodNameKey)] = dstPod.Name
k8sEventAttrs[fmt.Sprintf("destination.%s", semconv.K8SPodUIDKey)] = dstPod.UID
}

return k8sEventAttrs
Expand Down

0 comments on commit 57028ac

Please sign in to comment.