Skip to content

Commit

Permalink
feat(controller): Rate-limit workflows. Closes #4718 (#4726)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec authored and simster7 committed Jan 4, 2021
1 parent a602798 commit 0bbc082
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 38 deletions.
18 changes: 8 additions & 10 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ const (
workflowTemplateResyncPeriod = 20 * time.Minute
podResyncPeriod = 30 * time.Minute
clusterWorkflowTemplateResyncPeriod = 20 * time.Minute
enoughTimeForInformerSync = 1 * time.Second
)

// NewWorkflowController instantiates a new WorkflowController
Expand Down Expand Up @@ -138,16 +137,16 @@ func NewWorkflowController(restConfig *rest.Config, kubeclientset kubernetes.Int

wfc.metrics = metrics.New(wfc.getMetricsServerConfig())

workqueue.SetProvider(wfc.metrics)
wfc.wfQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "workflow_queue")
workqueue.SetProvider(wfc.metrics) // must execute SetProvider before we created the queues
wfc.wfQueue = workqueue.NewNamedRateLimitingQueue(&fixedItemIntervalRateLimiter{}, "workflow_queue")
wfc.throttler = wfc.newThrottler()
wfc.podQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod_queue")

return &wfc, nil
}

func (wfc *WorkflowController) newThrottler() sync.Throttler {
return sync.NewThrottler(wfc.Config.Parallelism, func(key string) { wfc.wfQueue.Add(key) })
return sync.NewThrottler(wfc.Config.Parallelism, func(key string) { wfc.wfQueue.AddRateLimited(key) })
}

// RunTTLController runs the workflow TTL controller
Expand Down Expand Up @@ -253,7 +252,7 @@ func (wfc *WorkflowController) createSynchronizationManager() error {
}

nextWorkflow := func(key string) {
wfc.wfQueue.AddAfter(key, enoughTimeForInformerSync)
wfc.wfQueue.Add(key)
}

wfc.syncManager = sync.NewLockManager(getSyncLimit, nextWorkflow)
Expand Down Expand Up @@ -656,9 +655,7 @@ func (wfc *WorkflowController) enqueueWfFromPodLabel(obj interface{}) error {
// Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly)
return fmt.Errorf("Watch returned pod unrelated to any workflow")
}
// add this change after 1s - this reduces the number of workflow reconciliations -
//with each reconciliation doing more work
wfc.wfQueue.AddAfter(pod.ObjectMeta.Namespace+"/"+workflowName, enoughTimeForInformerSync)
wfc.wfQueue.AddRateLimited(pod.ObjectMeta.Namespace + "/" + workflowName)
return nil
}

Expand Down Expand Up @@ -694,6 +691,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() {
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
// for a new workflow, we do not want to rate limit its execution using AddRateLimited
wfc.wfQueue.AddAfter(key, wfc.Config.InitialDelay.Duration)
priority, creation := getWfPriority(obj)
wfc.throttler.Add(key, priority, creation)
Expand All @@ -707,7 +705,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() {
}
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
wfc.wfQueue.Add(key)
wfc.wfQueue.AddRateLimited(key)
priority, creation := getWfPriority(new)
wfc.throttler.Add(key, priority, creation)
}
Expand All @@ -718,7 +716,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
wfc.releaseAllWorkflowLocks(obj)
wfc.wfQueue.Add(key)
wfc.wfQueue.AddRateLimited(key)
wfc.throttler.Remove(key)
}
},
Expand Down
10 changes: 1 addition & 9 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,7 @@ func unmarshalArtifact(yamlStr string) *wfv1.Artifact {
}

func expectWorkflow(controller *WorkflowController, name string, test func(wf *wfv1.Workflow)) {
obj, exists, err := controller.wfInformer.GetStore().GetByKey(name)
if err != nil {
panic(err)
}
if !exists {
test(nil)
return
}
wf, err := util.FromUnstructured(obj.(*unstructured.Unstructured))
wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows("").Get(name, metav1.GetOptions{})
if err != nil {
panic(err)
}
Expand Down
37 changes: 19 additions & 18 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ var (
// maxOperationTime is the maximum time a workflow operation is allowed to run
// for before requeuing the workflow onto the workqueue.
var (
maxOperationTime = envutil.LookupEnvDurationOr("MAX_OPERATION_TIME", 30*time.Second)
defaultRequeueTime = envutil.LookupEnvDurationOr("DEFAULT_REQUEUE_TIME", maxOperationTime/2)
maxOperationTime = envutil.LookupEnvDurationOr("MAX_OPERATION_TIME", 30*time.Second)
)

// failedNodeStatus is a subset of NodeStatus that is only used to Marshal certain fields into a JSON of failed nodes
Expand Down Expand Up @@ -261,7 +260,7 @@ func (woc *wfOperationCtx) operate() {
// Workflow will not be requeued if workflow steps are in pending state.
// Workflow needs to requeue on its deadline,
if woc.workflowDeadline != nil {
woc.requeue(time.Until(*woc.workflowDeadline))
woc.requeueAfter(time.Until(*woc.workflowDeadline))
}

if woc.execWf.Spec.Metrics != nil {
Expand Down Expand Up @@ -324,7 +323,7 @@ func (woc *wfOperationCtx) operate() {
// Error was most likely caused by a lack of resources.
// In this case, Workflow will be in pending state and requeue.
woc.markWorkflowPhase(wfv1.NodePending, fmt.Sprintf("Waiting for a PVC to be created. %v", err))
woc.requeue(defaultRequeueTime)
woc.requeue()
return
}
err = fmt.Errorf("pvc create error: %w", err)
Expand Down Expand Up @@ -549,13 +548,14 @@ func (woc *wfOperationCtx) persistUpdates() {

woc.log.WithFields(log.Fields{"resourceVersion": woc.wf.ResourceVersion, "phase": woc.wf.Status.Phase}).Info("Workflow update successful")

if os.Getenv("INFORMER_WRITE_BACK") != "false" {
switch os.Getenv("INFORMER_WRITE_BACK") {
case "true":
if err := woc.writeBackToInformer(); err != nil {
woc.markWorkflowError(err)
return
}
} else {
time.Sleep(enoughTimeForInformerSync)
case "false":
time.Sleep(1 * time.Second)
}

// It is important that we *never* label pods as completed until we successfully updated the workflow
Expand Down Expand Up @@ -675,15 +675,16 @@ func (woc *wfOperationCtx) reapplyUpdate(wfClient v1alpha1.WorkflowInterface, no
}

// requeue this workflow onto the workqueue for later processing
func (woc *wfOperationCtx) requeue(afterDuration time.Duration) {
key, err := cache.MetaNamespaceKeyFunc(woc.wf)
if err != nil {
woc.log.Errorf("Failed to requeue workflow %s: %v", woc.wf.ObjectMeta.Name, err)
return
}
func (woc *wfOperationCtx) requeueAfter(afterDuration time.Duration) {
key, _ := cache.MetaNamespaceKeyFunc(woc.wf)
woc.controller.wfQueue.AddAfter(key, afterDuration)
}

func (woc *wfOperationCtx) requeue() {
key, _ := cache.MetaNamespaceKeyFunc(woc.wf)
woc.controller.wfQueue.AddRateLimited(key)
}

// processNodeRetries updates the retry node state based on the child node state and the retry strategy and returns the node.
func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrategy wfv1.RetryStrategy, opts *executeTemplateOpts) (*wfv1.NodeStatus, bool, error) {
if node.Fulfilled() {
Expand Down Expand Up @@ -763,7 +764,7 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate

// See if we have waited past the deadline
if time.Now().Before(waitingDeadline) {
woc.requeue(timeToWait)
woc.requeueAfter(timeToWait)
retryMessage := fmt.Sprintf("Backoff for %s", humanize.Duration(timeToWait))
return woc.markNodePhase(node.Name, node.Phase, retryMessage), false, nil
}
Expand Down Expand Up @@ -917,7 +918,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
// If the pod was deleted, then we it is possible that the controller never get another informer message about it.
// In this case, the workflow will only be requeued after the resync period (20m). This means
// workflow will not update for 20m. Requeuing here prevents that happening.
woc.requeue(defaultRequeueTime)
woc.requeue()
continue
}

Expand Down Expand Up @@ -1592,7 +1593,7 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
// Check if we took too long operating on this workflow and immediately return if we did
if time.Now().UTC().After(woc.deadline) {
woc.log.Warnf("Deadline exceeded")
woc.requeue(defaultRequeueTime)
woc.requeue()
return node, ErrDeadlineExceeded
}

Expand Down Expand Up @@ -2337,7 +2338,7 @@ func (woc *wfOperationCtx) executeScript(nodeName string, templateScope string,
func (woc *wfOperationCtx) requeueIfTransientErr(err error, nodeName string) (*wfv1.NodeStatus, error) {
if errorsutil.IsTransientErr(err) {
// Our error was most likely caused by a lack of resources.
woc.requeue(defaultRequeueTime)
woc.requeue()
return woc.markNodePending(nodeName, err), nil
}
return nil, err
Expand Down Expand Up @@ -2653,7 +2654,7 @@ func (woc *wfOperationCtx) executeSuspend(nodeName string, templateScope string,
}

if requeueTime != nil {
woc.requeue(time.Until(*requeueTime))
woc.requeueAfter(time.Until(*requeueTime))
}

_ = woc.markNodePhase(nodeName, wfv1.NodeRunning)
Expand Down
23 changes: 23 additions & 0 deletions workflow/controller/rate_limiters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package controller

import (
"time"

"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo/util/env"
)

type fixedItemIntervalRateLimiter struct{}

func (r *fixedItemIntervalRateLimiter) When(interface{}) time.Duration {
return env.LookupEnvDurationOr("DEFAULT_REQUEUE_TIME", 2*time.Second)
}

func (r *fixedItemIntervalRateLimiter) Forget(interface{}) {}

func (r *fixedItemIntervalRateLimiter) NumRequeues(interface{}) int {
return 1
}

var _ workqueue.RateLimiter = &fixedItemIntervalRateLimiter{}
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
if exists {
existing, ok := obj.(*apiv1.Pod)
if ok {
woc.log.WithField("podPhase", existing.Status.Phase).Infof("Skipped pod %s (%s) creation: already exists", nodeName, nodeID)
woc.log.WithField("podPhase", existing.Status.Phase).Debugf("Skipped pod %s (%s) creation: already exists", nodeName, nodeID)
return existing, nil
}
}
Expand Down

0 comments on commit 0bbc082

Please sign in to comment.