Skip to content

Commit

Permalink
fix: Argo Workflows does not honour global timeout if step/pod is not…
Browse files Browse the repository at this point in the history
… able to schedule (#3581)
  • Loading branch information
sarabala1979 authored and alexec committed Jul 28, 2020
1 parent 3879827 commit 35a0049
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 5 deletions.
17 changes: 12 additions & 5 deletions workflow/controller/operator.go
Expand Up @@ -222,6 +222,12 @@ func (woc *wfOperationCtx) operate() {

woc.workflowDeadline = woc.getWorkflowDeadline()

// Workflow will not be requeued if workflow steps are in pending state.
// Workflow needs to requeue on its deadline,
if woc.workflowDeadline != nil {
woc.requeue(time.Until(*woc.workflowDeadline))
}

if woc.wfSpec.Metrics != nil {
realTimeScope := map[string]func() float64{common.GlobalVarWorkflowDuration: func() float64 {
return time.Since(woc.wf.Status.StartedAt.Time).Seconds()
Expand All @@ -232,7 +238,7 @@ func (woc *wfOperationCtx) operate() {
woc.workflowDeadline = woc.getWorkflowDeadline()
err := woc.podReconciliation()
if err == nil {
err = woc.failSuspendedNodesAfterDeadlineOrShutdown()
err = woc.failSuspendedAndPendingNodesAfterDeadlineOrShutdown()
}
if err != nil {
woc.log.WithError(err).WithField("workflow", woc.wf.ObjectMeta.Name).Error("workflow timeout")
Expand Down Expand Up @@ -871,11 +877,12 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node wfv1.NodeStatus) bool {
(woc.controller.Config.PodSpecLogStrategy.FailedPod && node.FailedOrError())
}

//fails any suspended nodes if the workflow deadline has passed
func (woc *wfOperationCtx) failSuspendedNodesAfterDeadlineOrShutdown() error {
if woc.wfSpec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
//fails any suspended and pending nodes if the workflow deadline has passed
func (woc *wfOperationCtx) failSuspendedAndPendingNodesAfterDeadlineOrShutdown() error {
deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)
if woc.wfSpec.Shutdown != "" || deadlineExceeded {
for _, node := range woc.wf.Status.Nodes {
if node.IsActiveSuspendNode() {
if node.IsActiveSuspendNode() || (node.Phase == wfv1.NodePending && deadlineExceeded) {
var message string
if woc.wfSpec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.wfSpec.Shutdown)
Expand Down
123 changes: 123 additions & 0 deletions workflow/controller/operator_test.go
Expand Up @@ -4327,3 +4327,126 @@ func TestGlobalVarsOnExit(t *testing.T) {
assert.Equal(t, "nononono", node.Inputs.Parameters[0].Value.StrVal)
}
}

var deadlineWf = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: steps-9fvnv
namespace: argo
spec:
activeDeadlineSeconds: 1
entrypoint: main
templates:
- name: main
steps:
- - name: approve
template: approve
- name: hello1
template: whalesay
- name: approve
suspend: {}
- container:
args:
- sleep 50
command:
- sh
- -c
image: alpine:latest
resources:
requests:
memory: 1Gi
name: whalesay
resubmitPendingPods: true
status:
finishedAt: null
nodes:
steps-9fvnv:
children:
- steps-9fvnv-3514116232
displayName: steps-9fvnv
finishedAt: null
id: steps-9fvnv
name: steps-9fvnv
phase: Running
startedAt: "2020-07-24T16:39:25Z"
templateName: main
templateScope: local/steps-9fvnv
type: Steps
steps-9fvnv-3392004273:
boundaryID: steps-9fvnv
displayName: hello1
finishedAt: null
hostNodeName: k3d-k3s-default-server
id: steps-9fvnv-3392004273
message: 'ErrImageNeverPull: Container image "argoproj/argoexec:latest" is not
present with pull policy of Never'
name: steps-9fvnv[0].hello1
phase: Pending
startedAt: "2020-07-24T16:39:25Z"
templateName: whalesay
templateScope: local/steps-9fvnv
type: Pod
steps-9fvnv-3514116232:
boundaryID: steps-9fvnv
children:
- steps-9fvnv-3700512507
- steps-9fvnv-3392004273
displayName: '[0]'
finishedAt: null
id: steps-9fvnv-3514116232
name: steps-9fvnv[0]
phase: Running
startedAt: "2020-07-24T16:39:25Z"
templateName: main
templateScope: local/steps-9fvnv
type: StepGroup
steps-9fvnv-3700512507:
boundaryID: steps-9fvnv
displayName: approve
finishedAt: null
id: steps-9fvnv-3700512507
name: steps-9fvnv[0].approve
phase: Running
startedAt: "2020-07-24T16:39:25Z"
templateName: approve
templateScope: local/steps-9fvnv
type: Suspend
phase: Running
startedAt: "2020-07-24T16:39:25Z"
`

func TestFailSuspendedAndPendingNodesAfterDeadline(t *testing.T) {
wf := unmarshalWF(deadlineWf)
wf.Status.StartedAt = metav1.Now()
cancel, controller := newController(wf)
defer cancel()
woc := newWorkflowOperationCtx(wf, controller)
t.Run("Before Deadline", func(t *testing.T) {
woc.operate()
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase)
})
time.Sleep(1 * time.Second)
t.Run("After Deadline", func(t *testing.T) {
woc.operate()
assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Phase)
for _, node := range woc.wf.Status.Nodes {
assert.Equal(t, wfv1.NodeFailed, node.Phase)
}
})
}

func TestFailSuspendedAndPendingNodesAfterShutdown(t *testing.T) {
wf := unmarshalWF(deadlineWf)
wf.Spec.Shutdown = wfv1.ShutdownStrategyStop
cancel, controller := newController(wf)
defer cancel()
woc := newWorkflowOperationCtx(wf, controller)
t.Run("After Shutdown", func(t *testing.T) {
woc.operate()
assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Phase)
for _, node := range woc.wf.Status.Nodes {
assert.Equal(t, wfv1.NodeFailed, node.Phase)
}
})
}

0 comments on commit 35a0049

Please sign in to comment.