Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): Rate-limit workflows. Closes #4718 #4726

Merged
merged 18 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 7 additions & 9 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ const (
workflowTemplateResyncPeriod = 20 * time.Minute
podResyncPeriod = 30 * time.Minute
clusterWorkflowTemplateResyncPeriod = 20 * time.Minute
enoughTimeForInformerSync = 1 * time.Second
)

// NewWorkflowController instantiates a new WorkflowController
Expand Down Expand Up @@ -140,7 +139,7 @@ func NewWorkflowController(restConfig *rest.Config, kubeclientset kubernetes.Int
wfc.metrics = metrics.New(wfc.getMetricsServerConfig())

workqueue.SetProvider(wfc.metrics) // must execute SetProvider before we created the queues
wfc.wfQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "workflow_queue")
wfc.wfQueue = workqueue.NewNamedRateLimitingQueue(&fixedItemIntervalRateLimiter{}, "workflow_queue")
wfc.throttler = wfc.newThrottler()
wfc.podQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod_queue")
wfc.podCleanupQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "pod_cleanup_queue")
Expand All @@ -149,7 +148,7 @@ func NewWorkflowController(restConfig *rest.Config, kubeclientset kubernetes.Int
}

func (wfc *WorkflowController) newThrottler() sync.Throttler {
return sync.NewThrottler(wfc.Config.Parallelism, func(key string) { wfc.wfQueue.Add(key) })
return sync.NewThrottler(wfc.Config.Parallelism, func(key string) { wfc.wfQueue.AddRateLimited(key) })
}

// RunTTLController runs the workflow TTL controller
Expand Down Expand Up @@ -292,7 +291,7 @@ func (wfc *WorkflowController) createSynchronizationManager() error {
}

nextWorkflow := func(key string) {
wfc.wfQueue.AddAfter(key, enoughTimeForInformerSync)
wfc.wfQueue.Add(key)
}

wfc.syncManager = sync.NewLockManager(getSyncLimit, nextWorkflow)
Expand Down Expand Up @@ -689,9 +688,7 @@ func (wfc *WorkflowController) enqueueWfFromPodLabel(obj interface{}) error {
// Ignore pods unrelated to workflow (this shouldn't happen unless the watch is setup incorrectly)
return fmt.Errorf("Watch returned pod unrelated to any workflow")
}
// add this change after 1s - this reduces the number of workflow reconciliations -
//with each reconciliation doing more work
wfc.wfQueue.AddAfter(pod.ObjectMeta.Namespace+"/"+workflowName, enoughTimeForInformerSync)
wfc.wfQueue.AddRateLimited(pod.ObjectMeta.Namespace + "/" + workflowName)
return nil
}

Expand Down Expand Up @@ -727,6 +724,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() {
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
// for a new workflow, we do not want to rate limit its execution using AddRateLimited
wfc.wfQueue.AddAfter(key, wfc.Config.InitialDelay.Duration)
priority, creation := getWfPriority(obj)
wfc.throttler.Add(key, priority, creation)
Expand All @@ -740,7 +738,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() {
}
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
wfc.wfQueue.Add(key)
wfc.wfQueue.AddRateLimited(key)
priority, creation := getWfPriority(new)
wfc.throttler.Add(key, priority, creation)
}
Expand All @@ -751,7 +749,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
wfc.releaseAllWorkflowLocks(obj)
wfc.wfQueue.Add(key)
wfc.wfQueue.AddRateLimited(key)
wfc.throttler.Remove(key)
}
},
Expand Down
10 changes: 1 addition & 9 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,7 @@ func unmarshalArtifact(yamlStr string) *wfv1.Artifact {
}

func expectWorkflow(controller *WorkflowController, name string, test func(wf *wfv1.Workflow)) {
obj, exists, err := controller.wfInformer.GetStore().GetByKey(name)
if err != nil {
panic(err)
}
if !exists {
test(nil)
return
}
wf, err := util.FromUnstructured(obj.(*unstructured.Unstructured))
wf, err := controller.wfclientset.ArgoprojV1alpha1().Workflows("").Get(name, metav1.GetOptions{})
if err != nil {
panic(err)
}
Expand Down
37 changes: 19 additions & 18 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ var (
// maxOperationTime is the maximum time a workflow operation is allowed to run
// for before requeuing the workflow onto the workqueue.
var (
maxOperationTime = envutil.LookupEnvDurationOr("MAX_OPERATION_TIME", 30*time.Second)
defaultRequeueTime = envutil.LookupEnvDurationOr("DEFAULT_REQUEUE_TIME", maxOperationTime/2)
maxOperationTime = envutil.LookupEnvDurationOr("MAX_OPERATION_TIME", 30*time.Second)
)

// failedNodeStatus is a subset of NodeStatus that is only used to Marshal certain fields into a JSON of failed nodes
Expand Down Expand Up @@ -277,7 +276,7 @@ func (woc *wfOperationCtx) operate() {
// Workflow will not be requeued if workflow steps are in pending state.
// Workflow needs to requeue on its deadline,
if woc.workflowDeadline != nil {
woc.requeue(time.Until(*woc.workflowDeadline))
woc.requeueAfter(time.Until(*woc.workflowDeadline))
}

if woc.execWf.Spec.Metrics != nil {
Expand Down Expand Up @@ -330,7 +329,7 @@ func (woc *wfOperationCtx) operate() {
// Error was most likely caused by a lack of resources.
// In this case, Workflow will be in pending state and requeue.
woc.markWorkflowPhase(wfv1.NodePending, fmt.Sprintf("Waiting for a PVC to be created. %v", err))
woc.requeue(defaultRequeueTime)
woc.requeue()
return
}
err = fmt.Errorf("pvc create error: %w", err)
Expand Down Expand Up @@ -555,13 +554,14 @@ func (woc *wfOperationCtx) persistUpdates() {

woc.log.WithFields(log.Fields{"resourceVersion": woc.wf.ResourceVersion, "phase": woc.wf.Status.Phase}).Info("Workflow update successful")

if os.Getenv("INFORMER_WRITE_BACK") != "false" {
switch os.Getenv("INFORMER_WRITE_BACK") {
case "true":
if err := woc.writeBackToInformer(); err != nil {
woc.markWorkflowError(err)
return
}
} else {
time.Sleep(enoughTimeForInformerSync)
case "false":
time.Sleep(1 * time.Second)
}

// It is important that we *never* label pods as completed until we successfully updated the workflow
Expand Down Expand Up @@ -681,15 +681,16 @@ func (woc *wfOperationCtx) reapplyUpdate(wfClient v1alpha1.WorkflowInterface, no
}

// requeue this workflow onto the workqueue for later processing
func (woc *wfOperationCtx) requeue(afterDuration time.Duration) {
key, err := cache.MetaNamespaceKeyFunc(woc.wf)
if err != nil {
woc.log.Errorf("Failed to requeue workflow %s: %v", woc.wf.ObjectMeta.Name, err)
return
}
func (woc *wfOperationCtx) requeueAfter(afterDuration time.Duration) {
key, _ := cache.MetaNamespaceKeyFunc(woc.wf)
woc.controller.wfQueue.AddAfter(key, afterDuration)
}

func (woc *wfOperationCtx) requeue() {
key, _ := cache.MetaNamespaceKeyFunc(woc.wf)
woc.controller.wfQueue.AddRateLimited(key)
}

// processNodeRetries updates the retry node state based on the child node state and the retry strategy and returns the node.
func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrategy wfv1.RetryStrategy, opts *executeTemplateOpts) (*wfv1.NodeStatus, bool, error) {
if node.Fulfilled() {
Expand Down Expand Up @@ -769,7 +770,7 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate

// See if we have waited past the deadline
if time.Now().Before(waitingDeadline) {
woc.requeue(timeToWait)
woc.requeueAfter(timeToWait)
retryMessage := fmt.Sprintf("Backoff for %s", humanize.Duration(timeToWait))
return woc.markNodePhase(node.Name, node.Phase, retryMessage), false, nil
}
Expand Down Expand Up @@ -923,7 +924,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
// If the pod was deleted, then we it is possible that the controller never get another informer message about it.
// In this case, the workflow will only be requeued after the resync period (20m). This means
// workflow will not update for 20m. Requeuing here prevents that happening.
woc.requeue(defaultRequeueTime)
woc.requeue()
continue
}

Expand Down Expand Up @@ -1598,7 +1599,7 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
// Check if we took too long operating on this workflow and immediately return if we did
if time.Now().UTC().After(woc.deadline) {
woc.log.Warnf("Deadline exceeded")
woc.requeue(defaultRequeueTime)
woc.requeue()
return node, ErrDeadlineExceeded
}

Expand Down Expand Up @@ -2343,7 +2344,7 @@ func (woc *wfOperationCtx) executeScript(nodeName string, templateScope string,
func (woc *wfOperationCtx) requeueIfTransientErr(err error, nodeName string) (*wfv1.NodeStatus, error) {
if errorsutil.IsTransientErr(err) {
// Our error was most likely caused by a lack of resources.
woc.requeue(defaultRequeueTime)
woc.requeue()
return woc.markNodePending(nodeName, err), nil
}
return nil, err
Expand Down Expand Up @@ -2672,7 +2673,7 @@ func (woc *wfOperationCtx) executeSuspend(nodeName string, templateScope string,
}

if requeueTime != nil {
woc.requeue(time.Until(*requeueTime))
woc.requeueAfter(time.Until(*requeueTime))
}

_ = woc.markNodePhase(nodeName, wfv1.NodeRunning)
Expand Down
23 changes: 23 additions & 0 deletions workflow/controller/rate_limiters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package controller

import (
"time"

"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo/util/env"
)

type fixedItemIntervalRateLimiter struct{}

func (r *fixedItemIntervalRateLimiter) When(interface{}) time.Duration {
return env.LookupEnvDurationOr("DEFAULT_REQUEUE_TIME", 2*time.Second)
}

func (r *fixedItemIntervalRateLimiter) Forget(interface{}) {}

func (r *fixedItemIntervalRateLimiter) NumRequeues(interface{}) int {
return 1
}

var _ workqueue.RateLimiter = &fixedItemIntervalRateLimiter{}
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
if exists {
existing, ok := obj.(*apiv1.Pod)
if ok {
woc.log.WithField("podPhase", existing.Status.Phase).Infof("Skipped pod %s (%s) creation: already exists", nodeName, nodeID)
woc.log.WithField("podPhase", existing.Status.Phase).Debugf("Skipped pod %s (%s) creation: already exists", nodeName, nodeID)
return existing, nil
}
}
Expand Down