From 3201f61fba1a11147a55e57e57972c3df5758cc7 Mon Sep 17 00:00:00 2001 From: Julie Vogelman Date: Fri, 28 Jul 2023 18:56:54 -0700 Subject: [PATCH] feat: Use WorkflowTemplate/ClusterWorkflowTemplate Informers when validating CronWorkflows (#11470) Signed-off-by: Julie Vogelman --- .../clustertemplates.yaml | 2 +- workflow/controller/controller.go | 2 +- .../cluster_workflow_template_convert.go | 42 ++++++++++++++++--- .../informer/workflow_template_convert.go | 41 +++++++++++++++--- workflow/cron/controller.go | 12 ++++-- workflow/cron/operator.go | 39 ++++++++++------- 6 files changed, 106 insertions(+), 32 deletions(-) diff --git a/examples/cluster-workflow-template/clustertemplates.yaml b/examples/cluster-workflow-template/clustertemplates.yaml index a2c4db448f0d..a3a13f75381c 100644 --- a/examples/cluster-workflow-template/clustertemplates.yaml +++ b/examples/cluster-workflow-template/clustertemplates.yaml @@ -135,7 +135,7 @@ kind: ClusterWorkflowTemplate metadata: name: cluster-workflow-template-submittable spec: - entryPoint: whalesay-template + entrypoint: whalesay-template arguments: parameters: - name: message diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index dad79b620317..956b67018896 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -221,7 +221,7 @@ func (wfc *WorkflowController) runGCcontroller(ctx context.Context, workflowTTLW func (wfc *WorkflowController) runCronController(ctx context.Context) { defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...) - cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager) + cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager, wfc.wftmplInformer, wfc.cwftmplInformer) cronController.Run(ctx) } diff --git a/workflow/controller/informer/cluster_workflow_template_convert.go b/workflow/controller/informer/cluster_workflow_template_convert.go index 9211c527511b..2c08f1a3e3f9 100644 --- a/workflow/controller/informer/cluster_workflow_template_convert.go +++ b/workflow/controller/informer/cluster_workflow_template_convert.go @@ -8,11 +8,25 @@ import ( "k8s.io/apimachinery/pkg/runtime" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/workflow/templateresolution" "github.com/argoproj/argo-workflows/v3/workflow/util" ) -// this function always tries to return a value, even if it is badly formed func objectToClusterWorkflowTemplate(object runtime.Object) (*wfv1.ClusterWorkflowTemplate, error) { + return interfaceToClusterWorkflowTemplate(object) +} + +func objectsToClusterWorkflowTemplates(list []runtime.Object) []*wfv1.ClusterWorkflowTemplate { + ret := make([]*wfv1.ClusterWorkflowTemplate, len(list)) + for i, object := range list { + ret[i], _ = objectToClusterWorkflowTemplate(object) + } + return ret +} + +// this function always tries to return a value, even if it is badly formed +func interfaceToClusterWorkflowTemplate(object interface{}) (*wfv1.ClusterWorkflowTemplate, error) { v := &wfv1.ClusterWorkflowTemplate{} un, ok := object.(*unstructured.Unstructured) if !ok { @@ -25,10 +39,26 @@ func objectToClusterWorkflowTemplate(object runtime.Object) (*wfv1.ClusterWorkfl return v, nil } -func objectsToClusterWorkflowTemplates(list []runtime.Object) []*wfv1.ClusterWorkflowTemplate { - ret := make([]*wfv1.ClusterWorkflowTemplate, len(list)) - for i, object := range list { - ret[i], _ = objectToClusterWorkflowTemplate(object) +// Get ClusterWorkflowTemplates from Informer +type ClusterWorkflowTemplateFromInformerGetter struct { + cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer +} + +func (getter *ClusterWorkflowTemplateFromInformerGetter) Get(name string) (*wfv1.ClusterWorkflowTemplate, error) { + obj, exists, err := getter.cwftmplInformer.Informer().GetStore().GetByKey(name) + if err != nil { + return nil, err } - return ret + if !exists { + return nil, fmt.Errorf("ClusterWorkflowTemplate Informer cannot find ClusterWorkflowTemplate of name %q", name) + } + cwfTmpl, err := interfaceToClusterWorkflowTemplate(obj) + if err != nil { + return nil, err + } + return cwfTmpl, nil +} + +func NewClusterWorkflowTemplateFromInformerGetter(cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer) templateresolution.ClusterWorkflowTemplateGetter { + return &ClusterWorkflowTemplateFromInformerGetter{cwftmplInformer: cwftmplInformer} } diff --git a/workflow/controller/informer/workflow_template_convert.go b/workflow/controller/informer/workflow_template_convert.go index 18befaf3ca93..38b5c5949484 100644 --- a/workflow/controller/informer/workflow_template_convert.go +++ b/workflow/controller/informer/workflow_template_convert.go @@ -8,10 +8,24 @@ import ( "k8s.io/apimachinery/pkg/runtime" wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" + wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1" + "github.com/argoproj/argo-workflows/v3/workflow/templateresolution" "github.com/argoproj/argo-workflows/v3/workflow/util" ) func objectToWorkflowTemplate(object runtime.Object) (*wfv1.WorkflowTemplate, error) { + return interfaceToWorkflowTemplate(object) +} + +func objectsToWorkflowTemplates(list []runtime.Object) []*wfv1.WorkflowTemplate { + ret := make([]*wfv1.WorkflowTemplate, len(list)) + for i, object := range list { + ret[i], _ = objectToWorkflowTemplate(object) + } + return ret +} + +func interfaceToWorkflowTemplate(object interface{}) (*wfv1.WorkflowTemplate, error) { v := &wfv1.WorkflowTemplate{} un, ok := object.(*unstructured.Unstructured) if !ok { @@ -24,10 +38,27 @@ func objectToWorkflowTemplate(object runtime.Object) (*wfv1.WorkflowTemplate, er return v, nil } -func objectsToWorkflowTemplates(list []runtime.Object) []*wfv1.WorkflowTemplate { - ret := make([]*wfv1.WorkflowTemplate, len(list)) - for i, object := range list { - ret[i], _ = objectToWorkflowTemplate(object) +// Get WorkflowTemplates from Informer +type WorkflowTemplateFromInformerGetter struct { + wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer + namespace string +} + +func (getter *WorkflowTemplateFromInformerGetter) Get(name string) (*wfv1.WorkflowTemplate, error) { + + obj, exists, err := getter.wftmplInformer.Informer().GetStore().GetByKey(getter.namespace + "/" + name) + if err != nil { + return nil, err } - return ret + if !exists { + return nil, fmt.Errorf("WorkflowTemplate Informer cannot find WorkflowTemplate of name %q in namespace %q", name, getter.namespace) + } + wfTmpl, err := interfaceToWorkflowTemplate(obj) + if err != nil { + return nil, err + } + return wfTmpl, nil +} +func NewWorkflowTemplateFromInformerGetter(wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer, namespace string) templateresolution.WorkflowTemplateNamespacedGetter { + return &WorkflowTemplateFromInformerGetter{wftmplInformer: wftmplInformer, namespace: namespace} } diff --git a/workflow/cron/controller.go b/workflow/cron/controller.go index ac967a2df22a..26c1fb1b34ef 100644 --- a/workflow/cron/controller.go +++ b/workflow/cron/controller.go @@ -27,6 +27,7 @@ import ( "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" + wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/util/env" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/events" @@ -44,6 +45,8 @@ type Controller struct { wfClientset versioned.Interface wfLister util.WorkflowLister cronWfInformer informers.GenericInformer + wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer + cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer cronWfQueue workqueue.RateLimitingInterface dynamicInterface dynamic.Interface metrics *metrics.Metrics @@ -68,7 +71,8 @@ func init() { log.WithField("cronSyncPeriod", cronSyncPeriod).Info("cron config") } -func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager) *Controller { +func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, + eventRecorderManager events.EventRecorderManager, wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer, cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer) *Controller { return &Controller{ wfClientset: wfclientset, namespace: namespace, @@ -80,6 +84,8 @@ func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic cronWfQueue: metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "cron_wf_queue"), metrics: metrics, eventRecorderManager: eventRecorderManager, + wftmplInformer: wftmplInformer, + cwftmplInformer: cwftmplInformer, } } @@ -163,7 +169,7 @@ func (cc *Controller) processNextCronItem(ctx context.Context) bool { return true } - cronWorkflowOperationCtx := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics) + cronWorkflowOperationCtx := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics, cc.wftmplInformer, cc.cwftmplInformer) err = cronWorkflowOperationCtx.validateCronWorkflow() if err != nil { @@ -257,7 +263,7 @@ func (cc *Controller) syncCronWorkflow(ctx context.Context, cronWf *v1alpha1.Cro cc.keyLock.Lock(key) defer cc.keyLock.Unlock(key) - cwoc := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics) + cwoc := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics, cc.wftmplInformer, cc.cwftmplInformer) err := cwoc.enforceHistoryLimit(ctx, workflows) if err != nil { return err diff --git a/workflow/cron/operator.go b/workflow/cron/operator.go index 631da5272823..0084f9095fc5 100644 --- a/workflow/cron/operator.go +++ b/workflow/cron/operator.go @@ -18,35 +18,42 @@ import ( "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned" typed "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1" + wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1" errorsutil "github.com/argoproj/argo-workflows/v3/util/errors" waitutil "github.com/argoproj/argo-workflows/v3/util/wait" "github.com/argoproj/argo-workflows/v3/workflow/common" "github.com/argoproj/argo-workflows/v3/workflow/metrics" - "github.com/argoproj/argo-workflows/v3/workflow/templateresolution" + + "github.com/argoproj/argo-workflows/v3/workflow/controller/informer" "github.com/argoproj/argo-workflows/v3/workflow/util" "github.com/argoproj/argo-workflows/v3/workflow/validate" ) type cronWfOperationCtx struct { // CronWorkflow is the CronWorkflow to be run - name string - cronWf *v1alpha1.CronWorkflow - wfClientset versioned.Interface - wfClient typed.WorkflowInterface - cronWfIf typed.CronWorkflowInterface - log *log.Entry - metrics *metrics.Metrics + name string + cronWf *v1alpha1.CronWorkflow + wfClientset versioned.Interface + wfClient typed.WorkflowInterface + cronWfIf typed.CronWorkflowInterface + wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer + cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer + log *log.Entry + metrics *metrics.Metrics // scheduledTimeFunc returns the last scheduled time when it is called scheduledTimeFunc ScheduledTimeFunc } -func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, metrics *metrics.Metrics) *cronWfOperationCtx { +func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, metrics *metrics.Metrics, + wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer, cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer) *cronWfOperationCtx { return &cronWfOperationCtx{ - name: cronWorkflow.ObjectMeta.Name, - cronWf: cronWorkflow, - wfClientset: wfClientset, - wfClient: wfClientset.ArgoprojV1alpha1().Workflows(cronWorkflow.Namespace), - cronWfIf: wfClientset.ArgoprojV1alpha1().CronWorkflows(cronWorkflow.Namespace), + name: cronWorkflow.ObjectMeta.Name, + cronWf: cronWorkflow, + wfClientset: wfClientset, + wfClient: wfClientset.ArgoprojV1alpha1().Workflows(cronWorkflow.Namespace), + cronWfIf: wfClientset.ArgoprojV1alpha1().CronWorkflows(cronWorkflow.Namespace), + wftmplInformer: wftmplInformer, + cwftmplInformer: cwftmplInformer, log: log.WithFields(log.Fields{ "workflow": cronWorkflow.ObjectMeta.Name, "namespace": cronWorkflow.ObjectMeta.Namespace, @@ -109,8 +116,8 @@ func (woc *cronWfOperationCtx) run(ctx context.Context, scheduledRuntime time.Ti } func (woc *cronWfOperationCtx) validateCronWorkflow() error { - wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(woc.wfClientset.ArgoprojV1alpha1().WorkflowTemplates(woc.cronWf.Namespace)) - cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(woc.wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates()) + wftmplGetter := informer.NewWorkflowTemplateFromInformerGetter(woc.wftmplInformer, woc.cronWf.ObjectMeta.Namespace) + cwftmplGetter := informer.NewClusterWorkflowTemplateFromInformerGetter(woc.cwftmplInformer) err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, woc.cronWf) if err != nil { woc.reportCronWorkflowError(v1alpha1.ConditionTypeSpecError, fmt.Sprint(err))