/
operator.go
266 lines (223 loc) · 8.28 KB
/
operator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package platform
import (
"context"
"fmt"
"os"
"strings"
camelv1 "github.com/apache/camel-k/pkg/apis/camel/v1"
"github.com/apache/camel-k/pkg/util/defaults"
coordination "k8s.io/api/coordination/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/apache/camel-k/pkg/util/log"
ctrl "sigs.k8s.io/controller-runtime/pkg/client"
)
const (
OperatorWatchNamespaceEnvVariable = "WATCH_NAMESPACE"
operatorNamespaceEnvVariable = "NAMESPACE"
operatorPodNameEnvVariable = "POD_NAME"
)
const OperatorLockName = "camel-k-lock"
var OperatorImage string
// IsCurrentOperatorGlobal returns true if the operator is configured to watch all namespaces.
func IsCurrentOperatorGlobal() bool {
if watchNamespace, envSet := os.LookupEnv(OperatorWatchNamespaceEnvVariable); !envSet || strings.TrimSpace(watchNamespace) == "" {
log.Debug("Operator is global to all namespaces")
return true
}
log.Debug("Operator is local to namespace")
return false
}
// GetOperatorWatchNamespace returns the namespace the operator watches.
func GetOperatorWatchNamespace() string {
if namespace, envSet := os.LookupEnv(OperatorWatchNamespaceEnvVariable); envSet {
return namespace
}
return ""
}
// GetOperatorNamespace returns the namespace where the current operator is located (if set).
func GetOperatorNamespace() string {
if podNamespace, envSet := os.LookupEnv(operatorNamespaceEnvVariable); envSet {
return podNamespace
}
return ""
}
// GetOperatorPodName returns the pod that is running the current operator (if any).
func GetOperatorPodName() string {
if podName, envSet := os.LookupEnv(operatorPodNameEnvVariable); envSet {
return podName
}
return ""
}
// GetOperatorLockName returns the name of the lock lease that is electing a leader on the particular namespace.
func GetOperatorLockName(operatorID string) string {
return fmt.Sprintf("%s-lock", operatorID)
}
// IsNamespaceLocked tells if the namespace contains a lock indicating that an operator owns it.
func IsNamespaceLocked(ctx context.Context, c ctrl.Reader, namespace string) (bool, error) {
if namespace == "" {
return false, nil
}
platforms, err := ListPrimaryPlatforms(ctx, c, namespace)
if err != nil {
return true, err
}
for _, platform := range platforms.Items {
lease := coordination.Lease{}
var operatorLockName string
if platform.Name != "" {
operatorLockName = GetOperatorLockName(platform.Name)
} else {
operatorLockName = OperatorLockName
}
if err := c.Get(ctx, ctrl.ObjectKey{Namespace: namespace, Name: operatorLockName}, &lease); err == nil || !k8serrors.IsNotFound(err) {
return true, err
}
}
return false, nil
}
// IsOperatorAllowedOnNamespace returns true if the current operator is allowed to react on changes in the given namespace.
func IsOperatorAllowedOnNamespace(ctx context.Context, c ctrl.Reader, namespace string) (bool, error) {
// allow all local operators
if !IsCurrentOperatorGlobal() {
return true, nil
}
// allow global operators that use a proper operator id
if defaults.OperatorID() != "" {
log.Debugf("Operator ID: %s", defaults.OperatorID())
return true, nil
}
operatorNamespace := GetOperatorNamespace()
if operatorNamespace == namespace {
// Global operator is allowed on its own namespace
return true, nil
}
alreadyOwned, err := IsNamespaceLocked(ctx, c, namespace)
if err != nil {
log.Debugf("Error occurred while testing whether namespace is locked: %v", err)
return false, err
}
log.Debugf("Lock status of namespace %s: %t", namespace, alreadyOwned)
return !alreadyOwned, nil
}
// IsOperatorHandler checks on resource operator id annotation and this operator instance id.
// Operators matching the annotation operator id are allowed to reconcile.
// For legacy resources that are missing a proper operator id annotation the default global operator or the local
// operator in this namespace are candidates for reconciliation.
func IsOperatorHandler(object ctrl.Object) bool {
if object == nil {
return true
}
resourceID := camelv1.GetOperatorIDAnnotation(object)
operatorID := defaults.OperatorID()
// allow operator with matching id to handle the resource
if resourceID == operatorID {
return true
}
// check if we are dealing with resource that is missing a proper operator id annotation
if resourceID == "" {
// allow default global operator to handle legacy resources (missing proper operator id annotations)
if operatorID == DefaultPlatformName {
return true
}
// allow local operators to handle legacy resources (missing proper operator id annotations)
if !IsCurrentOperatorGlobal() {
return true
}
}
return false
}
// IsOperatorHandlerConsideringLock uses normal IsOperatorHandler checks and adds additional check for legacy resources
// that are missing a proper operator id annotation. In general two kind of operators race for reconcile these legacy resources.
// The local operator for this namespace and the default global operator instance. Based on the existence of a namespace
// lock the current local operator has precedence. When no lock exists the default global operator should reconcile.
func IsOperatorHandlerConsideringLock(ctx context.Context, c ctrl.Reader, namespace string, object ctrl.Object) bool {
isHandler := IsOperatorHandler(object)
if !isHandler {
return false
}
resourceID := camelv1.GetOperatorIDAnnotation(object)
// add additional check on resources missing an operator id
if resourceID == "" {
operatorNamespace := GetOperatorNamespace()
if operatorNamespace == namespace {
// Global operator is allowed on its own namespace
return true
}
if locked, err := IsNamespaceLocked(ctx, c, namespace); err != nil || locked {
// namespace is locked so local operators do have precedence
return !IsCurrentOperatorGlobal()
}
}
return true
}
// FilteringFuncs do preliminary checks to determine if certain events should be handled by the controller
// based on labels on the resources (e.g. camel.apache.org/operator.id) and the operator configuration,
// before handing the computation over to the user code.
type FilteringFuncs struct {
// Create returns true if the Create event should be processed
CreateFunc func(event.CreateEvent) bool
// Delete returns true if the Delete event should be processed
DeleteFunc func(event.DeleteEvent) bool
// Update returns true if the Update event should be processed
UpdateFunc func(event.UpdateEvent) bool
// Generic returns true if the Generic event should be processed
GenericFunc func(event.GenericEvent) bool
}
func (f FilteringFuncs) Create(e event.CreateEvent) bool {
if !IsOperatorHandler(e.Object) {
return false
}
if f.CreateFunc != nil {
return f.CreateFunc(e)
}
return true
}
func (f FilteringFuncs) Delete(e event.DeleteEvent) bool {
if !IsOperatorHandler(e.Object) {
return false
}
if f.DeleteFunc != nil {
return f.DeleteFunc(e)
}
return true
}
func (f FilteringFuncs) Update(e event.UpdateEvent) bool {
if !IsOperatorHandler(e.ObjectNew) {
return false
}
if e.ObjectOld != nil && e.ObjectNew != nil &&
camelv1.GetOperatorIDAnnotation(e.ObjectOld) != camelv1.GetOperatorIDAnnotation(e.ObjectNew) {
// Always force reconciliation when the object becomes managed by the current operator
return true
}
if f.UpdateFunc != nil {
return f.UpdateFunc(e)
}
return true
}
func (f FilteringFuncs) Generic(e event.GenericEvent) bool {
if !IsOperatorHandler(e.Object) {
return false
}
if f.GenericFunc != nil {
return f.GenericFunc(e)
}
return true
}
var _ predicate.Predicate = FilteringFuncs{}