From 02843dd9565b79e3fb1c7e55e5e9a6148fcee7e2 Mon Sep 17 00:00:00 2001 From: Isitha Subasinghe Date: Mon, 29 Apr 2024 15:11:21 +1000 Subject: [PATCH 1/3] fix: ensure node phases propagate correctly Signed-off-by: Isitha Subasinghe --- workflow/controller/dag.go | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index fd60b5262eed..cec75755442e 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -29,6 +29,9 @@ type dagContext struct { // tasks are all the tasks in the template tasks []wfv1.DAGTask + // convert to get the task + nodeToTask map[string]*wfv1.DAGTask + // visited keeps track of tasks we have already visited during an invocation of executeDAG // in order to avoid duplicating work visited map[string]bool @@ -143,7 +146,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh // will take precedence as the branch phase for their descendents. targetTaskPhases := make(map[string]wfv1.NodePhase) for _, task := range targetTasks { - targetTaskPhases[d.taskNodeID(task)] = "" + targetTaskPhases[d.taskNodeID(task)] = wfv1.NodeOmitted } boundaryNode, err := nodes.Get(d.boundaryID) @@ -168,13 +171,19 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh return wfv1.NodeRunning, nil } + task := d.nodeToTask[curr.nodeId] + // Only overwrite the branchPhase if this node completed. (If it didn't we can just inherit our parent's branchPhase). if node.Completed() { - branchPhase = node.Phase + + if !curr.phase.FailedOrError() || (task != nil && !task.ContinuesOn(curr.phase)) { + branchPhase = node.Phase + } } + previousPhase, isTargetTask := targetTaskPhases[node.ID] // This node is a target task, so it will not have any children. Store or deduce its phase - if previousPhase, isTargetTask := targetTaskPhases[node.ID]; isTargetTask { + if isTargetTask { // Since we want Failed or Errored phases to have preference over Succeeded in case of ambiguity, only update // the deduced phase of the target task if it is not already Failed or Errored. // Note that if the target task is NOT omitted (i.e. it Completed), then this check is moot, because every time @@ -193,11 +202,15 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh // We only succeed if all the target tasks have been considered (i.e. its nodes created) and there are no failures failFast := d.tmpl.DAG.FailFast == nil || *d.tmpl.DAG.FailFast - result := wfv1.NodeSucceeded + result := boundaryNode.Phase + if result == wfv1.NodeRunning { + result = wfv1.NodeSucceeded + } + for _, depName := range targetTasks { - branchPhase := targetTaskPhases[d.taskNodeID(depName)] - if branchPhase == "" { - result = wfv1.NodeRunning + branchPhase, _ := targetTaskPhases[d.taskNodeID(depName)] + if branchPhase == wfv1.NodeOmitted { + result = wfv1.NodeSucceeded // If failFast is disabled, we will want to let all tasks complete before checking for failures if !failFast { break @@ -211,7 +224,6 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh if task := d.GetTask(depName); task.ContinuesOn(branchPhase) { continue } - result = branchPhase // If failFast is enabled, don't check to see if other target tasks are complete and fail now instead if failFast { @@ -246,6 +258,7 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl boundaryID: node.ID, tasks: tmpl.DAG.Tasks, visited: make(map[string]bool), + nodeToTask: make(map[string]*wfv1.DAGTask), tmpl: tmpl, wf: woc.wf, tmplCtx: tmplCtx, @@ -263,6 +276,11 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl targetTasks = strings.Split(tmpl.DAG.Target, " ") } + for _, task := range tmpl.DAG.Tasks { + taskId := dagCtx.taskNodeID(task.Name) + dagCtx.nodeToTask[taskId] = &task + } + // kick off execution of each target task asynchronously onExitCompleted := true for _, taskName := range targetTasks { @@ -350,6 +368,8 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl } outputs, err := getTemplateOutputsFromScope(tmpl, scope) if err != nil { + _ = woc.markNodePhase(nodeName, wfv1.NodeError) + err = woc.updateOutboundNodesForTargetTasks(dagCtx, targetTasks, nodeName) woc.log.Errorf("unable to get outputs") return node, err } From f3ff1c96c43c93dc695c00cb7b50bee02deed340 Mon Sep 17 00:00:00 2001 From: Isitha Subasinghe Date: Mon, 29 Apr 2024 15:24:26 +1000 Subject: [PATCH 2/3] fix: removed unneeded assignment Signed-off-by: Isitha Subasinghe --- workflow/controller/dag.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index cec75755442e..2c500cdf6b21 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -208,7 +208,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes, isSh } for _, depName := range targetTasks { - branchPhase, _ := targetTaskPhases[d.taskNodeID(depName)] + branchPhase := targetTaskPhases[d.taskNodeID(depName)] if branchPhase == wfv1.NodeOmitted { result = wfv1.NodeSucceeded // If failFast is disabled, we will want to let all tasks complete before checking for failures From 313ef8f5df72e5d6622d69f4c385ddc152b3e5a2 Mon Sep 17 00:00:00 2001 From: Isitha Subasinghe Date: Mon, 29 Apr 2024 15:33:44 +1000 Subject: [PATCH 3/3] fix: ensure no loop pointer exportation Signed-off-by: Isitha Subasinghe --- workflow/controller/dag.go | 1 + 1 file changed, 1 insertion(+) diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index 2c500cdf6b21..34983d84c709 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -277,6 +277,7 @@ func (woc *wfOperationCtx) executeDAG(ctx context.Context, nodeName string, tmpl } for _, task := range tmpl.DAG.Tasks { + task := task taskId := dagCtx.taskNodeID(task.Name) dagCtx.nodeToTask[taskId] = &task }