Skip to content

Commit

Permalink
feat: Add service k8s metadata to events (#257)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
Kubernetes services are commonly used as load balancers between pod
deployments so the pods can be managed separately from routing traffic
to live pods. However, the agent only appends Kubernetes metadata when
the source and destination IPs of generated events are pods.

This PR updates the process that applies Kubernetes metadata to also
lookup services using the source and destination IPs.

- Closes #251

## Short description of the changes
- Update the k8sCachedClient to index services in the serviceInformer by
it's registered cluster IP
- ClusterIP will be assigned when using services with a type of
ClusterIP, NodePort and LoadBalancer
- Update k8sutils to attempt to lookup a service if pod lookup failed
and if a match is found, apply common service attributes
- Extend both pod and service metadata to include a new
"k8s.resource.type" attribute (will be set to either "pod" or "service")
- Move custom k8s attribute keys to consts
- Update existing libhoney handler test to verify the new resource type
attribute is set
- Add new test where the destination IP of an httpEvent targets a
service and verify the expected attributes are set

## How to verify that this has the expected result
Events generated by the agent now include a resource type attribute and
apply service metadata when the service is routed through a service
resource and not just pods.
  • Loading branch information
MikeGoldsmith committed Oct 3, 2023
1 parent 1fa24dd commit 0dfe201
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 5 deletions.
113 changes: 113 additions & 0 deletions handlers/libhoney_event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ func Test_libhoneyEventHandler_handleEvent(t *testing.T) {
"error": "HTTP client error",
"duration_ms": int64(3),
"user_agent.original": "teapot-checker/1.0",
"source.k8s.resource.type": "pod",
"source.k8s.namespace.name": "unit-tests",
"source.k8s.pod.name": "src-pod",
"source.k8s.pod.uid": srcPod.UID,
"destination.k8s.resource.type": "pod",
"destination.k8s.namespace.name": "unit-tests",
"destination.k8s.pod.name": "dest-pod",
"destination.k8s.pod.uid": destPod.UID,
Expand All @@ -127,6 +129,117 @@ func Test_libhoneyEventHandler_handleEvent(t *testing.T) {
assert.Equal(t, expectedAttrs, attrs)
}

func Test_libhoneyEventHandler_handleEvent_routed_to_service(t *testing.T) {
// TEST SETUP

// Test Data - an assembled HTTP Event
testReqTime := time.Now()
httpEvent := assemblers.HttpEvent{
StreamIdent: "c->s:1->2",
Request: &http.Request{
Method: "GET",
RequestURI: "/check?teapot=true",
ContentLength: 42,
Header: http.Header{"User-Agent": []string{"teapot-checker/1.0"}},
},
Response: &http.Response{
StatusCode: 418,
ContentLength: 84,
},
RequestTimestamp: testReqTime,
ResponseTimestamp: testReqTime.Add(3 * time.Millisecond),
SrcIp: "1.2.3.4",
DstIp: "5.6.7.8",
}

// Test Data - k8s metadata
srcPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "src-pod",
Namespace: "unit-tests",
UID: "src-pod-uid",
},
Status: v1.PodStatus{
PodIP: httpEvent.SrcIp,
},
}

destService := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "dest-service",
Namespace: "unit-tests",
UID: "dest-service-uid",
},
Spec: v1.ServiceSpec{
ClusterIP: httpEvent.DstIp,
},
}

// create a fake k8s clientset with the test pod metadata and start the cached client with it
fakeCachedK8sClient := utils.NewCachedK8sClient(fake.NewSimpleClientset(srcPod, destService))
cancelableCtx, done := context.WithCancel(context.Background())
fakeCachedK8sClient.Start(cancelableCtx)

// create event channel used to pass in events to the handler
eventsChannel := make(chan assemblers.HttpEvent, 1)

wgTest := sync.WaitGroup{} // used to wait for the event handler to finish

// create the event handler with default config, fake k8s client & event channel then start it
handler := NewLibhoneyEventHandler(config.Config{}, fakeCachedK8sClient, eventsChannel, "test")
wgTest.Add(1)
go handler.Start(cancelableCtx, &wgTest)

// Setup libhoney for testing, use mock transmission to retrieve events "sent"
// must be done after the event handler is created
mockTransmission := setupTestLibhoney(t)

// TEST ACTION: pass in httpEvent to handler
eventsChannel <- httpEvent
time.Sleep(10 * time.Millisecond) // give the handler time to process the event

done()
wgTest.Wait()
handler.Close()

// VALIDATE
events := mockTransmission.Events()
assert.Equal(t, 1, len(events), "Expected 1 and only 1 event to be sent")

attrs := events[0].Data
// remove dynamic time-based data before comparing
delete(attrs, "meta.httpEvent_handled_at")
delete(attrs, "meta.httpEvent_request_handled_latency_ms")
delete(attrs, "meta.httpEvent_response_handled_latency_ms")

expectedAttrs := map[string]interface{}{
"name": "HTTP GET",
"client.socket.address": "1.2.3.4",
"server.socket.address": "5.6.7.8",
"meta.stream.ident": "c->s:1->2",
"http.request.method": "GET",
"url.path": "/check?teapot=true",
"http.request.body.size": int64(42),
"http.request.timestamp": testReqTime,
"http.response.timestamp": testReqTime.Add(3 * time.Millisecond),
"http.response.status_code": 418,
"http.response.body.size": int64(84),
"error": "HTTP client error",
"duration_ms": int64(3),
"user_agent.original": "teapot-checker/1.0",
"source.k8s.resource.type": "pod",
"source.k8s.namespace.name": "unit-tests",
"source.k8s.pod.name": "src-pod",
"source.k8s.pod.uid": srcPod.UID,
"destination.k8s.resource.type": "service",
"destination.k8s.namespace.name": "unit-tests",
"destination.k8s.service.name": "dest-service",
"destination.k8s.service.uid": destService.UID,
}

assert.Equal(t, expectedAttrs, attrs)
}

// setupTestLibhoney configures a Libhoney with a mock transmission for testing.
//
// Events sent can be found on the mock transmission:
Expand Down
25 changes: 22 additions & 3 deletions utils/cached_k8s_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

const (
ResyncTime = time.Minute * 5
podByIPIndex = "podIP"
byIPIndex = "ipAddr"
nodeByNameIndex = "nodeName"
)

Expand All @@ -32,11 +32,17 @@ func NewCachedK8sClient(clientset kubernetes.Interface) *CachedK8sClient {
nodeInformer := factory.Core().V1().Nodes().Informer()

podInformer.AddIndexers(map[string]cache.IndexFunc{
podByIPIndex: func(obj interface{}) ([]string, error) {
byIPIndex: func(obj interface{}) ([]string, error) {
pod := obj.(*v1.Pod)
return []string{pod.Status.PodIP}, nil
},
})
serviceInformer.AddIndexers(map[string]cache.IndexFunc{
byIPIndex: func(obj interface{}) ([]string, error) {
service := obj.(*v1.Service)
return []string{service.Spec.ClusterIP}, nil
},
})
nodeInformer.AddIndexers(map[string]cache.IndexFunc{
nodeByNameIndex: func(obj interface{}) ([]string, error) {
node := obj.(*v1.Node)
Expand All @@ -59,7 +65,7 @@ func (c *CachedK8sClient) Start(ctx context.Context) {

// GetPodByIPAddr returns the pod with the given IP address
func (c *CachedK8sClient) GetPodByIPAddr(ipAddr string) *v1.Pod {
val, err := c.podInformer.GetIndexer().ByIndex(podByIPIndex, ipAddr)
val, err := c.podInformer.GetIndexer().ByIndex(byIPIndex, ipAddr)
if err != nil {
log.Err(err).Msg("Error getting pod by IP")
return nil
Expand All @@ -70,6 +76,19 @@ func (c *CachedK8sClient) GetPodByIPAddr(ipAddr string) *v1.Pod {
return val[0].(*v1.Pod)
}

// GetServiceByIPAddr returns the service with the given IP address
func (c *CachedK8sClient) GetServiceByIPAddr(ipAddr string) *v1.Service {
val, err := c.serviceInformer.GetIndexer().ByIndex(byIPIndex, ipAddr)
if err != nil {
log.Err(err).Msg("Error getting service by IP")
return nil
}
if len(val) == 0 {
return nil
}
return val[0].(*v1.Service)
}

// GetServiceForPod returns the service that the given pod is associated with
func (c *CachedK8sClient) GetServiceForPod(pod *v1.Pod) *v1.Service {
podLabels := labels.Set(pod.Labels)
Expand Down
19 changes: 17 additions & 2 deletions utils/k8sutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
)

const (
k8sResourceType = "k8s.resource.type"
k8sResourceTypePod = "pod"
k8sResourceTypeService = "service"
k8sServiceName = "k8s.service.name"
k8sServiceUID = "k8s.service.uid"
)

// GetK8sAttrsForSourceIp returns a map of kubernetes metadata attributes for
// a given IP address. Attribute names will be prefixed with "source.".
func GetK8sAttrsForSourceIp(client *CachedK8sClient, ip string) map[string]any {
Expand Down Expand Up @@ -36,6 +44,7 @@ func GetK8sAttrsForIp(client *CachedK8sClient, ip string, prefix string) map[str
}

if pod := client.GetPodByIPAddr(ip); pod != nil {
k8sAttrs[prefix+k8sResourceType] = k8sResourceTypePod
k8sAttrs[prefix+string(semconv.K8SPodNameKey)] = pod.Name
k8sAttrs[prefix+string(semconv.K8SPodUIDKey)] = pod.UID
k8sAttrs[prefix+string(semconv.K8SNamespaceNameKey)] = pod.Namespace
Expand All @@ -55,9 +64,15 @@ func GetK8sAttrsForIp(client *CachedK8sClient, ip string, prefix string) map[str

if service := client.GetServiceForPod(pod); service != nil {
// no semconv for service yet
k8sAttrs[prefix+"k8s.service.name"] = service.Name
k8sAttrs[prefix+"k8s.service.uid"] = service.UID
k8sAttrs[prefix+k8sServiceName] = service.Name
k8sAttrs[prefix+k8sServiceUID] = service.UID
}
} else if service := client.GetServiceByIPAddr(ip); service != nil {
k8sAttrs[prefix+k8sResourceType] = k8sResourceTypeService
k8sAttrs[prefix+string(semconv.K8SNamespaceNameKey)] = service.Namespace
// no semconv for service yet
k8sAttrs[prefix+k8sServiceName] = service.Name
k8sAttrs[prefix+k8sServiceUID] = service.UID
}

return k8sAttrs
Expand Down

0 comments on commit 0dfe201

Please sign in to comment.