-
Notifications
You must be signed in to change notification settings - Fork 589
/
kongupstreampolicy_utils.go
411 lines (370 loc) · 15.7 KB
/
kongupstreampolicy_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
package configuration
import (
"context"
"fmt"
"reflect"
"sort"
"github.com/samber/lo"
"github.com/samber/mo"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
gatewaycontroller "github.com/kong/kubernetes-ingress-controller/v3/internal/controllers/gateway"
"github.com/kong/kubernetes-ingress-controller/v3/internal/gatewayapi"
kongv1beta1 "github.com/kong/kubernetes-ingress-controller/v3/pkg/apis/configuration/v1beta1"
incubatorv1alpha1 "github.com/kong/kubernetes-ingress-controller/v3/pkg/apis/incubator/v1alpha1"
)
// maxNAncestors is the maximum number of ancestors that can be stored in the KongUpstreamPolicy status.
// This is a limitation of the Gateway API.
const maxNAncestors = 16
// upstreamPolicyAncestorKind represents kind of KongUpstreamPolicy ancestor (Service or KongServiceFacade).
type upstreamPolicyAncestorKind string
const (
upstreamPolicyAncestorKindService upstreamPolicyAncestorKind = "Service"
upstreamPolicyAncestorKindKongServiceFacade upstreamPolicyAncestorKind = "KongServiceFacade"
)
// ancestorStatus represents the status of an ancestor (Service or KongServiceFacade).
// A collection of all ancestors' statuses is used to build the KongUpstreamPolicy status.
type ancestorStatus struct {
namespacedName k8stypes.NamespacedName
ancestorKind upstreamPolicyAncestorKind
acceptedCondition metav1.Condition
programmedCondition metav1.Condition
creationTimestamp metav1.Time
}
// serviceKey is used as a key for indexing Services by "namespace/name".
type serviceKey string
// servicesSet is a set of serviceKeys.
type servicesSet map[serviceKey]struct{}
// enforceKongUpstreamPolicyStatus gets a list of services (ancestors) along with their desired status and enforce them
// in the KongUpstreamPolicy status.
func (r *KongUpstreamPolicyReconciler) enforceKongUpstreamPolicyStatus(
ctx context.Context,
oldPolicy *kongv1beta1.KongUpstreamPolicy,
) (bool, error) {
policyNN := k8stypes.NamespacedName{
Namespace: oldPolicy.Namespace,
Name: oldPolicy.Name,
}
// Get all objects (Services and KongServiceFacades) that reference this KongUpstreamPolicy.
services, err := r.getServicesReferencingUpstreamPolicy(ctx, policyNN)
if err != nil {
return false, err
}
serviceFacades, err := r.maybeGetServiceFacadesReferencingUpstreamPolicy(ctx, policyNN)
if err != nil {
return false, err
}
// Build the status for each ancestor.
ancestorsStatus, err := r.buildAncestorsStatus(ctx, services, serviceFacades)
if err != nil {
return false, err
}
// Build the desired KongUpstreamPolicy status.
newPolicyStatus, err := r.buildPolicyStatus(policyNN, ancestorsStatus)
if err != nil {
return false, err
}
// If the status is not updated, we don't need to patch the KongUpstreamPolicy.
if isStatusUpdated := isPolicyStatusUpdated(oldPolicy.Status, newPolicyStatus); !isStatusUpdated {
newPolicy := oldPolicy.DeepCopy()
newPolicy.Status = newPolicyStatus
return true, r.Client.Status().Patch(ctx, newPolicy, client.MergeFrom(oldPolicy))
}
return false, nil
}
func (r *KongUpstreamPolicyReconciler) getServicesReferencingUpstreamPolicy(
ctx context.Context,
upstreamPolicyNN k8stypes.NamespacedName,
) ([]corev1.Service, error) {
services := &corev1.ServiceList{}
err := r.List(ctx, services,
client.InNamespace(upstreamPolicyNN.Namespace),
client.MatchingFields{
upstreamPolicyIndexKey: upstreamPolicyNN.Name,
},
)
if err != nil {
return nil, fmt.Errorf("failed listing Services: %w", err)
}
return services.Items, nil
}
// maybeGetServiceFacadesReferencingUpstreamPolicy returns a list of KongServiceFacades that reference the given KongUpstreamPolicy.
// Skips the lookup if KongServiceFacade is not enabled.
func (r *KongUpstreamPolicyReconciler) maybeGetServiceFacadesReferencingUpstreamPolicy(
ctx context.Context,
upstreamPolicyNN k8stypes.NamespacedName,
) ([]incubatorv1alpha1.KongServiceFacade, error) {
if !r.KongServiceFacadeEnabled {
// KongServiceFacade is not enabled, so we don't need to check for it.
return nil, nil
}
serviceFacades := &incubatorv1alpha1.KongServiceFacadeList{}
err := r.List(ctx, serviceFacades,
client.InNamespace(upstreamPolicyNN.Namespace),
client.MatchingFields{
upstreamPolicyIndexKey: upstreamPolicyNN.Name,
},
)
if err != nil {
return nil, fmt.Errorf("failed listing KongServiceFacades: %w", err)
}
return serviceFacades.Items, nil
}
// buildAncestorsStatus creates a list of services with their conditions associated.
func (r *KongUpstreamPolicyReconciler) buildAncestorsStatus(
ctx context.Context,
services []corev1.Service,
serviceFacades []incubatorv1alpha1.KongServiceFacade,
) ([]ancestorStatus, error) {
// Check if any Services have conflicts. We do not verify conflicts for KongServiceFacades as there's
// no scenario in which they would have one.
conflictedServices, err := r.getConflictedServices(ctx, services)
if err != nil {
return nil, err
}
// Prepare conditions.
acceptedCondition := metav1.Condition{
Type: string(gatewayapi.PolicyConditionAccepted),
Status: metav1.ConditionTrue,
Reason: string(gatewayapi.PolicyReasonAccepted),
LastTransitionTime: metav1.Now(),
}
programmedCondition := metav1.Condition{
Type: string(gatewayapi.GatewayConditionProgrammed),
Status: metav1.ConditionTrue,
Reason: string(gatewayapi.GatewayReasonProgrammed),
LastTransitionTime: metav1.Now(),
}
// Build the status for each ancestor (Services and KongServiceFacades).
ancestorsStatus := make([]ancestorStatus, 0, len(services)+len(serviceFacades))
for _, service := range services {
service := service
acceptedCondition := acceptedCondition
programmedCondition := programmedCondition
if _, isConflicted := conflictedServices[buildServiceReference(service.Namespace, service.Name)]; isConflicted {
// If the Service is conflicted, we change both conditions to False.
acceptedCondition.Status = metav1.ConditionFalse
acceptedCondition.Reason = string(gatewayapi.PolicyReasonConflicted)
programmedCondition.Status = metav1.ConditionFalse
programmedCondition.Reason = string(gatewayapi.GatewayReasonPending)
}
if !r.DataplaneClient.KubernetesObjectIsConfigured(&service) {
// If the Service is not configured, we change it to False.
programmedCondition.Status = metav1.ConditionFalse
programmedCondition.Reason = string(gatewayapi.GatewayReasonPending)
}
ancestorsStatus = append(ancestorsStatus, ancestorStatus{
namespacedName: k8stypes.NamespacedName{
Namespace: service.Namespace,
Name: service.Name,
},
ancestorKind: upstreamPolicyAncestorKindService,
acceptedCondition: acceptedCondition,
programmedCondition: programmedCondition,
})
}
for _, serviceFacade := range serviceFacades {
serviceFacade := serviceFacade
programmedCondition := programmedCondition
if !r.DataplaneClient.KubernetesObjectIsConfigured(&serviceFacade) {
// If the KongServiceFacade is not configured, we change it to False.
programmedCondition.Status = metav1.ConditionFalse
programmedCondition.Reason = string(gatewayapi.GatewayReasonPending)
}
ancestorsStatus = append(ancestorsStatus, ancestorStatus{
namespacedName: k8stypes.NamespacedName{
Namespace: serviceFacade.Namespace,
Name: serviceFacade.Name,
},
ancestorKind: upstreamPolicyAncestorKindKongServiceFacade,
acceptedCondition: acceptedCondition,
programmedCondition: programmedCondition,
})
}
return ancestorsStatus, nil
}
// getConflictedServices returns a set of services that have conflicts.
func (r *KongUpstreamPolicyReconciler) getConflictedServices(ctx context.Context, services []corev1.Service) (servicesSet, error) {
// Prepare a mapping for efficient lookups if a Service uses this KongUpstreamPolicy.
upstreamPolicyServices := make(servicesSet)
for _, service := range services {
upstreamPolicyServices[buildServiceReference(service.Namespace, service.Name)] = struct{}{}
}
conflictedServices := make(servicesSet)
for serviceKey := range upstreamPolicyServices {
// We fetch all the HTTPRoutes that reference this service.
httpRoutes := &gatewayapi.HTTPRouteList{}
err := r.List(ctx, httpRoutes,
client.MatchingFields{
routeBackendRefServiceNameIndexKey: string(serviceKey),
},
)
if err != nil {
return nil, err
}
hasConflict := lo.ContainsBy(httpRoutes.Items, func(httpRoute gatewayapi.HTTPRoute) bool {
return httpRouteHasUpstreamPolicyConflictedBackendRefsWithService(httpRoute, upstreamPolicyServices, serviceKey)
})
if hasConflict {
conflictedServices[serviceKey] = struct{}{}
}
}
return conflictedServices, nil
}
// httpRouteHasUpstreamPolicyConflictedBackendRefsWithService checks if there's any HTTPRoute's rule that uses multiple backendRefs
// AND they're not all using the same KongUpstreamPolicy.
// If so, that means that we have a conflict because we cannot apply multiple KongUpstreamPolicy to the same Kong Service.
func httpRouteHasUpstreamPolicyConflictedBackendRefsWithService(
httpRoute gatewayapi.HTTPRoute,
upstreamPolicyServices servicesSet,
serviceKey serviceKey,
) bool {
backendRefsUsedWithThisService := getAllBackendRefsUsedWithService(httpRoute, serviceKey)
hasAnyBackendRefNotUsingSameUpstreamPolicy := lo.ContainsBy(backendRefsUsedWithThisService, func(br gatewayapi.HTTPBackendRef) bool {
serviceRef := backendRefToServiceRef(httpRoute.Namespace, br.BackendRef)
if serviceRef == "" {
return false
}
// If the serviceRef is not in the upstreamPolicyServices, it means it doesn't use this KongUpstreamPolicy.
_, ok := upstreamPolicyServices[serviceRef]
return !ok
})
return hasAnyBackendRefNotUsingSameUpstreamPolicy
}
// getAllBackendRefsUsedWithService returns HTTPRoute's backendRefs that use the given service (excluding the given service).
func getAllBackendRefsUsedWithService(httpRoute gatewayapi.HTTPRoute, serviceKey serviceKey) []gatewayapi.HTTPBackendRef {
var backendRefs []gatewayapi.HTTPBackendRef
for _, rule := range httpRoute.Spec.Rules {
// We will look for a backendRef that matches the given service and keep its index if found.
backendRefMatchingServiceIdx := mo.None[int]()
for i, br := range rule.BackendRefs {
serviceRef := backendRefToServiceRef(httpRoute.Namespace, br.BackendRef)
if serviceRef == serviceKey {
// We found a backendRef that matches the given service, no need to look further.
backendRefMatchingServiceIdx = mo.Some(i)
break
}
}
if matchingIdx, ok := backendRefMatchingServiceIdx.Get(); ok {
// We found a backendRef that matches the given service. We will keep all the backendRefs that are together
// with this backendRef in the rule.
// Below we're suppressing nolintlint to not force `//nolint` instead of `// nolint`. This is to allow
// correctly suppressing looppointer which expects the latter.
backendRefs = append(backendRefs, rule.BackendRefs[:matchingIdx]...) // nolint:nolintlint,looppointer // We do not keep the reference to rule.BackendRefs, but copy it.
backendRefs = append(backendRefs, rule.BackendRefs[matchingIdx+1:]...) // nolint:nolintlint,looppointer // We do not keep the reference to rule.BackendRefs, but copy it.
}
}
return backendRefs
}
// buildPolicyStatus builds the KongUpstreamPolicy status from the ancestors' statuses.
// It ensures that the number of ancestors is not greater than the maximum allowed by the Gateway API
// and that the oldest ancestors are kept.
func (r *KongUpstreamPolicyReconciler) buildPolicyStatus(
upstreamPolicyNN k8stypes.NamespacedName,
ancestorsStatus []ancestorStatus,
) (gatewayapi.PolicyStatus, error) {
// Sort the ancestors by creation timestamp and keep only the oldest ones.
sort.Slice(ancestorsStatus, func(i, j int) bool {
return ancestorsStatus[i].creationTimestamp.Before(&ancestorsStatus[j].creationTimestamp)
})
if len(ancestorsStatus) > maxNAncestors {
r.Log.Info("status has more ancestors than the Gateway API permits, the newest ones will be ignored",
"KongUpstreamPolicy", upstreamPolicyNN.String(),
"ancestorsCount", len(ancestorsStatus),
"maxAllowedAncestors", maxNAncestors,
)
ancestorsStatus = ancestorsStatus[:maxNAncestors]
}
// Populate the KongUpstreamPolicy status with the ancestors' statuses.
policyStatus := gatewayapi.PolicyStatus{}
if len(ancestorsStatus) > 0 {
policyStatus.Ancestors = make([]gatewayapi.PolicyAncestorStatus, 0, len(ancestorsStatus))
}
for _, ss := range ancestorsStatus {
ancestorRef, err := ancestorRef(ss.namespacedName, ss.ancestorKind)
if err != nil {
return gatewayapi.PolicyStatus{}, fmt.Errorf("failed to build ancestor reference: %w", err)
}
policyStatus.Ancestors = append(policyStatus.Ancestors,
gatewayapi.PolicyAncestorStatus{
AncestorRef: ancestorRef,
ControllerName: gatewaycontroller.GetControllerName(),
Conditions: []metav1.Condition{
ss.acceptedCondition,
ss.programmedCondition,
},
},
)
}
return policyStatus, nil
}
func ancestorRef(nn k8stypes.NamespacedName, kind upstreamPolicyAncestorKind) (gatewayapi.ParentReference, error) {
switch kind {
case upstreamPolicyAncestorKindService:
return gatewayapi.ParentReference{
Group: lo.ToPtr(gatewayapi.Group("core")),
Kind: lo.ToPtr(gatewayapi.Kind("Service")),
Namespace: lo.ToPtr(gatewayapi.Namespace(nn.Namespace)),
Name: gatewayapi.ObjectName(nn.Name),
}, nil
case upstreamPolicyAncestorKindKongServiceFacade:
return gatewayapi.ParentReference{
Group: lo.ToPtr(gatewayapi.Group(incubatorv1alpha1.GroupVersion.Group)),
Kind: lo.ToPtr(gatewayapi.Kind(incubatorv1alpha1.KongServiceFacadeKind)),
Namespace: lo.ToPtr(gatewayapi.Namespace(nn.Namespace)),
Name: gatewayapi.ObjectName(nn.Name),
}, nil
}
return gatewayapi.ParentReference{}, fmt.Errorf("unknown ancestor kind %q", kind)
}
func isPolicyStatusUpdated(oldStatus, newStatus gatewayapi.PolicyStatus) bool {
if len(oldStatus.Ancestors) != len(newStatus.Ancestors) {
return false
}
for i, oldAncestor := range oldStatus.Ancestors {
newAncestor := newStatus.Ancestors[i]
if newAncestor.ControllerName != oldAncestor.ControllerName {
return false
}
if !reflect.DeepEqual(newAncestor.AncestorRef, oldAncestor.AncestorRef) {
return false
}
if len(oldAncestor.Conditions) != len(newAncestor.Conditions) {
return false
}
for j, oldCondition := range oldAncestor.Conditions {
newCondition := newAncestor.Conditions[j]
if newCondition.Type != oldCondition.Type ||
newCondition.Status != oldCondition.Status ||
newCondition.Reason != oldCondition.Reason ||
newCondition.Message != oldCondition.Message {
return false
}
}
}
return true
}
func backendRefToServiceRef(routeNamespace string, br gatewayapi.BackendRef) serviceKey {
if !isSupportedHTTPRouteBackendRef(br) {
return ""
}
namespace := routeNamespace
if br.Namespace != nil {
namespace = string(*br.Namespace)
}
return buildServiceReference(namespace, string(br.Name))
}
func buildServiceReference(namespace, name string) serviceKey {
return serviceKey(fmt.Sprintf("%s/%s", namespace, name))
}
func isSupportedHTTPRouteBackendRef(br gatewayapi.BackendRef) bool {
groupIsCoreOrNilOrEmpty := br.Group == nil || *br.Group == "core" || *br.Group == ""
kindIsServiceOrNil := br.Kind == nil || *br.Kind == "Service"
// We only support core Services.
// For Group the specification says when it's unspecified (nil or empty string), core API group should be inferred.
// For Kind nil case should never happen as it defaults on the API level to 'Service'. We can safely consider
// nil to be treated as 'Service' if it would happen for any reason.
return groupIsCoreOrNilOrEmpty && kindIsServiceOrNil
}