Skip to content

Commit

Permalink
fix: Check child node status before backoff in retry (#2407)
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 committed Mar 15, 2020
1 parent 8ad643c commit 9c608e5
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 11 deletions.
22 changes: 11 additions & 11 deletions workflow/controller/operator.go
Expand Up @@ -576,6 +576,17 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return node, true, nil
}

if !lastChildNode.Completed() {
// last child node is still running.
return node, true, nil
}

if lastChildNode.Successful() {
node.Outputs = lastChildNode.Outputs.DeepCopy()
woc.wf.Status.Nodes[node.ID] = *node
return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil
}

if retryStrategy.Backoff != nil {
// Process max duration limit
if retryStrategy.Backoff.MaxDuration != "" && len(node.Children) > 0 {
Expand Down Expand Up @@ -634,17 +645,6 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return nil, false, fmt.Errorf("%s is not a valid RetryPolicy", retryStrategy.RetryPolicy)
}

if !lastChildNode.Completed() {
// last child node is still running.
return node, true, nil
}

if lastChildNode.Successful() {
node.Outputs = lastChildNode.Outputs.DeepCopy()
woc.wf.Status.Nodes[node.ID] = *node
return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil
}

if (lastChildNode.Phase == wfv1.NodeFailed && !retryOnFailed) || (lastChildNode.Phase == wfv1.NodeError && !retryOnError) {
woc.log.Infof("Node not set to be retried after status: %s", lastChildNode.Phase)
return woc.markNodePhase(node.Name, lastChildNode.Phase, lastChildNode.Message), true, nil
Expand Down
55 changes: 55 additions & 0 deletions workflow/controller/operator_test.go
Expand Up @@ -254,6 +254,61 @@ func TestProcessNodesWithRetriesOnErrors(t *testing.T) {
assert.Equal(t, n.Phase, wfv1.NodeError)
}

func TestProcessNodesWithRetriesWithBackoff(t *testing.T) {
controller := newController()
assert.NotNil(t, controller)
wf := unmarshalWF(helloWorldWf)
assert.NotNil(t, wf)
woc := newWorkflowOperationCtx(wf, controller)
assert.NotNil(t, woc)

// Verify that there are no nodes in the wf status.
assert.Zero(t, len(woc.wf.Status.Nodes))

// Add the parent node for retries.
nodeName := "test-node"
nodeID := woc.wf.NodeID(nodeName)
node := woc.initializeNode(nodeName, wfv1.NodeTypeRetry, "", &wfv1.Template{}, "", wfv1.NodeRunning)
retries := wfv1.RetryStrategy{}
retryLimit := int32(2)
retries.Limit = &retryLimit
retries.Backoff = &wfv1.Backoff{
Duration: "10s",
Factor: 2,
MaxDuration: "10m",
}
retries.RetryPolicy = wfv1.RetryPolicyAlways
woc.wf.Status.Nodes[nodeID] = *node

assert.Equal(t, node.Phase, wfv1.NodeRunning)

// Ensure there are no child nodes yet.
lastChild, err := woc.getLastChildNode(node)
assert.Nil(t, err)
assert.Nil(t, lastChild)

woc.initializeNode("child-node-1", wfv1.NodeTypePod, "", &wfv1.Template{}, "", wfv1.NodeRunning)
woc.addChildNode(nodeName, "child-node-1")

n := woc.getNodeByName(nodeName)
lastChild, err = woc.getLastChildNode(n)
assert.Nil(t, err)
assert.NotNil(t, lastChild)

// Last child is still running. processNodesWithRetries() should return false since
// there should be no retries at this point.
n, _, err = woc.processNodeRetries(n, retries)
assert.Nil(t, err)
assert.Equal(t, n.Phase, wfv1.NodeRunning)

// Mark lastChild as successful.
woc.markNodePhase(lastChild.Name, wfv1.NodeSucceeded)
n, _, err = woc.processNodeRetries(n, retries)
assert.Nil(t, err)
// The parent node also gets marked as Succeeded.
assert.Equal(t, n.Phase, wfv1.NodeSucceeded)
}

// TestProcessNodesWithRetries tests retrying when RetryOn.Error is disabled
func TestProcessNodesNoRetryWithError(t *testing.T) {
controller := newController()
Expand Down

0 comments on commit 9c608e5

Please sign in to comment.