Skip to content

Commit

Permalink
fix: Fix certain sibling tasks not connected to parent (#6193)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Behar <simbeh7@gmail.com>
  • Loading branch information
simster7 authored and alexec committed Jun 28, 2021
1 parent 99b42eb commit 401a661
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 2 deletions.
21 changes: 20 additions & 1 deletion workflow/controller/operator.go
Expand Up @@ -2268,7 +2268,26 @@ func (woc *wfOperationCtx) getOutboundNodes(nodeID string) []string {
switch node.Type {
case wfv1.NodeTypeSkipped, wfv1.NodeTypeSuspend:
return []string{node.ID}
case wfv1.NodeTypeContainer, wfv1.NodeTypePod, wfv1.NodeTypeTaskGroup:
case wfv1.NodeTypePod:

// Recover the template that created this pod. If we can't just let the pod be its own outbound node
tmplCtx, err := woc.createTemplateContext(node.GetTemplateScope())
if err != nil {
return []string{node.ID}
}
_, parentTemplate, _, err := tmplCtx.ResolveTemplate(&node)
if err != nil {
return []string{node.ID}
}

// If this pod does not come from a container set, its outbound node is itself
if parentTemplate.GetType() != wfv1.TemplateTypeContainerSet {
return []string{node.ID}
}

// If this pod comes from a container set, it should be treated as a container or task group
fallthrough
case wfv1.NodeTypeContainer, wfv1.NodeTypeTaskGroup:
if len(node.Children) == 0 {
return []string{node.ID}
}
Expand Down
168 changes: 167 additions & 1 deletion workflow/controller/operator_test.go
Expand Up @@ -31,7 +31,6 @@ import (

"github.com/argoproj/argo-workflows/v3/config"
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"

intstrutil "github.com/argoproj/argo-workflows/v3/util/intstr"
"github.com/argoproj/argo-workflows/v3/util/template"
"github.com/argoproj/argo-workflows/v3/workflow/common"
Expand Down Expand Up @@ -6949,3 +6948,170 @@ func TestWFGlobalArtifactNil(t *testing.T) {
woc.operate(ctx)
assert.NotPanics(t, func() { woc.operate(ctx) })
}

const testDagTwoChildrenWithNonExpectedNodeType = `apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: ingest-pipeline-cdw-m2fnc
spec:
arguments:
parameters:
- name: job_name
value: all_the_data
entrypoint: ingest-task
templates:
- dag:
tasks:
- dependencies:
- sent
name: ing
template: ingest-collections
- dependencies:
- sent
name: mat
template: materializations
- arguments:
parameters:
- name: job_name
value: all_the_data
name: sent
template: sentinel
name: ingest-task
- container:
args:
- sleep 30; date; echo got sentinel for {{inputs.parameters.job_name}}
command:
- sh
- -c
image: alpine:3.13.5
inputs:
parameters:
- name: job_name
name: sentinel
- name: ingest-collections
steps:
- - name: get-ingest-collections
template: get-ingest-collections
- name: get-ingest-collections
script:
command:
- python
image: python:alpine3.6
source: |
import json
- name: materializations
steps:
- - name: get-materializations
template: get-materializations
- name: get-materializations
script:
command:
- python
image: python:alpine3.6
name: ""
resources: {}
source: |
import json
status:
nodes:
ingest-pipeline-cdw-m2fnc:
children:
- ingest-pipeline-cdw-m2fnc-141178578
displayName: ingest-pipeline-cdw-m2fnc
id: ingest-pipeline-cdw-m2fnc
name: ingest-pipeline-cdw-m2fnc
phase: Running
startedAt: "2021-06-22T18:51:02Z"
templateName: ingest-task
templateScope: local/ingest-pipeline-cdw-m2fnc
type: DAG
ingest-pipeline-cdw-m2fnc-141178578:
boundaryID: ingest-pipeline-cdw-m2fnc
displayName: sent
finishedAt: "2021-06-22T18:51:34Z"
hostNodeName: k3d-k3s-default-server-0
id: ingest-pipeline-cdw-m2fnc-141178578
name: ingest-pipeline-cdw-m2fnc.sent
phase: Succeeded
startedAt: "2021-06-22T18:51:02Z"
templateName: sentinel
templateScope: local/ingest-pipeline-cdw-m2fnc
type: Pod
phase: Running
startedAt: "2021-06-22T18:51:02Z"
`

func TestDagTwoChildrenWithNonExpectedNodeType(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(testDagTwoChildrenWithNonExpectedNodeType)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)

sentNode := woc.wf.Status.Nodes.FindByDisplayName("sent")

//Ensure that both child tasks are labeled as children of the "sent" node
assert.Len(t, sentNode.Children, 2)
}

const testDagTwoChildrenContainerSet = `apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: outputs-result-pn6gb
spec:
entrypoint: main
templates:
- dag:
tasks:
- name: a
template: group
- arguments:
parameters:
- name: x
value: '{{tasks.a.outputs.result}}'
depends: a
name: b
template: verify
name: main
- containerSet:
containers:
- args:
- -c
- |
print("hi")
image: python:alpine3.6
name: main
name: group
- inputs:
parameters:
- name: x
name: verify
script:
image: python:alpine3.6
source: |
assert "{{inputs.parameters.x}}" == "hi"
status:
phase: Running
startedAt: "2021-06-24T18:05:35Z"
`

// In this test, a pod originating from a container set should not be its own outbound node. "a" should only have one child
// and "main" should be the outbound node.
func TestDagTwoChildrenContainerSet(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(testDagTwoChildrenContainerSet)
cancel, controller := newController(wf)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)

woc.operate(ctx)
woc.operate(ctx)

sentNode := woc.wf.Status.Nodes.FindByDisplayName("a")

assert.Len(t, sentNode.Children, 1)
}

0 comments on commit 401a661

Please sign in to comment.