Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: DAG with continueOn in error after retry. Fixes: #11395 #12817

Merged
merged 11 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions test/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/yaml"
Expand Down Expand Up @@ -913,6 +914,41 @@ func (s *CLISuite) TestWorkflowRetryWithRecreatedPVC() {
})
}

func (s *CLISuite) TestRetryWorkflowWithContinueOn() {
var workflowName string
s.Given().
Workflow(`@testdata/retry-workflow-with-continueon.yaml`).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
workflowName = metadata.Name
assert.Equal(t, 6, len(status.Nodes))
}).
RunCli([]string{"retry", workflowName}, func(t *testing.T, output string, err error) {
if assert.NoError(t, err, output) {
assert.Contains(t, output, "Name:")
assert.Contains(t, output, "Namespace:")
}
})

s.Given().
When().
WaitForWorkflow(fixtures.ToBeCompleted).
Then().
ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
workflowName = metadata.Name
assert.Equal(t, wfv1.WorkflowFailed, status.Phase)
assert.Equal(t, 6, len(status.Nodes))
}).
ExpectWorkflowNode(func(status wfv1.NodeStatus) bool {
return strings.Contains(status.Name, "retry-workflow-with-continueon.success")
}, func(t *testing.T, status *wfv1.NodeStatus, pod *corev1.Pod) {
assert.Equal(t, 2, len(status.Children))
})
}

func (s *CLISuite) TestWorkflowStop() {
s.Given().
Workflow("@smoke/basic.yaml").
Expand Down
55 changes: 55 additions & 0 deletions test/e2e/testdata/retry-workflow-with-continueon.yaml
shuangkun marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: retry-workflow-with-continueon
spec:
entrypoint: dag
templates:
- name: dag
dag:
failFast: false
tasks:
- name: success
template: node-to-exit
arguments:
parameters:
- name: exitCode
value: 0
- name: failure
template: node-to-exit
dependencies: [success]
arguments:
parameters:
- name: exitCode
value: 1
- name: task-after-failure
template: node-to-exit
dependencies: [failure]
arguments:
parameters:
- name: exitCode
value: 0
- name: continue
template: node-to-exit
continueOn:
failed: true
dependencies: [success]
arguments:
parameters:
- name: exitCode
value: 2
- name: task-after-continue
template: node-to-exit
dependencies: [continue]
arguments:
parameters:
- name: exitCode
value: 0

- name: node-to-exit
inputs:
parameters:
- name: exitCode
container:
image: alpine:3.7
command: [ sh, "-c", "exit {{inputs.parameters.exitCode}}" ]
20 changes: 20 additions & 0 deletions workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,21 @@ func getDescendantNodeIDs(wf *wfv1.Workflow, node wfv1.NodeStatus) []string {
return descendantNodeIDs
}

func isDescendantNodeSucceeded(wf *wfv1.Workflow, node wfv1.NodeStatus, nodeIDsToReset map[string]bool) bool {
for _, child := range node.Children {
childStatus, err := wf.Status.Nodes.Get(child)
if err != nil {
log.Fatalf("Couldn't get child, panicking")
panic("Was not able to obtain child")
agilgur5 marked this conversation as resolved.
Show resolved Hide resolved
}
_, present := nodeIDsToReset[child]
if (!present && childStatus.Phase == wfv1.NodeSucceeded) || isDescendantNodeSucceeded(wf, *childStatus, nodeIDsToReset) {
return true
}
}
return false
}

func deletePodNodeDuringRetryWorkflow(wf *wfv1.Workflow, node wfv1.NodeStatus, deletedPods map[string]bool, podsToDelete []string) (map[string]bool, []string) {
templateName := GetTemplateFromNode(node)
version := GetWorkflowPodNameVersion(wf)
Expand Down Expand Up @@ -960,6 +975,11 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce
log.Debugf("Reset %s node %s since it's a group node", node.Name, string(node.Phase))
continue
} else {
if isDescendantNodeSucceeded(wf, node, nodeIDsToReset) {
log.Debugf("Node %s remains as is since it has succeed child nodes.", node.Name)
newWF.Status.Nodes.Set(node.ID, node)
continue
}
log.Debugf("Deleted %s node %s since it's not a group node", node.Name, string(node.Phase))
deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, node, deletedPods, podsToDelete)
log.Debugf("Deleted pod node: %s", node.Name)
Expand Down
58 changes: 58 additions & 0 deletions workflow/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,64 @@ func TestFormulateRetryWorkflow(t *testing.T) {
}
}
})

t.Run("Retry continue on failed workflow", func(t *testing.T) {
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "continue-on-failed-workflow",
Labels: map[string]string{},
},
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowFailed,
Nodes: map[string]wfv1.NodeStatus{
"continue-on-failed-workflow": {ID: "continue-on-failed-workflow", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypeDAG, Children: []string{"1"}, OutboundNodes: []string{"3", "5"}},
"1": {ID: "1", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Children: []string{"2", "4"}, Name: "node1"},
"2": {ID: "2", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Children: []string{"3"}, Name: "node2"},
"3": {ID: "3", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Name: "node3"},
"4": {ID: "4", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Children: []string{"5"}, Name: "node4"},
"5": {ID: "5", Phase: wfv1.NodeOmitted, Type: wfv1.NodeTypeSkipped, BoundaryID: "continue-on-failed-workflow", Name: "node5"}},
},
}
_, err := wfClient.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
wf, podsToDelete, err := FormulateRetryWorkflow(ctx, wf, false, "", nil)
if assert.NoError(t, err) {
if assert.Len(t, wf.Status.Nodes, 4) {
assert.Equal(t, wfv1.NodeFailed, wf.Status.Nodes["2"].Phase)
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["3"].Phase)
assert.Equal(t, 2, len(podsToDelete))
}
}
})

t.Run("Retry continue on failed workflow with restartSuccessful and nodeFieldSelector", func(t *testing.T) {
wf := &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{
Name: "continue-on-failed-workflow-2",
Labels: map[string]string{},
},
Status: wfv1.WorkflowStatus{
Phase: wfv1.WorkflowFailed,
Nodes: map[string]wfv1.NodeStatus{
"continue-on-failed-workflow-2": {ID: "continue-on-failed-workflow-2", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypeDAG, Children: []string{"1"}, OutboundNodes: []string{"3", "5"}},
"1": {ID: "1", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Children: []string{"2", "4"}, Name: "node1"},
"2": {ID: "2", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Children: []string{"3"}, Name: "node2"},
"3": {ID: "3", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Name: "node3"},
"4": {ID: "4", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Children: []string{"5"}, Name: "node4"},
"5": {ID: "5", Phase: wfv1.NodeOmitted, Type: wfv1.NodeTypeSkipped, BoundaryID: "continue-on-failed-workflow-2", Name: "node5"}},
},
}
_, err := wfClient.Create(ctx, wf, metav1.CreateOptions{})
assert.NoError(t, err)
wf, podsToDelete, err := FormulateRetryWorkflow(ctx, wf, true, "id=3", nil)
if assert.NoError(t, err) {
if assert.Len(t, wf.Status.Nodes, 2) {
assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["1"].Phase)
assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["continue-on-failed-workflow-2"].Phase)
assert.Equal(t, 4, len(podsToDelete))
}
}
})
}

func TestFromUnstructuredObj(t *testing.T) {
Expand Down
Loading