/
informer.go
375 lines (338 loc) · 11.6 KB
/
informer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
package kube
import (
"context"
"fmt"
"log/slog"
"os"
"path"
"strings"
"time"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
const (
kubeConfigEnvVariable = "KUBECONFIG"
syncTime = 10 * time.Minute
IndexPodByContainerIDs = "idx_pod_by_container"
IndexReplicaSetNames = "idx_rs"
)
func klog() *slog.Logger {
return slog.With("component", "kube.Metadata")
}
// ContainerEventHandler listens for the deletion of containers, as triggered
// by a Pod deletion.
type ContainerEventHandler interface {
OnDeletion(containerID []string)
}
// Metadata stores an in-memory copy of the different Kubernetes objects whose metadata is relevant to us.
type Metadata struct {
// pods and replicaSets cache the different K8s types to custom, smaller object types
pods cache.SharedIndexInformer
replicaSets cache.SharedIndexInformer
containerEventHandlers []ContainerEventHandler
}
// PodInfo contains precollected metadata for Pods, Nodes and Services.
// Not all the fields are populated for all the above types. To save
// memory, we just keep in memory the necessary data for each Type.
// For more information about which fields are set for each type, please
// refer to the instantiation function of the respective informers.
type PodInfo struct {
// Informers need that internal object is an ObjectMeta instance
metav1.ObjectMeta
NodeName string
Owner *Owner
// StartTimeStr caches value of ObjectMeta.StartTimestamp.String()
StartTimeStr string
ContainerIDs []string
IPs []string
}
type ReplicaSetInfo struct {
metav1.ObjectMeta
DeploymentName string
}
func qName(namespace, name string) string {
return namespace + "/" + name
}
var podIndexer = cache.Indexers{
IndexPodByContainerIDs: func(obj interface{}) ([]string, error) {
pi := obj.(*PodInfo)
return pi.ContainerIDs, nil
},
}
// usually all the data required by the discovery and enrichement is inside
// te v1.Pod object. However, when the Pod object has a ReplicaSet as owner,
// if the ReplicaSet is owned by a Deployment, the reported Pod Owner should
// be the Deployment, as the Replicaset is just an intermediate entity
// used by the Deployment that it's actually defined by the user
var replicaSetIndexer = cache.Indexers{
IndexReplicaSetNames: func(obj interface{}) ([]string, error) {
rs := obj.(*ReplicaSetInfo)
return []string{qName(rs.Namespace, rs.Name)}, nil
},
}
// GetContainerPod fetches metadata from a Pod given the name of one of its containers
func (k *Metadata) GetContainerPod(containerID string) (*PodInfo, bool) {
objs, err := k.pods.GetIndexer().ByIndex(IndexPodByContainerIDs, containerID)
if err != nil {
klog().Debug("error accessing index by container ID. Ignoring", "error", err, "containerID", containerID)
return nil, false
}
if len(objs) == 0 {
return nil, false
}
return objs[0].(*PodInfo), true
}
func (k *Metadata) initPodInformer(informerFactory informers.SharedInformerFactory) error {
log := klog().With("informer", "Pod")
pods := informerFactory.Core().V1().Pods().Informer()
k.initContainerListeners(log, pods)
// Transform any *v1.Pod instance into a *PodInfo instance to save space
// in the informer's cache
if err := pods.SetTransform(func(i interface{}) (interface{}, error) {
pod, ok := i.(*v1.Pod)
if !ok {
// it's Ok. The K8s library just informed from an entity
// that has been previously transformed/stored
if pi, ok := i.(*PodInfo); ok {
return pi, nil
}
return nil, fmt.Errorf("was expecting a Pod. Got: %T", i)
}
containerIDs := make([]string, 0,
len(pod.Status.ContainerStatuses)+
len(pod.Status.InitContainerStatuses)+
len(pod.Status.EphemeralContainerStatuses))
for i := range pod.Status.ContainerStatuses {
containerIDs = append(containerIDs,
rmContainerIDSchema(pod.Status.ContainerStatuses[i].ContainerID))
}
for i := range pod.Status.InitContainerStatuses {
containerIDs = append(containerIDs,
rmContainerIDSchema(pod.Status.InitContainerStatuses[i].ContainerID))
}
for i := range pod.Status.EphemeralContainerStatuses {
containerIDs = append(containerIDs,
rmContainerIDSchema(pod.Status.EphemeralContainerStatuses[i].ContainerID))
}
ips := make([]string, 0, len(pod.Status.PodIPs))
for _, ip := range pod.Status.PodIPs {
// ignoring host-networked Pod IPs
if ip.IP != pod.Status.HostIP {
ips = append(ips, ip.IP)
}
}
owner := OwnerFromPodInfo(pod)
startTime := pod.GetCreationTimestamp().String()
if log.Enabled(context.TODO(), slog.LevelDebug) {
log.Debug("inserting pod", "name", pod.Name, "namespace", pod.Namespace,
"uid", pod.UID, "owner", owner,
"node", pod.Spec.NodeName, "startTime", startTime,
"containerIDs", containerIDs)
}
return &PodInfo{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
UID: pod.UID,
Labels: pod.Labels,
},
Owner: owner,
NodeName: pod.Spec.NodeName,
StartTimeStr: startTime,
ContainerIDs: containerIDs,
IPs: ips,
}, nil
}); err != nil {
return fmt.Errorf("can't set pods transform: %w", err)
}
if err := pods.AddIndexers(podIndexer); err != nil {
return fmt.Errorf("can't add indexers to Pods informer: %w", err)
}
k.pods = pods
return nil
}
// initContainerListeners listens for deletions of pods, to forward them to the ContainerEventHandler subscribers.
func (k *Metadata) initContainerListeners(log *slog.Logger, pods cache.SharedIndexInformer) {
if _, err := pods.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
pod := obj.(*PodInfo)
log.Debug("deleting containers for pod", "pod", pod.Name, "containers", pod.ContainerIDs)
for _, listener := range k.containerEventHandlers {
listener.OnDeletion(pod.ContainerIDs)
}
},
}); err != nil {
log.Warn("can't attach container listener to the Kubernetes informer."+
" Your kubernetes metadata might be outdated in the long term", "error", err)
}
}
// rmContainerIDSchema extracts the hex ID of a container ID that is provided in the form:
// containerd://40c03570b6f4c30bc8d69923d37ee698f5cfcced92c7b7df1c47f6f7887378a9
func rmContainerIDSchema(containerID string) string {
if parts := strings.Split(containerID, "://"); len(parts) > 1 {
return parts[1]
}
return containerID
}
// GetReplicaSetInfo fetches metadata from a ReplicaSet given its name
func (k *Metadata) GetReplicaSetInfo(namespace, name string) (*ReplicaSetInfo, bool) {
objs, err := k.replicaSets.GetIndexer().ByIndex(IndexReplicaSetNames, qName(namespace, name))
if err != nil {
klog().Debug("error accessing ReplicaSet index by name. Ignoring",
"error", err, "name", name)
return nil, false
}
if len(objs) == 0 {
return nil, false
}
return objs[0].(*ReplicaSetInfo), true
}
func (k *Metadata) initReplicaSetInformer(informerFactory informers.SharedInformerFactory) error {
log := klog().With("informer", "ReplicaSet")
rss := informerFactory.Apps().V1().ReplicaSets().Informer()
// Transform any *appsv1.Replicaset instance into a *ReplicaSetInfo instance to save space
// in the informer's cache
if err := rss.SetTransform(func(i interface{}) (interface{}, error) {
rs, ok := i.(*appsv1.ReplicaSet)
if !ok {
// it's Ok. The K8s library just informed from an entity
// that has been previously transformed/stored
if pi, ok := i.(*ReplicaSetInfo); ok {
return pi, nil
}
return nil, fmt.Errorf("was expecting a ReplicaSet. Got: %T", i)
}
var deployment string
for i := range rs.OwnerReferences {
or := &rs.OwnerReferences[i]
if or.APIVersion == "apps/v1" && or.Kind == "Deployment" {
deployment = or.Name
break
}
}
if log.Enabled(context.TODO(), slog.LevelDebug) {
log.Debug("inserting ReplicaSet", "name", rs.Name, "namespace", rs.Namespace,
"deployment", deployment)
}
return &ReplicaSetInfo{
ObjectMeta: metav1.ObjectMeta{
Name: rs.Name,
Namespace: rs.Namespace,
},
DeploymentName: deployment,
}, nil
}); err != nil {
return fmt.Errorf("can't set pods transform: %w", err)
}
if err := rss.AddIndexers(replicaSetIndexer); err != nil {
return fmt.Errorf("can't add %s indexer to ReplicaSets informer: %w", IndexReplicaSetNames, err)
}
k.replicaSets = rss
return nil
}
func (k *Metadata) InitFromClient(ctx context.Context, client kubernetes.Interface, timeout time.Duration) error {
// Initialization variables
return k.initInformers(ctx, client, timeout)
}
func LoadConfig(kubeConfigPath string) (*rest.Config, error) {
// if no config path is provided, load it from the env variable
if kubeConfigPath == "" {
kubeConfigPath = os.Getenv(kubeConfigEnvVariable)
}
// otherwise, load it from the $HOME/.kube/config file
if kubeConfigPath == "" {
homeDir, err := os.UserHomeDir()
if err != nil {
return nil, fmt.Errorf("can't get user home dir: %w", err)
}
kubeConfigPath = path.Join(homeDir, ".kube", "config")
}
config, err := clientcmd.BuildConfigFromFlags("", kubeConfigPath)
if err == nil {
return config, nil
}
// fallback: use in-cluster config
config, err = rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("can't access kubenetes. Tried using config from: "+
"config parameter, %s env, homedir and InClusterConfig. Got: %w",
kubeConfigEnvVariable, err)
}
return config, nil
}
func (k *Metadata) initInformers(ctx context.Context, client kubernetes.Interface, timeout time.Duration) error {
informerFactory := informers.NewSharedInformerFactory(client, syncTime)
err := k.initPodInformer(informerFactory)
if err != nil {
return err
}
err = k.initReplicaSetInformer(informerFactory)
if err != nil {
return err
}
log := klog()
log.Debug("starting kubernetes informers, waiting for syncronization")
informerFactory.Start(ctx.Done())
finishedCacheSync := make(chan struct{})
go func() {
informerFactory.WaitForCacheSync(ctx.Done())
close(finishedCacheSync)
}()
select {
case <-finishedCacheSync:
log.Debug("kubernetes informers started")
return nil
case <-time.After(timeout):
return fmt.Errorf("kubernetes cache has not been synced after %s timeout", timeout)
}
}
// FetchPodOwnerInfo updates the pod owner with the Deployment information, if it exists.
// Pod Info might include a ReplicaSet as owner, and ReplicaSet info
// usually has a Deployment as owner reference, which is the one that we'd really like
// to report as owner.
func (k *Metadata) FetchPodOwnerInfo(pod *PodInfo) {
if pod.Owner != nil && pod.Owner.Type == OwnerReplicaSet {
if rsi, ok := k.GetReplicaSetInfo(pod.Namespace, pod.Owner.Name); ok {
pod.Owner.Owner = &Owner{Type: OwnerDeployment, Name: rsi.DeploymentName}
}
}
}
func (k *Metadata) AddContainerEventHandler(eh ContainerEventHandler) {
k.containerEventHandlers = append(k.containerEventHandlers, eh)
}
func (k *Metadata) AddPodEventHandler(h cache.ResourceEventHandler) error {
_, err := k.pods.AddEventHandler(h)
// passing a snapshot of the currently stored entities
go func() {
for _, pod := range k.pods.GetStore().List() {
h.OnAdd(pod, true)
}
}()
return err
}
func (k *Metadata) AddReplicaSetEventHandler(h cache.ResourceEventHandler) error {
_, err := k.replicaSets.AddEventHandler(h)
// passing a snapshot of the currently stored entities
go func() {
for _, pod := range k.replicaSets.GetStore().List() {
h.OnAdd(pod, true)
}
}()
return err
}
func (i *PodInfo) ServiceName() string {
if i.Owner != nil {
// we have two levels of ownership at most
if i.Owner.Owner != nil {
return i.Owner.Owner.Name
}
return i.Owner.Name
}
return i.Name
}