Skip to content

Commit

Permalink
fix: ensure wftmplLifecycleHook wait for each dag task
Browse files Browse the repository at this point in the history
Signed-off-by: toyamagu-2021 <toyamagu2021@gmail.com>
  • Loading branch information
toyamagu-2021 committed Nov 13, 2023
1 parent 877a2e7 commit 8d8751a
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 12 deletions.
15 changes: 15 additions & 0 deletions workflow/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,3 +322,18 @@ func IsDone(un *unstructured.Unstructured) bool {
un.GetLabels()[LabelKeyCompleted] == "true" &&
un.GetLabels()[LabelKeyWorkflowArchivingStatus] != "Pending"
}

// Check whether child hooked nodes Fulfilled
func CheckAllHooksFullfilled(node *wfv1.NodeStatus, nodes wfv1.Nodes) bool {
childs := node.Children
for _, id := range childs {
n, ok := nodes[id]
if !ok {
continue
}
if n.NodeFlag != nil && n.NodeFlag.Hooked && !n.Fulfilled() {
return false
}
}
return true
}
12 changes: 1 addition & 11 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,20 +838,10 @@ func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) {

// If the task is still running, we should not proceed.
depNode := d.getTaskNode(taskName)
if depNode == nil || !depNode.Fulfilled() {
if depNode == nil || !depNode.Fulfilled() || !common.CheckAllHooksFullfilled(depNode, d.wf.Status.Nodes) {
return false, false, nil
}

// If a task happens to have an onExit node, don't proceed until the onExit node is fulfilled
if onExitNode, err := d.wf.GetNodeByName(common.GenerateOnExitNodeName(depNode.Name)); onExitNode != nil {
if err != nil {
return false, false, err
}
if !onExitNode.Fulfilled() {
return false, false, nil
}
}

evalTaskName := strings.Replace(taskName, "-", "_", -1)
if _, ok := evalScope[evalTaskName]; ok {
continue
Expand Down
54 changes: 54 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3655,3 +3655,57 @@ spec:
woc1.operate(ctx)
assert.Equal(t, wfv1.WorkflowRunning, woc.wf.Status.Phase)
}

func TestDagWftmplHookWithRetry(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow("@testdata/dag_wftmpl_hook_with_retry.yaml")
woc := newWoc(*wf)
ctx := context.Background()
woc.operate(ctx)

// assert task kicked
taskNode := woc.wf.Status.Nodes.FindByDisplayName("task")
assert.Equal(t, wfv1.NodePending, taskNode.Phase)

// task failed
makePodsPhase(ctx, woc, v1.PodFailed)
woc.operate(ctx)

// onFailure retry hook(0) kicked
taskNode = woc.wf.Status.Nodes.FindByDisplayName("task")
assert.Equal(t, wfv1.NodeFailed, taskNode.Phase)
failHookRetryNode := woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure")
failHookChild0Node := woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure(0)")
assert.Equal(t, wfv1.NodeRunning, failHookRetryNode.Phase)
assert.Equal(t, wfv1.NodePending, failHookChild0Node.Phase)

// onFailure retry hook(0) failed
makePodsPhase(ctx, woc, v1.PodFailed)
woc.operate(ctx)

// onFailure retry hook(1) kicked
taskNode = woc.wf.Status.Nodes.FindByDisplayName("task")
assert.Equal(t, wfv1.NodeFailed, taskNode.Phase)
failHookRetryNode = woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure")
failHookChild0Node = woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure(0)")
failHookChild1Node := woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure(1)")
assert.Equal(t, wfv1.NodeRunning, failHookRetryNode.Phase)
assert.Equal(t, wfv1.NodeFailed, failHookChild0Node.Phase)
assert.Equal(t, wfv1.NodePending, failHookChild1Node.Phase)

// onFailure retry hook(1) failed
makePodsPhase(ctx, woc, v1.PodFailed)
woc.operate(ctx)

// onFailure retry node faled
taskNode = woc.wf.Status.Nodes.FindByDisplayName("task")
assert.Equal(t, wfv1.NodeFailed, taskNode.Phase)
failHookRetryNode = woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure")
failHookChild0Node = woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure(0)")
failHookChild1Node = woc.wf.Status.Nodes.FindByDisplayName("task.hooks.failure(1)")
assert.Equal(t, wfv1.NodeFailed, failHookRetryNode.Phase)
assert.Equal(t, wfv1.NodeFailed, failHookChild0Node.Phase)
assert.Equal(t, wfv1.NodeFailed, failHookChild1Node.Phase)
// finish Node skipped
finishNode := woc.wf.Status.Nodes.FindByDisplayName("finish")
assert.Equal(t, wfv1.NodeOmitted, finishNode.Phase)
}
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1735,7 +1735,7 @@ func getRetryNodeChildrenIds(node *wfv1.NodeStatus, nodes wfv1.Nodes) []string {
if node == nil {
continue
}
if strings.HasSuffix(node.Name, ".onExit") {
if node.NodeFlag != nil && node.NodeFlag.Hooked {
childrenIds = append(childrenIds, node.ID)
} else if len(node.Children) > 0 {
childrenIds = append(childrenIds, node.Children...)
Expand Down
52 changes: 52 additions & 0 deletions workflow/controller/testdata/dag_wftmpl_hook_with_retry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# https://github.com/argoproj/argo-workflows/issues/12120
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: dag-wftmpl-hook-with-retry
namespace: argo
spec:
templates:
- name: main
dag:
tasks:
- name: task
template: task
hooks:
failure:
template: exit-handler
expression: tasks["task"].status == "Failed"
- name: finish
template: finish
dependencies:
- task
- name: task
container:
name: ''
image: alpine:latest
command:
- sh
- '-c'
args:
- exit 1;
- name: exit-handler
container:
name: ''
image: alpine:latest
command:
- sh
- '-c'
args:
- exit 1;
retryStrategy:
limit: 1
retryPolicy: Always
- name: finish
container:
name: ''
image: alpine:latest
command:
- sh
- '-c'
args:
- echo "Finished!";
entrypoint: main

0 comments on commit 8d8751a

Please sign in to comment.