Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use WorkflowTemplate/ClusterWorkflowTemplate Informers when validating CronWorkflows #11470

Merged
merged 9 commits into from
Jul 29, 2023
2 changes: 1 addition & 1 deletion examples/cluster-workflow-template/clustertemplates.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ kind: ClusterWorkflowTemplate
metadata:
name: cluster-workflow-template-submittable
spec:
entryPoint: whalesay-template
entrypoint: whalesay-template
arguments:
parameters:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related but this is a typo

- name: message
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (wfc *WorkflowController) runGCcontroller(ctx context.Context, workflowTTLW
func (wfc *WorkflowController) runCronController(ctx context.Context) {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager)
cronController := cron.NewCronController(wfc.wfclientset, wfc.dynamicInterface, wfc.namespace, wfc.GetManagedNamespace(), wfc.Config.InstanceID, wfc.metrics, wfc.eventRecorderManager, wfc.wftmplInformer, wfc.cwftmplInformer)
cronController.Run(ctx)
}

Expand Down
41 changes: 35 additions & 6 deletions workflow/controller/informer/cluster_workflow_template_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,25 @@ import (
"k8s.io/apimachinery/pkg/runtime"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
"github.com/argoproj/argo-workflows/v3/workflow/util"
)

// this function always tries to return a value, even if it is badly formed
func objectToClusterWorkflowTemplate(object runtime.Object) (*wfv1.ClusterWorkflowTemplate, error) {
return interfaceToClusterWorkflowTemplate(object)
}

func objectsToClusterWorkflowTemplates(list []runtime.Object) []*wfv1.ClusterWorkflowTemplate {
ret := make([]*wfv1.ClusterWorkflowTemplate, len(list))
for i, object := range list {
ret[i], _ = objectToClusterWorkflowTemplate(object)
}
return ret
}

// this function always tries to return a value, even if it is badly formed
func interfaceToClusterWorkflowTemplate(object interface{}) (*wfv1.ClusterWorkflowTemplate, error) {
v := &wfv1.ClusterWorkflowTemplate{}
un, ok := object.(*unstructured.Unstructured)
if !ok {
Expand All @@ -25,10 +39,25 @@ func objectToClusterWorkflowTemplate(object runtime.Object) (*wfv1.ClusterWorkfl
return v, nil
}

func objectsToClusterWorkflowTemplates(list []runtime.Object) []*wfv1.ClusterWorkflowTemplate {
ret := make([]*wfv1.ClusterWorkflowTemplate, len(list))
for i, object := range list {
ret[i], _ = objectToClusterWorkflowTemplate(object)
type ClusterWorkflowTemplateFromInformerGetter struct {
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
}

func (getter *ClusterWorkflowTemplateFromInformerGetter) Get(name string) (*wfv1.ClusterWorkflowTemplate, error) {
obj, exists, err := getter.cwftmplInformer.Informer().GetStore().GetByKey(name)
if err != nil {
return nil, err
}
return ret
if !exists {
return nil, fmt.Errorf("ClusterWorkflowTemplate Informer cannot find ClusterWorkflowTemplate of name %q", name)
}
cwfTmpl, err := interfaceToClusterWorkflowTemplate(obj)
if err != nil {
return nil, err
}
return cwfTmpl, nil
}

func NewClusterWorkflowTemplateFromInformerGetter(cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer) templateresolution.ClusterWorkflowTemplateGetter {
return &ClusterWorkflowTemplateFromInformerGetter{cwftmplInformer: cwftmplInformer}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me know if there's a better place to put this struct...

41 changes: 36 additions & 5 deletions workflow/controller/informer/workflow_template_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,24 @@ import (
"k8s.io/apimachinery/pkg/runtime"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
"github.com/argoproj/argo-workflows/v3/workflow/util"
)

func objectToWorkflowTemplate(object runtime.Object) (*wfv1.WorkflowTemplate, error) {
return interfaceToWorkflowTemplate(object)
}

func objectsToWorkflowTemplates(list []runtime.Object) []*wfv1.WorkflowTemplate {
ret := make([]*wfv1.WorkflowTemplate, len(list))
for i, object := range list {
ret[i], _ = objectToWorkflowTemplate(object)
}
return ret
}

func interfaceToWorkflowTemplate(object interface{}) (*wfv1.WorkflowTemplate, error) {
v := &wfv1.WorkflowTemplate{}
un, ok := object.(*unstructured.Unstructured)
if !ok {
Expand All @@ -24,10 +38,27 @@ func objectToWorkflowTemplate(object runtime.Object) (*wfv1.WorkflowTemplate, er
return v, nil
}

func objectsToWorkflowTemplates(list []runtime.Object) []*wfv1.WorkflowTemplate {
ret := make([]*wfv1.WorkflowTemplate, len(list))
for i, object := range list {
ret[i], _ = objectToWorkflowTemplate(object)
// Get WorkflowTemplates from Informer
type WorkflowTemplateFromInformerGetter struct {
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
namespace string
}

func (getter *WorkflowTemplateFromInformerGetter) Get(name string) (*wfv1.WorkflowTemplate, error) {

obj, exists, err := getter.wftmplInformer.Informer().GetStore().GetByKey(getter.namespace + "/" + name)
if err != nil {
return nil, err
}
return ret
if !exists {
return nil, fmt.Errorf("WorkflowTemplate Informer cannot find WorkflowTemplate of name %q in namespace %q", name, getter.namespace)
}
wfTmpl, err := interfaceToWorkflowTemplate(obj)
if err != nil {
return nil, err
}
return wfTmpl, nil
}
func NewWorkflowTemplateFromInformerGetter(wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer, namespace string) templateresolution.WorkflowTemplateNamespacedGetter {
return &WorkflowTemplateFromInformerGetter{wftmplInformer: wftmplInformer, namespace: namespace}
}
14 changes: 10 additions & 4 deletions workflow/cron/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/env"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/events"
Expand All @@ -43,7 +44,9 @@ type Controller struct {
keyLock sync.KeyLock
wfClientset versioned.Interface
wfLister util.WorkflowLister
cronWfInformer informers.GenericInformer
cronWfInformer informers.GenericInformer // todo: might be nicer to group the informers in their own struct
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer //todo: is this type okay?
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
cronWfQueue workqueue.RateLimitingInterface
dynamicInterface dynamic.Interface
metrics *metrics.Metrics
Expand All @@ -68,7 +71,8 @@ func init() {
log.WithField("cronSyncPeriod", cronSyncPeriod).Info("cron config")
}

func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics, eventRecorderManager events.EventRecorderManager) *Controller {
func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic.Interface, namespace string, managedNamespace string, instanceId string, metrics *metrics.Metrics,
eventRecorderManager events.EventRecorderManager, wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer, cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer) *Controller {
return &Controller{
wfClientset: wfclientset,
namespace: namespace,
Expand All @@ -80,6 +84,8 @@ func NewCronController(wfclientset versioned.Interface, dynamicInterface dynamic
cronWfQueue: metrics.RateLimiterWithBusyWorkers(workqueue.DefaultControllerRateLimiter(), "cron_wf_queue"),
metrics: metrics,
eventRecorderManager: eventRecorderManager,
wftmplInformer: wftmplInformer,
cwftmplInformer: cwftmplInformer,
}
}

Expand Down Expand Up @@ -162,7 +168,7 @@ func (cc *Controller) processNextCronItem(ctx context.Context) bool {
return true
}

cronWorkflowOperationCtx := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics)
cronWorkflowOperationCtx := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics, cc.wftmplInformer, cc.cwftmplInformer)

err = cronWorkflowOperationCtx.validateCronWorkflow()
if err != nil {
Expand Down Expand Up @@ -256,7 +262,7 @@ func (cc *Controller) syncCronWorkflow(ctx context.Context, cronWf *v1alpha1.Cro
cc.keyLock.Lock(key)
defer cc.keyLock.Unlock(key)

cwoc := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics)
cwoc := newCronWfOperationCtx(cronWf, cc.wfClientset, cc.metrics, cc.wftmplInformer, cc.cwftmplInformer)
err := cwoc.enforceHistoryLimit(ctx, workflows)
if err != nil {
return err
Expand Down
40 changes: 24 additions & 16 deletions workflow/cron/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sort"
"time"

wfextvv1alpha1 "github.com/argoproj/argo-workflows/v3/pkg/client/informers/externalversions/workflow/v1alpha1"
"github.com/robfig/cron/v3"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -22,31 +23,38 @@ import (
waitutil "github.com/argoproj/argo-workflows/v3/util/wait"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/metrics"
"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"

//"github.com/argoproj/argo-workflows/v3/workflow/templateresolution"
"github.com/argoproj/argo-workflows/v3/workflow/controller/informer"
"github.com/argoproj/argo-workflows/v3/workflow/util"
"github.com/argoproj/argo-workflows/v3/workflow/validate"
)

type cronWfOperationCtx struct {
// CronWorkflow is the CronWorkflow to be run
name string
cronWf *v1alpha1.CronWorkflow
wfClientset versioned.Interface
wfClient typed.WorkflowInterface
cronWfIf typed.CronWorkflowInterface
log *log.Entry
metrics *metrics.Metrics
name string
cronWf *v1alpha1.CronWorkflow
wfClientset versioned.Interface
wfClient typed.WorkflowInterface
cronWfIf typed.CronWorkflowInterface
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
log *log.Entry
metrics *metrics.Metrics
// scheduledTimeFunc returns the last scheduled time when it is called
scheduledTimeFunc ScheduledTimeFunc
}

func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, metrics *metrics.Metrics) *cronWfOperationCtx {
func newCronWfOperationCtx(cronWorkflow *v1alpha1.CronWorkflow, wfClientset versioned.Interface, metrics *metrics.Metrics,
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer, cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer) *cronWfOperationCtx {
return &cronWfOperationCtx{
name: cronWorkflow.ObjectMeta.Name,
cronWf: cronWorkflow,
wfClientset: wfClientset,
wfClient: wfClientset.ArgoprojV1alpha1().Workflows(cronWorkflow.Namespace),
cronWfIf: wfClientset.ArgoprojV1alpha1().CronWorkflows(cronWorkflow.Namespace),
name: cronWorkflow.ObjectMeta.Name,
cronWf: cronWorkflow,
wfClientset: wfClientset,
wfClient: wfClientset.ArgoprojV1alpha1().Workflows(cronWorkflow.Namespace),
cronWfIf: wfClientset.ArgoprojV1alpha1().CronWorkflows(cronWorkflow.Namespace),
wftmplInformer: wftmplInformer,
cwftmplInformer: cwftmplInformer,
log: log.WithFields(log.Fields{
"workflow": cronWorkflow.ObjectMeta.Name,
"namespace": cronWorkflow.ObjectMeta.Namespace,
Expand Down Expand Up @@ -109,8 +117,8 @@ func (woc *cronWfOperationCtx) run(ctx context.Context, scheduledRuntime time.Ti
}

func (woc *cronWfOperationCtx) validateCronWorkflow() error {
wftmplGetter := templateresolution.WrapWorkflowTemplateInterface(woc.wfClientset.ArgoprojV1alpha1().WorkflowTemplates(woc.cronWf.Namespace))
cwftmplGetter := templateresolution.WrapClusterWorkflowTemplateInterface(woc.wfClientset.ArgoprojV1alpha1().ClusterWorkflowTemplates())
wftmplGetter := informer.NewWorkflowTemplateFromInformerGetter(woc.wftmplInformer, woc.cronWf.ObjectMeta.Namespace)
cwftmplGetter := informer.NewClusterWorkflowTemplateFromInformerGetter(woc.cwftmplInformer)
err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, woc.cronWf)
if err != nil {
woc.reportCronWorkflowError(v1alpha1.ConditionTypeSpecError, fmt.Sprint(err))
Expand Down
1 change: 1 addition & 0 deletions workflow/templateresolution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
typed "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/workflow/common"
//"github.com/argoproj/argo-workflows/v3/workflow/util"
)

// workflowTemplateInterfaceWrapper is an internal struct to wrap clientset.
Expand Down
Loading