Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠ Apply predicates on handler instead of passing them to the source #1

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/builtins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.EnqueueRequestForObject{})); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.TypedEnqueueRequestForObject[*appsv1.ReplicaSet]{})); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{},
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
handler.TypedEnqueueRequestForOwner[*corev1.Pod](mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()))); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
57 changes: 30 additions & 27 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -56,6 +55,7 @@ const (
type Builder struct {
forInput ForInput
ownsInput []OwnsInput
rawSources []source.Source
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
Expand Down Expand Up @@ -123,7 +123,8 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
obj client.Object
handler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}
Expand All @@ -132,15 +133,19 @@ type WatchesInput struct {
// update events by *reconciling the object* with the given EventHandler.
//
// This is the equivalent of calling
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
// WatchesRawSource(source.Kind(cache, object, eventHandler, predicates...)).
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{}
input := WatchesInput{
obj: object,
handler: eventHandler,
}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}
src := source.Kind(blder.mgr.GetCache(), object, eventHandler, input.predicates...)

return blder.WatchesRawSource(src, opts...)
blder.watchesInput = append(blder.watchesInput, input)

return blder
}

// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
Expand Down Expand Up @@ -180,13 +185,11 @@ func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler
//
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
func (blder *Builder) WatchesRawSource(src source.Source, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}
//
// WatchesRawSource does not respect predicates configured through WithEventFilter.
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
blder.rawSources = append(blder.rawSources, src)

blder.watchesInput = append(blder.watchesInput, input)
return blder
}

Expand Down Expand Up @@ -279,7 +282,7 @@ func (blder *Builder) doWatch() error {
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
src := source.Kind(blder.mgr.GetCache(), obj, handler.WithPredicates(hdler, allPredicates...))
if err := blder.ctrl.Watch(src); err != nil {
return err
}
Expand All @@ -305,29 +308,29 @@ func (blder *Builder) doWatch() error {
)
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
src := source.Kind(blder.mgr.GetCache(), obj, handler.WithPredicates(hdler, allPredicates...))
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}

// Do the watch requests
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
if len(blder.watchesInput) == 0 && blder.forInput.object == nil && len(blder.rawSources) == 0 {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
if srcKind, ok := w.src.(*internalsource.Kind); ok {
allPredicates := append(allPredicates, w.predicates...)
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
}
srcKind.Type = typeForSrc
srcKind.Predicates = append(srcKind.Predicates, allPredicates...)
projected, err := blder.project(w.obj, w.objectProjection)
if err != nil {
return fmt.Errorf("failed to project for %T: %w", w.obj, err)
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(source.Kind(blder.mgr.GetCache(), projected, handler.WithPredicates(w.handler, allPredicates...))); err != nil {
return err
}
if err := blder.ctrl.Watch(w.src); err != nil {
}
for _, src := range blder.rawSources {
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ var _ = Describe("application", func() {
instance, err := ControllerManagedBy(m).
Named("my_controller").
Build(noop)
Expect(err).To(MatchError(ContainSubstring("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")))
Expect(err).To(MatchError(ContainSubstring("there are no watches configured, controller will never get triggered. Use For(), Owns(), Watches() or WatchesRawSource() to set them up")))
Expect(instance).To(BeNil())
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ var _ = Describe("controller", func() {
By("Watching Resources")
err = instance.Watch(
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{},
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
handler.TypedEnqueueRequestForOwner[*appsv1.ReplicaSet](cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
),
)
Expect(err).NotTo(HaveOccurred())

err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}))
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.TypedEnqueueRequestForObject[*appsv1.Deployment]{}))
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = Describe("controller.Controller", func() {

ctx, cancel := context.WithCancel(context.Background())
watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan, Handler: &handler.EnqueueRequestForObject{}}
watch := source.Channel(watchChan, &handler.EnqueueRequestForObject{})
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func ExampleController() {
}

// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}))
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -108,7 +108,7 @@ func ExampleController_unstructured() {
Version: "v1",
})
// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.EnqueueRequestForObject{}))
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.TypedEnqueueRequestForObject[*unstructured.Unstructured]{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
os.Exit(1)
}

if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.TypedEnqueueRequestForObject[*corev1.Pod]{})); err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
}
Expand Down
51 changes: 34 additions & 17 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,55 @@ package event

import "sigs.k8s.io/controller-runtime/pkg/client"

// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
// CreateEvent is an event where a Kubernetes object was created. CreateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by a handler.EventHandler.
type CreateEvent = TypedCreateEvent[client.Object]

// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type UpdateEvent = TypedUpdateEvent[client.Object]

// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type CreateEvent struct {
type DeleteEvent = TypedDeleteEvent[client.Object]

// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.EventHandler.
type GenericEvent = TypedGenericEvent[client.Object]

// TypedCreateEvent is an event where a Kubernetes object was created. TypedCreateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedCreateEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T
}

// UpdateEvent is an event where a Kubernetes object was updated. UpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type UpdateEvent struct {
// TypedUpdateEvent is an event where a Kubernetes object was updated. TypedUpdateEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedUpdateEvent[T any] struct {
// ObjectOld is the object from the event
ObjectOld client.Object
ObjectOld T

// ObjectNew is the object from the event
ObjectNew client.Object
ObjectNew T
}

// DeleteEvent is an event where a Kubernetes object was deleted. DeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.EventHandler.
type DeleteEvent struct {
// TypedDeleteEvent is an event where a Kubernetes object was deleted. TypedDeleteEvent should be generated
// by a source.Source and transformed into a reconcile.Request by an handler.TypedEventHandler.
type TypedDeleteEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T

// DeleteStateUnknown is true if the Delete event was missed but we identified the object
// as having been deleted.
DeleteStateUnknown bool
}

// GenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// GenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.EventHandler.
type GenericEvent struct {
// TypedGenericEvent is an event where the operation type is unknown (e.g. polling or event originating outside the cluster).
// TypedGenericEvent should be generated by a source.Source and transformed into a reconcile.Request by an
// handler.TypedEventHandler.
type TypedGenericEvent[T any] struct {
// Object is the object from the event
Object client.Object
Object T
}
41 changes: 30 additions & 11 deletions pkg/handler/enqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ package handler

import (
"context"
"reflect"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -33,13 +35,18 @@ type empty struct{}
var _ EventHandler = &EnqueueRequestForObject{}

// EnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
// (e.g. the created / deleted / updated objects Name and Namespace). handler.EnqueueRequestForObject is used by almost all
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
type EnqueueRequestForObject struct{}
type EnqueueRequestForObject = TypedEnqueueRequestForObject[client.Object]

// TypedEnqueueRequestForObject enqueues a Request containing the Name and Namespace of the object that is the source of the Event.
// (e.g. the created / deleted / updated objects Name and Namespace). handler.TypedEnqueueRequestForObject is used by almost all
// Controllers that have associated Resources (e.g. CRDs) to reconcile the associated Resource.
type TypedEnqueueRequestForObject[T client.Object] struct{}

// Create implements EventHandler.
func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
func (e *TypedEnqueueRequestForObject[T]) Create(ctx context.Context, evt event.TypedCreateEvent[T], q workqueue.RateLimitingInterface) {
if isNil(evt.Object) {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
Expand All @@ -50,14 +57,14 @@ func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEv
}

// Update implements EventHandler.
func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) {
func (e *TypedEnqueueRequestForObject[T]) Update(ctx context.Context, evt event.TypedUpdateEvent[T], q workqueue.RateLimitingInterface) {
switch {
case evt.ObjectNew != nil:
case !isNil(evt.ObjectNew):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectNew.GetName(),
Namespace: evt.ObjectNew.GetNamespace(),
}})
case evt.ObjectOld != nil:
case !isNil(evt.ObjectOld):
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.ObjectOld.GetName(),
Namespace: evt.ObjectOld.GetNamespace(),
Expand All @@ -68,8 +75,8 @@ func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEv
}

// Delete implements EventHandler.
func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
func (e *TypedEnqueueRequestForObject[T]) Delete(ctx context.Context, evt event.TypedDeleteEvent[T], q workqueue.RateLimitingInterface) {
if isNil(evt.Object) {
enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt)
return
}
Expand All @@ -80,8 +87,8 @@ func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEv
}

// Generic implements EventHandler.
func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.GenericEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
func (e *TypedEnqueueRequestForObject[T]) Generic(ctx context.Context, evt event.TypedGenericEvent[T], q workqueue.RateLimitingInterface) {
if isNil(evt.Object) {
enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt)
return
}
Expand All @@ -90,3 +97,15 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic
Namespace: evt.Object.GetNamespace(),
}})
}

func isNil(arg any) bool {
if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr ||
v.Kind() == reflect.Interface ||
v.Kind() == reflect.Slice ||
v.Kind() == reflect.Map ||
v.Kind() == reflect.Chan ||
v.Kind() == reflect.Func) && v.IsNil()) {
return true
}
return false
}
Loading