Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically enrich Kubernetes module events #7470

Merged
merged 29 commits into from Jul 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0a08eab
Allow metadata generation for any k8s resource
Jun 23, 2018
392bfbb
Add kubernetes module metadata enricher
Jun 28, 2018
776cf83
Enrich `node` events with Kubernetes metadata
Jun 28, 2018
b280707
Enrich `state_node` events with Kubernetes metadata:
Jun 28, 2018
138440d
Enrich `state_pod` events with Kubernetes metadata
Jun 28, 2018
8a427f1
Enrich `pod` events with Kubernetes metadata
Jun 28, 2018
a486f1b
Add flag to disable metadata enriching
Jun 28, 2018
39f423f
Enrich `state_deployment` with Kubernetes metadata
Jun 29, 2018
0314211
Enrich `state_replicaset` with Kubernetes metadata
Jun 29, 2018
25d0bbb
Enrich `container` metricsets with Kubernetes metadata
Jun 29, 2018
7de196b
make hound happy
Jun 29, 2018
90c7ee8
Disable metadata retrieval in tests
Jul 2, 2018
1a43ff2
Add tests
Jul 3, 2018
7c79197
Add docs
Jul 3, 2018
4870722
Update metricbeat role to get access to watch needed resources
Jul 4, 2018
280884b
Enrich `state_statefulset` with Kubernetes metadata
Jul 6, 2018
d28aae2
Implement Closer interface to stop watcher on shutdown
Jul 6, 2018
eb72c5a
Move nilEnricher to its own implementation
Jul 6, 2018
a2707a9
Update config.yml
Jul 6, 2018
7d3d613
Add a note to meta generator config settings
Jul 6, 2018
c2b0454
fix typo
Jul 6, 2018
18d509f
Move warnings to errors
Jul 9, 2018
b6d8f76
Move `uid` back to `pod.uid`
Jul 10, 2018
99b9f84
make update
Jul 10, 2018
68929d0
Merge branch 'master' into kubernetes-module-metadata
Jul 10, 2018
307734b
Add locking to start/stop
Jul 12, 2018
d582e8b
Merge branch 'master' into kubernetes-module-metadata
Jul 12, 2018
cb88d1e
Add missing license headers
Jul 12, 2018
5dcb137
Merge branch 'master' into kubernetes-module-metadata
Jul 12, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion auditbeat/docs/fields.asciidoc
Expand Up @@ -3312,7 +3312,7 @@ Kubernetes pod name
--
type: keyword

Kubernetes pod uid
Kubernetes Pod UID


--
Expand Down
2 changes: 1 addition & 1 deletion auditbeat/include/fields.go

Large diffs are not rendered by default.

17 changes: 12 additions & 5 deletions deploy/kubernetes/metricbeat-kubernetes.yaml
Expand Up @@ -262,15 +262,22 @@ metadata:
labels:
k8s-app: metricbeat
rules:
- apiGroups: [""] # "" indicates the core API group
- apiGroups: [""]
resources:
- nodes
- namespaces
- events
- pods
verbs:
- get
- watch
- list
verbs: ["get", "list", "watch"]
- apiGroups: ["extensions"]
resources:
- deployments
- replicasets
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources:
- statefulsets
verbs: ["get", "list", "watch"]
---
apiVersion: v1
kind: ServiceAccount
Expand Down
17 changes: 12 additions & 5 deletions deploy/kubernetes/metricbeat/metricbeat-role.yaml
Expand Up @@ -5,12 +5,19 @@ metadata:
labels:
k8s-app: metricbeat
rules:
- apiGroups: [""] # "" indicates the core API group
- apiGroups: [""]
resources:
- nodes
- namespaces
- events
- pods
verbs:
- get
- watch
- list
verbs: ["get", "list", "watch"]
- apiGroups: ["extensions"]
resources:
- deployments
- replicasets
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources:
- statefulsets
verbs: ["get", "list", "watch"]
2 changes: 1 addition & 1 deletion filebeat/docs/fields.asciidoc
Expand Up @@ -2343,7 +2343,7 @@ Kubernetes pod name
--
type: keyword

Kubernetes pod uid
Kubernetes Pod UID


--
Expand Down
2 changes: 1 addition & 1 deletion filebeat/include/fields.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion heartbeat/docs/fields.asciidoc
Expand Up @@ -664,7 +664,7 @@ Kubernetes pod name
--
type: keyword

Kubernetes pod uid
Kubernetes Pod UID


--
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/include/fields.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 44 additions & 25 deletions libbeat/common/kubernetes/metadata.go
Expand Up @@ -26,21 +26,29 @@ import (

// MetaGenerator builds metadata objects for pods and containers
type MetaGenerator interface {
// ResourceMetadata generates metadata for the given kubernetes object taking to account certain filters
ResourceMetadata(obj Resource) common.MapStr

// PodMetadata generates metadata for the given pod taking to account certain filters
PodMetadata(pod *Pod) common.MapStr

// Containermetadata generates metadata for the given container of a pod
ContainerMetadata(pod *Pod, container string) common.MapStr
}

type metaGenerator struct {
IncludeLabels []string `config:"include_labels"`
ExcludeLabels []string `config:"exclude_labels"`
IncludeAnnotations []string `config:"include_annotations"`
IncludePodUID bool `config:"include_pod_uid"`
IncludeCreatorMetadata bool `config:"include_creator_metadata"`
// MetaGeneratorConfig settings
type MetaGeneratorConfig struct {
IncludeLabels []string `config:"include_labels"`
ExcludeLabels []string `config:"exclude_labels"`
IncludeAnnotations []string `config:"include_annotations"`

// Undocumented settings, to be deprecated in favor of `drop_fields` processor:
IncludePodUID bool `config:"include_pod_uid"`
IncludeCreatorMetadata bool `config:"include_creator_metadata"`
}

type metaGenerator = MetaGeneratorConfig

// NewMetaGenerator initializes and returns a new kubernetes metadata generator
func NewMetaGenerator(cfg *common.Config) (MetaGenerator, error) {
// default settings:
Expand All @@ -52,41 +60,37 @@ func NewMetaGenerator(cfg *common.Config) (MetaGenerator, error) {
return &generator, err
}

// PodMetadata generates metadata for the given pod taking to account certain filters
func (g *metaGenerator) PodMetadata(pod *Pod) common.MapStr {
// NewMetaGeneratorFromConfig initializes and returns a new kubernetes metadata generator
func NewMetaGeneratorFromConfig(cfg *MetaGeneratorConfig) MetaGenerator {
return cfg
}

// ResourceMetadata generates metadata for the given kubernetes object taking to account certain filters
func (g *metaGenerator) ResourceMetadata(obj Resource) common.MapStr {
objMeta := obj.GetMetadata()
labelMap := common.MapStr{}
if len(g.IncludeLabels) == 0 {
for k, v := range pod.Metadata.Labels {
for k, v := range obj.GetMetadata().Labels {
safemapstr.Put(labelMap, k, v)
}
} else {
labelMap = generateMapSubset(pod.Metadata.Labels, g.IncludeLabels)
labelMap = generateMapSubset(objMeta.Labels, g.IncludeLabels)
}

// Exclude any labels that are present in the exclude_labels config
for _, label := range g.ExcludeLabels {
delete(labelMap, label)
}

annotationsMap := generateMapSubset(pod.Metadata.Annotations, g.IncludeAnnotations)
meta := common.MapStr{
"pod": common.MapStr{
"name": pod.Metadata.GetName(),
},
"node": common.MapStr{
"name": pod.Spec.GetNodeName(),
},
"namespace": pod.Metadata.GetNamespace(),
}

// Add Pod UID metadata if enabled
if g.IncludePodUID {
safemapstr.Put(meta, "pod.uid", pod.Metadata.GetUid())
annotationsMap := generateMapSubset(objMeta.Annotations, g.IncludeAnnotations)
meta := common.MapStr{}
if objMeta.GetNamespace() != "" {
meta["namespace"] = objMeta.GetNamespace()
}

// Add controller metadata if present
if g.IncludeCreatorMetadata {
for _, ref := range pod.Metadata.OwnerReferences {
for _, ref := range objMeta.OwnerReferences {
if ref.GetController() {
switch ref.GetKind() {
// TODO grow this list as we keep adding more `state_*` metricsets
Expand All @@ -110,6 +114,21 @@ func (g *metaGenerator) PodMetadata(pod *Pod) common.MapStr {
return meta
}

// PodMetadata generates metadata for the given pod taking to account certain filters
func (g *metaGenerator) PodMetadata(pod *Pod) common.MapStr {
podMeta := g.ResourceMetadata(pod)

// Add UID metadata if enabled
if g.IncludePodUID {
safemapstr.Put(podMeta, "pod.uid", pod.GetMetadata().GetUid())
}

safemapstr.Put(podMeta, "pod.name", pod.GetMetadata().GetName())
safemapstr.Put(podMeta, "node.name", pod.Spec.GetNodeName())

return podMeta
}

// Containermetadata generates metadata for the given container of a pod
func (g *metaGenerator) ContainerMetadata(pod *Pod, container string) common.MapStr {
podMeta := g.PodMetadata(pod)
Expand Down
22 changes: 12 additions & 10 deletions libbeat/common/kubernetes/metadata_test.go
Expand Up @@ -28,7 +28,7 @@ import (
)

func TestPodMetadataDeDot(t *testing.T) {
withPodUID, _ := common.NewConfigFrom(map[string]interface{}{"include_pod_uid": true})
withUID, _ := common.NewConfigFrom(map[string]interface{}{"include_pod_uid": true})

UID := "005f3b90-4b9d-12f8-acf0-31020a840133"
Deployment := "Deployment"
Expand All @@ -44,17 +44,18 @@ func TestPodMetadataDeDot(t *testing.T) {
{
pod: &Pod{
Metadata: &metav1.ObjectMeta{
Labels: map[string]string{"a.key": "foo", "a": "bar"},
Uid: &UID,
Labels: map[string]string{"a.key": "foo", "a": "bar"},
Uid: &UID,
Namespace: &test,
},
Spec: &v1.PodSpec{
NodeName: &test,
},
},
meta: common.MapStr{
"pod": common.MapStr{"name": ""},
"namespace": "",
"node": common.MapStr{"name": "test"},
"namespace": "test",
"labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}},
},
config: common.NewConfig(),
Expand All @@ -70,12 +71,14 @@ func TestPodMetadataDeDot(t *testing.T) {
},
},
meta: common.MapStr{
"pod": common.MapStr{"name": "", "uid": "005f3b90-4b9d-12f8-acf0-31020a840133"},
"namespace": "",
"node": common.MapStr{"name": "test"},
"labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}},
"pod": common.MapStr{
"name": "",
"uid": "005f3b90-4b9d-12f8-acf0-31020a840133",
},
"node": common.MapStr{"name": "test"},
"labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}},
},
config: withPodUID,
config: withUID,
},
{
pod: &Pod{
Expand All @@ -101,7 +104,6 @@ func TestPodMetadataDeDot(t *testing.T) {
},
meta: common.MapStr{
"pod": common.MapStr{"name": ""},
"namespace": "",
"node": common.MapStr{"name": "test"},
"labels": common.MapStr{"a": common.MapStr{"value": "bar", "key": "foo"}},
"deployment": common.MapStr{"name": "test"},
Expand Down
14 changes: 14 additions & 0 deletions libbeat/common/kubernetes/types.go
Expand Up @@ -22,7 +22,9 @@ import (
"time"

"github.com/ericchiang/k8s"
appsv1 "github.com/ericchiang/k8s/apis/apps/v1beta1"
"github.com/ericchiang/k8s/apis/core/v1"
extv1 "github.com/ericchiang/k8s/apis/extensions/v1beta1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"
)

Expand All @@ -46,6 +48,9 @@ type PodSpec = v1.PodSpec
// PodStatus data
type PodStatus = v1.PodStatus

// Node data
type Node = v1.Node

// Container data
type Container = v1.Container

Expand All @@ -58,6 +63,15 @@ type Event = v1.Event
// PodContainerStatus data
type PodContainerStatus = v1.ContainerStatus

// Deployment data
type Deployment = appsv1.Deployment

// ReplicaSet data
type ReplicaSet = extv1.ReplicaSet

// StatefulSet data
type StatefulSet = appsv1.StatefulSet

// Time extracts time from k8s.Time type
func Time(t *metav1.Time) time.Time {
return time.Unix(t.GetSeconds(), int64(t.GetNanos()))
Expand Down
47 changes: 46 additions & 1 deletion libbeat/common/kubernetes/watcher.go
Expand Up @@ -24,7 +24,9 @@ import (
"time"

"github.com/ericchiang/k8s"
appsv1 "github.com/ericchiang/k8s/apis/apps/v1beta1"
"github.com/ericchiang/k8s/apis/core/v1"
extv1 "github.com/ericchiang/k8s/apis/extensions/v1beta1"

"github.com/elastic/beats/libbeat/logp"
)
Expand Down Expand Up @@ -109,6 +111,50 @@ func NewWatcher(client *k8s.Client, resource Resource, options WatchOptions) (Wa
}
return rs
}
case *Node:
list := &v1.NodeList{}
w.resourceList = list
w.k8sResourceFactory = func() k8s.Resource { return &v1.Node{} }
w.items = func() []k8s.Resource {
rs := make([]k8s.Resource, 0, len(list.Items))
for _, item := range list.Items {
rs = append(rs, item)
}
return rs
}
case *Deployment:
list := &appsv1.DeploymentList{}
w.resourceList = list
w.k8sResourceFactory = func() k8s.Resource { return &appsv1.Deployment{} }
w.items = func() []k8s.Resource {
rs := make([]k8s.Resource, 0, len(list.Items))
for _, item := range list.Items {
rs = append(rs, item)
}
return rs
}
case *ReplicaSet:
list := &extv1.ReplicaSetList{}
w.resourceList = list
w.k8sResourceFactory = func() k8s.Resource { return &extv1.ReplicaSet{} }
w.items = func() []k8s.Resource {
rs := make([]k8s.Resource, 0, len(list.Items))
for _, item := range list.Items {
rs = append(rs, item)
}
return rs
}
case *StatefulSet:
list := &appsv1.StatefulSetList{}
w.resourceList = list
w.k8sResourceFactory = func() k8s.Resource { return &appsv1.StatefulSet{} }
w.items = func() []k8s.Resource {
rs := make([]k8s.Resource, 0, len(list.Items))
for _, item := range list.Items {
rs = append(rs, item)
}
return rs
}
default:
return nil, fmt.Errorf("unsupported resource type for watching %T", resource)
}
Expand Down Expand Up @@ -163,7 +209,6 @@ func (w *watcher) onDelete(obj Resource) {

// Start watching pods
func (w *watcher) Start() error {

// Make sure that events don't flow into the annotator before informer is fully set up
// Sync initial state:
err := w.sync()
Expand Down
Expand Up @@ -16,7 +16,7 @@
- name: pod.uid
type: keyword
description: >
Kubernetes pod uid
Kubernetes Pod UID

- name: namespace
type: keyword
Expand Down
2 changes: 1 addition & 1 deletion metricbeat/docs/fields.asciidoc
Expand Up @@ -6945,7 +6945,7 @@ Kubernetes pod name
--
type: keyword

Kubernetes pod uid
Kubernetes Pod UID


--
Expand Down