Skip to content

Commit

Permalink
feat: support ServiceFacade as KongUpstreamPolicy ancestor
Browse files Browse the repository at this point in the history
  • Loading branch information
czeslavo committed Jan 12, 2024
1 parent 9caec73 commit e327869
Show file tree
Hide file tree
Showing 5 changed files with 524 additions and 141 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,12 @@ Adding a new version? You'll need three changes:
- using filters in backendRefs of rules
[#5312](https://github.com/Kong/kubernetes-ingress-controller/pull/5312)
- Added functionality to the `KongUpstreamPolicy` controller to properly set and
enforce `KongUpstreamPolicy` status.
enforce `KongUpstreamPolicy` status.
The controller will set an ancestor status in `KongUpstreamPolicy` status for
each of its ancestors (i.e. `Service` or `KongServiceFacade`) with the `Accepted`
condition.
[#5185](https://github.com/Kong/kubernetes-ingress-controller/pull/5185)
[#5428](https://github.com/Kong/kubernetes-ingress-controller/pull/5428)
- New CRD `KongVault` to reperesent a custom Kong vault for storing senstive
data used in plugin configurations. Now users can create a `KongVault` to
create a custom Kong vault and reference the values in configurations of
Expand Down
112 changes: 83 additions & 29 deletions internal/controllers/configuration/kongupstreampolicy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/kong/kubernetes-ingress-controller/v3/internal/gatewayapi"
"github.com/kong/kubernetes-ingress-controller/v3/internal/util"
kongv1beta1 "github.com/kong/kubernetes-ingress-controller/v3/pkg/apis/configuration/v1beta1"
incubatorv1alpha1 "github.com/kong/kubernetes-ingress-controller/v3/pkg/apis/incubator/v1alpha1"
)

// -----------------------------------------------------------------------------
Expand All @@ -36,6 +37,10 @@ type KongUpstreamPolicyReconciler struct {
Scheme *runtime.Scheme
DataplaneClient controllers.DataPlane
CacheSyncTimeout time.Duration

// KongServiceFacadeEnabled determines whether the controller should populate the KongUpstreamPolicy's ancestor
// status for KongServiceFacades.
KongServiceFacadeEnabled bool
}

// SetupWithManager sets up the controller with the Manager.
Expand All @@ -51,28 +56,67 @@ func (r *KongUpstreamPolicyReconciler) SetupWithManager(mgr ctrl.Manager) error
return err
}

if err := mgr.GetCache().IndexField(context.Background(), &corev1.Service{}, upstreamPolicyIndexKey, indexServicesOnUpstreamPolicyAnnotation); err != nil {
return err
}

if err := mgr.GetCache().IndexField(context.Background(), &gatewayapi.HTTPRoute{}, routeBackendRefServiceNameIndexKey, indexRoutesOnBackendRefServiceName); err != nil {
if err := r.setupIndices(mgr); err != nil {
return err
}

if err := c.Watch(
source.Kind(mgr.GetCache(), &corev1.Service{}),
handler.EnqueueRequestsFromMapFunc(r.getUpstreamPolicyForService),
predicate.NewPredicateFuncs(doesServiceUseUpstreamPolicy),
handler.EnqueueRequestsFromMapFunc(r.getUpstreamPolicyForObject),
predicate.NewPredicateFuncs(doesObjectReferUpstreamPolicy),
); err != nil {
return err
}

if r.KongServiceFacadeEnabled {
if err := c.Watch(
source.Kind(mgr.GetCache(), &incubatorv1alpha1.KongServiceFacade{}),
handler.EnqueueRequestsFromMapFunc(r.getUpstreamPolicyForObject),
predicate.NewPredicateFuncs(doesObjectReferUpstreamPolicy),
); err != nil {
return err
}
}

return c.Watch(
source.Kind(mgr.GetCache(), &kongv1beta1.KongUpstreamPolicy{}),
&handler.EnqueueRequestForObject{},
)
}

func (r *KongUpstreamPolicyReconciler) setupIndices(mgr ctrl.Manager) error {
if err := mgr.GetCache().IndexField(
context.Background(),
&corev1.Service{},
upstreamPolicyIndexKey,
indexServicesOnUpstreamPolicyAnnotation,
); err != nil {
return fmt.Errorf("failed to index services on annotation %s: %w", kongv1beta1.KongUpstreamPolicyAnnotationKey, err)
}

if err := mgr.GetCache().IndexField(
context.Background(),
&gatewayapi.HTTPRoute{},
routeBackendRefServiceNameIndexKey,
indexRoutesOnBackendRefServiceName,
); err != nil {
return fmt.Errorf("failed to index HTTPRoutes on backendReferences: %w", err)
}

if r.KongServiceFacadeEnabled {
if err := mgr.GetCache().IndexField(
context.Background(),
&incubatorv1alpha1.KongServiceFacade{},
upstreamPolicyIndexKey,
indexServiceFacadesOnUpstreamPolicyAnnotation,
); err != nil {
return fmt.Errorf("failed to index KongServiceFacades on annotation %s: %w", kongv1beta1.KongUpstreamPolicyAnnotationKey, err)
}
}

return nil
}

// -----------------------------------------------------------------------------
// KongUpstreamPolicy Controller - Indexers
// -----------------------------------------------------------------------------
Expand Down Expand Up @@ -103,69 +147,78 @@ func indexRoutesOnBackendRefServiceName(o client.Object) []string {
return []string{}
}

indexes := []string{}
var indexes []string
for _, rule := range httpRoute.Spec.Rules {
for _, br := range rule.BackendRefs {
serviceRef := backendRefToServiceRef(httpRoute.Namespace, br.BackendRef)
if serviceRef == "" {
continue
}
indexes = append(indexes, serviceRef)
indexes = append(indexes, string(serviceRef))
}
}
return indexes
}

// indexServiceFacadesOnUpstreamPolicyAnnotation indexes the KongServiceFacades on the annotation konghq.com/upstream-policy.
func indexServiceFacadesOnUpstreamPolicyAnnotation(o client.Object) []string {
service, ok := o.(*incubatorv1alpha1.KongServiceFacade)
if !ok {
return []string{}
}
if service.Annotations != nil {
if policy, ok := service.Annotations[kongv1beta1.KongUpstreamPolicyAnnotationKey]; ok {
return []string{policy}
}
}
return []string{}
}

// -----------------------------------------------------------------------------
// KongUpstreamPolicy Controller - Watch Predicates
// -----------------------------------------------------------------------------

// getUpstreamPolicyForService enqueue a new reconcile request for the KongUpstreamPolicy
// referenced by a service.
func (r *KongUpstreamPolicyReconciler) getUpstreamPolicyForService(ctx context.Context, obj client.Object) []reconcile.Request {
service, ok := obj.(*corev1.Service)
if !ok {
return nil
}
if service.Annotations == nil {
// getUpstreamPolicyForObject enqueues a new reconcile request for the KongUpstreamPolicy referenced by an object.
func (r *KongUpstreamPolicyReconciler) getUpstreamPolicyForObject(ctx context.Context, obj client.Object) []reconcile.Request {
annotations := obj.GetAnnotations()
if annotations == nil {
return nil
}
policyName, ok := service.Annotations[kongv1beta1.KongUpstreamPolicyAnnotationKey]
policyName, ok := annotations[kongv1beta1.KongUpstreamPolicyAnnotationKey]
if !ok {
return nil
}

kongUpstreamPolicy := &kongv1beta1.KongUpstreamPolicy{}
if err := r.Get(ctx, k8stypes.NamespacedName{
Namespace: service.Namespace,
Namespace: obj.GetNamespace(),
Name: policyName,
}, kongUpstreamPolicy); err != nil {
if !apierrors.IsNotFound(err) {
r.Log.Error(err, "Failed to retrieve KongUpstreamPolicy in watch predicates", "KongUpstreamPolicy", fmt.Sprintf("%s/%s", service.Namespace, policyName))
r.Log.Error(err, "Failed to retrieve KongUpstreamPolicy in watch predicates",
"KongUpstreamPolicy", fmt.Sprintf("%s/%s", obj.GetNamespace(), policyName),
)
}
return []reconcile.Request{}
}

return []reconcile.Request{
{
NamespacedName: k8stypes.NamespacedName{
Namespace: service.Namespace,
Namespace: obj.GetNamespace(),
Name: policyName,
},
},
}
}

// doesServiceUseUpstreamPolicy filters out all the services not referencing KongUpstreamPolicies.
func doesServiceUseUpstreamPolicy(obj client.Object) bool {
service, ok := obj.(*corev1.Service)
if !ok {
return false
}
if service.Annotations == nil {
// doesObjectReferUpstreamPolicy filters out all the objects not referencing KongUpstreamPolicies.
func doesObjectReferUpstreamPolicy(obj client.Object) bool {
annotations := obj.GetAnnotations()
if annotations == nil {
return false
}
_, ok = service.Annotations[kongv1beta1.KongUpstreamPolicyAnnotationKey]
_, ok := annotations[kongv1beta1.KongUpstreamPolicyAnnotationKey]
return ok
}

Expand All @@ -177,6 +230,7 @@ func doesServiceUseUpstreamPolicy(obj client.Object) bool {
// +kubebuilder:rbac:groups=configuration.konghq.com,resources=kongupstreampolicies/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=httproutes,verbs=get;list;watch
// +kubebuilder:rbac:groups="",resources=services,verbs=get;list;watch
// +kubebuilder:rbac:groups=incubator.ingress-controller.konghq.com,resources=kongservicefacades,verbs=get;list;watch

// Reconcile processes the watched objects.
func (r *KongUpstreamPolicyReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down

0 comments on commit e327869

Please sign in to comment.