From 2eb241586746326f0fa6e2ee4dd3b45d77910551 Mon Sep 17 00:00:00 2001 From: shuangkun tian <72060326+shuangkun@users.noreply.github.com> Date: Fri, 12 Apr 2024 08:36:59 +0800 Subject: [PATCH] fix: DAG with continueOn in error after retry. Fixes: #11395 (#12817) Signed-off-by: shuangkun --- test/e2e/cli_test.go | 36 ++++++++++++ .../retry-workflow-with-continueon.yaml | 55 ++++++++++++++++++ workflow/util/util.go | 20 +++++++ workflow/util/util_test.go | 58 +++++++++++++++++++ 4 files changed, 169 insertions(+) create mode 100644 test/e2e/testdata/retry-workflow-with-continueon.yaml diff --git a/test/e2e/cli_test.go b/test/e2e/cli_test.go index 54dc4fd517ad..6c84c4e2c4d8 100644 --- a/test/e2e/cli_test.go +++ b/test/e2e/cli_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/yaml" @@ -927,6 +928,41 @@ func (s *CLISuite) TestWorkflowRetryWithRecreatedPVC() { }) } +func (s *CLISuite) TestRetryWorkflowWithContinueOn() { + var workflowName string + s.Given(). + Workflow(`@testdata/retry-workflow-with-continueon.yaml`). + When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeFailed). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + workflowName = metadata.Name + assert.Equal(t, 6, len(status.Nodes)) + }). + RunCli([]string{"retry", workflowName}, func(t *testing.T, output string, err error) { + if assert.NoError(t, err, output) { + assert.Contains(t, output, "Name:") + assert.Contains(t, output, "Namespace:") + } + }) + + s.Given(). + When(). + WaitForWorkflow(fixtures.ToBeCompleted). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus) { + workflowName = metadata.Name + assert.Equal(t, wfv1.WorkflowFailed, status.Phase) + assert.Equal(t, 6, len(status.Nodes)) + }). + ExpectWorkflowNode(func(status wfv1.NodeStatus) bool { + return strings.Contains(status.Name, "retry-workflow-with-continueon.success") + }, func(t *testing.T, status *wfv1.NodeStatus, pod *corev1.Pod) { + assert.Equal(t, 2, len(status.Children)) + }) +} + func (s *CLISuite) TestWorkflowStop() { s.Given(). Workflow("@smoke/basic.yaml"). diff --git a/test/e2e/testdata/retry-workflow-with-continueon.yaml b/test/e2e/testdata/retry-workflow-with-continueon.yaml new file mode 100644 index 000000000000..9d2ce1414860 --- /dev/null +++ b/test/e2e/testdata/retry-workflow-with-continueon.yaml @@ -0,0 +1,55 @@ +apiVersion: argoproj.io/v1alpha1 +kind: WorkflowTemplate +metadata: + name: retry-workflow-with-continueon +spec: + entrypoint: dag + templates: + - name: dag + dag: + failFast: false + tasks: + - name: success + template: node-to-exit + arguments: + parameters: + - name: exitCode + value: 0 + - name: failure + template: node-to-exit + dependencies: [success] + arguments: + parameters: + - name: exitCode + value: 1 + - name: task-after-failure + template: node-to-exit + dependencies: [failure] + arguments: + parameters: + - name: exitCode + value: 0 + - name: continue + template: node-to-exit + continueOn: + failed: true + dependencies: [success] + arguments: + parameters: + - name: exitCode + value: 2 + - name: task-after-continue + template: node-to-exit + dependencies: [continue] + arguments: + parameters: + - name: exitCode + value: 0 + + - name: node-to-exit + inputs: + parameters: + - name: exitCode + container: + image: alpine:3.7 + command: [ sh, "-c", "exit {{inputs.parameters.exitCode}}" ] \ No newline at end of file diff --git a/workflow/util/util.go b/workflow/util/util.go index 2f69ef282d99..ff10e1101196 100644 --- a/workflow/util/util.go +++ b/workflow/util/util.go @@ -772,6 +772,21 @@ func getDescendantNodeIDs(wf *wfv1.Workflow, node wfv1.NodeStatus) []string { return descendantNodeIDs } +func isDescendantNodeSucceeded(wf *wfv1.Workflow, node wfv1.NodeStatus, nodeIDsToReset map[string]bool) bool { + for _, child := range node.Children { + childStatus, err := wf.Status.Nodes.Get(child) + if err != nil { + log.Fatalf("Couldn't get child, panicking") + panic("Was not able to obtain child") + } + _, present := nodeIDsToReset[child] + if (!present && childStatus.Phase == wfv1.NodeSucceeded) || isDescendantNodeSucceeded(wf, *childStatus, nodeIDsToReset) { + return true + } + } + return false +} + func deletePodNodeDuringRetryWorkflow(wf *wfv1.Workflow, node wfv1.NodeStatus, deletedPods map[string]bool, podsToDelete []string) (map[string]bool, []string) { templateName := GetTemplateFromNode(node) version := GetWorkflowPodNameVersion(wf) @@ -960,6 +975,11 @@ func FormulateRetryWorkflow(ctx context.Context, wf *wfv1.Workflow, restartSucce log.Debugf("Reset %s node %s since it's a group node", node.Name, string(node.Phase)) continue } else { + if isDescendantNodeSucceeded(wf, node, nodeIDsToReset) { + log.Debugf("Node %s remains as is since it has succeed child nodes.", node.Name) + newWF.Status.Nodes.Set(node.ID, node) + continue + } log.Debugf("Deleted %s node %s since it's not a group node", node.Name, string(node.Phase)) deletedPods, podsToDelete = deletePodNodeDuringRetryWorkflow(wf, node, deletedPods, podsToDelete) log.Debugf("Deleted pod node: %s", node.Name) diff --git a/workflow/util/util_test.go b/workflow/util/util_test.go index 0328b37d04ab..0ea6844beb4b 100644 --- a/workflow/util/util_test.go +++ b/workflow/util/util_test.go @@ -1367,6 +1367,64 @@ func TestFormulateRetryWorkflow(t *testing.T) { } } }) + + t.Run("Retry continue on failed workflow", func(t *testing.T) { + wf := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "continue-on-failed-workflow", + Labels: map[string]string{}, + }, + Status: wfv1.WorkflowStatus{ + Phase: wfv1.WorkflowFailed, + Nodes: map[string]wfv1.NodeStatus{ + "continue-on-failed-workflow": {ID: "continue-on-failed-workflow", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypeDAG, Children: []string{"1"}, OutboundNodes: []string{"3", "5"}}, + "1": {ID: "1", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Children: []string{"2", "4"}, Name: "node1"}, + "2": {ID: "2", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Children: []string{"3"}, Name: "node2"}, + "3": {ID: "3", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Name: "node3"}, + "4": {ID: "4", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow", Children: []string{"5"}, Name: "node4"}, + "5": {ID: "5", Phase: wfv1.NodeOmitted, Type: wfv1.NodeTypeSkipped, BoundaryID: "continue-on-failed-workflow", Name: "node5"}}, + }, + } + _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) + assert.NoError(t, err) + wf, podsToDelete, err := FormulateRetryWorkflow(ctx, wf, false, "", nil) + if assert.NoError(t, err) { + if assert.Len(t, wf.Status.Nodes, 4) { + assert.Equal(t, wfv1.NodeFailed, wf.Status.Nodes["2"].Phase) + assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["3"].Phase) + assert.Equal(t, 2, len(podsToDelete)) + } + } + }) + + t.Run("Retry continue on failed workflow with restartSuccessful and nodeFieldSelector", func(t *testing.T) { + wf := &wfv1.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Name: "continue-on-failed-workflow-2", + Labels: map[string]string{}, + }, + Status: wfv1.WorkflowStatus{ + Phase: wfv1.WorkflowFailed, + Nodes: map[string]wfv1.NodeStatus{ + "continue-on-failed-workflow-2": {ID: "continue-on-failed-workflow-2", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypeDAG, Children: []string{"1"}, OutboundNodes: []string{"3", "5"}}, + "1": {ID: "1", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Children: []string{"2", "4"}, Name: "node1"}, + "2": {ID: "2", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Children: []string{"3"}, Name: "node2"}, + "3": {ID: "3", Phase: wfv1.NodeSucceeded, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Name: "node3"}, + "4": {ID: "4", Phase: wfv1.NodeFailed, Type: wfv1.NodeTypePod, BoundaryID: "continue-on-failed-workflow-2", Children: []string{"5"}, Name: "node4"}, + "5": {ID: "5", Phase: wfv1.NodeOmitted, Type: wfv1.NodeTypeSkipped, BoundaryID: "continue-on-failed-workflow-2", Name: "node5"}}, + }, + } + _, err := wfClient.Create(ctx, wf, metav1.CreateOptions{}) + assert.NoError(t, err) + wf, podsToDelete, err := FormulateRetryWorkflow(ctx, wf, true, "id=3", nil) + if assert.NoError(t, err) { + if assert.Len(t, wf.Status.Nodes, 2) { + assert.Equal(t, wfv1.NodeSucceeded, wf.Status.Nodes["1"].Phase) + assert.Equal(t, wfv1.NodeRunning, wf.Status.Nodes["continue-on-failed-workflow-2"].Phase) + assert.Equal(t, 4, len(podsToDelete)) + } + } + }) } func TestFromUnstructuredObj(t *testing.T) {