Skip to content

Commit

Permalink
fix: DAG with continueOn in error after retry. Fixes: #11395 (#12817)
Browse files Browse the repository at this point in the history
Signed-off-by: shuangkun <tsk2013uestc@163.com>
  • Loading branch information
shuangkun committed Apr 12, 2024
1 parent 87a2041 commit 2eb2415
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 0 deletions.
36 changes: 36 additions & 0 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/yaml"
Expand Down Expand Up @@ -927,6 +928,41 @@ func (s *CLISuite) TestWorkflowRetryWithRecreatedPVC() {
})
}

func (s *CLISuite) TestRetryWorkflowWithContinueOn() {
var workflowName string
s.Given().
Workflow(`@testdata/retry-workflow-with-continueon.yaml`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
workflowName = metadata.Name
assert.Equal(t, 6, len(status.Nodes))
}).
RunCli([]string{"retry", workflowName}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err, output) {
assert.Contains(t, output, "Name:")
assert.Contains(t, output, "Namespace:")
}
})

s.Given().
When().
WaitForWorkflow(fixtures.ToBeCompleted).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
workflowName = metadata.Name
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Equal(t, 6, len(status.Nodes))
}).
ExpectWorkflowNode(func(status wfv1.NodeStatus) bool {
return strings.Contains(status.Name, "retry-workflow-with-continueon.success")
}, func(t *testing.T, status *wfv1.NodeStatus, pod *corev1.Pod) {
assert.Equal(t, 2, len(status.Children))
})
}

func (s *CLISuite) TestWorkflowStop() {
s.Given().
Workflow("@smoke/basic.yaml").
Expand Down
55 changes: 55 additions & 0 deletions test/e2e/testdata/retry-workflow-with-continueon.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: retry-workflow-with-continueon
spec:
entrypoint: dag
templates:
- name: dag
dag:
failFast: false
tasks:
- name: success
template: node-to-exit
arguments:
parameters:
- name: exitCode
value: 0
- name: failure
template: node-to-exit
dependencies: [success]
arguments:
parameters:
- name: exitCode
value: 1
- name: task-after-failure
template: node-to-exit
dependencies: [failure]
arguments:
parameters:
- name: exitCode
value: 0
- name: continue
template: node-to-exit
continueOn:
failed: true
dependencies: [success]
arguments:
parameters:
- name: exitCode
value: 2
- name: task-after-continue
template: node-to-exit
dependencies: [continue]
arguments:
parameters:
- name: exitCode
value: 0

- name: node-to-exit
inputs:
parameters:
- name: exitCode
container:
image: alpine:3.7
command: [ sh, "-c", "exit {{inputs.parameters.exitCode}}" ]
20 changes: 20 additions & 0 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,21 @@ func getDescendantNodeIDs(wf *wfv1.Workflow, node wfv1.NodeStatus) []string {
return descendantNodeIDs
}

func isDescendantNodeSucceeded(wf *wfv1.Workflow, node wfv1.NodeStatus, nodeIDsToReset map[string]bool) bool {
for _, child := range node.Children {
childStatus, err := wf.Status.Nodes.Get(child)
if err != nil {
log.Fatalf("Couldn't get child, panicking")
panic("Was not able to obtain child")
}
_, present := nodeIDsToReset[child]
if (!present && childStatus.Phase == wfv1.NodeSucceeded) || isDescendantNodeSucceeded(wf, *childStatus, nodeIDsToReset) {
return true
}
}
return false
}

func deletePodNodeDuringRetryWorkflow(wf *wfv1.Workflow, node wfv1.NodeStatus, deletedPods map[string]bool, podsToDelete []string) (map[string]bool, []string) {
templateName := GetTemplateFromNode(node)
version := GetWorkflowPodNameVersion(wf)
Expand Down Expand Up @@ -960,6 +975,11 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
log.Debugf("Reset %s node %s since it's a group node", node.Name, string(node.Phase))
continue
} else {
if isDescendantNodeSucceeded(wf, node, nodeIDsToReset) {
log.Debugf("Node %s remains as is since it has succeed child nodes.", node.Name)
newWF.Status.Nodes.Set(node.ID, node)
continue
}
log.Debugf("Deleted %s node %s since it's not a group node", node.Name, string(node.Phase))
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, node, deletedPods, podsToDelete)
log.Debugf("Deleted pod node: %s", node.Name)
Expand Down
58 changes: 58 additions & 0 deletions workflow/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,64 @@ func TestFormulateRetryWorkflow(t *testing.T) {
}
}
})

t.Run("Retry continue on failed workflow", func(t *testing.T) {
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "continue-on-failed-workflow",
Labels: map[string]string{},
},
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowFailed,
Nodes: map[string]wfv1.NodeStatus{
"continue-on-failed-workflow": {ID: "continue-on-failed-workflow", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypeDAG, Children: []string{"1"}, OutboundNodes: []string{"3", "5"}},
"1": {ID: "1", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Children: []string{"2", "4"}, Name: "node1"},
"2": {ID: "2", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Children: []string{"3"}, Name: "node2"},
"3": {ID: "3", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Name: "node3"},
"4": {ID: "4", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Children: []string{"5"}, Name: "node4"},
"5": {ID: "5", Phase: wfv1.NodeOmitted, Type: wfv1.NodeTypeSkipped, BoundaryID: "continue-on-failed-workflow", Name: "node5"}},
},
}
_, err := wfClient.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
wf, podsToDelete, err := FormulateRetryWorkflow(ctx, wf, false, "", nil)
if assert.NoError(t, err) {
if assert.Len(t, wf.Status.Nodes, 4) {
assert.Equal(t, wfv1.NodeFailed, wf.Status.Nodes["2"].Phase)
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["3"].Phase)
assert.Equal(t, 2, len(podsToDelete))
}
}
})

t.Run("Retry continue on failed workflow with restartSuccessful and nodeFieldSelector", func(t *testing.T) {
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "continue-on-failed-workflow-2",
Labels: map[string]string{},
},
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowFailed,
Nodes: map[string]wfv1.NodeStatus{
"continue-on-failed-workflow-2": {ID: "continue-on-failed-workflow-2", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypeDAG, Children: []string{"1"}, OutboundNodes: []string{"3", "5"}},
"1": {ID: "1", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Children: []string{"2", "4"}, Name: "node1"},
"2": {ID: "2", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Children: []string{"3"}, Name: "node2"},
"3": {ID: "3", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Name: "node3"},
"4": {ID: "4", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Children: []string{"5"}, Name: "node4"},
"5": {ID: "5", Phase: wfv1.NodeOmitted, Type: wfv1.NodeTypeSkipped, BoundaryID: "continue-on-failed-workflow-2", Name: "node5"}},
},
}
_, err := wfClient.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
wf, podsToDelete, err := FormulateRetryWorkflow(ctx, wf, true, "id=3", nil)
if assert.NoError(t, err) {
if assert.Len(t, wf.Status.Nodes, 2) {
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["1"].Phase)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["continue-on-failed-workflow-2"].Phase)
assert.Equal(t, 4, len(podsToDelete))
}
}
})
}

func TestFromUnstructuredObj(t *testing.T) {
Expand Down

0 comments on commit 2eb2415

Please sign in to comment.