-
Notifications
You must be signed in to change notification settings - Fork 51
/
kubernetes.go
85 lines (65 loc) · 2.87 KB
/
kubernetes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package kubernetesmonitor
import (
"context"
"time"
"go.uber.org/zap"
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
kubecache "k8s.io/client-go/tools/cache"
)
// KubernetesPodNameIdentifier is the label used by Docker for the K8S pod name.
const KubernetesPodNameIdentifier = "@usr:io.kubernetes.pod.name"
// KubernetesPodNamespaceIdentifier is the label used by Docker for the K8S namespace.
const KubernetesPodNamespaceIdentifier = "@usr:io.kubernetes.pod.namespace"
// KubernetesContainerNameIdentifier is the label used by Docker for the K8S container name.
const KubernetesContainerNameIdentifier = "@usr:io.kubernetes.container.name"
// KubernetesInfraContainerName is the name of the infra POD.
const KubernetesInfraContainerName = "POD"
// UpstreamNameIdentifier is the identifier used to identify the nane on the resulting PU
const UpstreamNameIdentifier = "k8s:name"
// UpstreamNamespaceIdentifier is the identifier used to identify the nanespace on the resulting PU
const UpstreamNamespaceIdentifier = "k8s:namespace"
func (m *KubernetesMonitor) addPod(addedPod *api.Pod) error {
zap.L().Debug("pod added event", zap.String("name", addedPod.GetName()), zap.String("namespace", addedPod.GetNamespace()))
// This event is not needed as the trigger is the DockerMonitor event
// The pod obejct is cached in order to reuse it and avoid an API request possibly laster on
return nil
}
func (m *KubernetesMonitor) deletePod(deletedPod *api.Pod) error {
zap.L().Debug("pod deleted event", zap.String("name", deletedPod.GetName()), zap.String("namespace", deletedPod.GetNamespace()))
return nil
}
func (m *KubernetesMonitor) updatePod(oldPod, updatedPod *api.Pod) error {
zap.L().Debug("pod modified event", zap.String("name", updatedPod.GetName()), zap.String("namespace", updatedPod.GetNamespace()))
if !isPolicyUpdateNeeded(oldPod, updatedPod) {
zap.L().Debug("no modified labels for Pod", zap.String("name", updatedPod.GetName()), zap.String("namespace", updatedPod.GetNamespace()))
return nil
}
// This event requires sending the Runtime upstream again.
// TODO: Use propagated context
return m.RefreshPUs(context.TODO(), updatedPod)
}
func (m *KubernetesMonitor) getPod(podNamespace, podName string) (*api.Pod, error) {
zap.L().Debug("no pod cached, querying Kubernetes API")
// TODO: Use cached Kube Store (from a shared informer)
return m.Pod(podName, podNamespace)
}
func isPolicyUpdateNeeded(oldPod, newPod *api.Pod) bool {
if !(oldPod.Status.PodIP == newPod.Status.PodIP) {
return true
}
if !labels.Equals(oldPod.GetLabels(), newPod.GetLabels()) {
return true
}
return false
}
// hasSynced sends an event on the Sync chan when the attachedController finished syncing.
func hasSynced(sync chan struct{}, controller kubecache.Controller) {
for {
if controller.HasSynced() {
sync <- struct{}{}
return
}
<-time.After(100 * time.Millisecond)
}
}