diff --git a/test/e2e/hooks_test.go b/test/e2e/hooks_test.go index 666d5942e81f..84a2ddda7bbe 100644 --- a/test/e2e/hooks_test.go +++ b/test/e2e/hooks_test.go @@ -567,6 +567,71 @@ spec: }) } +func (s *HooksSuite) TestWorkflowLevelHooksWithRetry() { + s.Given(). + Workflow(` +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + name: test-workflow-level-hooks-with-retry +spec: + templates: + - name: argosay + container: + image: argoproj/argosay:v2 + command: + - /bin/sh + - '-c' + args: + - /bin/sleep 1; exit 1 + retryStrategy: + limit: 1 + - name: hook + container: + image: argoproj/argosay:v2 + command: + - /bin/sh + - '-c' + args: + - /argosay + entrypoint: argosay + hooks: + failed: + template: hook + expression: workflow.status == "Failed" + running: + template: hook + expression: workflow.status == "Running" +`).When(). + SubmitWorkflow(). + WaitForWorkflow(fixtures.ToBeFailed). + Then(). + ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) { + assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed) + assert.Equal(t, status.Progress, v1alpha1.Progress("2/4")) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return strings.Contains(status.Name, "test-workflow-level-hooks-with-retry.hooks.running") + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return strings.Contains(status.Name, "test-workflow-level-hooks-with-retry.hooks.failed") + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return strings.Contains(status.Name, "test-workflow-level-hooks-with-retry(0)") + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, v1alpha1.NodeFailed, status.Phase) + }). + ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool { + return strings.Contains(status.Name, "test-workflow-level-hooks-with-retry(1)") + }, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) { + assert.Equal(t, v1alpha1.NodeFailed, status.Phase) + }) +} + func TestHooksSuite(t *testing.T) { suite.Run(t, new(HooksSuite)) } diff --git a/workflow/controller/operator.go b/workflow/controller/operator.go index ffc29948ff8a..f84ed499077d 100644 --- a/workflow/controller/operator.go +++ b/workflow/controller/operator.go @@ -921,9 +921,16 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate return node, true, nil } - lastChildNode := getChildNodeIndex(node, woc.wf.Status.Nodes, -1) + childNodeIds := getChildNodeIdsRetried(node, woc.wf.Status.Nodes) + if len(childNodeIds) == 0 { + return node, true, nil + } + lastChildNode, err := woc.wf.Status.Nodes.Get(childNodeIds[len(childNodeIds)-1]) + if err != nil { + return node, true, nil + } - if retryStrategy.Expression != "" && len(node.Children) > 0 { + if retryStrategy.Expression != "" && len(childNodeIds) > 0 { localScope := buildRetryStrategyLocalScope(node, woc.wf.Status.Nodes) scope := env.GetFuncMap(localScope) shouldContinue, err := argoexpr.EvalBool(retryStrategy.Expression, scope) @@ -964,12 +971,15 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate if retryStrategy.Backoff != nil { maxDurationDeadline := time.Time{} // Process max duration limit - if retryStrategy.Backoff.MaxDuration != "" && len(node.Children) > 0 { + if retryStrategy.Backoff.MaxDuration != "" && len(childNodeIds) > 0 { maxDuration, err := parseStringToDuration(retryStrategy.Backoff.MaxDuration) if err != nil { return nil, false, err } - firstChildNode := getChildNodeIndex(node, woc.wf.Status.Nodes, 0) + firstChildNode, err := woc.wf.Status.Nodes.Get(childNodeIds[0]) + if err != nil { + return nil, false, err + } maxDurationDeadline = firstChildNode.StartedAt.Add(maxDuration) if time.Now().After(maxDurationDeadline) { woc.log.Infoln("Max duration limit exceeded. Failing...") @@ -995,7 +1005,7 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate if retryStrategyBackoffFactor != nil && *retryStrategyBackoffFactor > 0 { // Formula: timeToWait = duration * factor^retry_number // Note that timeToWait should equal to duration for the first retry attempt. - timeToWait = baseDuration * time.Duration(math.Pow(float64(*retryStrategyBackoffFactor), float64(len(node.Children)-1))) + timeToWait = baseDuration * time.Duration(math.Pow(float64(*retryStrategyBackoffFactor), float64(len(childNodeIds)-1))) } waitingDeadline := lastChildNode.FinishedAt.Add(timeToWait) @@ -1006,7 +1016,7 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate } // See if we have waited past the deadline - if time.Now().Before(waitingDeadline) && retryStrategy.Limit != nil && int32(len(node.Children)) <= int32(retryStrategy.Limit.IntValue()) { + if time.Now().Before(waitingDeadline) && retryStrategy.Limit != nil && int32(len(childNodeIds)) <= int32(retryStrategy.Limit.IntValue()) { woc.requeueAfter(timeToWait) retryMessage := fmt.Sprintf("Backoff for %s", humanize.Duration(timeToWait)) return woc.markNodePhase(node.Name, node.Phase, retryMessage), false, nil @@ -1054,12 +1064,12 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate if err != nil { return nil, false, err } - if retryStrategy.Limit != nil && limit != nil && int32(len(node.Children)) > *limit { + if retryStrategy.Limit != nil && limit != nil && int32(len(childNodeIds)) > *limit { woc.log.Infoln("No more retries left. Failing...") return woc.markNodePhase(node.Name, lastChildNode.Phase, "No more retries left"), true, nil } - woc.log.Infof("%d child nodes of %s failed. Trying again...", len(node.Children), node.Name) + woc.log.Infof("%d child nodes of %s failed. Trying again...", len(childNodeIds), node.Name) return node, true, nil } @@ -1719,6 +1729,21 @@ func getChildNodeIndex(node *wfv1.NodeStatus, nodes wfv1.Nodes, index int) *wfv1 return &lastChildNode } +func getChildNodeIdsRetried(node *wfv1.NodeStatus, nodes wfv1.Nodes) []string { + childrenIds := []string{} + r := regexp.MustCompile(`^` + regexp.QuoteMeta(node.Name) + `\(\d+\)`) + for i := 0; i < len(node.Children); i++ { + n := getChildNodeIndex(node, nodes, i) + if node == nil { + continue + } + if r.MatchString(n.Name) { + childrenIds = append(childrenIds, n.ID) + } + } + return childrenIds +} + func getRetryNodeChildrenIds(node *wfv1.NodeStatus, nodes wfv1.Nodes) []string { // A fulfilled Retry node will always reflect the status of its last child node, so its individual attempts don't interest us. // To resume the traversal, we look at the children of the last child node and of any on exit nodes. @@ -1741,9 +1766,13 @@ func buildRetryStrategyLocalScope(node *wfv1.NodeStatus, nodes wfv1.Nodes) map[s localScope := make(map[string]interface{}) // `retries` variable - localScope[common.LocalVarRetries] = strconv.Itoa(len(node.Children) - 1) + childNodeIds := getChildNodeIdsRetried(node, nodes) + localScope[common.LocalVarRetries] = strconv.Itoa(len(childNodeIds) - 1) - lastChildNode := getChildNodeIndex(node, nodes, -1) + lastChildNode, err := nodes.Get(childNodeIds[len(childNodeIds)-1]) + if err != nil { + return localScope + } exitCode := "-1" if lastChildNode.Outputs != nil && lastChildNode.Outputs.ExitCode != nil { @@ -2027,8 +2056,9 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, nodeName = lastChildNode.Name node = lastChildNode } else { + retryNum := len(getChildNodeIdsRetried(retryParentNode, woc.wf.Status.Nodes)) // Create a new child node and append it to the retry node. - nodeName = fmt.Sprintf("%s(%d)", retryNodeName, len(retryParentNode.Children)) + nodeName = fmt.Sprintf("%s(%d)", retryNodeName, retryNum) woc.addChildNode(retryNodeName, nodeName) node = nil @@ -2038,7 +2068,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string, localParams[common.LocalVarPodName] = woc.getPodName(nodeName, processedTmpl.Name) } // Inject the retryAttempt number - localParams[common.LocalVarRetries] = strconv.Itoa(len(retryParentNode.Children)) + localParams[common.LocalVarRetries] = strconv.Itoa(retryNum) processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, localParams) if errorsutil.IsTransientErr(err) { diff --git a/workflow/controller/operator_test.go b/workflow/controller/operator_test.go index 41965b27c79d..c60af7a42009 100644 --- a/workflow/controller/operator_test.go +++ b/workflow/controller/operator_test.go @@ -540,7 +540,7 @@ func TestProcessNodeRetries(t *testing.T) { // Add child nodes. for i := 0; i < 2; i++ { - childNode := fmt.Sprintf("child-node-%d", i) + childNode := fmt.Sprintf("%s(%d)", nodeName, i) woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning) woc.addChildNode(nodeName, childNode) } @@ -572,8 +572,18 @@ func TestProcessNodeRetries(t *testing.T) { assert.NoError(t, err) assert.Equal(t, n.Phase, wfv1.NodeRunning) + // Add a hook node that has Succeeded + childHookedNode := "child-node.hooks.running" + woc.initializeNode(childHookedNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeSucceeded) + woc.addChildNode(nodeName, childHookedNode) + n, err = woc.wf.GetNodeByName(nodeName) + assert.NoError(t, err) + n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{}) + assert.NoError(t, err) + assert.Equal(t, n.Phase, wfv1.NodeRunning) + // Add a third node that has failed. - childNode := "child-node-3" + childNode := fmt.Sprintf("%s(%d)", nodeName, 3) woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed) woc.addChildNode(nodeName, childNode) n, err = woc.wf.GetNodeByName(nodeName) @@ -612,7 +622,7 @@ func TestProcessNodeRetriesOnErrors(t *testing.T) { // Add child nodes. for i := 0; i < 2; i++ { - childNode := fmt.Sprintf("child-node-%d", i) + childNode := fmt.Sprintf("%s(%d)", nodeName, i) woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning) woc.addChildNode(nodeName, childNode) } @@ -645,7 +655,7 @@ func TestProcessNodeRetriesOnErrors(t *testing.T) { assert.Equal(t, n.Phase, wfv1.NodeRunning) // Add a third node that has errored. - childNode := "child-node-3" + childNode := fmt.Sprintf("%s(%d)", nodeName, 3) woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeError) woc.addChildNode(nodeName, childNode) n, err = woc.wf.GetNodeByName(nodeName) @@ -684,7 +694,7 @@ func TestProcessNodeRetriesOnTransientErrors(t *testing.T) { // Add child nodes. for i := 0; i < 2; i++ { - childNode := fmt.Sprintf("child-node-%d", i) + childNode := fmt.Sprintf("%s(%d)", nodeName, i) woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning) woc.addChildNode(nodeName, childNode) } @@ -722,7 +732,7 @@ func TestProcessNodeRetriesOnTransientErrors(t *testing.T) { _ = os.Unsetenv(transientEnvVarKey) // Add a third node that has errored. - childNode := "child-node-3" + childNode := fmt.Sprintf("%s(%d)", nodeName, 3) woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeError) woc.addChildNode(nodeName, childNode) n, err = woc.wf.GetNodeByName(nodeName) @@ -764,8 +774,8 @@ func TestProcessNodeRetriesWithBackoff(t *testing.T) { lastChild := getChildNodeIndex(node, woc.wf.Status.Nodes, -1) assert.Nil(t, lastChild) - woc.initializeNode("child-node-1", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning) - woc.addChildNode(nodeName, "child-node-1") + woc.initializeNode(nodeName+"(0)", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning) + woc.addChildNode(nodeName, nodeName+"(0)") n, err := woc.wf.GetNodeByName(nodeName) assert.NoError(t, err) @@ -819,8 +829,8 @@ func TestProcessNodeRetriesWithExponentialBackoff(t *testing.T) { lastChild := getChildNodeIndex(node, woc.wf.Status.Nodes, -1) require.Nil(lastChild) - woc.initializeNode("child-node-1", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed) - woc.addChildNode(nodeName, "child-node-1") + woc.initializeNode(nodeName+"(0)", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed) + woc.addChildNode(nodeName, nodeName+"(0)") n, err := woc.wf.GetNodeByName(nodeName) assert.NoError(t, err) @@ -836,8 +846,8 @@ func TestProcessNodeRetriesWithExponentialBackoff(t *testing.T) { require.LessOrEqual(backoff, 300) require.Less(295, backoff) - woc.initializeNode("child-node-2", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeError) - woc.addChildNode(nodeName, "child-node-2") + woc.initializeNode(nodeName+"(1)", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeError) + woc.addChildNode(nodeName, nodeName+"(1)") n, err = woc.wf.GetNodeByName(nodeName) assert.NoError(t, err) @@ -911,7 +921,7 @@ func TestProcessNodesNoRetryWithError(t *testing.T) { // Add child nodes. for i := 0; i < 2; i++ { - childNode := fmt.Sprintf("child-node-%d", i) + childNode := fmt.Sprintf("%s(%d)", nodeName, i) woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning) woc.addChildNode(nodeName, childNode) } @@ -5481,7 +5491,7 @@ func TestPropagateMaxDurationProcess(t *testing.T) { } woc.wf.Status.Nodes[woc.wf.NodeID(nodeName)] = *node - childNode := fmt.Sprintf("child-node-%d", 0) + childNode := fmt.Sprintf("%s(%d)", nodeName, 0) woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed) woc.addChildNode(nodeName, childNode) @@ -6768,7 +6778,7 @@ func TestRetryOnDiffHost(t *testing.T) { assert.Nil(t, lastChild) // Add child node. - childNode := fmt.Sprintf("child-node-%d", 0) + childNode := fmt.Sprintf("%s(%d)", nodeName, 0) woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning) woc.addChildNode(nodeName, childNode)