forked from argoproj/argo-workflows
/
controller.go
130 lines (120 loc) · 3.66 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package config
import (
"fmt"
"strings"
apiv1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
log "github.com/sirupsen/logrus"
"sigs.k8s.io/yaml"
)
type Controller interface {
Run(stopCh <-chan struct{}, onChange func(config Config) error)
Get() (Config, error)
}
type controller struct {
namespace string
// name of the config map
configMap string
kubeclientset kubernetes.Interface
}
func NewController(namespace, name string, kubeclientset kubernetes.Interface) Controller {
log.WithField("name", name).Info("config map")
return &controller{
namespace: namespace,
configMap: name,
kubeclientset: kubeclientset,
}
}
func (cc *controller) updateConfig(cm *apiv1.ConfigMap, onChange func(config Config) error) error {
config, err := parseConfigMap(cm)
if err != nil {
return err
}
return onChange(config)
}
func parseConfigMap(cm *apiv1.ConfigMap) (Config, error) {
if cm == nil {
return emptyConfig, nil
}
// The key in the configmap to retrieve workflow configuration from.
// Content encoding is expected to be YAML.
var config Config
rawConfig, ok := cm.Data["config"]
if ok && len(cm.Data) != 1 {
return emptyConfig, fmt.Errorf("if you have an item in your config map named 'config', you must only have one item")
}
if !ok {
for name, value := range cm.Data {
if strings.Contains(value, "\n") {
// this mucky code indents with two spaces
rawConfig = rawConfig + name + ":\n " + strings.Join(strings.Split(strings.Trim(value, "\n"), "\n"), "\n ") + "\n"
} else {
rawConfig = rawConfig + name + ": " + value + "\n"
}
}
}
err := yaml.Unmarshal([]byte(rawConfig), &config)
if err != nil {
return emptyConfig, err
}
return config, nil
}
func (cc *controller) Run(stopCh <-chan struct{}, onChange func(config Config) error) {
restClient := cc.kubeclientset.CoreV1().RESTClient()
resource := "configmaps"
fieldSelector := fields.ParseSelectorOrDie(fmt.Sprintf("metadata.name=%s", cc.configMap))
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fieldSelector.String()
req := restClient.Get().
Namespace(cc.namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec)
return req.Do().Get()
}
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
options.FieldSelector = fieldSelector.String()
req := restClient.Get().
Namespace(cc.namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec)
return req.Watch()
}
source := &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
_, controller := cache.NewInformer(
source,
&apiv1.ConfigMap{},
0,
cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
oldCM := old.(*apiv1.ConfigMap)
newCM := new.(*apiv1.ConfigMap)
if oldCM.ResourceVersion == newCM.ResourceVersion {
return
}
if newCm, ok := new.(*apiv1.ConfigMap); ok {
log.Infof("Detected ConfigMap update.")
err := cc.updateConfig(newCm, onChange)
if err != nil {
log.Errorf("Update of config failed due to: %v", err)
}
}
},
})
controller.Run(stopCh)
log.Info("Watching config map updates")
}
func (cc *controller) Get() (Config, error) {
cmClient := cc.kubeclientset.CoreV1().ConfigMaps(cc.namespace)
cm, err := cmClient.Get(cc.configMap, metav1.GetOptions{})
if err != nil && !apierr.IsNotFound(err) {
return emptyConfig, err
}
return parseConfigMap(cm)
}