Skip to content

Commit

Permalink
[kubeapiserver/events] Support namespace labels as tags on kubernetes…
Browse files Browse the repository at this point in the history
… events (#25490)

* Add namespace info to workloadmeta

* Add namespace label tags to kubernetes events

* Unit test adding namespace tags

* Format code

* Use namespace workloadmeta info to get pod labels

* Remove apiserver namespace informer factory

* Add changelog entry

* Minor updates to get ns label API call

* Do not fast return namespace labels if namespace collection disabled

* Use assert.ElementsMatch and add comment in events test
  • Loading branch information
jennchenn committed May 13, 2024
1 parent 2d60714 commit 010f0bd
Show file tree
Hide file tree
Showing 19 changed files with 291 additions and 92 deletions.
9 changes: 5 additions & 4 deletions cmd/cluster-agent/api/v1/kubernetes_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func installKubernetesMetadataEndpoints(r *mux.Router, wmeta workloadmeta.Compon
"getNodeLabels",
func(w http.ResponseWriter, r *http.Request) { getNodeLabels(w, r, wmeta) },
)).Methods("GET")
r.HandleFunc("/tags/namespace/{ns}", api.WithTelemetryWrapper("getNamespaceLabels", getNamespaceLabels)).Methods("GET")
r.HandleFunc("/tags/namespace/{ns}", api.WithTelemetryWrapper("getNamespaceLabels", func(w http.ResponseWriter, r *http.Request) { getNamespaceLabels(w, r, wmeta) })).Methods("GET")
r.HandleFunc("/cluster/id", api.WithTelemetryWrapper("getClusterID", getClusterID)).Methods("GET")
}

Expand Down Expand Up @@ -110,7 +110,7 @@ func getNodeAnnotations(w http.ResponseWriter, r *http.Request, wmeta workloadme
}

// getNamespaceLabels is only used when the node agent hits the DCA for the list of labels
func getNamespaceLabels(w http.ResponseWriter, r *http.Request) {
func getNamespaceLabels(w http.ResponseWriter, r *http.Request, wmeta workloadmeta.Component) {
/*
Input
localhost:5001/api/v1/tags/namespace/default
Expand All @@ -131,12 +131,13 @@ func getNamespaceLabels(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
var labelBytes []byte
nsName := vars["ns"]
nsLabels, err := as.GetNamespaceLabels(nsName)
namespace, err := wmeta.GetKubernetesNamespace(nsName)
if err != nil {
log.Errorf("Could not retrieve the namespace labels of %s: %v", nsName, err.Error()) //nolint:errcheck
log.Errorf("Could not retrieve the namespace %s: %v", nsName, err.Error()) //nolint:errcheck
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
nsLabels := namespace.Labels
labelBytes, err = json.Marshal(nsLabels)
if err != nil {
log.Errorf("Could not process the labels of the namespace %s from the informer's cache: %v", nsName, err.Error()) //nolint:errcheck
Expand Down
26 changes: 25 additions & 1 deletion comp/core/tagger/taggerimpl/collectors/workloadmeta_extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (c *WorkloadMetaCollector) processEvents(evBundle workloadmeta.EventBundle)
case workloadmeta.KindKubernetesNode:
tagInfos = append(tagInfos, c.handleKubeNode(ev)...)
case workloadmeta.KindKubernetesNamespace:
// tagInfos = append(tagInfos, c.handleKubeNamespace(ev)...) No tags for now
tagInfos = append(tagInfos, c.handleKubeNamespace(ev)...)
case workloadmeta.KindECSTask:
tagInfos = append(tagInfos, c.handleECSTask(ev)...)
case workloadmeta.KindContainerImageMetadata:
Expand Down Expand Up @@ -437,6 +437,30 @@ func (c *WorkloadMetaCollector) handleKubeNode(ev workloadmeta.Event) []*types.T
return tagInfos
}

func (c *WorkloadMetaCollector) handleKubeNamespace(ev workloadmeta.Event) []*types.TagInfo {
namespace := ev.Entity.(*workloadmeta.KubernetesNamespace)

tags := utils.NewTagList()

for name, value := range namespace.Labels {
utils.AddMetadataAsTags(name, value, c.nsLabelsAsTags, c.globNsLabels, tags)
}

low, orch, high, standard := tags.Compute()
tagInfos := []*types.TagInfo{
{
Source: nodeSource,
Entity: buildTaggerEntityID(namespace.EntityID),
HighCardTags: high,
OrchestratorCardTags: orch,
LowCardTags: low,
StandardTags: standard,
},
}

return tagInfos
}

func (c *WorkloadMetaCollector) handleECSTask(ev workloadmeta.Event) []*types.TagInfo {
task := ev.Entity.(*workloadmeta.ECSTask)

Expand Down
91 changes: 90 additions & 1 deletion comp/core/tagger/taggerimpl/collectors/workloadmeta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ import (
"sort"
"testing"

"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/log/logimpl"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/comp/core/tagger/utils"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta"
"github.com/DataDog/datadog-agent/pkg/util/fxutil"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes"
"go.uber.org/fx"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -606,6 +607,94 @@ func TestHandleKubePodNoContainerName(t *testing.T) {
}
}

func TestHandleKubeNamespace(t *testing.T) {
const namespace = "foobar"

namespaceEntityID := workloadmeta.EntityID{
Kind: workloadmeta.KindKubernetesNamespace,
ID: namespace,
}

namespaceTaggerEntityID := fmt.Sprintf("namespace://%s", namespaceEntityID.ID)

store := fxutil.Test[workloadmeta.Mock](t, fx.Options(
logimpl.MockModule(),
config.MockModule(),
fx.Supply(workloadmeta.NewParams()),
fx.Supply(context.Background()),
workloadmeta.MockModule(),
))

store.Set(&workloadmeta.Container{
EntityID: workloadmeta.EntityID{
Kind: workloadmeta.KindKubernetesNamespace,
ID: namespace,
},
EntityMeta: workloadmeta.EntityMeta{
Name: namespace,
},
})

tests := []struct {
name string
labelsAsTags map[string]string
annotationsAsTags map[string]string
nsLabelsAsTags map[string]string
namespace workloadmeta.KubernetesNamespace
expected []*types.TagInfo
}{
{
name: "fully formed namespace",
nsLabelsAsTags: map[string]string{
"ns_env": "ns_env",
"ns-ownerteam": "ns-team",
},
namespace: workloadmeta.KubernetesNamespace{
EntityID: namespaceEntityID,
EntityMeta: workloadmeta.EntityMeta{
Name: namespace,
Labels: map[string]string{
"ns_env": "dev",
"ns-ownerteam": "containers",
"foo": "bar",
},
},
},
expected: []*types.TagInfo{
{
Source: nodeSource,
Entity: namespaceTaggerEntityID,
HighCardTags: []string{},
OrchestratorCardTags: []string{},
LowCardTags: []string{
"ns_env:dev",
"ns-team:containers",
},
StandardTags: []string{},
},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
collector := &WorkloadMetaCollector{
store: store,
children: make(map[string]map[string]struct{}),
}

collector.initPodMetaAsTags(tt.labelsAsTags, tt.annotationsAsTags, tt.nsLabelsAsTags)

actual := collector.handleKubeNamespace(workloadmeta.Event{
Type: workloadmeta.EventTypeSet,
Entity: &tt.namespace,
})

assertTagInfoListEqual(t, tt.expected, actual)
})
}
}

func TestHandleECSTask(t *testing.T) {
const (
containerID = "foobarquux"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ func (r *reflectorStore) Delete(obj interface{}) error {
uid = v.UID
case *appsv1.Deployment:
uid = v.UID
case *corev1.Namespace:
uid = v.UID
default:
return fmt.Errorf("failed to identify Kind of object: %#v", obj)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (

"k8s.io/apimachinery/pkg/util/sets"

"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/core/workloadmeta"
apiv1 "github.com/DataDog/datadog-agent/pkg/clusteragent/api/v1"
"github.com/DataDog/datadog-agent/pkg/config"
Expand All @@ -25,7 +27,6 @@ import (
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/kubelet"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/util/retry"
"go.uber.org/fx"
)

const (
Expand Down Expand Up @@ -229,7 +230,7 @@ func (c *collector) parsePods(
}

var nsLabels map[string]string
nsLabels, err = c.getNamespaceLabels(apiserver.GetNamespaceLabels, pod.Metadata.Namespace)
nsLabels, err = c.getNamespaceLabels(pod.Metadata.Namespace)
if err != nil {
log.Debugf("Could not fetch namespace labels for pod %s/%s: %v", pod.Metadata.Namespace, pod.Metadata.Name, err)
}
Expand Down Expand Up @@ -289,16 +290,12 @@ func (c *collector) getMetadata(getPodMetaDataFromAPIServerFunc func(string, str
}

// getNamespaceLabels returns the namespace labels, fast return if namespace labels as tags is disabled.
func (c *collector) getNamespaceLabels(getNamespaceLabelsFromAPIServerFunc func(string) (map[string]string, error), ns string) (map[string]string, error) {
if !c.collectNamespaceLabels {
func (c *collector) getNamespaceLabels(ns string) (map[string]string, error) {
if !c.collectNamespaceLabels || !c.isDCAEnabled() {
return nil, nil
}

if c.isDCAEnabled() {
getNamespaceLabelsFromAPIServerFunc = c.dcaClient.GetNamespaceLabels
}

return getNamespaceLabelsFromAPIServerFunc(ns)
return c.dcaClient.GetNamespaceLabels(ns)
}

func (c *collector) isDCAEnabled() bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,14 +296,10 @@ func TestKubeMetadataCollector_getNamespaceLabels(t *testing.T) {
dcaClient clusteragent.DCAClientInterface
clusterAgentEnabled bool
}
type args struct {
getNamespaceLabelsFromAPIServerFunc func(string) (map[string]string, error)
}

tests := []struct {
name string
fields fields
args args
namespaceLabelsAsTags map[string]string
want map[string]string
wantErr bool
Expand All @@ -315,30 +311,6 @@ func TestKubeMetadataCollector_getNamespaceLabels(t *testing.T) {
},
{
name: "cluster agent not enabled",
args: args{
getNamespaceLabelsFromAPIServerFunc: func(string) (map[string]string, error) {
return map[string]string{
"label": "value",
}, nil
},
},
fields: fields{
clusterAgentEnabled: false,
dcaClient: &FakeDCAClient{},
},
namespaceLabelsAsTags: map[string]string{
"label": "tag",
},
want: map[string]string{"label": "tag"},
wantErr: false,
},
{
name: "cluster agent not enabled and failed to get namespace labels",
args: args{
getNamespaceLabelsFromAPIServerFunc: func(string) (map[string]string, error) {
return nil, errors.New("failed to get namespace labels")
},
},
fields: fields{
clusterAgentEnabled: false,
dcaClient: &FakeDCAClient{},
Expand All @@ -347,11 +319,10 @@ func TestKubeMetadataCollector_getNamespaceLabels(t *testing.T) {
"label": "tag",
},
want: nil,
wantErr: true,
wantErr: false,
},
{
name: "cluster agent enabled",
args: args{},
fields: fields{
clusterAgentEnabled: true,
dcaClient: &FakeDCAClient{
Expand All @@ -369,7 +340,6 @@ func TestKubeMetadataCollector_getNamespaceLabels(t *testing.T) {
},
{
name: "cluster agent enabled and failed to get namespace labels",
args: args{},
fields: fields{
clusterAgentEnabled: true,
dcaClient: &FakeDCAClient{
Expand All @@ -392,7 +362,7 @@ func TestKubeMetadataCollector_getNamespaceLabels(t *testing.T) {
collectNamespaceLabels: len(tt.namespaceLabelsAsTags) > 0,
}

labels, err := c.getNamespaceLabels(tt.args.getNamespaceLabelsFromAPIServerFunc, "foo")
labels, err := c.getNamespaceLabels("foo")
assert.True(t, (err != nil) == tt.wantErr)
assert.EqualValues(&testing.T{}, tt.want, labels)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@
package kubernetesapiserver

import (
"github.com/DataDog/datadog-agent/pkg/metrics/event"
v1 "k8s.io/api/core/v1"

"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/pkg/metrics/event"
)

func newBundledTransformer(clusterName string) eventTransformer {
func newBundledTransformer(clusterName string, taggerInstance tagger.Component) eventTransformer {
return &bundledTransformer{
clusterName: clusterName,
clusterName: clusterName,
taggerInstance: taggerInstance,
}
}

type bundledTransformer struct {
clusterName string
clusterName string
taggerInstance tagger.Component
}

func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []error) {
Expand Down Expand Up @@ -60,7 +64,7 @@ func (c *bundledTransformer) Transform(events []*v1.Event) ([]event.Event, []err
datadogEvs := make([]event.Event, 0, len(bundlesByObject))

for id, bundle := range bundlesByObject {
datadogEv, err := bundle.formatEvents()
datadogEv, err := bundle.formatEvents(c.taggerInstance)
if err != nil {
errors = append(errors, err)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ import (

v1 "k8s.io/api/core/v1"

"github.com/patrickmn/go-cache"

"github.com/DataDog/datadog-agent/comp/core/tagger"
"github.com/DataDog/datadog-agent/comp/core/tagger/types"
"github.com/DataDog/datadog-agent/pkg/metrics/event"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes"
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/patrickmn/go-cache"
)

var (
Expand All @@ -44,7 +47,7 @@ func getDDAlertType(k8sType string) event.EventAlertType {
}
}

func getInvolvedObjectTags(involvedObject v1.ObjectReference) []string {
func getInvolvedObjectTags(involvedObject v1.ObjectReference, taggerInstance tagger.Component) []string {
// NOTE: we now standardized on using kube_* tags, instead of
// non-namespaced ones, or kubernetes_*. The latter two are now
// considered deprecated.
Expand All @@ -64,6 +67,12 @@ func getInvolvedObjectTags(involvedObject v1.ObjectReference) []string {
// DEPRECATED:
fmt.Sprintf("namespace:%s", involvedObject.Namespace),
)

namespaceEntityID := fmt.Sprintf("namespace://%s", involvedObject.Namespace)
namespaceEntity, err := taggerInstance.GetEntity(namespaceEntityID)
if err == nil {
tags = append(tags, namespaceEntity.GetTags(types.HighCardinality)...)
}
}

kindTag := getKindTag(involvedObject.Kind, involvedObject.Name)
Expand Down

0 comments on commit 010f0bd

Please sign in to comment.