Skip to content

Commit

Permalink
refactor(operator): Use predicates to filter out events added to a wo…
Browse files Browse the repository at this point in the history
…rker queue (#352)

Signed-off-by: Daniel Pacak <pacak.daniel@gmail.com>
  • Loading branch information
danielpacak committed Jan 19, 2021
1 parent 0d45462 commit 3a548ef
Show file tree
Hide file tree
Showing 7 changed files with 365 additions and 68 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ itests-starboard-operator: check-env get-ginkgo
--v \
-coverprofile=coverage.txt \
-coverpkg=github.com/aquasecurity/starboard/pkg/operator,\
github.com/aquasecurity/starboard/pkg/operator/predicate,\
github.com/aquasecurity/starboard/pkg/operator/controller,\
github.com/aquasecurity/starboard/pkg/operator/controller/job,\
github.com/aquasecurity/starboard/pkg/operator/controller/pod,\
Expand Down
28 changes: 10 additions & 18 deletions pkg/operator/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@ import (
"context"
"fmt"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aquasecurity/starboard/pkg/operator/controller"

"github.com/aquasecurity/starboard/pkg/kube"
pods "github.com/aquasecurity/starboard/pkg/kube/pod"
"github.com/aquasecurity/starboard/pkg/operator/controller"
"github.com/aquasecurity/starboard/pkg/operator/etc"
"github.com/aquasecurity/starboard/pkg/operator/predicate"
"github.com/aquasecurity/starboard/pkg/resources"
"k8s.io/apimachinery/pkg/api/errors"

batchv1 "k8s.io/api/batch/v1"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -34,11 +32,6 @@ type JobController struct {
func (r *JobController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.WithValues("job", req.NamespacedName)

if req.Namespace != r.Namespace {
log.V(1).Info("Ignoring Job not managed by this operator")
return ctrl.Result{}, nil
}

job := &batchv1.Job{}
err := r.Client.Get(ctx, req.NamespacedName, job)
if err != nil {
Expand All @@ -49,11 +42,6 @@ func (r *JobController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, fmt.Errorf("getting job from cache: %w", err)
}

if len(job.Status.Conditions) == 0 {
log.V(1).Info("Ignoring Job without status conditions")
return ctrl.Result{}, nil
}

switch jobCondition := job.Status.Conditions[0].Type; jobCondition {
case batchv1.JobComplete:
err = r.processCompleteScanJob(ctx, job)
Expand Down Expand Up @@ -125,6 +113,10 @@ func (r *JobController) processFailedScanJob(ctx context.Context, scanJob *batch

func (r *JobController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&batchv1.Job{}).
For(&batchv1.Job{}, builder.WithPredicates(
predicate.InNamespace(r.Config.Namespace),
predicate.ManagedByStarboardOperator,
predicate.JobHasAnyCondition,
)).
Complete(r)
}
44 changes: 10 additions & 34 deletions pkg/operator/controller/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ import (
"fmt"

"github.com/aquasecurity/starboard/pkg/operator/controller"

"k8s.io/apimachinery/pkg/types"

"github.com/aquasecurity/starboard/pkg/resources"

"github.com/aquasecurity/starboard/pkg/operator/etc"
. "github.com/aquasecurity/starboard/pkg/operator/predicate"
"github.com/aquasecurity/starboard/pkg/resources"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -38,6 +37,7 @@ func (r *PodController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, fmt.Errorf("getting install mode: %w", err)
}

// TODO Consider using a predicate
if r.IgnorePodInOperatorNamespace(installMode, req.NamespacedName) {
log.V(1).Info("Ignoring Pod run in the operator namespace")
return ctrl.Result{}, nil
Expand All @@ -53,24 +53,6 @@ func (r *PodController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, fmt.Errorf("getting pod from cache: %w", err)
}

// Check if the Pod is managed by the operator, i.e. is controlled by a scan Job created by the PodController.
if IsPodManagedByStarboardOperator(pod) {
log.V(1).Info("Ignoring Pod managed by this operator")
return ctrl.Result{}, nil
}

// Check if the Pod is being terminated.
if pod.DeletionTimestamp != nil {
log.V(1).Info("Ignoring Pod that is being terminated")
return ctrl.Result{}, nil
}

// Check if the Pod containers are ready.
if !resources.HasContainersReadyCondition(pod) {
log.V(1).Info("Ignoring Pod that is being scheduled")
return ctrl.Result{}, nil
}

owner := resources.GetImmediateOwnerReference(pod)
containerImages := resources.GetContainerImagesFromPodSpec(pod.Spec)
hash := resources.ComputeHash(pod.Spec)
Expand Down Expand Up @@ -143,19 +125,13 @@ func (r *PodController) IgnorePodInOperatorNamespace(installMode etc.InstallMode
return false
}

// IsPodManagedByStarboardOperator returns true if the specified Pod
// is managed by the Starboard Operator, false otherwise.
//
// We define managed Pods as ones controlled by Jobs created by the Starboard Operator.
// They're labeled with `app.kubernetes.io/managed-by=starboard-operator`.
func IsPodManagedByStarboardOperator(pod *corev1.Pod) bool {
managedBy, exists := pod.Labels["app.kubernetes.io/managed-by"]
return exists && managedBy == "starboard-operator"
}

func (r *PodController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Pod{}).
For(&corev1.Pod{}, builder.WithPredicates(
Not(ManagedByStarboardOperator),
Not(PodBeingTerminated),
PodHasContainersReadyCondition,
)).
Complete(r)
}

Expand Down
78 changes: 78 additions & 0 deletions pkg/operator/predicate/predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package predicate

import (
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// InNamespace is a predicate.Predicate that returns true if the
// specified client.Object is in the desired namespace.
var InNamespace = func(namespace string) predicate.Predicate {
return predicate.NewPredicateFuncs(func(obj client.Object) bool {
return namespace == obj.GetNamespace()
})
}

// ManagedByStarboardOperator is a predicate.Predicate that returns true if the
// specified client.Object is managed by the Starboard Operator.
//
// For example, pods controlled by jobs scheduled by Starboard Operator are
// labeled with `app.kubernetes.io/managed-by=starboard-operator`.
var ManagedByStarboardOperator = predicate.NewPredicateFuncs(func(obj client.Object) bool {
if managedBy, ok := obj.GetLabels()["app.kubernetes.io/managed-by"]; ok {
return managedBy == "starboard-operator"
}
return false
})

// PodHasContainersReadyCondition is a predicate.Predicate that returns true if the
// specified client.Object is a corev1.Pod with corev1.ContainersReady condition.
var PodHasContainersReadyCondition = predicate.NewPredicateFuncs(func(obj client.Object) bool {
if pod, ok := obj.(*corev1.Pod); ok {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.ContainersReady {
return true
}
}
}
return false
})

// PodBeingTerminated is a predicate.Predicate that returns true if the specified
// client.Object is a corev1.Pod that is being terminated, i.e. its
// DeletionTimestamp property is set to non nil value.
var PodBeingTerminated = predicate.NewPredicateFuncs(func(obj client.Object) bool {
if pod, ok := obj.(*corev1.Pod); ok {
return pod.DeletionTimestamp != nil
}
return false
})

// JobHasConditions is a predicate.Predicate that returns true if the
// specified client.Object is a batchv1.Job with any batchv1.JobConditionType.
var JobHasAnyCondition = predicate.NewPredicateFuncs(func(obj client.Object) bool {
if job, ok := obj.(*batchv1.Job); ok {
return len(job.Status.Conditions) > 0
}
return false
})

func Not(p predicate.Predicate) predicate.Predicate {
return predicate.Funcs{
CreateFunc: func(event event.CreateEvent) bool {
return !p.Create(event)
},
DeleteFunc: func(event event.DeleteEvent) bool {
return !p.Delete(event)
},
UpdateFunc: func(event event.UpdateEvent) bool {
return !p.Update(event)
},
GenericFunc: func(event event.GenericEvent) bool {
return !p.Generic(event)
},
}
}
20 changes: 20 additions & 0 deletions pkg/operator/predicate/predicate_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package predicate_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

func TestPredicate(t *testing.T) {
RegisterFailHandler(Fail)
suiteName := "Predicate Suite"
RunSpecs(t, suiteName)
}

var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))
})

0 comments on commit 3a548ef

Please sign in to comment.