This repository has been archived by the owner on Nov 3, 2023. It is now read-only.
/
pods.go
133 lines (113 loc) · 3.77 KB
/
pods.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package k8s
import (
"context"
"fmt"
"time"
"github.com/elastic/assetbeat/input/internal"
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
kube "github.com/elastic/elastic-agent-autodiscover/kubernetes"
"github.com/elastic/elastic-agent-libs/logp"
kuberntescli "k8s.io/client-go/kubernetes"
)
type pod struct {
watcher kube.Watcher
client kuberntescli.Interface
logger *logp.Logger
ctx context.Context
}
// getPodWatcher initiates and returns a watcher of kubernetes pods
func getPodWatcher(ctx context.Context, log *logp.Logger, client kuberntescli.Interface, timeout time.Duration) (kube.Watcher, error) {
watcher, err := kube.NewNamedWatcher("pod", client, &kube.Pod{}, kube.WatchOptions{
SyncTimeout: timeout,
Node: "",
Namespace: "",
HonorReSyncs: true,
}, nil)
if err != nil {
log.Errorf("could not create kubernetes watcher %v", err)
return nil, err
}
p := &pod{
watcher: watcher,
client: client,
logger: log,
ctx: ctx,
}
watcher.AddEventHandler(p)
return watcher, nil
}
// Start starts the eventer
func (p *pod) Start() error {
return p.watcher.Start()
}
// Stop stops the eventer
func (p *pod) Stop() {
p.watcher.Stop()
}
// OnUpdate handles events for pods that have been updated.
func (p *pod) OnUpdate(obj interface{}) {
o := obj.(*kube.Pod)
p.logger.Debugf("Watcher Pod update: %+v", o.Name)
}
// OnDelete stops pod objects that are deleted.
func (p *pod) OnDelete(obj interface{}) {
o := obj.(*kube.Pod)
p.logger.Debugf("Watcher Pod delete: %+v", o.Name)
}
// OnAdd ensures processing of pod objects that are newly added.
func (p *pod) OnAdd(obj interface{}) {
o := obj.(*kube.Pod)
p.logger.Debugf("Watcher Pod add: %+v", o.Name)
}
// publishK8sPods publishes the pod assets stored in pod watcher cache
func publishK8sPods(ctx context.Context, log *logp.Logger, publisher stateless.Publisher, podWatcher, nodeWatcher kube.Watcher) {
log.Info("Publishing pod assets\n")
assetType := "k8s.pod"
assetKind := "container_group"
for _, obj := range podWatcher.Store().List() {
o, ok := obj.(*kube.Pod)
if ok {
log.Debugf("Publish Pod: %+v", o.Name)
assetName := o.Name
assetId := string(o.UID)
assetStartTime := o.Status.StartTime
namespace := o.Namespace
nodeName := o.Spec.NodeName
assetParents := []string{}
if nodeWatcher != nil {
nodeId, err := getNodeIdFromName(nodeName, nodeWatcher)
if err == nil {
nodeAssetName := fmt.Sprintf("%s:%s", "host", nodeId)
assetParents = append(assetParents, nodeAssetName)
} else {
log.Errorf("pod asset parents not collected: %w", err)
}
}
internal.Publish(publisher, nil,
internal.WithAssetKindAndID(assetKind, assetId),
internal.WithAssetType(assetType),
internal.WithAssetParents(assetParents),
internal.WithAssetName(assetName),
internal.WithPodData(assetName, assetId, namespace, assetStartTime),
)
} else {
log.Error("Publishing pod assets failed. Type assertion of pod object failed")
}
}
}