-
Notifications
You must be signed in to change notification settings - Fork 376
/
config_fetcher.go
148 lines (130 loc) · 3.98 KB
/
config_fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package prometheus_adapter
import (
"context"
"fmt"
"github.com/fsnotify/fsnotify"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/prometheus-adapter/pkg/config"
)
type PrometheusAdapterConfigFetcher struct {
client.Client
Scheme *runtime.Scheme
RestMapper meta.RESTMapper
Recorder record.EventRecorder
AdapterConfigMapNS string
AdapterConfigMapName string
AdapterConfigMapKey string
AdapterConfig string
}
type PrometheusAdapterConfigChangedPredicate struct {
predicate.Funcs
Name string
Namespace string
}
func (pc *PrometheusAdapterConfigFetcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
if req.NamespacedName.String() != pc.AdapterConfigMapNS+"/"+pc.AdapterConfigMapName {
return ctrl.Result{}, fmt.Errorf("configmap %s/%s not matched", req.NamespacedName, req.NamespacedName.Name)
}
klog.V(4).Infof("Got prometheus adapter configmap %s", req.NamespacedName)
//get configmap content
var cm corev1.ConfigMap
err := pc.Client.Get(ctx, req.NamespacedName, &cm)
if err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
cfg, err := config.FromYAML([]byte(cm.Data[pc.AdapterConfigMapKey]))
if err != nil {
klog.Errorf("Got metricsDiscoveryConfig failed[%s] %v", pc.AdapterConfigMapName, err)
}
err = FlushRules(*cfg, pc.RestMapper)
if err != nil {
klog.Errorf("Flush rules failed %v", err)
}
return ctrl.Result{}, nil
}
// SetupWithManager creates a controller and register to controller manager.
func (pc *PrometheusAdapterConfigFetcher) SetupWithManager(mgr ctrl.Manager) error {
var promAdapterConfigMapChangedPredicate = &PrometheusAdapterConfigChangedPredicate{
Namespace: pc.AdapterConfigMapNS,
Name: pc.AdapterConfigMapName,
}
// Watch for changes to ConfigMap
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.ConfigMap{}, builder.WithPredicates(promAdapterConfigMapChangedPredicate)).
Complete(pc)
}
// Update fetched metricRule if configmap is updated
func (paCm *PrometheusAdapterConfigChangedPredicate) Update(e event.UpdateEvent) bool {
if e.ObjectOld == nil {
return false
}
if e.ObjectNew == nil {
return false
}
if e.ObjectNew.GetName() == paCm.Name && e.ObjectNew.GetNamespace() == paCm.Namespace {
return e.ObjectNew.GetResourceVersion() != e.ObjectOld.GetResourceVersion()
}
return false
}
// if set promAdapterConfig, daemon reload by config fsnotify
func (pc *PrometheusAdapterConfigFetcher) Reload() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
klog.Error(err)
return
}
defer watcher.Close()
err = watcher.Add(pc.AdapterConfig)
if err != nil {
klog.ErrorS(err, "Failed to watch", "file", pc.AdapterConfig)
return
}
klog.Infof("Start watching %s for update.", pc.AdapterConfig)
for {
select {
case event, ok := <-watcher.Events:
klog.Infof("Watched an event: %v", event)
if !ok {
return
}
metricsDiscoveryConfig, err := config.FromFile(pc.AdapterConfig)
if err != nil {
klog.Errorf("Got metricsDiscoveryConfig failed[%s] %v", pc.AdapterConfig, err)
} else {
err = FlushRules(*metricsDiscoveryConfig, pc.RestMapper)
if err != nil {
klog.Errorf("Flush rules failed %v", err)
}
}
case err, ok := <-watcher.Errors:
if !ok {
return
}
klog.Error(err)
}
}
}
func FlushRules(metricsDiscoveryConfig config.MetricsDiscoveryConfig, mapper meta.RESTMapper) error {
err := ParsingResourceRules(metricsDiscoveryConfig, mapper)
if err != nil {
return err
}
err = ParsingRules(metricsDiscoveryConfig, mapper)
if err != nil {
return err
}
err = ParsingExternalRules(metricsDiscoveryConfig, mapper)
if err != nil {
return err
}
return nil
}