From a0024d0d4625c8660badff5a7d8eca883e7e2a3e Mon Sep 17 00:00:00 2001 From: Alex Collins Date: Mon, 4 Jan 2021 09:35:46 -0800 Subject: [PATCH] fix(controller): Various v2.12 fixes. Fixes #4798, #4801, #4806 (#4808) Signed-off-by: Alex Collins --- Makefile | 4 +- Procfile | 2 +- .../workflow-controller-deployment.yaml | 14 ++ manifests/install.yaml | 9 + manifests/namespace-install.yaml | 9 + manifests/quick-start-minimal.yaml | 9 + manifests/quick-start-mysql.yaml | 9 + manifests/quick-start-postgres.yaml | 9 + test/e2e/functional/stop-terminate.yaml | 4 +- test/e2e/functional_test.go | 16 +- util/logs/workflow-logger.go | 2 +- workflow/controller/controller.go | 11 +- workflow/controller/controller_test.go | 2 + .../estimation/estimator_factory.go | 18 +- workflow/controller/indexes/indexes.go | 6 + workflow/controller/indexes/labels.go | 8 +- workflow/controller/indexes/workflow_index.go | 13 +- workflow/controller/operator.go | 17 +- workflow/controller/operator_test.go | 68 +++++-- workflow/controller/rate_limiters.go | 9 +- workflow/executor/pns/pns.go | 13 +- workflow/executor/pns/pns_test.go | 12 ++ workflow/ttlcontroller/ttlcontroller.go | 192 +++++------------- workflow/ttlcontroller/ttlcontroller_test.go | 14 +- workflow/util/util.go | 8 +- 25 files changed, 264 insertions(+), 214 deletions(-) diff --git a/Makefile b/Makefile index 4141680de070..0c9181bedd8c 100644 --- a/Makefile +++ b/Makefile @@ -50,6 +50,8 @@ STATIC_BUILD ?= true STATIC_FILES ?= true GOTEST ?= go test PROFILE ?= minimal +# by keeping this short we speed up the tests +DEFAULT_REQUEUE_TIME ?= 2s # whether or not to start the Argo Service in TLS mode SECURE := false AUTH_MODE := hybrid @@ -450,7 +452,7 @@ endif grep '127.0.0.1[[:blank:]]*postgres' /etc/hosts grep '127.0.0.1[[:blank:]]*mysql' /etc/hosts ifeq ($(RUN_MODE),local) - env SECURE=$(SECURE) ALWAYS_OFFLOAD_NODE_STATUS=$(ALWAYS_OFFLOAD_NODE_STATUS) LOG_LEVEL=$(LOG_LEVEL) UPPERIO_DB_DEBUG=$(UPPERIO_DB_DEBUG) VERSION=$(VERSION) AUTH_MODE=$(AUTH_MODE) NAMESPACED=$(NAMESPACED) NAMESPACE=$(KUBE_NAMESPACE) $(GOPATH)/bin/goreman -set-ports=false -logtime=false start + env DEFAULT_REQUEUE_TIME=$(DEFAULT_REQUEUE_TIME) SECURE=$(SECURE) ALWAYS_OFFLOAD_NODE_STATUS=$(ALWAYS_OFFLOAD_NODE_STATUS) LOG_LEVEL=$(LOG_LEVEL) UPPERIO_DB_DEBUG=$(UPPERIO_DB_DEBUG) VERSION=$(VERSION) AUTH_MODE=$(AUTH_MODE) NAMESPACED=$(NAMESPACED) NAMESPACE=$(KUBE_NAMESPACE) $(GOPATH)/bin/goreman -set-ports=false -logtime=false start endif .PHONY: wait diff --git a/Procfile b/Procfile index 3364a14aea90..a5b5c844e43f 100644 --- a/Procfile +++ b/Procfile @@ -1,2 +1,2 @@ -controller: LEADER_ELECTION_IDENTITY=local ALWAYS_OFFLOAD_NODE_STATUS=${ALWAYS_OFFLOAD_NODE_STATUS} OFFLOAD_NODE_STATUS_TTL=30s WORKFLOW_GC_PERIOD=30s UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ARCHIVED_WORKFLOW_GC_PERIOD=30s ./dist/workflow-controller --executor-image argoproj/argoexec:${VERSION} --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --loglevel ${LOG_LEVEL} +controller: DEFAULT_REQUEUE_TIME=${DEFAULT_REQUEUE_TIME} LEADER_ELECTION_IDENTITY=local ALWAYS_OFFLOAD_NODE_STATUS=${ALWAYS_OFFLOAD_NODE_STATUS} OFFLOAD_NODE_STATUS_TTL=30s WORKFLOW_GC_PERIOD=30s UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ARCHIVED_WORKFLOW_GC_PERIOD=30s ./dist/workflow-controller --executor-image argoproj/argoexec:${VERSION} --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --loglevel ${LOG_LEVEL} argo-server: UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ./dist/argo --loglevel ${LOG_LEVEL} server --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --auth-mode ${AUTH_MODE} --secure=$SECURE diff --git a/manifests/base/workflow-controller/workflow-controller-deployment.yaml b/manifests/base/workflow-controller/workflow-controller-deployment.yaml index ecb5c55f501b..0d99d7bcc0c3 100644 --- a/manifests/base/workflow-controller/workflow-controller-deployment.yaml +++ b/manifests/base/workflow-controller/workflow-controller-deployment.yaml @@ -32,6 +32,20 @@ spec: fieldRef: apiVersion: v1 fieldPath: metadata.name + ports: + - name: metrics + containerPort: 9090 + # Periodically check we are listening on the metrics port + # causing a restart if it is not OK. + # This takes advantage of the fact that if the metrics service has died, + # then the controller has died. + # In testing, it appears to take 60-90s from failure to restart. + livenessProbe: + httpGet: + port: metrics + path: /metrics + initialDelaySeconds: 30 + periodSeconds: 30 securityContext: runAsNonRoot: true nodeSelector: diff --git a/manifests/install.yaml b/manifests/install.yaml index d4dd9e72c1c1..2da2349769ad 100644 --- a/manifests/install.yaml +++ b/manifests/install.yaml @@ -523,7 +523,16 @@ spec: apiVersion: v1 fieldPath: metadata.name image: argoproj/workflow-controller:latest + livenessProbe: + httpGet: + path: /metrics + port: metrics + initialDelaySeconds: 30 + periodSeconds: 30 name: workflow-controller + ports: + - containerPort: 9090 + name: metrics securityContext: capabilities: drop: diff --git a/manifests/namespace-install.yaml b/manifests/namespace-install.yaml index 900bfb52b5bf..1ea2f1f29ff9 100644 --- a/manifests/namespace-install.yaml +++ b/manifests/namespace-install.yaml @@ -418,7 +418,16 @@ spec: apiVersion: v1 fieldPath: metadata.name image: argoproj/workflow-controller:latest + livenessProbe: + httpGet: + path: /metrics + port: metrics + initialDelaySeconds: 30 + periodSeconds: 30 name: workflow-controller + ports: + - containerPort: 9090 + name: metrics securityContext: capabilities: drop: diff --git a/manifests/quick-start-minimal.yaml b/manifests/quick-start-minimal.yaml index 1a9454a8d3f1..67e3819c8b1c 100644 --- a/manifests/quick-start-minimal.yaml +++ b/manifests/quick-start-minimal.yaml @@ -672,7 +672,16 @@ spec: apiVersion: v1 fieldPath: metadata.name image: argoproj/workflow-controller:latest + livenessProbe: + httpGet: + path: /metrics + port: metrics + initialDelaySeconds: 30 + periodSeconds: 30 name: workflow-controller + ports: + - containerPort: 9090 + name: metrics securityContext: capabilities: drop: diff --git a/manifests/quick-start-mysql.yaml b/manifests/quick-start-mysql.yaml index 85490527d737..56ee0dfdc13e 100644 --- a/manifests/quick-start-mysql.yaml +++ b/manifests/quick-start-mysql.yaml @@ -761,7 +761,16 @@ spec: apiVersion: v1 fieldPath: metadata.name image: argoproj/workflow-controller:latest + livenessProbe: + httpGet: + path: /metrics + port: metrics + initialDelaySeconds: 30 + periodSeconds: 30 name: workflow-controller + ports: + - containerPort: 9090 + name: metrics securityContext: capabilities: drop: diff --git a/manifests/quick-start-postgres.yaml b/manifests/quick-start-postgres.yaml index 23e2dfdf7f5c..f7d900baaee3 100644 --- a/manifests/quick-start-postgres.yaml +++ b/manifests/quick-start-postgres.yaml @@ -753,7 +753,16 @@ spec: apiVersion: v1 fieldPath: metadata.name image: argoproj/workflow-controller:latest + livenessProbe: + httpGet: + path: /metrics + port: metrics + initialDelaySeconds: 30 + periodSeconds: 30 name: workflow-controller + ports: + - containerPort: 9090 + name: metrics securityContext: capabilities: drop: diff --git a/test/e2e/functional/stop-terminate.yaml b/test/e2e/functional/stop-terminate.yaml index 9c6ea383608f..60379ef6cb9a 100644 --- a/test/e2e/functional/stop-terminate.yaml +++ b/test/e2e/functional/stop-terminate.yaml @@ -1,7 +1,7 @@ apiVersion: argoproj.io/v1alpha1 kind: Workflow metadata: - name: stop-terminate + generateName: stop-terminate- labels: argo-e2e: true spec: @@ -23,7 +23,7 @@ spec: - name: message container: image: argoproj/argosay:v2 - args: [sleep, "10s"] + args: [sleep, "30s"] - name: exit container: diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index fb9919afcb34..72ec08e7a596 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -575,19 +575,19 @@ func (s *FunctionalSuite) TestStopBehavior() { When(). SubmitWorkflow(). WaitForWorkflow(fixtures.ToStart, "to start"). - RunCli([]string{"stop", "stop-terminate"}, func(t *testing.T, output string, err error) { + RunCli([]string{"stop", "@latest"}, func(t *testing.T, output string, err error) { assert.NoError(t, err) - assert.Contains(t, output, "workflow stop-terminate stopped") + assert.Regexp(t, "workflow stop-terminate-.* stopped", output) }). WaitForWorkflow(45 * time.Second). Then(). - ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + ExpectWorkflow(func(t *testing.T, m *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { assert.Equal(t, wfv1.NodeFailed, status.Phase) nodeStatus := status.Nodes.FindByDisplayName("A.onExit") if assert.NotNil(t, nodeStatus) { assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase) } - nodeStatus = status.Nodes.FindByDisplayName("stop-terminate.onExit") + nodeStatus = status.Nodes.FindByDisplayName(m.Name + ".onExit") if assert.NotNil(t, nodeStatus) { assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase) } @@ -600,17 +600,17 @@ func (s *FunctionalSuite) TestTerminateBehavior() { When(). SubmitWorkflow(). WaitForWorkflow(fixtures.ToStart, "to start"). - RunCli([]string{"terminate", "stop-terminate"}, func(t *testing.T, output string, err error) { + RunCli([]string{"terminate", "@latest"}, func(t *testing.T, output string, err error) { assert.NoError(t, err) - assert.Contains(t, output, "workflow stop-terminate terminated") + assert.Regexp(t, "workflow stop-terminate-.* terminated", output) }). WaitForWorkflow(). Then(). - ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + ExpectWorkflow(func(t *testing.T, m *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { assert.Equal(t, wfv1.NodeFailed, status.Phase) nodeStatus := status.Nodes.FindByDisplayName("A.onExit") assert.Nil(t, nodeStatus) - nodeStatus = status.Nodes.FindByDisplayName("stop-terminate.onExit") + nodeStatus = status.Nodes.FindByDisplayName(m.Name + ".onExit") assert.Nil(t, nodeStatus) }) } diff --git a/util/logs/workflow-logger.go b/util/logs/workflow-logger.go index 10495277b980..03cba82e5339 100644 --- a/util/logs/workflow-logger.go +++ b/util/logs/workflow-logger.go @@ -147,7 +147,7 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient ensureWeAreStreaming(&pod) } - if req.GetLogOptions().Follow { + if logOptions.Follow { wfListOptions := metav1.ListOptions{FieldSelector: "metadata.name=" + req.GetName(), ResourceVersion: "0"} wfWatch, err := wfInterface.Watch(wfListOptions) if err != nil { diff --git a/workflow/controller/controller.go b/workflow/controller/controller.go index 4168e8d1ca0c..a789718412c3 100644 --- a/workflow/controller/controller.go +++ b/workflow/controller/controller.go @@ -291,7 +291,7 @@ func (wfc *WorkflowController) createSynchronizationManager() error { } nextWorkflow := func(key string) { - wfc.wfQueue.Add(key) + wfc.wfQueue.AddRateLimited(key) } wfc.syncManager = sync.NewLockManager(getSyncLimit, nextWorkflow) @@ -353,12 +353,7 @@ func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap) log.Warnf("received object from indexer %s is not an unstructured", indexes.SemaphoreConfigIndexName) continue } - wf, err := util.FromUnstructured(un) - if err != nil { - log.Errorf("failed to convert to workflow from unstructured: %v", err) - continue - } - wfc.wfQueue.Add(fmt.Sprintf("%s/%s", wf.Namespace, wf.Name)) + wfc.wfQueue.AddRateLimited(fmt.Sprintf("%s/%s", un.GetNamespace(), un.GetName())) } } @@ -749,7 +744,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) if err == nil { wfc.releaseAllWorkflowLocks(obj) - wfc.wfQueue.AddRateLimited(key) + // no need to add to the queue - this workflow is done wfc.throttler.Remove(key) } }, diff --git a/workflow/controller/controller_test.go b/workflow/controller/controller_test.go index b0a413740454..7e00ec20665b 100644 --- a/workflow/controller/controller_test.go +++ b/workflow/controller/controller_test.go @@ -3,6 +3,7 @@ package controller import ( "context" "testing" + "time" "github.com/argoproj/pkg/sync" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -595,5 +596,6 @@ func TestNotifySemaphoreConfigUpdate(t *testing.T) { assert.Equal(0, controller.wfQueue.Len()) controller.notifySemaphoreConfigUpdate(&cm) + time.Sleep(2 * time.Second) assert.Equal(2, controller.wfQueue.Len()) } diff --git a/workflow/controller/estimation/estimator_factory.go b/workflow/controller/estimation/estimator_factory.go index 8996401388b2..0e182fe9489f 100644 --- a/workflow/controller/estimation/estimator_factory.go +++ b/workflow/controller/estimation/estimator_factory.go @@ -46,7 +46,7 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) { if err != nil { return defaultEstimator, fmt.Errorf("failed to list workflows by index: %v", err) } - var newestWf *wfv1.Workflow + var newestUn *unstructured.Unstructured for _, obj := range objs { un, ok := obj.(*unstructured.Unstructured) if !ok { @@ -55,16 +55,16 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) { if un.GetLabels()[common.LabelKeyPhase] != string(wfv1.NodeSucceeded) { continue } - candidateWf, err := util.FromUnstructured(un) + // we use `creationTimestamp` because it's fast + if newestUn == nil || un.GetCreationTimestamp().After(newestUn.GetCreationTimestamp().Time) { + newestUn = un + } + } + if newestUn != nil { + newestWf, err := util.FromUnstructured(newestUn) if err != nil { return defaultEstimator, fmt.Errorf("failed convert unstructured to workflow: %w", err) } - // we use `startedAt` because that's same as how the archive sorts - if newestWf == nil || candidateWf.Status.StartedAt.Time.After(newestWf.Status.StartedAt.Time) { - newestWf = candidateWf - } - } - if newestWf != nil { err = f.hydrator.Hydrate(newestWf) if err != nil { return defaultEstimator, fmt.Errorf("failed hydrate last workflow: %w", err) @@ -78,7 +78,7 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) { } workflows, err := f.wfArchive.ListWorkflows(wf.Namespace, time.Time{}, time.Time{}, requirements, 1, 0) if err != nil { - return nil, fmt.Errorf("failed to list archived workflows: %v", err) + return defaultEstimator, fmt.Errorf("failed to list archived workflows: %v", err) } if len(workflows) > 0 { return &estimator{wf, &workflows[0]}, nil diff --git a/workflow/controller/indexes/indexes.go b/workflow/controller/indexes/indexes.go index baee15f5caa0..24a6a54105fc 100644 --- a/workflow/controller/indexes/indexes.go +++ b/workflow/controller/indexes/indexes.go @@ -1,5 +1,11 @@ package indexes +// Indexers (i.e. IndexFunc) should be fast and should not return errors +// If an indexers returns an error, the cache will panic, and crash the Go VM. +// Indexers should be fast, if they are not, then the informer will get out of date +// and start returning old workflows, resulting in the operator reconciling out of date +// information, causing conflict errors and risking corrupted data. + const ( ClusterWorkflowTemplateIndex = "clusterworkflowtemplate" CronWorkflowIndex = "cronworkflow" diff --git a/workflow/controller/indexes/labels.go b/workflow/controller/indexes/labels.go index 7baab6a3ca4c..3cc47376511d 100644 --- a/workflow/controller/indexes/labels.go +++ b/workflow/controller/indexes/labels.go @@ -1,8 +1,6 @@ package indexes import ( - "fmt" - "k8s.io/apimachinery/pkg/api/meta" "k8s.io/client-go/tools/cache" @@ -18,7 +16,7 @@ func MetaWorkflowPhaseIndexFunc() cache.IndexFunc { return func(obj interface{}) ([]string, error) { v, err := meta.Accessor(obj) if err != nil { - return []string{}, fmt.Errorf("object has no meta: %v", err) + return nil, nil } if value, exists := v.GetLabels()[common.LabelKeyPhase]; exists { return []string{value}, nil @@ -33,12 +31,12 @@ func MetaNamespaceLabelIndexFunc(label string) cache.IndexFunc { return func(obj interface{}) ([]string, error) { v, err := meta.Accessor(obj) if err != nil { - return []string{}, fmt.Errorf("object has no meta: %v", err) + return nil, nil } if value, exists := v.GetLabels()[label]; exists { return []string{MetaNamespaceLabelIndex(v.GetNamespace(), value)}, nil } else { - return []string{}, nil + return nil, nil } } } diff --git a/workflow/controller/indexes/workflow_index.go b/workflow/controller/indexes/workflow_index.go index d682550aa369..1c914c48ea11 100644 --- a/workflow/controller/indexes/workflow_index.go +++ b/workflow/controller/indexes/workflow_index.go @@ -1,9 +1,6 @@ package indexes import ( - "fmt" - - "github.com/prometheus/common/log" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/client-go/tools/cache" @@ -15,11 +12,11 @@ import ( func MetaWorkflowIndexFunc(obj interface{}) ([]string, error) { m, err := meta.Accessor(obj) if err != nil { - return []string{}, fmt.Errorf("object has no meta: %v", err) + return nil, nil } name, ok := m.GetLabels()[common.LabelKeyWorkflow] if !ok { - return []string{}, fmt.Errorf("object has no workflow label") + return nil, nil } return []string{WorkflowIndexValue(m.GetNamespace(), name)}, nil } @@ -32,13 +29,11 @@ func WorkflowSemaphoreKeysIndexFunc() cache.IndexFunc { return func(obj interface{}) ([]string, error) { un, ok := obj.(*unstructured.Unstructured) if !ok { - log.Warnf("cannot convert obj into unstructured.Unstructured in Indexer %s", SemaphoreConfigIndexName) - return []string{}, nil + return nil, nil } wf, err := util.FromUnstructured(un) if err != nil { - log.Warnf("failed to convert to workflow from unstructured: %v", err) - return []string{}, nil + return nil, nil } return wf.GetSemaphoreKeys(), nil } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index e72c949a2940..eee36977f996 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -556,7 +556,9 @@ func (woc *wfOperationCtx) persistUpdates() { woc.log.WithFields(log.Fields{"resourceVersion": woc.wf.ResourceVersion, "phase": woc.wf.Status.Phase}).Info("Workflow update successful") switch os.Getenv("INFORMER_WRITE_BACK") { - case "true": + // By default we write back (as per v2.11), this does not reduce errors, but does reduce + // conflicts and therefore we log fewer warning messages. + case "", "true": if err := woc.writeBackToInformer(); err != nil { woc.markWorkflowError(err) return @@ -646,10 +648,23 @@ func (woc *wfOperationCtx) reapplyUpdate(wfClient v1alpha1.WorkflowInterface, no if err != nil { return nil, err } + // There is something about having informer indexers (introduced in v2.12) that means we are more likely to operate on the + // previous version of the workflow. This means under high load, a previously successful workflow could + // be operated on again. This can error (e.g. if any pod was deleted as part of clean-up). This check prevents that. + // https://github.com/argoproj/argo/issues/4798 + if currWf.Status.Fulfilled() { + return nil, fmt.Errorf("must never update completed workflows") + } err = woc.controller.hydrator.Hydrate(currWf) if err != nil { return nil, err } + for id, node := range woc.wf.Status.Nodes { + currNode, exists := currWf.Status.Nodes[id] + if exists && currNode.Fulfilled() && node.Phase != currNode.Phase { + return nil, fmt.Errorf("must never update completed node %s", id) + } + } currWfBytes, err := json.Marshal(currWf) if err != nil { return nil, err diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 4cf64b966fb5..cfd70d7e44f8 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -58,28 +58,56 @@ func TestOperateWorkflowPanicRecover(t *testing.T) { } func Test_wfOperationCtx_reapplyUpdate(t *testing.T) { - wf := &wfv1.Workflow{ - ObjectMeta: metav1.ObjectMeta{Name: "my-wf"}, - Status: wfv1.WorkflowStatus{Nodes: wfv1.Nodes{"foo": wfv1.NodeStatus{Name: "my-foo"}}}, - } - cancel, controller := newController(wf) - defer cancel() - controller.hydrator = hydratorfake.Always - woc := newWorkflowOperationCtx(wf, controller) + t.Run("Success", func(t *testing.T) { + wf := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "my-wf"}, + Status: wfv1.WorkflowStatus{Nodes: wfv1.Nodes{"foo": wfv1.NodeStatus{Name: "my-foo"}}}, + } + cancel, controller := newController(wf) + defer cancel() + controller.hydrator = hydratorfake.Always + woc := newWorkflowOperationCtx(wf, controller) - // fake the behaviour woc.operate() - assert.NoError(t, controller.hydrator.Hydrate(wf)) - nodes := wfv1.Nodes{"foo": wfv1.NodeStatus{Name: "my-foo", Phase: wfv1.NodeSucceeded}} - - // now force a re-apply update - updatedWf, err := woc.reapplyUpdate(controller.wfclientset.ArgoprojV1alpha1().Workflows(""), nodes) - if assert.NoError(t, err) && assert.NotNil(t, updatedWf) { - assert.True(t, woc.controller.hydrator.IsHydrated(updatedWf)) - if assert.Contains(t, updatedWf.Status.Nodes, "foo") { - assert.Equal(t, "my-foo", updatedWf.Status.Nodes["foo"].Name) - assert.Equal(t, wfv1.NodeSucceeded, updatedWf.Status.Nodes["foo"].Phase, "phase is merged") + // fake the behaviour woc.operate() + assert.NoError(t, controller.hydrator.Hydrate(wf)) + nodes := wfv1.Nodes{"foo": wfv1.NodeStatus{Name: "my-foo", Phase: wfv1.NodeSucceeded}} + + // now force a re-apply update + updatedWf, err := woc.reapplyUpdate(controller.wfclientset.ArgoprojV1alpha1().Workflows(""), nodes) + if assert.NoError(t, err) && assert.NotNil(t, updatedWf) { + assert.True(t, woc.controller.hydrator.IsHydrated(updatedWf)) + if assert.Contains(t, updatedWf.Status.Nodes, "foo") { + assert.Equal(t, "my-foo", updatedWf.Status.Nodes["foo"].Name) + assert.Equal(t, wfv1.NodeSucceeded, updatedWf.Status.Nodes["foo"].Phase, "phase is merged") + } } - } + }) + t.Run("ErrUpdatingCompletedWorkflow", func(t *testing.T) { + wf := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "my-wf"}, + Status: wfv1.WorkflowStatus{Phase: wfv1.NodeError}, + } + currWf := wf.DeepCopy() + currWf.Status.Phase = wfv1.NodeSucceeded + cancel, controller := newController(currWf) + defer cancel() + woc := newWorkflowOperationCtx(wf, controller) + _, err := woc.reapplyUpdate(controller.wfclientset.ArgoprojV1alpha1().Workflows(""), wfv1.Nodes{}) + assert.EqualError(t, err, "must never update completed workflows") + }) + t.Run("ErrUpdatingCompletedNode", func(t *testing.T) { + wf := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{Name: "my-wf"}, + Status: wfv1.WorkflowStatus{Nodes: wfv1.Nodes{"my-node": wfv1.NodeStatus{Phase: wfv1.NodeError}}}, + } + currWf := wf.DeepCopy() + currWf.Status.Nodes = wfv1.Nodes{"my-node": wfv1.NodeStatus{Phase: wfv1.NodeSucceeded}} + cancel, controller := newController(currWf) + defer cancel() + woc := newWorkflowOperationCtx(wf, controller) + _, err := woc.reapplyUpdate(controller.wfclientset.ArgoprojV1alpha1().Workflows(""), wf.Status.Nodes) + assert.EqualError(t, err, "must never update completed node my-node") + }) } func TestResourcesDuration(t *testing.T) { diff --git a/workflow/controller/rate_limiters.go b/workflow/controller/rate_limiters.go index 41f949191ac6..8e0273242048 100644 --- a/workflow/controller/rate_limiters.go +++ b/workflow/controller/rate_limiters.go @@ -11,7 +11,14 @@ import ( type fixedItemIntervalRateLimiter struct{} func (r *fixedItemIntervalRateLimiter) When(interface{}) time.Duration { - return env.LookupEnvDurationOr("DEFAULT_REQUEUE_TIME", 2*time.Second) + // We need to rate limit a minimum 1s, otherwise informers are unlikey to be upto date + // and we'll operate on an out of date version of a workflow. + // Under high load, the informer can get many seconds behind. Increasing this to 30s + // would be sensible for some users. + // Higher values mean that workflows with many short running (<20s) nodes do not progress as quickly. + // So some users may wish to have this as low as 2s. + // The default of 10s provides a balance more most users. + return env.LookupEnvDurationOr("DEFAULT_REQUEUE_TIME", 10*time.Second) } func (r *fixedItemIntervalRateLimiter) Forget(interface{}) {} diff --git a/workflow/executor/pns/pns.go b/workflow/executor/pns/pns.go index 1891e969b9e6..936e43eff9a9 100644 --- a/workflow/executor/pns/pns.go +++ b/workflow/executor/pns/pns.go @@ -21,6 +21,7 @@ import ( "github.com/argoproj/argo/errors" "github.com/argoproj/argo/util/archive" + errorsutil "github.com/argoproj/argo/util/errors" "github.com/argoproj/argo/workflow/common" execcommon "github.com/argoproj/argo/workflow/executor/common" argowait "github.com/argoproj/argo/workflow/executor/common/wait" @@ -369,13 +370,21 @@ func (p *PNSExecutor) updateCtrIDMap() { } } +var backoffOver30s = wait.Backoff{ + Duration: 1 * time.Second, + Steps: 7, + Factor: 2, +} + func (p *PNSExecutor) GetTerminatedContainerStatus(containerID string) (*corev1.Pod, *corev1.ContainerStatus, error) { var pod *corev1.Pod var containerStatus *corev1.ContainerStatus - err := wait.Poll(1*time.Second, 3*time.Second, func() (bool, error) { + // Under high load, the Kubernetes API may be unresponsive for some time (30s). This would have failed the workflow + // previously (<=v2.11) but a 30s back-off mitigates this. + err := wait.ExponentialBackoff(backoffOver30s, func() (bool, error) { podRes, err := p.clientset.CoreV1().Pods(p.namespace).Get(p.podName, metav1.GetOptions{}) if err != nil { - return false, fmt.Errorf("could not get pod: %s", err) + return !errorsutil.IsTransientErr(err), fmt.Errorf("could not get pod: %w", err) } for _, containerStatusRes := range podRes.Status.ContainerStatuses { if execcommon.GetContainerID(&containerStatusRes) != containerID { diff --git a/workflow/executor/pns/pns_test.go b/workflow/executor/pns/pns_test.go index b04e1540c2b1..40cc9918db6b 100644 --- a/workflow/executor/pns/pns_test.go +++ b/workflow/executor/pns/pns_test.go @@ -2,10 +2,22 @@ package pns import ( "testing" + "time" "github.com/stretchr/testify/assert" ) +func Test_backoffOver30s(t *testing.T) { + x := backoffOver30s + assert.Equal(t, 1*time.Second, x.Step()) + assert.Equal(t, 2*time.Second, x.Step()) + assert.Equal(t, 4*time.Second, x.Step()) + assert.Equal(t, 8*time.Second, x.Step()) + assert.Equal(t, 16*time.Second, x.Step()) + assert.Equal(t, 32*time.Second, x.Step()) + assert.Equal(t, 64*time.Second, x.Step()) +} + func TestPNSExecutor_parseContainerIDFromCgroupLine(t *testing.T) { testCases := []struct { line string diff --git a/workflow/ttlcontroller/ttlcontroller.go b/workflow/ttlcontroller/ttlcontroller.go index 6624b7c5d856..26db2df9770d 100644 --- a/workflow/ttlcontroller/ttlcontroller.go +++ b/workflow/ttlcontroller/ttlcontroller.go @@ -14,7 +14,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" - "github.com/argoproj/argo/config" wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned" commonutil "github.com/argoproj/argo/util" @@ -22,30 +21,20 @@ import ( "github.com/argoproj/argo/workflow/util" ) -const ( - workflowTTLResyncPeriod = 20 * time.Minute - // 1s is usually enough time for the informer to get synced and be up-to-date - enoughTimeForInformerSync = time.Second -) - -type ConfigSupplier func() *config.Config - type Controller struct { - wfclientset wfclientset.Interface - wfInformer cache.SharedIndexInformer - workqueue workqueue.DelayingInterface - resyncPeriod time.Duration - clock clock.Clock + wfclientset wfclientset.Interface + wfInformer cache.SharedIndexInformer + workqueue workqueue.DelayingInterface + clock clock.Clock } // NewController returns a new workflow ttl controller func NewController(wfClientset wfclientset.Interface, wfInformer cache.SharedIndexInformer) *Controller { controller := &Controller{ - wfclientset: wfClientset, - wfInformer: wfInformer, - workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "workflow_ttl_queue"), - resyncPeriod: workflowTTLResyncPeriod, - clock: clock.RealClock{}, + wfclientset: wfClientset, + wfInformer: wfInformer, + workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "workflow_ttl_queue"), + clock: clock.RealClock{}, } wfInformer.AddEventHandler(cache.FilteringResourceEventHandler{ @@ -66,7 +55,7 @@ func NewController(wfClientset wfclientset.Interface, wfInformer cache.SharedInd func (c *Controller) Run(stopCh <-chan struct{}, workflowTTLWorkers int) error { defer runtimeutil.HandleCrash() defer c.workqueue.ShutDown() - log.Infof("Starting workflow TTL controller (resync %v, workflowTTLWorkers %d)", c.resyncPeriod, workflowTTLWorkers) + log.Infof("Starting workflow TTL controller (workflowTTLWorkers %d)", workflowTTLWorkers) go c.wfInformer.Run(stopCh) if ok := cache.WaitForCacheSync(stopCh, c.wfInformer.HasSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") @@ -91,36 +80,13 @@ func (c *Controller) runWorker() { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() - - if shutdown { + key, quit := c.workqueue.Get() + if quit { return false } + defer c.workqueue.Done(key) - // We wrap this block in a func so we can defer c.workqueue.Done. - err := func(obj interface{}) error { - defer c.workqueue.Done(obj) - var key string - var ok bool - if key, ok = obj.(string); !ok { - // As the item in the workqueue is actually invalid, we call - // Forget here else we'd go into a loop of attempting to - // process a work item that is invalid. - //c.workqueue.Forget(obj) - runtimeutil.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) - return nil - } - if err := c.deleteWorkflow(key); err != nil { - return fmt.Errorf("error deleting '%s': %s", key, err.Error()) - } - //c.workqueue.Forget(obj) - return nil - }(obj) - - if err != nil { - runtimeutil.HandleError(err) - return true - } + runtimeutil.HandleError(c.deleteWorkflow(key.(string))) return true } @@ -137,121 +103,71 @@ func (c *Controller) enqueueWF(obj interface{}) { log.Warnf("Failed to unmarshal workflow %v object: %v", obj, err) return } - now := c.clock.Now() - remaining, expiration := timeLeft(wf, &now) - if remaining == nil || *remaining > c.resyncPeriod { - return - } - log.Infof("Found Workflow %s/%s set expire at %v (%s from now)", wf.Namespace, wf.Name, expiration, remaining) - var addAfter time.Duration - if *remaining > 0 { - addAfter = *remaining - } - var key string - if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { - runtimeutil.HandleError(err) + remaining, ok := c.expiresIn(wf) + if !ok { return } // if we try and delete in the next second, it is almost certain that the informer is out of sync. Because we // double-check that sees if the workflow in the informer is already deleted and we'll make 2 API requests when // one is enough. - if addAfter < enoughTimeForInformerSync { - addAfter = enoughTimeForInformerSync - } - log.Infof("Queueing workflow %s/%s for delete in %v", wf.Namespace, wf.Name, addAfter) + // Additionally, this allows enough time to make sure the double checking that the workflow is actually expired + // truly works. + addAfter := remaining + time.Second + key, _ := cache.MetaNamespaceKeyFunc(obj) + log.Infof("Queueing %v workflow %s for delete in %v", wf.Status.Phase, key, addAfter.Truncate(time.Second)) c.workqueue.AddAfter(key, addAfter) } func (c *Controller) deleteWorkflow(key string) error { - obj, exists, err := c.wfInformer.GetIndexer().GetByKey(key) + // It should be impossible for a workflow to have been queue without a valid key. + namespace, name, _ := cache.SplitMetaNamespaceKey(key) + // Any workflow that was queued must need deleting, therefore we do not check the expiry again. + log.Infof("Deleting TTL expired workflow '%s'", key) + err := c.wfclientset.ArgoprojV1alpha1().Workflows(namespace).Delete(name, &metav1.DeleteOptions{PropagationPolicy: commonutil.GetDeletePropagation()}) if err != nil { if apierr.IsNotFound(err) { - runtimeutil.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key)) - return nil - } - return err - } - if !exists { - return nil - } - - // The workflow informer receives unstructured objects to deal with the possibility of invalid - // workflow manifests that are unable to unmarshal to workflow objects - un, ok := obj.(*unstructured.Unstructured) - if !ok { - log.Warnf("Key '%s' in index is not an unstructured", key) - return nil - } - if un.GetDeletionTimestamp() != nil { - return nil - } - wf, err := util.FromUnstructured(un) - if err != nil { - log.Warnf("Failed to unmarshal key '%s' to workflow object: %v", key, err) - return nil - } - if c.ttlExpired(wf) { - log.Infof("Deleting TTL expired workflow %s/%s", wf.Namespace, wf.Name) - - err = c.wfclientset.ArgoprojV1alpha1().Workflows(wf.Namespace).Delete(wf.Name, &metav1.DeleteOptions{PropagationPolicy: commonutil.GetDeletePropagation()}) - if err != nil { - if apierr.IsNotFound(err) { - log.Infof("workflow already deleted '%s'", key) - } else { - return err - } + log.Infof("Workflow already deleted '%s'", key) } else { - log.Infof("Successfully deleted '%s'", key) + return err } + } else { + log.Infof("Successfully deleted '%s'", key) } return nil } +// if the workflow both has a TTL and is expired func (c *Controller) ttlExpired(wf *wfv1.Workflow) bool { - ttlStrategy := wf.GetTTLStrategy() - - // We don't care about the Workflows that are going to be deleted, or the ones that don't need clean up. - if wf.DeletionTimestamp != nil || ttlStrategy == nil || wf.Status.FinishedAt.IsZero() { + expiresIn, ok := c.expiresIn(wf) + if !ok { return false } - now := c.clock.Now() + return expiresIn <= 0 +} - if wf.Status.Failed() && ttlStrategy.SecondsAfterFailure != nil { - expiry := wf.Status.FinishedAt.Add(time.Second * time.Duration(*ttlStrategy.SecondsAfterFailure)) - return now.After(expiry) - } else if wf.Status.Successful() && ttlStrategy.SecondsAfterSuccess != nil { - expiry := wf.Status.FinishedAt.Add(time.Second * time.Duration(*ttlStrategy.SecondsAfterSuccess)) - return now.After(expiry) - } else { - expiry := wf.Status.FinishedAt.Add(time.Second * time.Duration(*ttlStrategy.SecondsAfterCompletion)) - return now.After(expiry) +// expiresIn - seconds from now the workflow expires in, maybe <= 0 +// ok - if the workflow has a TTL +func (c *Controller) expiresIn(wf *wfv1.Workflow) (expiresIn time.Duration, ok bool) { + ttl, ok := ttl(wf) + if !ok { + return 0, false } + expiresAt := wf.Status.FinishedAt.Add(ttl) + return expiresAt.Sub(c.clock.Now()), true } -func timeLeft(wf *wfv1.Workflow, since *time.Time) (*time.Duration, *time.Time) { +// ttl - the workflow's TTL +// ok - if the workflow has a TTL +func ttl(wf *wfv1.Workflow) (ttl time.Duration, ok bool) { ttlStrategy := wf.GetTTLStrategy() - if wf.DeletionTimestamp != nil || ttlStrategy == nil || wf.Status.FinishedAt.IsZero() { - return nil, nil - } - - sinceUTC := since.UTC() - finishAtUTC := wf.Status.FinishedAt.UTC() - if finishAtUTC.After(sinceUTC) { - log.Infof("Warning: Found Workflow %s/%s finished in the future. This is likely due to time skew in the cluster. Workflow cleanup will be deferred.", wf.Namespace, wf.Name) - } - if wf.Status.Failed() && ttlStrategy.SecondsAfterFailure != nil { - expireAtUTC := finishAtUTC.Add(time.Duration(*ttlStrategy.SecondsAfterFailure) * time.Second) - remaining := expireAtUTC.Sub(sinceUTC) - return &remaining, &expireAtUTC - } else if wf.Status.Successful() && ttlStrategy.SecondsAfterSuccess != nil { - expireAtUTC := finishAtUTC.Add(time.Duration(*ttlStrategy.SecondsAfterSuccess) * time.Second) - remaining := expireAtUTC.Sub(sinceUTC) - return &remaining, &expireAtUTC - } else if ttlStrategy.SecondsAfterCompletion != nil { - expireAtUTC := finishAtUTC.Add(time.Duration(*ttlStrategy.SecondsAfterCompletion) * time.Second) - remaining := expireAtUTC.Sub(sinceUTC) - return &remaining, &expireAtUTC - } else { - return nil, nil + if ttlStrategy != nil { + if wf.Status.Failed() && ttlStrategy.SecondsAfterFailure != nil { + return time.Duration(*ttlStrategy.SecondsAfterFailure) * time.Second, true + } else if wf.Status.Successful() && ttlStrategy.SecondsAfterSuccess != nil { + return time.Duration(*ttlStrategy.SecondsAfterSuccess) * time.Second, true + } else if wf.Status.Phase.Completed() && ttlStrategy.SecondsAfterCompletion != nil { + return time.Duration(*ttlStrategy.SecondsAfterCompletion) * time.Second, true + } } + return 0, false } diff --git a/workflow/ttlcontroller/ttlcontroller_test.go b/workflow/ttlcontroller/ttlcontroller_test.go index 4eb6a5f593c5..2a005a346120 100644 --- a/workflow/ttlcontroller/ttlcontroller_test.go +++ b/workflow/ttlcontroller/ttlcontroller_test.go @@ -44,8 +44,9 @@ spec: image: docker/whalesay:latest name: whalesay status: - phase: Running + phase: Succeeded startedAt: 2018-08-27T20:41:38Z + finishedAt: 2018-08-27T20:41:38Z ` var succeededWf = ` @@ -344,17 +345,16 @@ func newTTLController() *Controller { wfclientset := fakewfclientset.NewSimpleClientset() wfInformer := cache.NewSharedIndexInformer(nil, nil, 0, nil) return &Controller{ - wfclientset: wfclientset, - wfInformer: wfInformer, - resyncPeriod: workflowTTLResyncPeriod, - clock: clock, - workqueue: workqueue.NewDelayingQueue(), + wfclientset: wfclientset, + wfInformer: wfInformer, + clock: clock, + workqueue: workqueue.NewDelayingQueue(), } } func enqueueWF(controller *Controller, un *unstructured.Unstructured) { controller.enqueueWF(un) - time.Sleep(100*time.Millisecond + enoughTimeForInformerSync) + time.Sleep(100*time.Millisecond + time.Second) } func TestEnqueueWF(t *testing.T) { diff --git a/workflow/util/util.go b/workflow/util/util.go index 2ad1bf3ec06a..aef6b25459ee 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -116,7 +116,13 @@ func NewWorkflowLister(informer cache.SharedIndexInformer) WorkflowLister { } } -// FromUnstructured converts an unstructured object to a workflow +// FromUnstructured converts an unstructured object to a workflow. +// This function performs a lot of allocations and con resulting in a lot of memory +// being used. Users should avoid invoking this function if the data they need is +// available from `unstructured.Unstructured`. especially if they're looping. +// Available values include: `GetLabels()`, `GetName()`, `GetNamespace()` etc. +// Single values can be accessed using `unstructured.Nested*`, e.g. +// `unstructured.NestedString(un.Object, "spec", "phase")`. func FromUnstructured(un *unstructured.Unstructured) (*wfv1.Workflow, error) { var wf wfv1.Workflow err := FromUnstructuredObj(un, &wf)