Skip to content

Commit

Permalink
fix: retry only proper node.
Browse files Browse the repository at this point in the history
Signed-off-by: toyamagu2021@gmail.com <toyamagu2021@gmail.com>
  • Loading branch information
toyamagu-2021 committed Sep 24, 2023
1 parent d0588ea commit cacf70b
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 27 deletions.
65 changes: 65 additions & 0 deletions test/e2e/hooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,71 @@ spec:
})
}

func (s *HooksSuite) TestWorkflowLevelHooksWithRetry() {
s.Given().
Workflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: test-workflow-level-hooks-with-retry
spec:
templates:
- name: argosay
container:
image: argoproj/argosay:v2
command:
- /bin/sh
- '-c'
args:
- /bin/sleep 1; exit 1
retryStrategy:
limit: 1
- name: hook
container:
image: argoproj/argosay:v2
command:
- /bin/sh
- '-c'
args:
- /argosay
entrypoint: argosay
hooks:
failed:
template: hook
expression: workflow.status == "Failed"
running:
template: hook
expression: workflow.status == "Running"
`).When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowFailed)
assert.Equal(t, status.Progress, v1alpha1.Progress("2/4"))
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "test-workflow-level-hooks-with-retry.hooks.running")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "test-workflow-level-hooks-with-retry.hooks.failed")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeSucceeded, status.Phase)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "test-workflow-level-hooks-with-retry(0)")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeFailed, status.Phase)
}).
ExpectWorkflowNode(func(status v1alpha1.NodeStatus) bool {
return strings.Contains(status.Name, "test-workflow-level-hooks-with-retry(1)")
}, func(t *testing.T, status *v1alpha1.NodeStatus, pod *apiv1.Pod) {
assert.Equal(t, v1alpha1.NodeFailed, status.Phase)
})
}

func TestHooksSuite(t *testing.T) {
suite.Run(t, new(HooksSuite))
}
57 changes: 45 additions & 12 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -921,9 +921,16 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return node, true, nil
}

lastChildNode := getChildNodeIndex(node, woc.wf.Status.Nodes, -1)
childNodeIds := getChildNodeIdsRetried(node, woc.wf.Status.Nodes)
if len(childNodeIds) == 0 {
return node, true, nil
}
lastChildNode, err := woc.wf.Status.Nodes.Get(childNodeIds[len(childNodeIds)-1])
if err != nil {
return node, true, nil
}

if retryStrategy.Expression != "" && len(node.Children) > 0 {
if retryStrategy.Expression != "" && len(childNodeIds) > 0 {
localScope := buildRetryStrategyLocalScope(node, woc.wf.Status.Nodes)
scope := env.GetFuncMap(localScope)
shouldContinue, err := argoexpr.EvalBool(retryStrategy.Expression, scope)
Expand Down Expand Up @@ -964,12 +971,15 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
if retryStrategy.Backoff != nil {
maxDurationDeadline := time.Time{}
// Process max duration limit
if retryStrategy.Backoff.MaxDuration != "" && len(node.Children) > 0 {
if retryStrategy.Backoff.MaxDuration != "" && len(childNodeIds) > 0 {
maxDuration, err := parseStringToDuration(retryStrategy.Backoff.MaxDuration)
if err != nil {
return nil, false, err
}
firstChildNode := getChildNodeIndex(node, woc.wf.Status.Nodes, 0)
firstChildNode, err := woc.wf.Status.Nodes.Get(childNodeIds[0])
if err != nil {
return nil, false, err
}
maxDurationDeadline = firstChildNode.StartedAt.Add(maxDuration)
if time.Now().After(maxDurationDeadline) {
woc.log.Infoln("Max duration limit exceeded. Failing...")
Expand All @@ -995,7 +1005,7 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
if retryStrategyBackoffFactor != nil && *retryStrategyBackoffFactor > 0 {
// Formula: timeToWait = duration * factor^retry_number
// Note that timeToWait should equal to duration for the first retry attempt.
timeToWait = baseDuration * time.Duration(math.Pow(float64(*retryStrategyBackoffFactor), float64(len(node.Children)-1)))
timeToWait = baseDuration * time.Duration(math.Pow(float64(*retryStrategyBackoffFactor), float64(len(childNodeIds)-1)))
}
waitingDeadline := lastChildNode.FinishedAt.Add(timeToWait)

Expand All @@ -1006,7 +1016,7 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
}

// See if we have waited past the deadline
if time.Now().Before(waitingDeadline) && retryStrategy.Limit != nil && int32(len(node.Children)) <= int32(retryStrategy.Limit.IntValue()) {
if time.Now().Before(waitingDeadline) && retryStrategy.Limit != nil && int32(len(childNodeIds)) <= int32(retryStrategy.Limit.IntValue()) {
woc.requeueAfter(timeToWait)
retryMessage := fmt.Sprintf("Backoff for %s", humanize.Duration(timeToWait))
return woc.markNodePhase(node.Name, node.Phase, retryMessage), false, nil
Expand Down Expand Up @@ -1054,12 +1064,12 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
if err != nil {
return nil, false, err
}
if retryStrategy.Limit != nil && limit != nil && int32(len(node.Children)) > *limit {
if retryStrategy.Limit != nil && limit != nil && int32(len(childNodeIds)) > *limit {
woc.log.Infoln("No more retries left. Failing...")
return woc.markNodePhase(node.Name, lastChildNode.Phase, "No more retries left"), true, nil
}

woc.log.Infof("%d child nodes of %s failed. Trying again...", len(node.Children), node.Name)
woc.log.Infof("%d child nodes of %s failed. Trying again...", len(childNodeIds), node.Name)
return node, true, nil
}

Expand Down Expand Up @@ -1719,6 +1729,21 @@ func getChildNodeIndex(node *wfv1.NodeStatus, nodes wfv1.Nodes, index int) *wfv1
return &lastChildNode
}

func getChildNodeIdsRetried(node *wfv1.NodeStatus, nodes wfv1.Nodes) []string {
childrenIds := []string{}
r := regexp.MustCompile(`^` + regexp.QuoteMeta(node.Name) + `\(\d+\)`)
for i := 0; i < len(node.Children); i++ {
n := getChildNodeIndex(node, nodes, i)
if node == nil {
continue
}
if r.MatchString(n.Name) {
childrenIds = append(childrenIds, n.ID)
}
}
return childrenIds
}

func getRetryNodeChildrenIds(node *wfv1.NodeStatus, nodes wfv1.Nodes) []string {
// A fulfilled Retry node will always reflect the status of its last child node, so its individual attempts don't interest us.
// To resume the traversal, we look at the children of the last child node and of any on exit nodes.
Expand All @@ -1741,9 +1766,16 @@ func buildRetryStrategyLocalScope(node *wfv1.NodeStatus, nodes wfv1.Nodes) map[s
localScope := make(map[string]interface{})

// `retries` variable
localScope[common.LocalVarRetries] = strconv.Itoa(len(node.Children) - 1)
childNodeIds := getChildNodeIdsRetried(node, nodes)
if len(childNodeIds) == 0 {
return localScope
}
localScope[common.LocalVarRetries] = strconv.Itoa(len(childNodeIds) - 1)

lastChildNode := getChildNodeIndex(node, nodes, -1)
lastChildNode, err := nodes.Get(childNodeIds[len(childNodeIds)-1])
if err != nil {
return localScope
}

exitCode := "-1"
if lastChildNode.Outputs != nil && lastChildNode.Outputs.ExitCode != nil {
Expand Down Expand Up @@ -2027,8 +2059,9 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
nodeName = lastChildNode.Name
node = lastChildNode
} else {
retryNum := len(getChildNodeIdsRetried(retryParentNode, woc.wf.Status.Nodes))
// Create a new child node and append it to the retry node.
nodeName = fmt.Sprintf("%s(%d)", retryNodeName, len(retryParentNode.Children))
nodeName = fmt.Sprintf("%s(%d)", retryNodeName, retryNum)
woc.addChildNode(retryNodeName, nodeName)
node = nil

Expand All @@ -2038,7 +2071,7 @@ func (woc *wfOperationCtx) executeTemplate(ctx context.Context, nodeName string,
localParams[common.LocalVarPodName] = woc.getPodName(nodeName, processedTmpl.Name)
}
// Inject the retryAttempt number
localParams[common.LocalVarRetries] = strconv.Itoa(len(retryParentNode.Children))
localParams[common.LocalVarRetries] = strconv.Itoa(retryNum)

processedTmpl, err = common.SubstituteParams(processedTmpl, map[string]string{}, localParams)
if errorsutil.IsTransientErr(err) {
Expand Down
40 changes: 25 additions & 15 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func TestProcessNodeRetries(t *testing.T) {

// Add child nodes.
for i := 0; i < 2; i++ {
childNode := fmt.Sprintf("child-node-%d", i)
childNode := fmt.Sprintf("%s(%d)", nodeName, i)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning)
woc.addChildNode(nodeName, childNode)
}
Expand Down Expand Up @@ -572,8 +572,18 @@ func TestProcessNodeRetries(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeRunning)

// Add a hook node that has Succeeded
childHookedNode := "child-node.hooks.running"
woc.initializeNode(childHookedNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeSucceeded)
woc.addChildNode(nodeName, childHookedNode)
n, err = woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
n, _, err = woc.processNodeRetries(n, retries, &executeTemplateOpts{})
assert.NoError(t, err)
assert.Equal(t, n.Phase, wfv1.NodeRunning)

// Add a third node that has failed.
childNode := "child-node-3"
childNode := fmt.Sprintf("%s(%d)", nodeName, 3)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed)
woc.addChildNode(nodeName, childNode)
n, err = woc.wf.GetNodeByName(nodeName)
Expand Down Expand Up @@ -612,7 +622,7 @@ func TestProcessNodeRetriesOnErrors(t *testing.T) {

// Add child nodes.
for i := 0; i < 2; i++ {
childNode := fmt.Sprintf("child-node-%d", i)
childNode := fmt.Sprintf("%s(%d)", nodeName, i)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning)
woc.addChildNode(nodeName, childNode)
}
Expand Down Expand Up @@ -645,7 +655,7 @@ func TestProcessNodeRetriesOnErrors(t *testing.T) {
assert.Equal(t, n.Phase, wfv1.NodeRunning)

// Add a third node that has errored.
childNode := "child-node-3"
childNode := fmt.Sprintf("%s(%d)", nodeName, 3)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeError)
woc.addChildNode(nodeName, childNode)
n, err = woc.wf.GetNodeByName(nodeName)
Expand Down Expand Up @@ -684,7 +694,7 @@ func TestProcessNodeRetriesOnTransientErrors(t *testing.T) {

// Add child nodes.
for i := 0; i < 2; i++ {
childNode := fmt.Sprintf("child-node-%d", i)
childNode := fmt.Sprintf("%s(%d)", nodeName, i)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning)
woc.addChildNode(nodeName, childNode)
}
Expand Down Expand Up @@ -722,7 +732,7 @@ func TestProcessNodeRetriesOnTransientErrors(t *testing.T) {
_ = os.Unsetenv(transientEnvVarKey)

// Add a third node that has errored.
childNode := "child-node-3"
childNode := fmt.Sprintf("%s(%d)", nodeName, 3)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeError)
woc.addChildNode(nodeName, childNode)
n, err = woc.wf.GetNodeByName(nodeName)
Expand Down Expand Up @@ -764,8 +774,8 @@ func TestProcessNodeRetriesWithBackoff(t *testing.T) {
lastChild := getChildNodeIndex(node, woc.wf.Status.Nodes, -1)
assert.Nil(t, lastChild)

woc.initializeNode("child-node-1", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning)
woc.addChildNode(nodeName, "child-node-1")
woc.initializeNode(nodeName+"(0)", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning)
woc.addChildNode(nodeName, nodeName+"(0)")

n, err := woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
Expand Down Expand Up @@ -819,8 +829,8 @@ func TestProcessNodeRetriesWithExponentialBackoff(t *testing.T) {
lastChild := getChildNodeIndex(node, woc.wf.Status.Nodes, -1)
require.Nil(lastChild)

woc.initializeNode("child-node-1", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed)
woc.addChildNode(nodeName, "child-node-1")
woc.initializeNode(nodeName+"(0)", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed)
woc.addChildNode(nodeName, nodeName+"(0)")

n, err := woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)
Expand All @@ -836,8 +846,8 @@ func TestProcessNodeRetriesWithExponentialBackoff(t *testing.T) {
require.LessOrEqual(backoff, 300)
require.Less(295, backoff)

woc.initializeNode("child-node-2", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeError)
woc.addChildNode(nodeName, "child-node-2")
woc.initializeNode(nodeName+"(1)", wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeError)
woc.addChildNode(nodeName, nodeName+"(1)")
n, err = woc.wf.GetNodeByName(nodeName)
assert.NoError(t, err)

Expand Down Expand Up @@ -911,7 +921,7 @@ func TestProcessNodesNoRetryWithError(t *testing.T) {

// Add child nodes.
for i := 0; i < 2; i++ {
childNode := fmt.Sprintf("child-node-%d", i)
childNode := fmt.Sprintf("%s(%d)", nodeName, i)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning)
woc.addChildNode(nodeName, childNode)
}
Expand Down Expand Up @@ -5481,7 +5491,7 @@ func TestPropagateMaxDurationProcess(t *testing.T) {
}
woc.wf.Status.Nodes[woc.wf.NodeID(nodeName)] = *node

childNode := fmt.Sprintf("child-node-%d", 0)
childNode := fmt.Sprintf("%s(%d)", nodeName, 0)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeFailed)
woc.addChildNode(nodeName, childNode)

Expand Down Expand Up @@ -6768,7 +6778,7 @@ func TestRetryOnDiffHost(t *testing.T) {
assert.Nil(t, lastChild)

// Add child node.
childNode := fmt.Sprintf("child-node-%d", 0)
childNode := fmt.Sprintf("%s(%d)", nodeName, 0)
woc.initializeNode(childNode, wfv1.NodeTypePod, "", &wfv1.WorkflowStep{}, "", wfv1.NodeRunning)
woc.addChildNode(nodeName, childNode)

Expand Down

0 comments on commit cacf70b

Please sign in to comment.