From 0463f24165e360344b5ff743915d16a12fef0ba0 Mon Sep 17 00:00:00 2001 From: Simon Behar Date: Wed, 8 Jul 2020 13:43:12 -0700 Subject: [PATCH] fix: Use a unique queue to visit nodes (#3418) --- workflow/controller/dag.go | 25 ++-------- workflow/controller/phase_node.go | 69 ++++++++++++++++++++++++++ workflow/controller/phase_node_test.go | 66 ++++++++++++++++++++++++ 3 files changed, 140 insertions(+), 20 deletions(-) create mode 100644 workflow/controller/phase_node.go create mode 100644 workflow/controller/phase_node_test.go diff --git a/workflow/controller/dag.go b/workflow/controller/dag.go index efb3a19e21b9..bfb9a0129e20 100644 --- a/workflow/controller/dag.go +++ b/workflow/controller/dag.go @@ -114,19 +114,6 @@ func (d *dagContext) getTaskNode(taskName string) *wfv1.NodeStatus { return &node } -type phaseNode struct { - nodeId string - phase wfv1.NodePhase -} - -func generatePhaseNodes(children []string, branchPhase wfv1.NodePhase) []phaseNode { - var out []phaseNode - for _, child := range children { - out = append(out, phaseNode{nodeId: child, phase: branchPhase}) - } - return out -} - // assessDAGPhase assesses the overall DAG status func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes) wfv1.NodePhase { @@ -142,10 +129,9 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes) wfv1 } // BFS over the children of the DAG - var curr phaseNode - queue := generatePhaseNodes(nodes[d.boundaryID].Children, wfv1.NodeSucceeded) - for len(queue) != 0 { - curr, queue = queue[0], queue[1:] + uniqueQueue := newUniquePhaseNodeQueue(generatePhaseNodes(nodes[d.boundaryID].Children, wfv1.NodeSucceeded)...) + for !uniqueQueue.empty() { + curr := uniqueQueue.pop() // We need to store the current branchPhase to remember the last completed phase in this branch so that we can apply it to omitted nodes node, branchPhase := nodes[curr.nodeId], curr.phase @@ -169,7 +155,6 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes) wfv1 } } - // TODO: Might have to consider OutboundNodes for DAG and Steps NodeTypes if node.Type == wfv1.NodeTypeRetry || node.Type == wfv1.NodeTypeTaskGroup { // A fulfilled Retry node will always reflect the status of its last child node, so its individual attempts don't interest us. // To resume the traversal, we look at the children of the last child node. @@ -177,10 +162,10 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes) wfv1 // expanded tasks have succeeded), so each individual expanded task doesn't interest us. To resume the traversal, we look at the // children of its last child node (note that this is arbitrary, since all expanded tasks will have the same children). if childNode := getChildNodeIndex(&node, nodes, -1); childNode != nil { - queue = append(queue, generatePhaseNodes(childNode.Children, branchPhase)...) + uniqueQueue.add(generatePhaseNodes(childNode.Children, branchPhase)...) } } else { - queue = append(queue, generatePhaseNodes(node.Children, branchPhase)...) + uniqueQueue.add(generatePhaseNodes(node.Children, branchPhase)...) } } diff --git a/workflow/controller/phase_node.go b/workflow/controller/phase_node.go new file mode 100644 index 000000000000..15f0e299701e --- /dev/null +++ b/workflow/controller/phase_node.go @@ -0,0 +1,69 @@ +package controller + +import ( + "fmt" + + wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" +) + +// A phaseNode is a node in a BFS of all nodes for the purposes of determining overall DAG phase. nodeId is the corresponding +// nodeId and phase is the current branchPhase associated with the node +type phaseNode struct { + nodeId string + phase wfv1.NodePhase +} + +func generatePhaseNodes(children []string, branchPhase wfv1.NodePhase) []phaseNode { + out := make([]phaseNode, len(children)) + for i, child := range children { + out[i] = phaseNode{nodeId: child, phase: branchPhase} + } + return out +} + +type uniquePhaseNodeQueue struct { + seen map[string]bool + queue []phaseNode +} + +// A uniquePhaseNodeQueue is a queue that only accepts a phaseNode only once during its life. If a node with a +// phaseNode is added while another had already been added before, the add will not succeed. Even if a phaseNode +// is added, popped, and re-added, the re-add will not succeed. Failed adds fail silently. Note that two phaseNodes +// with the same nodeId but different phases may be added, but only once per nodeId-phase combination. This is to ensure +// that branches with different branchPhases can still be processed: if an Omitted node is reached first from a step +// that succeeded, we consider the omitted node succeeded. However, it may be subsequently reached from another step +// that did not succeed. In that case we want to update the deduced status of the omitted node, and we may only do so by +// adding it to the queue again. +func newUniquePhaseNodeQueue(nodes ...phaseNode) *uniquePhaseNodeQueue { + uq := &uniquePhaseNodeQueue{ + seen: make(map[string]bool), + queue: []phaseNode{}, + } + uq.add(nodes...) + return uq +} + +// If a phaseNode has already existed, it will not be added silently +func (uq *uniquePhaseNodeQueue) add(nodes ...phaseNode) { + for _, node := range nodes { + key := fmt.Sprintf("%s-%s", node.nodeId, node.phase) + if _, ok := uq.seen[key]; !ok { + uq.seen[key] = true + uq.queue = append(uq.queue, node) + } + } +} + +func (uq *uniquePhaseNodeQueue) pop() phaseNode { + var head phaseNode + head, uq.queue = uq.queue[0], uq.queue[1:] + return head +} + +func (uq *uniquePhaseNodeQueue) empty() bool { + return uq.len() == 0 +} + +func (uq *uniquePhaseNodeQueue) len() int { + return len(uq.queue) +} diff --git a/workflow/controller/phase_node_test.go b/workflow/controller/phase_node_test.go new file mode 100644 index 000000000000..9d6822f25f26 --- /dev/null +++ b/workflow/controller/phase_node_test.go @@ -0,0 +1,66 @@ +package controller + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestUniqueQueue(t *testing.T) { + queue := newUniquePhaseNodeQueue() + assert.True(t, queue.empty()) + + phaseNodeA := phaseNode{nodeId: "node-a"} + queue.add(phaseNodeA) + assert.Equal(t, 1, queue.len()) + assert.False(t, queue.empty()) + queue.add(phaseNodeA) + assert.Equal(t, 1, queue.len()) + + phaseNodeB := phaseNode{nodeId: "node-b"} + queue.add(phaseNodeB) + assert.Equal(t, 2, queue.len()) + queue.add(phaseNodeB) + assert.Equal(t, 2, queue.len()) + + pop := queue.pop() + assert.Equal(t, "node-a", pop.nodeId) + assert.Equal(t, 1, queue.len()) + pop = queue.pop() + assert.True(t, queue.empty()) + assert.Equal(t, "node-b", pop.nodeId) + assert.Equal(t, 0, queue.len()) + + queue.add(phaseNodeA) + assert.Equal(t, 0, queue.len()) + queue.add(phaseNodeB) + assert.Equal(t, 0, queue.len()) +} + +func TestUniqueQueueConstructor(t *testing.T) { + phaseNodeA := phaseNode{nodeId: "node-a"} + queue := newUniquePhaseNodeQueue(phaseNodeA) + assert.Equal(t, 1, queue.len()) + assert.False(t, queue.empty()) + queue.add(phaseNodeA) + assert.Equal(t, 1, queue.len()) + + phaseNodeB := phaseNode{nodeId: "node-b"} + queue.add(phaseNodeB) + assert.Equal(t, 2, queue.len()) + queue.add(phaseNodeB) + assert.Equal(t, 2, queue.len()) + + pop := queue.pop() + assert.Equal(t, "node-a", pop.nodeId) + assert.Equal(t, 1, queue.len()) + pop = queue.pop() + assert.True(t, queue.empty()) + assert.Equal(t, "node-b", pop.nodeId) + assert.Equal(t, 0, queue.len()) + + queue.add(phaseNodeA) + assert.Equal(t, 0, queue.len()) + queue.add(phaseNodeB) + assert.Equal(t, 0, queue.len()) +}