Skip to content

Commit

Permalink
fix(controller): Various v2.12 fixes. Fixes #4798, #4801, #4806 (#4808)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed Jan 4, 2021
1 parent ee59d49 commit a0024d0
Show file tree
Hide file tree
Showing 25 changed files with 264 additions and 214 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ STATIC_BUILD ?= true
STATIC_FILES ?= true
GOTEST ?= go test
PROFILE ?= minimal
# by keeping this short we speed up the tests
DEFAULT_REQUEUE_TIME ?= 2s
# whether or not to start the Argo Service in TLS mode
SECURE := false
AUTH_MODE := hybrid
Expand Down Expand Up @@ -450,7 +452,7 @@ endif
grep '127.0.0.1[[:blank:]]*postgres' /etc/hosts
grep '127.0.0.1[[:blank:]]*mysql' /etc/hosts
ifeq ($(RUN_MODE),local)
env SECURE=$(SECURE) ALWAYS_OFFLOAD_NODE_STATUS=$(ALWAYS_OFFLOAD_NODE_STATUS) LOG_LEVEL=$(LOG_LEVEL) UPPERIO_DB_DEBUG=$(UPPERIO_DB_DEBUG) VERSION=$(VERSION) AUTH_MODE=$(AUTH_MODE) NAMESPACED=$(NAMESPACED) NAMESPACE=$(KUBE_NAMESPACE) $(GOPATH)/bin/goreman -set-ports=false -logtime=false start
env DEFAULT_REQUEUE_TIME=$(DEFAULT_REQUEUE_TIME) SECURE=$(SECURE) ALWAYS_OFFLOAD_NODE_STATUS=$(ALWAYS_OFFLOAD_NODE_STATUS) LOG_LEVEL=$(LOG_LEVEL) UPPERIO_DB_DEBUG=$(UPPERIO_DB_DEBUG) VERSION=$(VERSION) AUTH_MODE=$(AUTH_MODE) NAMESPACED=$(NAMESPACED) NAMESPACE=$(KUBE_NAMESPACE) $(GOPATH)/bin/goreman -set-ports=false -logtime=false start
endif

.PHONY: wait
Expand Down
2 changes: 1 addition & 1 deletion Procfile
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
controller: LEADER_ELECTION_IDENTITY=local ALWAYS_OFFLOAD_NODE_STATUS=${ALWAYS_OFFLOAD_NODE_STATUS} OFFLOAD_NODE_STATUS_TTL=30s WORKFLOW_GC_PERIOD=30s UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ARCHIVED_WORKFLOW_GC_PERIOD=30s ./dist/workflow-controller --executor-image argoproj/argoexec:${VERSION} --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --loglevel ${LOG_LEVEL}
controller: DEFAULT_REQUEUE_TIME=${DEFAULT_REQUEUE_TIME} LEADER_ELECTION_IDENTITY=local ALWAYS_OFFLOAD_NODE_STATUS=${ALWAYS_OFFLOAD_NODE_STATUS} OFFLOAD_NODE_STATUS_TTL=30s WORKFLOW_GC_PERIOD=30s UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ARCHIVED_WORKFLOW_GC_PERIOD=30s ./dist/workflow-controller --executor-image argoproj/argoexec:${VERSION} --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --loglevel ${LOG_LEVEL}
argo-server: UPPERIO_DB_DEBUG=${UPPERIO_DB_DEBUG} ./dist/argo --loglevel ${LOG_LEVEL} server --namespaced=${NAMESPACED} --namespace ${NAMESPACE} --auth-mode ${AUTH_MODE} --secure=$SECURE
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ spec:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
ports:
- name: metrics
containerPort: 9090
# Periodically check we are listening on the metrics port
# causing a restart if it is not OK.
# This takes advantage of the fact that if the metrics service has died,
# then the controller has died.
# In testing, it appears to take 60-90s from failure to restart.
livenessProbe:
httpGet:
port: metrics
path: /metrics
initialDelaySeconds: 30
periodSeconds: 30
securityContext:
runAsNonRoot: true
nodeSelector:
Expand Down
9 changes: 9 additions & 0 deletions manifests/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,16 @@ spec:
apiVersion: v1
fieldPath: metadata.name
image: argoproj/workflow-controller:latest
livenessProbe:
httpGet:
path: /metrics
port: metrics
initialDelaySeconds: 30
periodSeconds: 30
name: workflow-controller
ports:
- containerPort: 9090
name: metrics
securityContext:
capabilities:
drop:
Expand Down
9 changes: 9 additions & 0 deletions manifests/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,16 @@ spec:
apiVersion: v1
fieldPath: metadata.name
image: argoproj/workflow-controller:latest
livenessProbe:
httpGet:
path: /metrics
port: metrics
initialDelaySeconds: 30
periodSeconds: 30
name: workflow-controller
ports:
- containerPort: 9090
name: metrics
securityContext:
capabilities:
drop:
Expand Down
9 changes: 9 additions & 0 deletions manifests/quick-start-minimal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,16 @@ spec:
apiVersion: v1
fieldPath: metadata.name
image: argoproj/workflow-controller:latest
livenessProbe:
httpGet:
path: /metrics
port: metrics
initialDelaySeconds: 30
periodSeconds: 30
name: workflow-controller
ports:
- containerPort: 9090
name: metrics
securityContext:
capabilities:
drop:
Expand Down
9 changes: 9 additions & 0 deletions manifests/quick-start-mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,16 @@ spec:
apiVersion: v1
fieldPath: metadata.name
image: argoproj/workflow-controller:latest
livenessProbe:
httpGet:
path: /metrics
port: metrics
initialDelaySeconds: 30
periodSeconds: 30
name: workflow-controller
ports:
- containerPort: 9090
name: metrics
securityContext:
capabilities:
drop:
Expand Down
9 changes: 9 additions & 0 deletions manifests/quick-start-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,16 @@ spec:
apiVersion: v1
fieldPath: metadata.name
image: argoproj/workflow-controller:latest
livenessProbe:
httpGet:
path: /metrics
port: metrics
initialDelaySeconds: 30
periodSeconds: 30
name: workflow-controller
ports:
- containerPort: 9090
name: metrics
securityContext:
capabilities:
drop:
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/functional/stop-terminate.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: stop-terminate
generateName: stop-terminate-
labels:
argo-e2e: true
spec:
Expand All @@ -23,7 +23,7 @@ spec:
- name: message
container:
image: argoproj/argosay:v2
args: [sleep, "10s"]
args: [sleep, "30s"]

- name: exit
container:
Expand Down
16 changes: 8 additions & 8 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,19 +575,19 @@ func (s *FunctionalSuite) TestStopBehavior() {
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToStart, "to start").
RunCli([]string{"stop", "stop-terminate"}, func(t *testing.T, output string, err error) {
RunCli([]string{"stop", "@latest"}, func(t *testing.T, output string, err error) {
assert.NoError(t, err)
assert.Contains(t, output, "workflow stop-terminate stopped")
assert.Regexp(t, "workflow stop-terminate-.* stopped", output)
}).
WaitForWorkflow(45 * time.Second).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, m *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeFailed, status.Phase)
nodeStatus := status.Nodes.FindByDisplayName("A.onExit")
if assert.NotNil(t, nodeStatus) {
assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase)
}
nodeStatus = status.Nodes.FindByDisplayName("stop-terminate.onExit")
nodeStatus = status.Nodes.FindByDisplayName(m.Name + ".onExit")
if assert.NotNil(t, nodeStatus) {
assert.Equal(t, wfv1.NodeSucceeded, nodeStatus.Phase)
}
Expand All @@ -600,17 +600,17 @@ func (s *FunctionalSuite) TestTerminateBehavior() {
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToStart, "to start").
RunCli([]string{"terminate", "stop-terminate"}, func(t *testing.T, output string, err error) {
RunCli([]string{"terminate", "@latest"}, func(t *testing.T, output string, err error) {
assert.NoError(t, err)
assert.Contains(t, output, "workflow stop-terminate terminated")
assert.Regexp(t, "workflow stop-terminate-.* terminated", output)
}).
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
ExpectWorkflow(func(t *testing.T, m *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeFailed, status.Phase)
nodeStatus := status.Nodes.FindByDisplayName("A.onExit")
assert.Nil(t, nodeStatus)
nodeStatus = status.Nodes.FindByDisplayName("stop-terminate.onExit")
nodeStatus = status.Nodes.FindByDisplayName(m.Name + ".onExit")
assert.Nil(t, nodeStatus)
})
}
Expand Down
2 changes: 1 addition & 1 deletion util/logs/workflow-logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func WorkflowLogs(ctx context.Context, wfClient versioned.Interface, kubeClient
ensureWeAreStreaming(&pod)
}

if req.GetLogOptions().Follow {
if logOptions.Follow {
wfListOptions := metav1.ListOptions{FieldSelector: "metadata.name=" + req.GetName(), ResourceVersion: "0"}
wfWatch, err := wfInterface.Watch(wfListOptions)
if err != nil {
Expand Down
11 changes: 3 additions & 8 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ func (wfc *WorkflowController) createSynchronizationManager() error {
}

nextWorkflow := func(key string) {
wfc.wfQueue.Add(key)
wfc.wfQueue.AddRateLimited(key)
}

wfc.syncManager = sync.NewLockManager(getSyncLimit, nextWorkflow)
Expand Down Expand Up @@ -353,12 +353,7 @@ func (wfc *WorkflowController) notifySemaphoreConfigUpdate(cm *apiv1.ConfigMap)
log.Warnf("received object from indexer %s is not an unstructured", indexes.SemaphoreConfigIndexName)
continue
}
wf, err := util.FromUnstructured(un)
if err != nil {
log.Errorf("failed to convert to workflow from unstructured: %v", err)
continue
}
wfc.wfQueue.Add(fmt.Sprintf("%s/%s", wf.Namespace, wf.Name))
wfc.wfQueue.AddRateLimited(fmt.Sprintf("%s/%s", un.GetNamespace(), un.GetName()))
}
}

Expand Down Expand Up @@ -749,7 +744,7 @@ func (wfc *WorkflowController) addWorkflowInformerHandlers() {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
wfc.releaseAllWorkflowLocks(obj)
wfc.wfQueue.AddRateLimited(key)
// no need to add to the queue - this workflow is done
wfc.throttler.Remove(key)
}
},
Expand Down
2 changes: 2 additions & 0 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"testing"
"time"

"github.com/argoproj/pkg/sync"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -595,5 +596,6 @@ func TestNotifySemaphoreConfigUpdate(t *testing.T) {
assert.Equal(0, controller.wfQueue.Len())

controller.notifySemaphoreConfigUpdate(&cm)
time.Sleep(2 * time.Second)
assert.Equal(2, controller.wfQueue.Len())
}
18 changes: 9 additions & 9 deletions workflow/controller/estimation/estimator_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) {
if err != nil {
return defaultEstimator, fmt.Errorf("failed to list workflows by index: %v", err)
}
var newestWf *wfv1.Workflow
var newestUn *unstructured.Unstructured
for _, obj := range objs {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
Expand All @@ -55,16 +55,16 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) {
if un.GetLabels()[common.LabelKeyPhase] != string(wfv1.NodeSucceeded) {
continue
}
candidateWf, err := util.FromUnstructured(un)
// we use `creationTimestamp` because it's fast
if newestUn == nil || un.GetCreationTimestamp().After(newestUn.GetCreationTimestamp().Time) {
newestUn = un
}
}
if newestUn != nil {
newestWf, err := util.FromUnstructured(newestUn)
if err != nil {
return defaultEstimator, fmt.Errorf("failed convert unstructured to workflow: %w", err)
}
// we use `startedAt` because that's same as how the archive sorts
if newestWf == nil || candidateWf.Status.StartedAt.Time.After(newestWf.Status.StartedAt.Time) {
newestWf = candidateWf
}
}
if newestWf != nil {
err = f.hydrator.Hydrate(newestWf)
if err != nil {
return defaultEstimator, fmt.Errorf("failed hydrate last workflow: %w", err)
Expand All @@ -78,7 +78,7 @@ func (f *estimatorFactory) NewEstimator(wf *wfv1.Workflow) (Estimator, error) {
}
workflows, err := f.wfArchive.ListWorkflows(wf.Namespace, time.Time{}, time.Time{}, requirements, 1, 0)
if err != nil {
return nil, fmt.Errorf("failed to list archived workflows: %v", err)
return defaultEstimator, fmt.Errorf("failed to list archived workflows: %v", err)
}
if len(workflows) > 0 {
return &estimator{wf, &workflows[0]}, nil
Expand Down
6 changes: 6 additions & 0 deletions workflow/controller/indexes/indexes.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package indexes

// Indexers (i.e. IndexFunc) should be fast and should not return errors
// If an indexers returns an error, the cache will panic, and crash the Go VM.
// Indexers should be fast, if they are not, then the informer will get out of date
// and start returning old workflows, resulting in the operator reconciling out of date
// information, causing conflict errors and risking corrupted data.

const (
ClusterWorkflowTemplateIndex = "clusterworkflowtemplate"
CronWorkflowIndex = "cronworkflow"
Expand Down
8 changes: 3 additions & 5 deletions workflow/controller/indexes/labels.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package indexes

import (
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"

Expand All @@ -18,7 +16,7 @@ func MetaWorkflowPhaseIndexFunc() cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
v, err := meta.Accessor(obj)
if err != nil {
return []string{}, fmt.Errorf("object has no meta: %v", err)
return nil, nil
}
if value, exists := v.GetLabels()[common.LabelKeyPhase]; exists {
return []string{value}, nil
Expand All @@ -33,12 +31,12 @@ func MetaNamespaceLabelIndexFunc(label string) cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
v, err := meta.Accessor(obj)
if err != nil {
return []string{}, fmt.Errorf("object has no meta: %v", err)
return nil, nil
}
if value, exists := v.GetLabels()[label]; exists {
return []string{MetaNamespaceLabelIndex(v.GetNamespace(), value)}, nil
} else {
return []string{}, nil
return nil, nil
}
}
}
13 changes: 4 additions & 9 deletions workflow/controller/indexes/workflow_index.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package indexes

import (
"fmt"

"github.com/prometheus/common/log"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"
Expand All @@ -15,11 +12,11 @@ import (
func MetaWorkflowIndexFunc(obj interface{}) ([]string, error) {
m, err := meta.Accessor(obj)
if err != nil {
return []string{}, fmt.Errorf("object has no meta: %v", err)
return nil, nil
}
name, ok := m.GetLabels()[common.LabelKeyWorkflow]
if !ok {
return []string{}, fmt.Errorf("object has no workflow label")
return nil, nil
}
return []string{WorkflowIndexValue(m.GetNamespace(), name)}, nil
}
Expand All @@ -32,13 +29,11 @@ func WorkflowSemaphoreKeysIndexFunc() cache.IndexFunc {
return func(obj interface{}) ([]string, error) {
un, ok := obj.(*unstructured.Unstructured)
if !ok {
log.Warnf("cannot convert obj into unstructured.Unstructured in Indexer %s", SemaphoreConfigIndexName)
return []string{}, nil
return nil, nil
}
wf, err := util.FromUnstructured(un)
if err != nil {
log.Warnf("failed to convert to workflow from unstructured: %v", err)
return []string{}, nil
return nil, nil
}
return wf.GetSemaphoreKeys(), nil
}
Expand Down
Loading

0 comments on commit a0024d0

Please sign in to comment.