Skip to content

Commit

Permalink
fix: Use DEFAULT_REQUEUE_TIME for Agent. Fixes #7269 (#7296)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Behar <simbeh7@gmail.com>
  • Loading branch information
simster7 committed Dec 1, 2021
1 parent 242360a commit e0d5abc
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 18 deletions.
1 change: 1 addition & 0 deletions docs/environment-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ most users. Note that these environment variables may be removed at any time.
| `ARGO_PROGRESS_FILE_TICK_DURATION` | `time.Duration` | `3s` | How often the progress file is read by the executor. Set to 0 to disable self reporting progress. |
| `ARGO_REMOVE_PVC_PROTECTION_FINALIZER` | `bool` | `false` | Remove the `kubernetes.io/pvc-protection` finalizer from persistent volume claims (PVC) after marking PVCs created for the workflow for deletion, so deleted is not blocked until the pods are deleted. [#6629](https://github.com/argoproj/argo-workflows/issues/6629) |
| `ARGO_TRACE` | `string` | `"1"` | Whether to enable tracing statements in Argo components. |
| `ARGO_AGENT_PATCH_RATE` | `time.Duration` | `DEFAULT_REQUEUE_TIME` | Rate that the Argo Agent will patch the Workflow TaskSet. |
| `BUBBLE_ENTRY_TEMPLATE_ERR` | `bool` | `true` | Whether to bubble up template errors to workflow. |
| `CACHE_GC_PERIOD` | `time.Duration` | `0s` | How often to perform memoization cache GC, which is disabled by default and can be enabled by providing a non-zero duration. |
| `CACHE_GC_AFTER_NOT_HIT_DURATION` | `time.Duration` | `30s` | When a memoization cache has not been hit after this duration, it will be deleted. |
Expand Down
8 changes: 8 additions & 0 deletions util/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,11 @@ func LookupEnvFloatOr(key string, o float64) float64 {
}
return o
}

func LookupEnvStringOr(key string, o string) string {
v, found := os.LookupEnv(key)
if found && v != "" {
return v
}
return o
}
9 changes: 9 additions & 0 deletions util/env/env_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,12 @@ func TestLookupEnvFloatOr(t *testing.T) {
_ = os.Setenv("FOO", "")
assert.Equal(t, 1., LookupEnvFloatOr("FOO", 1.), "empty var value; default value")
}

func TestLookupEnvStringOr(t *testing.T) {
defer func() { _ = os.Unsetenv("FOO") }()
assert.Equal(t, "a", LookupEnvStringOr("", "a"), "default value")
_ = os.Setenv("FOO", "b")
assert.Equal(t, "b", LookupEnvStringOr("FOO", "a"), "env var value")
_ = os.Setenv("FOO", "")
assert.Equal(t, "a", LookupEnvStringOr("FOO", "a"), "empty var value; default value")
}
9 changes: 7 additions & 2 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ const (

// EnvVarPodName contains the name of the pod (currently unused)
EnvVarPodName = "ARGO_POD_NAME"
// EnvVarWorkflowName
// EnvVarWorkflowName is the name of the workflow for which the an agent is responsible for
EnvVarWorkflowName = "ARGO_WORKFLOW_NAME"
// EnvVarContainerName container the container's name for the current pod
EnvVarContainerName = "ARGO_CONTAINER_NAME"
Expand All @@ -123,7 +123,6 @@ const (
EnvVarKubeletInsecure = "ARGO_KUBELET_INSECURE"
// EnvVarArgoTrace is used enable tracing statements in Argo components
EnvVarArgoTrace = "ARGO_TRACE"

// EnvVarProgressPatchTickDuration sets the tick duration for patching pod annotations upon progress changes.
// Setting this or EnvVarProgressFileTickDuration to 0 will disable monitoring progress.
EnvVarProgressPatchTickDuration = "ARGO_PROGRESS_PATCH_TICK_DURATION"
Expand All @@ -132,6 +131,12 @@ const (
EnvVarProgressFileTickDuration = "ARGO_PROGRESS_FILE_TICK_DURATION"
// EnvVarProgressFile is the file watched for reporting progress
EnvVarProgressFile = "ARGO_PROGRESS_FILE"
// EnvVarDefaultRequeueTime is the default requeue time for Workflow Informers. For more info, see rate_limiters.go
EnvVarDefaultRequeueTime = "DEFAULT_REQUEUE_TIME"
// EnvAgentTaskWorkers is the number of task workers for the agent pod
EnvAgentTaskWorkers = "ARGO_AGENT_TASK_WORKERS"
// EnvAgentPatchRate is the rate that the Argo Agent will patch the Workflow TaskSet
EnvAgentPatchRate = "ARGO_AGENT_PATCH_RATE"

// ContainerRuntimeExecutorDocker to use docker as container runtime executor
ContainerRuntimeExecutorDocker = "docker"
Expand Down
7 changes: 4 additions & 3 deletions workflow/controller/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (
"github.com/argoproj/argo-workflows/v3/errors"
"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util/env"
"github.com/argoproj/argo-workflows/v3/workflow/common"
"github.com/argoproj/argo-workflows/v3/workflow/executor"
)

func (woc *wfOperationCtx) getAgentPodName() string {
Expand Down Expand Up @@ -84,12 +84,13 @@ func (woc *wfOperationCtx) createAgentPod(ctx context.Context) (*apiv1.Pod, erro

envVars := []apiv1.EnvVar{
{Name: common.EnvVarWorkflowName, Value: woc.wf.Name},
{Name: common.EnvAgentPatchRate, Value: env.LookupEnvStringOr(common.EnvAgentPatchRate, GetRequeueTime().String())},
}

// If the default number of task workers is overridden, then pass it to the agent pod.
if taskWorkers, exists := os.LookupEnv(executor.EnvAgentTaskWorkers); exists {
if taskWorkers, exists := os.LookupEnv(common.EnvAgentTaskWorkers); exists {
envVars = append(envVars, apiv1.EnvVar{
Name: executor.EnvAgentTaskWorkers,
Name: common.EnvAgentTaskWorkers,
Value: taskWorkers,
})
}
Expand Down
10 changes: 8 additions & 2 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,14 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
defer wfc.podQueue.ShutDown()
defer wfc.podCleanupQueue.ShutDown()

log.WithField("version", argo.GetVersion().Version).Info("Starting Workflow Controller")
log.Infof("Workers: workflow: %d, pod: %d, pod cleanup: %d", wfWorkers, podWorkers, podCleanupWorkers)
log.WithField("version", argo.GetVersion().Version).
WithField("defaultRequeueTime", GetRequeueTime()).
Info("Starting Workflow Controller")
log.WithField("workflow", wfWorkers).
WithField("workflowTtl", workflowTTLWorkers).
WithField("pod", podWorkers).
WithField("podCleanup", podCleanupWorkers).
Infof("Current Worker Numbers")

wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListOptions, indexers)
wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace)
Expand Down
13 changes: 9 additions & 4 deletions workflow/controller/rate_limiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,24 @@ import (
"k8s.io/client-go/util/workqueue"

"github.com/argoproj/argo-workflows/v3/util/env"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

type fixedItemIntervalRateLimiter struct{}

func (r *fixedItemIntervalRateLimiter) When(interface{}) time.Duration {
func GetRequeueTime() time.Duration {
// We need to rate limit a minimum 1s, otherwise informers are unlikely to be upto date
// and we'll operate on an out of date version of a workflow.
// Under high load, the informer can get many seconds behind. Increasing this to 30s
// would be sensible for some users.
// Higher values mean that workflows with many short running (<20s) nodes do not progress as quickly.
// So some users may wish to have this as low as 2s.
// The default of 10s provides a balance more most users.
return env.LookupEnvDurationOr("DEFAULT_REQUEUE_TIME", 10*time.Second)
return env.LookupEnvDurationOr(common.EnvVarDefaultRequeueTime, 10*time.Second)
}

type fixedItemIntervalRateLimiter struct{}

func (r *fixedItemIntervalRateLimiter) When(interface{}) time.Duration {
return GetRequeueTime()
}

func (r *fixedItemIntervalRateLimiter) Forget(interface{}) {}
Expand Down
13 changes: 6 additions & 7 deletions workflow/executor/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,18 @@ type response struct {
Result *wfv1.NodeResult
}

const EnvAgentTaskWorkers = "ARGO_AGENT_TASK_WORKERS"

func (ae *AgentExecutor) Agent(ctx context.Context) error {
defer runtimeutil.HandleCrash(runtimeutil.PanicHandlers...)

taskWorkers := env.LookupEnvIntOr(EnvAgentTaskWorkers, 16)
log.WithField("task_workers", taskWorkers).Info("Starting Agent s15")
taskWorkers := env.LookupEnvIntOr(common.EnvAgentTaskWorkers, 16)
requeueTime := env.LookupEnvDurationOr(common.EnvAgentPatchRate, 10*time.Second)
log.WithFields(log.Fields{"taskWorkers": taskWorkers, "requeueTime": requeueTime}).Info("Starting Agent")

taskQueue := make(chan task)
responseQueue := make(chan response)
taskSetInterface := ae.WorkflowInterface.ArgoprojV1alpha1().WorkflowTaskSets(ae.Namespace)

go ae.patchWorker(ctx, taskSetInterface, responseQueue)
go ae.patchWorker(ctx, taskSetInterface, responseQueue, requeueTime)
for i := 0; i < taskWorkers; i++ {
go ae.taskWorker(ctx, taskQueue, responseQueue)
}
Expand Down Expand Up @@ -130,8 +129,8 @@ func (ae *AgentExecutor) taskWorker(ctx context.Context, taskQueue chan task, re
}
}

func (ae *AgentExecutor) patchWorker(ctx context.Context, taskSetInterface v1alpha1.WorkflowTaskSetInterface, responseQueue chan response) {
ticker := time.NewTicker(1 * time.Second)
func (ae *AgentExecutor) patchWorker(ctx context.Context, taskSetInterface v1alpha1.WorkflowTaskSetInterface, responseQueue chan response, requeueTime time.Duration) {
ticker := time.NewTicker(requeueTime)
nodeResults := map[string]wfv1.NodeResult{}
for {
select {
Expand Down

0 comments on commit e0d5abc

Please sign in to comment.