/
single-config.go
executable file
·240 lines (202 loc) · 6.41 KB
/
single-config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
// This is functionally identical to the hard-coded controllers.
// However it reads it's configuration from a configmap that is constantly reloaded as it is changed
package main // import "github.com/carsonoid/kube-crds-and-controllers/hard-coded-controller"
import (
"encoding/json"
"flag"
logging "log"
"os"
"path/filepath"
// Better yaml handling
"github.com/ghodss/yaml"
// Kubernetes and client-go
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
// "k8s.io/apimachinery/pkg/api/errors"
// metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
var (
log = logging.New(os.Stdout, "", logging.Lshortfile)
)
// These could be from flags if desired
const (
configNamespace string = "kube-system"
configName string = "pod-labeler-config"
)
// PodLabelConfig holds the namespace to target and labels to be ensured
type PodLabelConfig struct {
TargetNamespace string `json:"targetNamespace"`
Labels map[string]string `json:"labels"`
}
// PodLabelController with a config and client
type PodLabelController struct {
client *kubernetes.Clientset
Config *PodLabelConfig
configLoadChan chan bool
}
// NewPodLabelController takes a kubernetes clientset and configuration and returns a valid PodLabelController
func NewPodLabelController(client *kubernetes.Clientset) *PodLabelController {
return &PodLabelController{
client: client,
configLoadChan: make(chan bool),
}
}
// Run starts the PodLabelController and blocks until killed
func (plc *PodLabelController) Run() {
// Start configmap watcher
go plc.WatchConfigMap()
log.Print("Waiting for initial config load")
// wait for load/reload signal before starting pod controller
<-plc.configLoadChan
// Watch for config reloads and then recreate and restart the controller so all existing pods
// are re-evaluted with new config
for {
restClient := plc.client.CoreV1().RESTClient()
listwatch := cache.NewListWatchFromClient(restClient, "pods", plc.Config.TargetNamespace, fields.Everything())
_, controller := cache.NewInformer(listwatch, &corev1.Pod{}, 0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Print("Pod Add Event")
if err := plc.handlePod(obj.(*corev1.Pod)); err != nil {
log.Printf("Error handling pod: %s", err)
}
},
UpdateFunc: func(oldobj interface{}, newobj interface{}) {
log.Print("Pod Update Event")
if err := plc.handlePod(newobj.(*corev1.Pod)); err != nil {
log.Printf("Error handling pod: %s", err)
}
},
DeleteFunc: func(obj interface{}) {
log.Print("Pod Delete Event")
// nothing to do
},
},
)
log.Print("Starting Controller")
stopChan := make(chan struct{})
go controller.Run(stopChan)
<-plc.configLoadChan
log.Print("killing controller")
close(stopChan)
}
}
func (plc *PodLabelController) handlePod(pod *corev1.Pod) error {
o, err := runtime.NewScheme().DeepCopy(pod)
if err != nil {
return err
}
newPod := o.(*corev1.Pod)
// apply labels if needed
// if no changes then return
if !plc.labelPod(newPod) {
return nil
}
oldData, err := json.Marshal(pod)
if err != nil {
return err
}
newData, err := json.Marshal(newPod)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Pod{})
if err != nil {
return err
}
_, err = plc.client.CoreV1().Pods(pod.Namespace).Patch(pod.Name, types.StrategicMergePatchType, patchBytes)
if err != nil {
return err
}
return nil
}
func (plc *PodLabelController) labelPod(pod *corev1.Pod) bool {
changed := false
// make sure map is initialized
if len(pod.GetLabels()) == 0 {
pod.ObjectMeta.Labels = make(map[string]string)
}
// check keys
for k, newVal := range plc.Config.Labels {
if curVal, ok := pod.GetLabels()[k]; ok && curVal == newVal {
//log.Printf("Pod %s already has label: %s=%s", pod.GetName(), k, newVal)
} else {
log.Printf("Pod %s needs label: %s=%s", pod.GetName(), k, newVal)
pod.Labels[k] = newVal
changed = true
}
}
return changed
}
func (plc *PodLabelController) WatchConfigMap() {
log.Print("Watching for ConfigMap")
restClient := plc.client.CoreV1().RESTClient()
listwatch := cache.NewListWatchFromClient(restClient, "configmaps", configNamespace, fields.OneTermEqualSelector("metadata.name", configName))
_, controller := cache.NewInformer(listwatch, &corev1.ConfigMap{}, 0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Print("ConfigMap Add Event")
if err := plc.loadConfigMap(obj.(*corev1.ConfigMap)); err != nil {
log.Printf("Error loading config from configmap: %s", err)
}
},
UpdateFunc: func(oldobj interface{}, newobj interface{}) {
log.Print("ConfigMap Update Event")
if err := plc.loadConfigMap(newobj.(*corev1.ConfigMap)); err != nil {
log.Printf("Error loading config from configmap: %s", err)
}
},
DeleteFunc: func(obj interface{}) {
log.Print("ConfigMap Deleted - last known config will be retained")
// nothing to do
},
},
)
stopChan := make(chan struct{})
controller.Run(stopChan)
<-stopChan
}
func (plc *PodLabelController) loadConfigMap(cm *corev1.ConfigMap) error {
log.Print("Loading ConfigMap")
// New empty config struct
c := PodLabelConfig{}
// Make sure the configmap has the key we expect
if confYaml, ok := cm.Data["podLabelConfig"]; ok {
// Populate struct from value
if err := yaml.Unmarshal([]byte(confYaml), &c); err != nil {
return err
}
// Update config pointer
plc.Config = &c
// Send Load/Reload signal
plc.configLoadChan <- true
}
log.Printf("Loaded new config: %+v", *plc.Config)
return nil
}
func main() {
log.SetOutput(os.Stdout)
var kubeconfig *string
kubeconfig = flag.String("kubeconfig", filepath.Join(os.Getenv("HOME"), ".kube", "config"), "(optional) absolute path to the kubeconfig file")
flag.Parse()
// use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}
// create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// Create controller, passing only the kube client
plc := NewPodLabelController(clientset)
// Run controller
plc.Run()
}