Skip to content

Commit

Permalink
fix: Skip empty withParam tasks. Fixes #6834 (#6912)
Browse files Browse the repository at this point in the history
Signed-off-by: Iven Hsu <ivenvd@gmail.com>
  • Loading branch information
iven committed Oct 18, 2021
1 parent b0d1f65 commit 79d03a9
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 1 deletion.
39 changes: 39 additions & 0 deletions test/e2e/functional/dag-empty-param.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: dag-param-result-
spec:
entrypoint: dag-param-result-example
templates:
- name: dag-param-result-example
dag:
tasks:
- name: generate
template: gen-number-list
- name: sleep
template: sleep-n-sec
arguments:
parameters:
- name: seconds
value: "{{item}}"
withParam: "{{tasks.generate.outputs.result}}"
dependencies:
- generate

- name: gen-number-list
script:
image: python:alpine3.6
command: [python]
source: |
import json
import sys
json.dump([i for i in range(0, -1)], sys.stdout)
- name: sleep-n-sec
inputs:
parameters:
- name: seconds
container:
image: argoproj/argosay:v1
command: [sh, -c]
args: ["echo sleeping for {{inputs.parameters.seconds}} seconds; sleep {{inputs.parameters.seconds}}; echo done"]
17 changes: 17 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,23 @@ func (s *FunctionalSuite) TestLoopEmptyParam() {
})
}

func (s *FunctionalSuite) TestDAGEmptyParam() {
s.Given().
Workflow("@functional/dag-empty-param.yaml").
When().
SubmitWorkflow().
WaitForWorkflow().
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.WorkflowSucceeded, status.Phase)
if assert.Len(t, status.Nodes, 3) {
nodeStatus := status.Nodes.FindByDisplayName("sleep")
assert.Equal(t, wfv1.NodeSkipped, nodeStatus.Phase)
assert.Equal(t, "Skipped, empty params", nodeStatus.Message)
}
})
}

// 128M is for argo executor
func (s *FunctionalSuite) TestPendingRetryWorkflow() {
s.Given().
Expand Down
7 changes: 6 additions & 1 deletion workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,12 @@ func (woc *wfOperationCtx) executeDAGTask(ctx context.Context, dagCtx *dagContex
// For example, if we had task A with withItems of ['foo', 'bar'] which expanded to ['A(0:foo)', 'A(1:bar)'], we still
// need to create a node for A.
if task.ShouldExpand() {
if taskGroupNode == nil {
// DAG task with empty withParams list should be skipped
if len(expandedTasks) == 0 {
skipReason := "Skipped, empty params"
woc.initializeNode(nodeName, wfv1.NodeTypeSkipped, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeSkipped, skipReason)
connectDependencies(nodeName)
} else if taskGroupNode == nil {
connectDependencies(nodeName)
taskGroupNode = woc.initializeNode(nodeName, wfv1.NodeTypeTaskGroup, dagTemplateScope, task, dagCtx.boundaryID, wfv1.NodeRunning, "")
}
Expand Down

0 comments on commit 79d03a9

Please sign in to comment.