Skip to content

Commit

Permalink
feat: Use WorkflowTemplate/ClusterWorkflowTemplate Informers when val…
Browse files Browse the repository at this point in the history
…idating CronWorkflows (#11470)

Signed-off-by: Julie Vogelman <julievogelman0@gmail.com>
  • Loading branch information
juliev0 authored and terrytangyuan committed Nov 27, 2023
1 parent d9a0797 commit 3201f61
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 32 deletions.
2 changes: 1 addition & 1 deletion examples/cluster-workflow-template/clustertemplates.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ kind: ClusterWorkflowTemplate
metadata:
name: cluster-workflow-template-submittable
spec:
entryPoint: whalesay-template
entrypoint: whalesay-template
arguments:
parameters:
- name: message
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (wfc *WorkflowController) runGCcontroller(ctx context.Context, workflowTTLW
func (wfc *WorkflowController) runCronController(ctx context.Context) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager)
cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager, wfc.wftmplInformer, wfc.cwftmplInformer)
cronController.Run(ctx)
}

Expand Down
42 changes: 36 additions & 6 deletions workflow/controller/informer/cluster_workflow_template_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,25 @@ import (
"k8s.io/apimachinery/pkg/runtime"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
"github.com/argoproj/argo-workflows/v3/workflow/util"
)

// this function always tries to return a value, even if it is badly formed
func objectToClusterWorkflowTemplate(object runtime.Object) (*wfv1.ClusterWorkflowTemplate, error) {
return interfaceToClusterWorkflowTemplate(object)
}

func objectsToClusterWorkflowTemplates(list []runtime.Object) []*wfv1.ClusterWorkflowTemplate {
ret := make([]*wfv1.ClusterWorkflowTemplate, len(list))
for i, object := range list {
ret[i], _ = objectToClusterWorkflowTemplate(object)
}
return ret
}

// this function always tries to return a value, even if it is badly formed
func interfaceToClusterWorkflowTemplate(object interface{}) (*wfv1.ClusterWorkflowTemplate, error) {
v := &wfv1.ClusterWorkflowTemplate{}
un, ok := object.(*unstructured.Unstructured)
if !ok {
Expand All @@ -25,10 +39,26 @@ func objectToClusterWorkflowTemplate(object runtime.Object) (*wfv1.ClusterWorkfl
return v, nil
}

func objectsToClusterWorkflowTemplates(list []runtime.Object) []*wfv1.ClusterWorkflowTemplate {
ret := make([]*wfv1.ClusterWorkflowTemplate, len(list))
for i, object := range list {
ret[i], _ = objectToClusterWorkflowTemplate(object)
// Get ClusterWorkflowTemplates from Informer
type ClusterWorkflowTemplateFromInformerGetter struct {
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
}

func (getter *ClusterWorkflowTemplateFromInformerGetter) Get(name string) (*wfv1.ClusterWorkflowTemplate, error) {
obj, exists, err := getter.cwftmplInformer.Informer().GetStore().GetByKey(name)
if err != nil {
return nil, err
}
return ret
if !exists {
return nil, fmt.Errorf("ClusterWorkflowTemplate Informer cannot find ClusterWorkflowTemplate of name %q", name)
}
cwfTmpl, err := interfaceToClusterWorkflowTemplate(obj)
if err != nil {
return nil, err
}
return cwfTmpl, nil
}

func NewClusterWorkflowTemplateFromInformerGetter(cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer) templateresolution.ClusterWorkflowTemplateGetter {
return &ClusterWorkflowTemplateFromInformerGetter{cwftmplInformer: cwftmplInformer}
}
41 changes: 36 additions & 5 deletions workflow/controller/informer/workflow_template_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,24 @@ import (
"k8s.io/apimachinery/pkg/runtime"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
"github.com/argoproj/argo-workflows/v3/workflow/util"
)

func objectToWorkflowTemplate(object runtime.Object) (*wfv1.WorkflowTemplate, error) {
return interfaceToWorkflowTemplate(object)
}

func objectsToWorkflowTemplates(list []runtime.Object) []*wfv1.WorkflowTemplate {
ret := make([]*wfv1.WorkflowTemplate, len(list))
for i, object := range list {
ret[i], _ = objectToWorkflowTemplate(object)
}
return ret
}

func interfaceToWorkflowTemplate(object interface{}) (*wfv1.WorkflowTemplate, error) {
v := &wfv1.WorkflowTemplate{}
un, ok := object.(*unstructured.Unstructured)
if !ok {
Expand All @@ -24,10 +38,27 @@ func objectToWorkflowTemplate(object runtime.Object) (*wfv1.WorkflowTemplate, er
return v, nil
}

func objectsToWorkflowTemplates(list []runtime.Object) []*wfv1.WorkflowTemplate {
ret := make([]*wfv1.WorkflowTemplate, len(list))
for i, object := range list {
ret[i], _ = objectToWorkflowTemplate(object)
// Get WorkflowTemplates from Informer
type WorkflowTemplateFromInformerGetter struct {
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
namespace string
}

func (getter *WorkflowTemplateFromInformerGetter) Get(name string) (*wfv1.WorkflowTemplate, error) {

obj, exists, err := getter.wftmplInformer.Informer().GetStore().GetByKey(getter.namespace + "/" + name)
if err != nil {
return nil, err
}
return ret
if !exists {
return nil, fmt.Errorf("WorkflowTemplate Informer cannot find WorkflowTemplate of name %q in namespace %q", name, getter.namespace)
}
wfTmpl, err := interfaceToWorkflowTemplate(obj)
if err != nil {
return nil, err
}
return wfTmpl, nil
}
func NewWorkflowTemplateFromInformerGetter(wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer, namespace string) templateresolution.WorkflowTemplateNamespacedGetter {
return &WorkflowTemplateFromInformerGetter{wftmplInformer: wftmplInformer, namespace: namespace}
}
12 changes: 9 additions & 3 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/env"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/events"
Expand All @@ -44,6 +45,8 @@ type Controller struct {
wfClientset versioned.Interface
wfLister util.WorkflowLister
cronWfInformer informers.GenericInformer
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
cronWfQueue workqueue.RateLimitingInterface
dynamicInterface dynamic.Interface
metrics *metrics.Metrics
Expand All @@ -68,7 +71,8 @@ func init() {
log.WithField("cronSyncPeriod", cronSyncPeriod).Info("cron config")
}

func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager) *Controller {
func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics,
eventRecorderManager events.EventRecorderManager, wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer, cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer) *Controller {
return &Controller{
wfClientset: wfclientset,
namespace: namespace,
Expand All @@ -80,6 +84,8 @@ func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic
cronWfQueue: metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "cron_wf_queue"),
metrics: metrics,
eventRecorderManager: eventRecorderManager,
wftmplInformer: wftmplInformer,
cwftmplInformer: cwftmplInformer,
}
}

Expand Down Expand Up @@ -163,7 +169,7 @@ func (cc *Controller) processNextCronItem(ctx context.Context) bool {
return true
}

cronWorkflowOperationCtx := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics)
cronWorkflowOperationCtx := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics, cc.wftmplInformer, cc.cwftmplInformer)

err = cronWorkflowOperationCtx.validateCronWorkflow()
if err != nil {
Expand Down Expand Up @@ -257,7 +263,7 @@ func (cc *Controller) syncCronWorkflow(ctx context.Context, cronWf *v1alpha1.Cro
cc.keyLock.Lock(key)
defer cc.keyLock.Unlock(key)

cwoc := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics)
cwoc := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics, cc.wftmplInformer, cc.cwftmplInformer)
err := cwoc.enforceHistoryLimit(ctx, workflows)
if err != nil {
return err
Expand Down
39 changes: 23 additions & 16 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,42 @@ import (
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
typed "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
errorsutil "github.com/argoproj/argo-workflows/v3/util/errors"
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/metrics"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"

"github.com/argoproj/argo-workflows/v3/workflow/controller/informer"
"github.com/argoproj/argo-workflows/v3/workflow/util"
"github.com/argoproj/argo-workflows/v3/workflow/validate"
)

type cronWfOperationCtx struct {
// CronWorkflow is the CronWorkflow to be run
name string
cronWf *v1alpha1.CronWorkflow
wfClientset versioned.Interface
wfClient typed.WorkflowInterface
cronWfIf typed.CronWorkflowInterface
log *log.Entry
metrics *metrics.Metrics
name string
cronWf *v1alpha1.CronWorkflow
wfClientset versioned.Interface
wfClient typed.WorkflowInterface
cronWfIf typed.CronWorkflowInterface
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
log *log.Entry
metrics *metrics.Metrics
// scheduledTimeFunc returns the last scheduled time when it is called
scheduledTimeFunc ScheduledTimeFunc
}

func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, metrics *metrics.Metrics) *cronWfOperationCtx {
func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, metrics *metrics.Metrics,
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer, cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer) *cronWfOperationCtx {
return &cronWfOperationCtx{
name: cronWorkflow.ObjectMeta.Name,
cronWf: cronWorkflow,
wfClientset: wfClientset,
wfClient: wfClientset.ArgoprojV1alpha1().Workflows(cronWorkflow.Namespace),
cronWfIf: wfClientset.ArgoprojV1alpha1().CronWorkflows(cronWorkflow.Namespace),
name: cronWorkflow.ObjectMeta.Name,
cronWf: cronWorkflow,
wfClientset: wfClientset,
wfClient: wfClientset.ArgoprojV1alpha1().Workflows(cronWorkflow.Namespace),
cronWfIf: wfClientset.ArgoprojV1alpha1().CronWorkflows(cronWorkflow.Namespace),
wftmplInformer: wftmplInformer,
cwftmplInformer: cwftmplInformer,
log: log.WithFields(log.Fields{
"workflow": cronWorkflow.ObjectMeta.Name,
"namespace": cronWorkflow.ObjectMeta.Namespace,
Expand Down Expand Up @@ -109,8 +116,8 @@ func (woc *cronWfOperationCtx) run(ctx context.Context, scheduledRuntime time.Ti
}

func (woc *cronWfOperationCtx) validateCronWorkflow() error {
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(woc.wfClientset.ArgoprojV1alpha1().WorkflowTemplates(woc.cronWf.Namespace))
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(woc.wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates())
wftmplGetter := informer.NewWorkflowTemplateFromInformerGetter(woc.wftmplInformer, woc.cronWf.ObjectMeta.Namespace)
cwftmplGetter := informer.NewClusterWorkflowTemplateFromInformerGetter(woc.cwftmplInformer)
err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, woc.cronWf)
if err != nil {
woc.reportCronWorkflowError(v1alpha1.ConditionTypeSpecError, fmt.Sprint(err))
Expand Down

0 comments on commit 3201f61

Please sign in to comment.