Skip to content

Commit

Permalink
fix(controller): Fix node status when daemon pod deleted but its chil…
Browse files Browse the repository at this point in the history
…dren nodes are still running (#4683)

Signed-off-by: lons <lonsdale8734@gmail.com>
  • Loading branch information
lonsdale8734 authored and simster7 committed Jan 19, 2021
1 parent 8cd9635 commit 84b44cf
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 1 deletion.
5 changes: 5 additions & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,11 @@ type TaskResults struct {
// evaluateDependsLogic returns whether a node should execute and proceed. proceed means that all of its dependencies are
// completed and execute means that given the results of its dependencies, this node should execute.
func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) {
node := d.getTaskNode(taskName)
if node != nil {
return true, true, nil
}

evalScope := make(map[string]TaskResults)

for _, taskName := range d.GetTaskDependencies(taskName) {
Expand Down
152 changes: 152 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,158 @@ func TestEvaluateDependsLogic(t *testing.T) {
assert.True(t, execute)
}

func TestEvaluateAnyAllDependsLogic(t *testing.T) {
testTasks := []wfv1.DAGTask{
{
Name: "A",
},
{
Name: "A-1",
},
{
Name: "A-2",
},
{
Name: "B",
Depends: "A.AnySucceeded",
},
{
Name: "B-1",
},
{
Name: "B-2",
},
{
Name: "C",
Depends: "B.AllFailed",
},
}

d := &dagContext{
boundaryName: "test",
tasks: testTasks,
wf: &wfv1.Workflow{ObjectMeta: metav1.ObjectMeta{Name: "test-wf"}},
dependencies: make(map[string][]string),
dependsLogic: make(map[string]string),
}

// Task A is still running, A-1 succeeded but A-2 failed
d.wf = &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{Name: "test-wf"},
Status: wfv1.WorkflowStatus{
Nodes: map[string]wfv1.NodeStatus{
d.taskNodeID("A"): {
Phase: wfv1.NodeRunning,
Type: wfv1.NodeTypeTaskGroup,
Children: []string{d.taskNodeID("A-1"), d.taskNodeID("A-2")},
},
d.taskNodeID("A-1"): {Phase: wfv1.NodeRunning},
d.taskNodeID("A-2"): {Phase: wfv1.NodeRunning},
},
},
}

// Task B should not proceed as task A is still running
execute, proceed, err := d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.False(t, proceed)
assert.False(t, execute)

// Task A succeeded
d.wf.Status.Nodes[d.taskNodeID("A")] = wfv1.NodeStatus{
Phase: wfv1.NodeSucceeded,
Type: wfv1.NodeTypeTaskGroup,
Children: []string{d.taskNodeID("A-1"), d.taskNodeID("A-2")},
}

// Task B should proceed, but not execute as none of the children have succeeded yet
execute, proceed, err = d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.False(t, execute)

// Task A-2 succeeded
d.wf.Status.Nodes[d.taskNodeID("A-2")] = wfv1.NodeStatus{Phase: wfv1.NodeSucceeded}

// Task B should now proceed and execute
execute, proceed, err = d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)

// Task B succeeds and B-1 fails
d.wf.Status.Nodes[d.taskNodeID("B")] = wfv1.NodeStatus{
Phase: wfv1.NodeSucceeded,
Type: wfv1.NodeTypeTaskGroup,
Children: []string{d.taskNodeID("B-1"), d.taskNodeID("B-2")},
}
d.wf.Status.Nodes[d.taskNodeID("B-1")] = wfv1.NodeStatus{Phase: wfv1.NodeFailed}

// Task C should proceed, but not execute as not all of B's children have failed yet
execute, proceed, err = d.evaluateDependsLogic("C")
assert.NoError(t, err)
assert.True(t, proceed)
assert.False(t, execute)

d.wf.Status.Nodes[d.taskNodeID("B-2")] = wfv1.NodeStatus{Phase: wfv1.NodeFailed}

// Task C should now proceed and execute as all of B's children have failed
execute, proceed, err = d.evaluateDependsLogic("C")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)

}

func TestEvaluateDependsLogicWhenDaemonFailed(t *testing.T) {
testTasks := []wfv1.DAGTask{
{
Name: "A",
},
{
Name: "B",
Depends: "A",
},
}

d := &dagContext{
boundaryName: "test",
tasks: testTasks,
wf: &wfv1.Workflow{ObjectMeta: metav1.ObjectMeta{Name: "test-wf"}},
dependencies: make(map[string][]string),
dependsLogic: make(map[string]string),
}

// Task A is running
daemon := true
d.wf = &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{Name: "test-wf"},
Status: wfv1.WorkflowStatus{
Nodes: map[string]wfv1.NodeStatus{
d.taskNodeID("A"): {Phase: wfv1.NodeRunning, Daemoned: &daemon},
},
},
}

// Task B should proceed and execute
execute, proceed, err := d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)

// Task B running
d.wf.Status.Nodes[d.taskNodeID("B")] = wfv1.NodeStatus{Phase: wfv1.NodeRunning}

// Task A failed or error
d.wf.Status.Nodes[d.taskNodeID("A")] = wfv1.NodeStatus{Phase: wfv1.NodeFailed}

// Task B should proceed and execute
execute, proceed, err = d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)
}

func TestAllEvaluateDependsLogic(t *testing.T) {
statusMap := map[common.TaskResult]wfv1.NodePhase{
common.TaskResultSucceeded: wfv1.NodeSucceeded,
Expand Down
7 changes: 6 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
// It is now impossible to infer pod status. We can do at this point is to mark the node with Error, or
// we can re-submit it.
for nodeID, node := range woc.wf.Status.Nodes {
if node.Type != wfv1.NodeTypePod || node.Fulfilled() || node.StartedAt.IsZero() {
if node.Type != wfv1.NodeTypePod || node.Phase.Fulfilled() || node.StartedAt.IsZero() {
// node is not a pod, it is already complete, or it can be re-run.
continue
}
Expand All @@ -937,6 +937,10 @@ func (woc *wfOperationCtx) podReconciliation() error {
continue
}

if node.Daemoned != nil && *node.Daemoned {
node.Daemoned = nil
woc.updated = true
}
woc.markNodePhase(node.Name, wfv1.NodeError, "pod deleted")
} else {
// At this point we are certain that the pod associated with our node is running or has been run;
Expand Down Expand Up @@ -1087,6 +1091,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu
if pod.DeletionTimestamp != nil {
// pod is being terminated
newPhase = wfv1.NodeError
newDaemonStatus = pointer.BoolPtr(false)
message = "pod deleted during operation"
woc.log.WithField("displayName", node.DisplayName).WithField("templateName", node.TemplateName).
WithField("pod", pod.Name).Error(message)
Expand Down

0 comments on commit 84b44cf

Please sign in to comment.