Skip to content

Commit

Permalink
fix: workflow stuck in running state when using activeDeadlineSeconds…
Browse files Browse the repository at this point in the history
… on template level. Fixes: #12329

Signed-off-by: shuangkun <tsk2013uestc@163.com>
  • Loading branch information
shuangkun committed Mar 8, 2024
1 parent 61156f2 commit cd5a7f0
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 2 deletions.
99 changes: 99 additions & 0 deletions test/e2e/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
package e2e

import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
"github.com/argoproj/argo-workflows/v3/workflow/common"
)

type WorkflowSuite struct {
Expand Down Expand Up @@ -85,6 +89,101 @@ spec:
})
}

func (s *WorkflowSuite) TestWorkflowFailedWhenAllPodSetFailedFromPending() {
(s.Given().Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: active-deadline-fanout-template-level-
namespace: argo
spec:
entrypoint: entrypoint
templates:
- name: entrypoint
steps:
- - name: fanout
template: echo
arguments:
parameters:
- name: item
value: "{{item}}"
withItems:
- 1
- 2
- 3
- 4
- name: echo
inputs:
parameters:
- name: item
container:
image: centos:latest
imagePullPolicy: Always
command:
- sh
- '-c'
args:
- echo
- 'workflow number {{inputs.parameters.item}}'
- sleep
- '20'
activeDeadlineSeconds: 2 # defined on template level, not workflow level !
`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed, time.Minute*11).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
for _, node := range status.Nodes {
if node.Type == wfv1.NodeTypePod {
assert.Equal(t, wfv1.NodeFailed, node.Phase)
assert.Contains(t, node.Message, "Pod was active on the node longer than the specified deadline")
}
}
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "fanout(0:1)")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName && c.State.Terminated == nil {
assert.NotNil(t, c.State.Waiting)
assert.Contains(t, c.State.Waiting.Reason, "PodInitializing")
}
}
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "fanout(1:2)")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName && c.State.Terminated == nil {
assert.NotNil(t, c.State.Waiting)
assert.Contains(t, c.State.Waiting.Reason, "PodInitializing")
}
}
})).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "fanout(2:3)")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName && c.State.Terminated == nil {
assert.NotNil(t, c.State.Waiting)
assert.Contains(t, c.State.Waiting.Reason, "PodInitializing")
}
}
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "fanout(3:4)")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName && c.State.Terminated == nil {
assert.NotNil(t, c.State.Waiting)
assert.Contains(t, c.State.Waiting.Reason, "PodInitializing")
}
}
})
}

func TestWorkflowSuite(t *testing.T) {
suite.Run(t, new(WorkflowSuite))
}
4 changes: 2 additions & 2 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1444,10 +1444,10 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
}
}

// We cannot fail the node until the wait container is finished (unless any init container has failed) because it may be busy saving outputs, and these
// We cannot fail the node until the wait container is finished (unless any init container has failed or wait contain is not Pending) because it may be busy saving outputs, and these
// would not get captured successfully.
for _, c := range pod.Status.ContainerStatuses {
if (c.Name == common.WaitContainerName && c.State.Terminated == nil && new.Phase.Completed()) && !initContainerFailed {
if (c.Name == common.WaitContainerName && c.State.Terminated == nil && c.State.Waiting == nil && new.Phase.Completed()) && !initContainerFailed {
woc.log.WithField("new.phase", new.Phase).Info("leaving phase un-changed: wait container is not yet terminated ")
new.Phase = old.Phase
}
Expand Down

0 comments on commit cd5a7f0

Please sign in to comment.