Skip to content

Commit

Permalink
feat(controller): Support .AnySucceeded / .AllFailed for TaskGroup in…
Browse files Browse the repository at this point in the history
… depends logic. Closes #3405 (#3964)

Signed-off-by: Mark White <mark@markwhite.com>
  • Loading branch information
markterm committed Dec 3, 2020
1 parent 6175458 commit 1212df4
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 41 deletions.
7 changes: 7 additions & 0 deletions docs/enhanced-depends-logic.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ Full boolean logic is also available. Operators include:
depends: "(task-2.Succeeded || task-2.Skipped) && !task-3.Failed"
```

In the case that you're depending on a task that uses withItems, you can depend on
whether any of the item tasks are successful or all have failed using .AnySucceeded and .AllFailed, for example:

```
depends: "task-1.AnySucceeded || task-2.AllFailed"
```

## Compatibility with `dependencies` and `dag.task.continueOn`

This feature is fully compatible with `dependencies` and conversion is easy.
Expand Down
46 changes: 27 additions & 19 deletions workflow/common/ancestry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,19 @@ type DagContext interface {
type TaskResult string

const (
TaskResultSucceeded TaskResult = "Succeeded"
TaskResultFailed TaskResult = "Failed"
TaskResultErrored TaskResult = "Errored"
TaskResultSkipped TaskResult = "Skipped"
TaskResultDaemoned TaskResult = "Daemoned"
TaskResultSucceeded TaskResult = "Succeeded"
TaskResultFailed TaskResult = "Failed"
TaskResultErrored TaskResult = "Errored"
TaskResultSkipped TaskResult = "Skipped"
TaskResultDaemoned TaskResult = "Daemoned"
TaskResultAnySucceeded TaskResult = "AnySucceeded"
TaskResultAllFailed TaskResult = "AllFailed"
)

var (
// TODO: This should use validate.workflowFieldNameFmt, but we can't import it here because an import cycle would be created
taskNameRegex = regexp.MustCompile(`([a-zA-Z0-9][-a-zA-Z0-9]*?\.[A-Z][a-z]+)|([a-zA-Z0-9][-a-zA-Z0-9]*)`)
taskResultRegex = regexp.MustCompile(`([a-zA-Z0-9][-a-zA-Z0-9]*?\.[A-Z][a-z]+)`)
taskNameRegex = regexp.MustCompile(`([a-zA-Z0-9][-a-zA-Z0-9]*?\.[A-Z][a-zA-Z]+)|([a-zA-Z0-9][-a-zA-Z0-9]*)`)
taskResultRegex = regexp.MustCompile(`([a-zA-Z0-9][-a-zA-Z0-9]*?\.[A-Z][a-zA-Z]+)`)
)

type expansionMatch struct {
Expand All @@ -38,31 +40,37 @@ type expansionMatch struct {
end int
}

func GetTaskDependencies(task *wfv1.DAGTask, ctx DagContext) ([]string, string) {
type DependencyType int

const (
DependencyTypeTask DependencyType = iota
DependencyTypeItems
)

func GetTaskDependencies(task *wfv1.DAGTask, ctx DagContext) (map[string]DependencyType, string) {
depends := getTaskDependsLogic(task, ctx)
matches := taskNameRegex.FindAllStringSubmatchIndex(depends, -1)
var expansionMatches []expansionMatch
dependencies := make(map[string]bool)
dependencies := make(map[string]DependencyType)
for _, matchGroup := range matches {
// We have matched a taskName.TaskResult
if matchGroup[2] != -1 {
match := depends[matchGroup[2]:matchGroup[3]]
split := strings.Split(match, ".")
dependencies[split[0]] = true
if split[1] == string(TaskResultAnySucceeded) || split[1] == string(TaskResultAllFailed) {
dependencies[split[0]] = DependencyTypeItems
} else if _, ok := dependencies[split[0]]; !ok { //DependencyTypeItems takes precedence
dependencies[split[0]] = DependencyTypeTask
}
} else if matchGroup[4] != -1 {
match := depends[matchGroup[4]:matchGroup[5]]
dependencies[match] = true
dependencies[match] = DependencyTypeTask
expansionMatches = append(expansionMatches, expansionMatch{taskName: match, start: matchGroup[4], end: matchGroup[5]})
}
}

var out []string
for dependency := range dependencies {
out = append(out, dependency)
}

if len(expansionMatches) == 0 {
return out, depends
return dependencies, depends
}

sort.Slice(expansionMatches, func(i, j int) bool {
Expand All @@ -74,7 +82,7 @@ func GetTaskDependencies(task *wfv1.DAGTask, ctx DagContext) ([]string, string)
depends = depends[:match.start] + expandDependency(match.taskName, matchTask) + depends[match.end:]
}

return out, depends
return dependencies, depends
}

func ValidateTaskResults(dagTask *wfv1.DAGTask) error {
Expand All @@ -88,7 +96,7 @@ func ValidateTaskResults(dagTask *wfv1.DAGTask) error {
split := strings.Split(matchGroup[1], ".")
taskName, taskResult := split[0], TaskResult(split[1])
switch taskResult {
case TaskResultSucceeded, TaskResultFailed, TaskResultSkipped, TaskResultErrored, TaskResultDaemoned:
case TaskResultSucceeded, TaskResultFailed, TaskResultSkipped, TaskResultErrored, TaskResultDaemoned, TaskResultAnySucceeded, TaskResultAllFailed:
// Do nothing
default:
return fmt.Errorf("task result '%s' for task '%s' is invalid", taskResult, taskName)
Expand Down
22 changes: 18 additions & 4 deletions workflow/common/ancestry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ func TestGetTaskDependenciesFromDepends(t *testing.T) {
}
assert.Equal(t, "((task-1.Succeeded || task-1.Skipped || task-1.Daemoned) || task-2.Succeeded) && !task-3.Succeeded", logic)

task = &wfv1.DAGTask{Depends: "(task-1 || task-2.AnySucceeded) && !task-3.Succeeded"}
deps, logic = GetTaskDependencies(task, ctx)
assert.Len(t, deps, 3)
for _, dep := range []string{"task-1", "task-2", "task-3"} {
assert.Contains(t, deps, dep)
}
assert.Equal(t, "((task-1.Succeeded || task-1.Skipped || task-1.Daemoned) || task-2.AnySucceeded) && !task-3.Succeeded", logic)

task = &wfv1.DAGTask{Depends: "(task-1||(task-2.Succeeded || task-2.Failed))&&!task-3.Failed"}
deps, logic = GetTaskDependencies(task, ctx)
assert.Len(t, deps, 3)
Expand All @@ -44,26 +52,32 @@ func TestGetTaskDependenciesFromDepends(t *testing.T) {

task = &wfv1.DAGTask{Depends: "(task-1 || task-1.Succeeded) && !task-1.Failed"}
deps, logic = GetTaskDependencies(task, ctx)
assert.Equal(t, []string{"task-1"}, deps)
assert.Equal(t, map[string]DependencyType{"task-1": DependencyTypeTask}, deps)
assert.Equal(t, "((task-1.Succeeded || task-1.Skipped || task-1.Daemoned) || task-1.Succeeded) && !task-1.Failed", logic)

task = &wfv1.DAGTask{Depends: "task-1.Succeeded && task-1.AnySucceeded"}
deps, logic = GetTaskDependencies(task, ctx)
assert.Equal(t, map[string]DependencyType{"task-1": DependencyTypeItems}, deps)
assert.Equal(t, "task-1.Succeeded && task-1.AnySucceeded", logic)

ctx.testTasks[0].ContinueOn = &wfv1.ContinueOn{Failed: true}
task = &wfv1.DAGTask{Depends: "task-1"}
deps, logic = GetTaskDependencies(task, ctx)
assert.Equal(t, []string{"task-1"}, deps)
assert.Equal(t, map[string]DependencyType{"task-1": DependencyTypeTask}, deps)
assert.Equal(t, "(task-1.Succeeded || task-1.Skipped || task-1.Daemoned || task-1.Failed)", logic)

ctx.testTasks[0].ContinueOn = &wfv1.ContinueOn{Error: true}
task = &wfv1.DAGTask{Depends: "task-1"}
deps, logic = GetTaskDependencies(task, ctx)
assert.Equal(t, []string{"task-1"}, deps)
assert.Equal(t, map[string]DependencyType{"task-1": DependencyTypeTask}, deps)
assert.Equal(t, "(task-1.Succeeded || task-1.Skipped || task-1.Daemoned || task-1.Errored)", logic)

ctx.testTasks[0].ContinueOn = &wfv1.ContinueOn{Failed: true, Error: true}
task = &wfv1.DAGTask{Depends: "task-1"}
deps, logic = GetTaskDependencies(task, ctx)
assert.Equal(t, []string{"task-1"}, deps)
assert.Equal(t, map[string]DependencyType{"task-1": DependencyTypeTask}, deps)
assert.Equal(t, "(task-1.Succeeded || task-1.Skipped || task-1.Daemoned || task-1.Errored || task-1.Failed)", logic)

}

func TestValidateTaskResults(t *testing.T) {
Expand Down
47 changes: 35 additions & 12 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ func (d *dagContext) GetTaskDependsLogic(taskName string) string {

func (d *dagContext) resolveDependencies(taskName string) {
dependencies, resolvedDependsLogic := common.GetTaskDependencies(d.GetTask(taskName), d)
d.dependencies[taskName] = dependencies
var dependencyTasks []string
for dep := range dependencies {
dependencyTasks = append(dependencyTasks, dep)
}

d.dependencies[taskName] = dependencyTasks
d.dependsLogic[taskName] = resolvedDependsLogic
}

Expand Down Expand Up @@ -437,7 +442,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
taskNodeName := dagCtx.taskNodeName(t.Name)
node = dagCtx.getTaskNode(t.Name)
if node == nil {
woc.log.Infof("All of node %s dependencies %s completed", taskNodeName, taskDependencies)
woc.log.Infof("All of node %s dependencies %v completed", taskNodeName, taskDependencies)
// Add the child relationship from our dependency's outbound nodes to this node.
connectDependencies(taskNodeName)

Expand Down Expand Up @@ -674,11 +679,13 @@ func expandTask(task wfv1.DAGTask) ([]wfv1.DAGTask, error) {
}

type TaskResults struct {
Succeeded bool `json:"Succeeded"`
Failed bool `json:"Failed"`
Errored bool `json:"Errored"`
Skipped bool `json:"Skipped"`
Daemoned bool `json:"Daemoned"`
Succeeded bool `json:"Succeeded"`
Failed bool `json:"Failed"`
Errored bool `json:"Errored"`
Skipped bool `json:"Skipped"`
Daemoned bool `json:"Daemoned"`
AnySucceeded bool `json:"AnySucceeded"`
AllFailed bool `json:"AllFailed"`
}

// evaluateDependsLogic returns whether a node should execute and proceed. proceed means that all of its dependencies are
Expand Down Expand Up @@ -706,12 +713,28 @@ func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) {
continue
}

anySucceeded := false
allFailed := false

if depNode.Type == wfv1.NodeTypeTaskGroup {

allFailed = len(depNode.Children) > 0

for _, childNodeID := range depNode.Children {
childNode := d.wf.Status.Nodes[childNodeID]
anySucceeded = anySucceeded || childNode.Phase == wfv1.NodeSucceeded
allFailed = allFailed && childNode.Phase == wfv1.NodeFailed
}
}

evalScope[evalTaskName] = TaskResults{
Succeeded: depNode.Phase == wfv1.NodeSucceeded,
Failed: depNode.Phase == wfv1.NodeFailed,
Errored: depNode.Phase == wfv1.NodeError,
Skipped: depNode.Phase == wfv1.NodeSkipped,
Daemoned: depNode.IsDaemoned() && depNode.Phase != wfv1.NodePending,
Succeeded: depNode.Phase == wfv1.NodeSucceeded,
Failed: depNode.Phase == wfv1.NodeFailed,
Errored: depNode.Phase == wfv1.NodeError,
Skipped: depNode.Phase == wfv1.NodeSkipped,
Daemoned: depNode.IsDaemoned() && depNode.Phase != wfv1.NodePending,
AnySucceeded: anySucceeded,
AllFailed: allFailed,
}
}

Expand Down
103 changes: 103 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,109 @@ func TestEvaluateDependsLogic(t *testing.T) {
assert.True(t, execute)
}

func TestEvaluateAnyAllDependsLogic(t *testing.T) {
testTasks := []wfv1.DAGTask{
{
Name: "A",
},
{
Name: "A-1",
},
{
Name: "A-2",
},
{
Name: "B",
Depends: "A.AnySucceeded",
},
{
Name: "B-1",
},
{
Name: "B-2",
},
{
Name: "C",
Depends: "B.AllFailed",
},
}

d := &dagContext{
boundaryName: "test",
tasks: testTasks,
wf: &wfv1.Workflow{ObjectMeta: metav1.ObjectMeta{Name: "test-wf"}},
dependencies: make(map[string][]string),
dependsLogic: make(map[string]string),
}

// Task A is still running, A-1 succeeded but A-2 failed
d.wf = &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{Name: "test-wf"},
Status: wfv1.WorkflowStatus{
Nodes: map[string]wfv1.NodeStatus{
d.taskNodeID("A"): {
Phase: wfv1.NodeRunning,
Type: wfv1.NodeTypeTaskGroup,
Children: []string{d.taskNodeID("A-1"), d.taskNodeID("A-2")},
},
d.taskNodeID("A-1"): {Phase: wfv1.NodeRunning},
d.taskNodeID("A-2"): {Phase: wfv1.NodeRunning},
},
},
}

// Task B should not proceed as task A is still running
execute, proceed, err := d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.False(t, proceed)
assert.False(t, execute)

// Task A succeeded
d.wf.Status.Nodes[d.taskNodeID("A")] = wfv1.NodeStatus{
Phase: wfv1.NodeSucceeded,
Type: wfv1.NodeTypeTaskGroup,
Children: []string{d.taskNodeID("A-1"), d.taskNodeID("A-2")},
}

// Task B should proceed, but not execute as none of the children have succeeded yet
execute, proceed, err = d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.False(t, execute)

// Task A-2 succeeded
d.wf.Status.Nodes[d.taskNodeID("A-2")] = wfv1.NodeStatus{Phase: wfv1.NodeSucceeded}

// Task B should now proceed and execute
execute, proceed, err = d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)

// Task B succeeds and B-1 fails
d.wf.Status.Nodes[d.taskNodeID("B")] = wfv1.NodeStatus{
Phase: wfv1.NodeSucceeded,
Type: wfv1.NodeTypeTaskGroup,
Children: []string{d.taskNodeID("B-1"), d.taskNodeID("B-2")},
}
d.wf.Status.Nodes[d.taskNodeID("B-1")] = wfv1.NodeStatus{Phase: wfv1.NodeFailed}

// Task C should proceed, but not execute as not all of B's children have failed yet
execute, proceed, err = d.evaluateDependsLogic("C")
assert.NoError(t, err)
assert.True(t, proceed)
assert.False(t, execute)

d.wf.Status.Nodes[d.taskNodeID("B-2")] = wfv1.NodeStatus{Phase: wfv1.NodeFailed}

// Task C should now proceed and execute as all of B's children have failed
execute, proceed, err = d.evaluateDependsLogic("C")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)

}

func TestAllEvaluateDependsLogic(t *testing.T) {
statusMap := map[common.TaskResult]wfv1.NodePhase{
common.TaskResultSucceeded: wfv1.NodeSucceeded,
Expand Down

0 comments on commit 1212df4

Please sign in to comment.