Skip to content

Commit

Permalink
feat: add Programmed condition to KongUpstreamPolicy ancestors status
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Jan 17, 2024
1 parent 3936d7d commit 756d405
Show file tree
Hide file tree
Showing 8 changed files with 439 additions and 29 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,10 @@ Adding a new version? You'll need three changes:
enforce `KongUpstreamPolicy` status.
The controller will set an ancestor status in `KongUpstreamPolicy` status for
each of its ancestors (i.e. `Service` or `KongServiceFacade`) with the `Accepted`
condition.
and `Programmed` condition.
[#5185](https://github.com/Kong/kubernetes-ingress-controller/pull/5185)
[#5428](https://github.com/Kong/kubernetes-ingress-controller/pull/5428)
[#5444](https://github.com/Kong/kubernetes-ingress-controller/pull/5444)
- New CRD `KongVault` to reperesent a custom Kong vault for storing senstive
data used in plugin configurations. Now users can create a `KongVault` to
create a custom Kong vault and reference the values in configurations of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/kong/kubernetes-ingress-controller/v3/internal/util/kubernetes/object/status"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
k8stypes "k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -37,6 +39,7 @@ type KongUpstreamPolicyReconciler struct {
Scheme *runtime.Scheme
DataplaneClient controllers.DataPlane
CacheSyncTimeout time.Duration
StatusQueue *status.Queue

// KongServiceFacadeEnabled determines whether the controller should populate the KongUpstreamPolicy's ancestor
// status for KongServiceFacades.
Expand Down Expand Up @@ -68,6 +71,15 @@ func (r *KongUpstreamPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error
return err
}

// Watch for HTTPRoute changes to trigger reconciliation for the KongUpstreamPolicies referenced by the Services
// of the HTTPRoute.
if err := c.Watch(
source.Kind(mgr.GetCache(), &gatewayapi.HTTPRoute{}),
handler.EnqueueRequestsFromMapFunc(r.getUpstreamPoliciesForHTTPRouteServices),
); err != nil {
return err
}

if r.KongServiceFacadeEnabled {
if err := c.Watch(
source.Kind(mgr.GetCache(), &incubatorv1alpha1.KongServiceFacade{}),
Expand All @@ -78,10 +90,40 @@ func (r *KongUpstreamPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error
}
}

return c.Watch(
if err := c.Watch(
source.Kind(mgr.GetCache(), &kongv1beta1.KongUpstreamPolicy{}),
&handler.EnqueueRequestForObject{},
)
); err != nil {
return err
}

if r.StatusQueue != nil {
// Watch for notifications on the status queue from Services and KongServiceFacades as their status change
// needs to be propagated to the KongUpstreamPolicy's ancestor Programmed status.
if err := c.Watch(
&source.Channel{Source: r.StatusQueue.Subscribe(schema.GroupVersionKind{
Version: "v1",
Kind: "Service",
})},
handler.EnqueueRequestsFromMapFunc(r.getUpstreamPolicyForObject),
predicate.NewPredicateFuncs(doesObjectReferUpstreamPolicy),
); err != nil {
return err
}
if err := c.Watch(
&source.Channel{Source: r.StatusQueue.Subscribe(schema.GroupVersionKind{
Version: incubatorv1alpha1.SchemeGroupVersion.Version,
Group: incubatorv1alpha1.SchemeGroupVersion.Group,
Kind: incubatorv1alpha1.KongServiceFacadeKind,
})},
handler.EnqueueRequestsFromMapFunc(r.getUpstreamPolicyForObject),
predicate.NewPredicateFuncs(doesObjectReferUpstreamPolicy),
); err != nil {
return err
}
}

return nil
}

func (r *KongUpstreamPolicyReconciler) setupIndices(mgr ctrl.Manager) error {
Expand Down Expand Up @@ -212,6 +254,56 @@ func (r *KongUpstreamPolicyReconciler) getUpstreamPolicyForObject(ctx context.Co
}
}

// getUpstreamPoliciesForHTTPRouteServices enqueues a new reconcile request for the KongUpstreamPolicies referenced by
// the Services of an HTTPRoute.
func (r *KongUpstreamPolicyReconciler) getUpstreamPoliciesForHTTPRouteServices(ctx context.Context, obj client.Object) []reconcile.Request {
httpRoute, ok := obj.(*gatewayapi.HTTPRoute)
if !ok {
return nil
}

var requests []reconcile.Request
for _, rule := range httpRoute.Spec.Rules {
for _, br := range rule.BackendRefs {
if !isSupportedHTTPRouteBackendRef(br.BackendRef) {
continue
}

namespace := httpRoute.Namespace
if br.BackendRef.Namespace != nil {
namespace = string(*br.BackendRef.Namespace)
}
service := &corev1.Service{}
if err := r.Client.Get(ctx, k8stypes.NamespacedName{
Namespace: namespace,
Name: string(br.BackendRef.Name),
}, service); err != nil {
if !apierrors.IsNotFound(err) {
r.Log.Error(err, "Failed to retrieve Service in watch predicates",
"Service", fmt.Sprintf("%s/%s", namespace, string(br.BackendRef.Name)),
)
}
continue
}

if service.Annotations == nil {
continue
}
upstreamPolicy, ok := service.Annotations[kongv1beta1.KongUpstreamPolicyAnnotationKey]
if !ok {
continue
}
requests = append(requests, reconcile.Request{
NamespacedName: k8stypes.NamespacedName{
Namespace: httpRoute.Namespace,
Name: upstreamPolicy,
},
})
}
}
return requests
}

// doesObjectReferUpstreamPolicy filters out all the objects not referencing KongUpstreamPolicies.
func doesObjectReferUpstreamPolicy(obj client.Object) bool {
annotations := obj.GetAnnotations()
Expand Down
70 changes: 52 additions & 18 deletions internal/controllers/configuration/kongupstreampolicy_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
incubatorv1alpha1 "github.com/kong/kubernetes-ingress-controller/v3/pkg/apis/incubator/v1alpha1"
)

// maxNAncestors is the maximum number of ancestors that can be stored in the KongUpstreamPolicy status.
// This is a limitation of the Gateway API.
const maxNAncestors = 16

// upstreamPolicyAncestorKind represents kind of KongUpstreamPolicy ancestor (Service or KongServiceFacade).
Expand All @@ -32,10 +34,11 @@ const (
// ancestorStatus represents the status of an ancestor (Service or KongServiceFacade).
// A collection of all ancestors' statuses is used to build the KongUpstreamPolicy status.
type ancestorStatus struct {
namespacedName k8stypes.NamespacedName
ancestorKind upstreamPolicyAncestorKind
acceptedCondition metav1.Condition
creationTimestamp metav1.Time
namespacedName k8stypes.NamespacedName
ancestorKind upstreamPolicyAncestorKind
acceptedCondition metav1.Condition
programmedCondition metav1.Condition
creationTimestamp metav1.Time
}

// serviceKey is used as a key for indexing Services by "namespace/name".
Expand Down Expand Up @@ -146,38 +149,61 @@ func (r *KongUpstreamPolicyReconciler) buildAncestorsStatus(
Reason: string(gatewayapi.PolicyReasonAccepted),
LastTransitionTime: metav1.Now(),
}
conflictedCondition := metav1.Condition{
Type: string(gatewayapi.PolicyConditionAccepted),
Status: metav1.ConditionFalse,
Reason: string(gatewayapi.PolicyReasonConflicted),
programmedCondition := metav1.Condition{
Type: string(gatewayapi.GatewayConditionProgrammed),
Status: metav1.ConditionTrue,
Reason: string(gatewayapi.GatewayReasonProgrammed),
LastTransitionTime: metav1.Now(),
}

// Build the status for each ancestor (Services and KongServiceFacades).
ancestorsStatus := make([]ancestorStatus, 0, len(services)+len(serviceFacades))
for _, service := range services {
condition := acceptedCondition
service := service
acceptedCondition := acceptedCondition
programmedCondition := programmedCondition

if _, isConflicted := conflictedServices[buildServiceReference(service.Namespace, service.Name)]; isConflicted {
// If the service is conflicted, we will use the conflictedCondition.
condition = conflictedCondition
// If the Service is conflicted, we change both conditions to False.
acceptedCondition.Status = metav1.ConditionFalse
acceptedCondition.Reason = string(gatewayapi.PolicyReasonConflicted)
programmedCondition.Status = metav1.ConditionFalse
programmedCondition.Reason = string(gatewayapi.GatewayReasonPending)
}

if !r.DataplaneClient.KubernetesObjectIsConfigured(&service) {
// If the Service is not configured, we change it to False.
programmedCondition.Status = metav1.ConditionFalse
programmedCondition.Reason = string(gatewayapi.GatewayReasonPending)
}

ancestorsStatus = append(ancestorsStatus, ancestorStatus{
namespacedName: k8stypes.NamespacedName{
Namespace: service.Namespace,
Name: service.Name,
},
ancestorKind: upstreamPolicyAncestorKindService,
acceptedCondition: condition,
ancestorKind: upstreamPolicyAncestorKindService,
acceptedCondition: acceptedCondition,
programmedCondition: programmedCondition,
})
}
for _, serviceFacade := range serviceFacades {
serviceFacade := serviceFacade
programmedCondition := programmedCondition
if !r.DataplaneClient.KubernetesObjectIsConfigured(&serviceFacade) {
// If the KongServiceFacade is not configured, we change it to False.
programmedCondition.Status = metav1.ConditionFalse
programmedCondition.Reason = string(gatewayapi.GatewayReasonPending)
}

ancestorsStatus = append(ancestorsStatus, ancestorStatus{
namespacedName: k8stypes.NamespacedName{
Namespace: serviceFacade.Namespace,
Name: serviceFacade.Name,
},
ancestorKind: upstreamPolicyAncestorKindKongServiceFacade,
acceptedCondition: acceptedCondition,
ancestorKind: upstreamPolicyAncestorKindKongServiceFacade,
acceptedCondition: acceptedCondition,
programmedCondition: programmedCondition,
})
}

Expand Down Expand Up @@ -262,6 +288,9 @@ func getAllBackendRefsUsedWithService(httpRoute gatewayapi.HTTPRoute, serviceKey
return backendRefs
}

// buildPolicyStatus builds the KongUpstreamPolicy status from the ancestors' statuses.
// It ensures that the number of ancestors is not greater than the maximum allowed by the Gateway API
// and that the oldest ancestors are kept.
func (r *KongUpstreamPolicyReconciler) buildPolicyStatus(
upstreamPolicyNN k8stypes.NamespacedName,
ancestorsStatus []ancestorStatus,
Expand All @@ -271,7 +300,7 @@ func (r *KongUpstreamPolicyReconciler) buildPolicyStatus(
return ancestorsStatus[i].creationTimestamp.Before(&ancestorsStatus[j].creationTimestamp)
})
if len(ancestorsStatus) > maxNAncestors {
r.Log.Info("status has too many ancestors, the newest ones will be ignored",
r.Log.Info("status has more ancestors than the Gateway API permits, the newest ones will be ignored",
"KongUpstreamPolicy", upstreamPolicyNN.String(),
"ancestorsCount", len(ancestorsStatus),
"maxAllowedAncestors", maxNAncestors,
Expand All @@ -295,6 +324,7 @@ func (r *KongUpstreamPolicyReconciler) buildPolicyStatus(
ControllerName: gatewaycontroller.GetControllerName(),
Conditions: []metav1.Condition{
ss.acceptedCondition,
ss.programmedCondition,
},
},
)
Expand Down Expand Up @@ -370,8 +400,12 @@ func buildServiceReference(namespace, name string) serviceKey {
}

func isSupportedHTTPRouteBackendRef(br gatewayapi.BackendRef) bool {
groupIsCoreOrNil := br.Group == nil || *br.Group == "core"
groupIsCoreOrNilOrEmpty := br.Group == nil || *br.Group == "core" || *br.Group == ""
kindIsServiceOrNil := br.Kind == nil || *br.Kind == "Service"

// We only support core Services.
return groupIsCoreOrNil && kindIsServiceOrNil
// For Group the specification says when it's unspecified (nil or empty string), core API group should be inferred.
// For Kind nil case should never happen as it defaults on the API level to 'Service'. We can safely consider
// nil to be treated as 'Service' if it would happen for any reason.
return groupIsCoreOrNilOrEmpty && kindIsServiceOrNil
}
Loading

0 comments on commit 756d405

Please sign in to comment.