Skip to content

Commit

Permalink
fix(controller): Fix depends logic when daemon depend failed. Fixes (#…
Browse files Browse the repository at this point in the history
…4665)

Signed-off-by: lons <lonsdale8734@gmail.com>
  • Loading branch information
lonsdale8734 committed Dec 16, 2020
1 parent ff09725 commit 9c0dbfd
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 0 deletions.
5 changes: 5 additions & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,11 @@ type TaskResults struct {
// evaluateDependsLogic returns whether a node should execute and proceed. proceed means that all of its dependencies are
// completed and execute means that given the results of its dependencies, this node should execute.
func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) {
node := d.getTaskNode(taskName)
if node != nil {
return true, true, nil
}

evalScope := make(map[string]TaskResults)

for _, taskName := range d.GetTaskDependencies(taskName) {
Expand Down
49 changes: 49 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,55 @@ func TestEvaluateAnyAllDependsLogic(t *testing.T) {

}

func TestEvaluateDependsLogicWhenDaemonFailed(t *testing.T) {
testTasks := []wfv1.DAGTask{
{
Name: "A",
},
{
Name: "B",
Depends: "A",
},
}

d := &dagContext{
boundaryName: "test",
tasks: testTasks,
wf: &wfv1.Workflow{ObjectMeta: metav1.ObjectMeta{Name: "test-wf"}},
dependencies: make(map[string][]string),
dependsLogic: make(map[string]string),
}

// Task A is running
daemon := true
d.wf = &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{Name: "test-wf"},
Status: wfv1.WorkflowStatus{
Nodes: map[string]wfv1.NodeStatus{
d.taskNodeID("A"): {Phase: wfv1.NodeRunning, Daemoned: &daemon},
},
},
}

// Task B should proceed and execute
execute, proceed, err := d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)

// Task B running
d.wf.Status.Nodes[d.taskNodeID("B")] = wfv1.NodeStatus{Phase: wfv1.NodeRunning}

// Task A failed or error
d.wf.Status.Nodes[d.taskNodeID("A")] = wfv1.NodeStatus{Phase: wfv1.NodeFailed}

// Task B should proceed and execute
execute, proceed, err = d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)
}

func TestAllEvaluateDependsLogic(t *testing.T) {
statusMap := map[common.TaskResult]wfv1.NodePhase{
common.TaskResultSucceeded: wfv1.NodeSucceeded,
Expand Down

0 comments on commit 9c0dbfd

Please sign in to comment.