Skip to content

Commit

Permalink
feat(operator): Limit the number of concurrent scan jobs (#232)
Browse files Browse the repository at this point in the history
Resolves: #202

Signed-off-by: Daniel Pacak <pacak.daniel@gmail.com>
  • Loading branch information
danielpacak committed Oct 27, 2020
1 parent cb78a84 commit 50d179c
Show file tree
Hide file tree
Showing 13 changed files with 427 additions and 322 deletions.
32 changes: 21 additions & 11 deletions cmd/starboard-operator/main.go
Expand Up @@ -5,6 +5,8 @@ import (
"errors"
"fmt"

"github.com/aquasecurity/starboard/pkg/operator/controller"

"github.com/aquasecurity/starboard/pkg/vulnerabilityreport"

"github.com/aquasecurity/starboard/pkg/ext"
Expand Down Expand Up @@ -178,24 +180,32 @@ func run() error {
return err
}

analyzer := controller.NewAnalyzer(operatorConfig.Operator,
store,
mgr.GetClient())

reconciler := controller.NewReconciler(mgr.GetScheme(),
operatorConfig.Operator,
mgr.GetClient(),
store,
idGenerator,
scanner,
logs.NewReader(kubernetesClientset))

if err = (&pod.PodController{
Config: operatorConfig.Operator,
Client: mgr.GetClient(),
IDGenerator: idGenerator,
Store: store,
Scanner: scanner,
Scheme: mgr.GetScheme(),
Operator: operatorConfig.Operator,
Client: mgr.GetClient(),
Analyzer: analyzer,
Reconciler: reconciler,
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to create pod controller: %w", err)
}

if err = (&job.JobController{
Config: operatorConfig.Operator,
LogsReader: logs.NewReader(kubernetesClientset),
Operator: operatorConfig.Operator,
Client: mgr.GetClient(),
Store: store,
Scanner: scanner,
Scheme: mgr.GetScheme(),
Analyzer: analyzer,
Reconciler: reconciler,
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to create job controller: %w", err)
}
Expand Down
4 changes: 4 additions & 0 deletions deploy/kubectl/05-starboard-operator.deployment.yaml
Expand Up @@ -38,6 +38,10 @@ spec:
value: "starboard-operator"
- name: OPERATOR_TARGET_NAMESPACES
value: "default"
- name: OPERATOR_CONCURRENT_SCAN_JOBS_LIMIT
value: "3"
- name: OPERATOR_SCAN_JOB_RETRY_AFTER
value: "30s"
- name: OPERATOR_SCANNER_TRIVY_ENABLED
value: "true"
- name: OPERATOR_SCANNER_TRIVY_VERSION
Expand Down
94 changes: 94 additions & 0 deletions pkg/operator/controller/analyzer.go
@@ -0,0 +1,94 @@
package controller

import (
"context"
"fmt"
"reflect"

"github.com/aquasecurity/starboard/pkg/kube"
"github.com/aquasecurity/starboard/pkg/operator/etc"
"github.com/aquasecurity/starboard/pkg/vulnerabilityreport"
batchv1 "k8s.io/api/batch/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type Analyzer interface {
HasVulnerabilityReports(ctx context.Context, owner kube.Object, images kube.ContainerImages, hash string) (bool, error)
GetActiveScanJob(ctx context.Context, owner kube.Object, hash string) (*batchv1.Job, error)
IsConcurrentScanJobsLimitExceeded(ctx context.Context) (bool, int, error)
}

func NewAnalyzer(config etc.Operator, store vulnerabilityreport.StoreInterface, client client.Client) Analyzer {
return &analyzer{
config: config,
store: store,
client: client,
}
}

type analyzer struct {
config etc.Operator
client client.Client
store vulnerabilityreport.StoreInterface
}

func (a *analyzer) HasVulnerabilityReports(ctx context.Context, owner kube.Object, images kube.ContainerImages, hash string) (bool, error) {
list, err := a.store.FindByOwner(ctx, owner)
if err != nil {
return false, err
}

actual := map[string]bool{}
for _, report := range list {
if containerName, ok := report.Labels[kube.LabelContainerName]; ok {
if hash == report.Labels[kube.LabelPodSpecHash] {
actual[containerName] = true
}
}
}

expected := map[string]bool{}
for containerName, _ := range images {
expected[containerName] = true
}

return reflect.DeepEqual(actual, expected), nil
}

func (a *analyzer) GetActiveScanJob(ctx context.Context, owner kube.Object, hash string) (*batchv1.Job, error) {
jobList := &batchv1.JobList{}
err := a.client.List(ctx, jobList, client.MatchingLabels{
kube.LabelResourceNamespace: owner.Namespace,
kube.LabelResourceKind: string(owner.Kind),
kube.LabelResourceName: owner.Name,
kube.LabelPodSpecHash: hash,
}, client.InNamespace(a.config.Namespace))
if err != nil {
return nil, fmt.Errorf("listing scan jobs: %w", err)
}
if len(jobList.Items) > 0 {
return jobList.Items[0].DeepCopy(), nil
}
return nil, nil
}

func (a *analyzer) IsConcurrentScanJobsLimitExceeded(ctx context.Context) (bool, int, error) {
scanJobsCount, err := a.countScanJobs(ctx, a.config.Namespace)
if err != nil {
return false, 0, err
}

return scanJobsCount >= a.config.ConcurrentScanJobsLimit, scanJobsCount, nil
}

func (a *analyzer) countScanJobs(ctx context.Context, namespace string) (int, error) {
var scanJobs batchv1.JobList
err := a.client.List(ctx, &scanJobs, client.MatchingLabels{
"app.kubernetes.io/managed-by": "starboard-operator",
}, client.InNamespace(namespace))
if err != nil {
return 0, err
}

return len(scanJobs.Items), nil
}
1 change: 1 addition & 0 deletions pkg/operator/controller/analyzer_test.go
@@ -0,0 +1 @@
package controller_test
33 changes: 0 additions & 33 deletions pkg/operator/controller/controllers.go

This file was deleted.

133 changes: 16 additions & 117 deletions pkg/operator/controller/job/job_controller.go
Expand Up @@ -4,28 +4,18 @@ import (
"context"
"fmt"

appsv1 "k8s.io/api/apps/v1"
"k8s.io/api/batch/v1beta1"
"k8s.io/apimachinery/pkg/types"
"github.com/aquasecurity/starboard/pkg/operator/controller"

"github.com/aquasecurity/starboard/pkg/vulnerabilityreport"

"github.com/aquasecurity/starboard/pkg/operator/resources"
"github.com/aquasecurity/starboard/pkg/resources"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/aquasecurity/starboard/pkg/apis/aquasecurity/v1alpha1"
"github.com/aquasecurity/starboard/pkg/kube"
pods "github.com/aquasecurity/starboard/pkg/kube/pod"
"github.com/aquasecurity/starboard/pkg/operator/etc"
"github.com/aquasecurity/starboard/pkg/operator/logs"
"github.com/aquasecurity/starboard/pkg/operator/scanner"
corev1 "k8s.io/api/core/v1"

"github.com/aquasecurity/starboard/pkg/kube"
"k8s.io/apimachinery/pkg/api/errors"

batchv1 "k8s.io/api/batch/v1"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -35,19 +25,17 @@ var (
)

type JobController struct {
Config etc.Operator
Client client.Client
LogsReader *logs.Reader
Scheme *runtime.Scheme
Scanner scanner.VulnerabilityScanner
Store vulnerabilityreport.StoreInterface
etc.Operator
client.Client
controller.Analyzer
controller.Reconciler
}

func (r *JobController) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := log.WithValues("job", req.NamespacedName)

if req.Namespace != r.Config.Namespace {
if req.Namespace != r.Namespace {
log.V(1).Info("Ignoring Job not managed by this operator")
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -81,7 +69,7 @@ func (r *JobController) Reconcile(req ctrl.Request) (ctrl.Result, error) {

func (r *JobController) processCompleteScanJob(ctx context.Context, scanJob *batchv1.Job) error {
log := log.WithValues("job", fmt.Sprintf("%s/%s", scanJob.Namespace, scanJob.Name))
workload, err := kube.ObjectFromLabelsSet(scanJob.Labels)
owner, err := kube.ObjectFromLabelsSet(scanJob.Labels)
if err != nil {
return fmt.Errorf("getting workload from scan job labels set: %w", err)
}
Expand All @@ -96,121 +84,32 @@ func (r *JobController) processCompleteScanJob(ctx context.Context, scanJob *bat
return fmt.Errorf("expected label %s not set", kube.LabelPodSpecHash)
}

hasVulnerabilityReports, err := r.Store.HasVulnerabilityReports(ctx, workload, hash, containerImages)
log.V(1).Info("Resolving workload properties",
"owner", owner, "hash", hash, "containerImages", containerImages)

hasVulnerabilityReports, err := r.HasVulnerabilityReports(ctx, owner, containerImages, hash)
if err != nil {
return err
}

if hasVulnerabilityReports {
log.V(1).Info("VulnerabilityReports already exist", "owner", workload)
log.V(1).Info("VulnerabilityReports already exist", "owner", owner)
log.V(1).Info("Deleting scan job")
return r.Client.Delete(ctx, scanJob, client.PropagationPolicy(metav1.DeletePropagationBackground))
}

owner, err := r.getRuntimeObjectFor(ctx, workload)
err = r.ParseLogsAndSaveVulnerabilityReports(ctx, scanJob, owner, containerImages, hash)
if err != nil {
return err
}

pod, err := r.GetPodControlledBy(ctx, scanJob)
if err != nil {
return fmt.Errorf("getting pod controlled by %s/%s: %w", scanJob.Namespace, scanJob.Name, err)
}

var vulnerabilityReports []v1alpha1.VulnerabilityReport

for _, container := range pod.Spec.Containers {
logsReader, err := r.LogsReader.GetLogsForPod(ctx, client.ObjectKey{Namespace: pod.Namespace, Name: pod.Name}, &corev1.PodLogOptions{
Container: container.Name,
Follow: true,
})
if err != nil {
return fmt.Errorf("getting logs for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
scanResult, err := r.Scanner.ParseVulnerabilityScanResult(containerImages[container.Name], logsReader)
if err != nil {
return err
}
_ = logsReader.Close()

reportName, err := vulnerabilityreport.NewNameBuilder(r.Scheme).
Owner(owner).
Container(container.Name).Get()
if err != nil {
return err
}

report, err := vulnerabilityreport.NewBuilder(r.Scheme).
Owner(owner).
Container(container.Name).
ReportName(reportName).
ScanResult(scanResult).
PodSpecHash(hash).Get()
if err != nil {
return err
}

vulnerabilityReports = append(vulnerabilityReports, report)
}

log.Info("Writing VulnerabilityReports", "owner", workload)
err = r.Store.Save(ctx, vulnerabilityReports)
if err != nil {
return fmt.Errorf("writing vulnerability reports: %w", err)
}
log.V(1).Info("Deleting complete scan job")
return r.Client.Delete(ctx, scanJob, client.PropagationPolicy(metav1.DeletePropagationBackground))
}

func (r *JobController) getRuntimeObjectFor(ctx context.Context, workload kube.Object) (metav1.Object, error) {
var obj runtime.Object
switch workload.Kind {
case kube.KindPod:
obj = &corev1.Pod{}
case kube.KindReplicaSet:
obj = &appsv1.ReplicaSet{}
case kube.KindReplicationController:
obj = &corev1.ReplicationController{}
case kube.KindDeployment:
obj = &appsv1.Deployment{}
case kube.KindStatefulSet:
obj = &appsv1.StatefulSet{}
case kube.KindDaemonSet:
obj = &appsv1.DaemonSet{}
case kube.KindCronJob:
obj = &v1beta1.CronJob{}
case kube.KindJob:
obj = &batchv1.Job{}
default:
return nil, fmt.Errorf("unknown workload kind: %s", workload.Kind)
}
err := r.Client.Get(ctx, types.NamespacedName{Name: workload.Name, Namespace: workload.Namespace}, obj)
if err != nil {
return nil, err
}
return obj.(metav1.Object), nil
}

func (r *JobController) GetPodControlledBy(ctx context.Context, job *batchv1.Job) (*corev1.Pod, error) {
controllerUID, ok := job.Spec.Selector.MatchLabels["controller-uid"]
if !ok {
return nil, fmt.Errorf("controller-uid not found for job %s/%s", job.Namespace, job.Name)
}
podList := &corev1.PodList{}
err := r.Client.List(ctx, podList, client.MatchingLabels{"controller-uid": controllerUID})
if err != nil {
return nil, fmt.Errorf("listing pods controlled by job %s/%s: %w", job.Namespace, job.Name, err)
}
if len(podList.Items) != 1 {
return nil, fmt.Errorf("expected 1 Pod, but got %d", len(podList.Items))
}
return podList.Items[0].DeepCopy(), nil
}

func (r *JobController) processFailedScanJob(ctx context.Context, scanJob *batchv1.Job) error {
log := log.WithValues("job", fmt.Sprintf("%s/%s", scanJob.Namespace, scanJob.Name))

pod, err := r.GetPodControlledBy(ctx, scanJob)
pod, err := r.Reconciler.GetPodControlledBy(ctx, scanJob)
if err != nil {
return err
}
Expand Down

0 comments on commit 50d179c

Please sign in to comment.