Skip to content

Commit

Permalink
fix: Use a unique queue to visit nodes (#3418)
Browse files Browse the repository at this point in the history
  • Loading branch information
simster7 committed Jul 8, 2020
1 parent eddcac6 commit 0463f24
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 20 deletions.
25 changes: 5 additions & 20 deletions workflow/controller/dag.go
Expand Up @@ -114,19 +114,6 @@ func (d *dagContext) getTaskNode(taskName string) *wfv1.NodeStatus {
return &node
}

type phaseNode struct {
nodeId string
phase wfv1.NodePhase
}

func generatePhaseNodes(children []string, branchPhase wfv1.NodePhase) []phaseNode {
var out []phaseNode
for _, child := range children {
out = append(out, phaseNode{nodeId: child, phase: branchPhase})
}
return out
}

// assessDAGPhase assesses the overall DAG status
func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes) wfv1.NodePhase {

Expand All @@ -142,10 +129,9 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes) wfv1
}

// BFS over the children of the DAG
var curr phaseNode
queue := generatePhaseNodes(nodes[d.boundaryID].Children, wfv1.NodeSucceeded)
for len(queue) != 0 {
curr, queue = queue[0], queue[1:]
uniqueQueue := newUniquePhaseNodeQueue(generatePhaseNodes(nodes[d.boundaryID].Children, wfv1.NodeSucceeded)...)
for !uniqueQueue.empty() {
curr := uniqueQueue.pop()
// We need to store the current branchPhase to remember the last completed phase in this branch so that we can apply it to omitted nodes
node, branchPhase := nodes[curr.nodeId], curr.phase

Expand All @@ -169,18 +155,17 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes wfv1.Nodes) wfv1
}
}

// TODO: Might have to consider OutboundNodes for DAG and Steps NodeTypes
if node.Type == wfv1.NodeTypeRetry || node.Type == wfv1.NodeTypeTaskGroup {
// A fulfilled Retry node will always reflect the status of its last child node, so its individual attempts don't interest us.
// To resume the traversal, we look at the children of the last child node.
// A TaskGroup node will always reflect the status of its expanded tasks (mainly it will be Succeeded if and only if all of its
// expanded tasks have succeeded), so each individual expanded task doesn't interest us. To resume the traversal, we look at the
// children of its last child node (note that this is arbitrary, since all expanded tasks will have the same children).
if childNode := getChildNodeIndex(&node, nodes, -1); childNode != nil {
queue = append(queue, generatePhaseNodes(childNode.Children, branchPhase)...)
uniqueQueue.add(generatePhaseNodes(childNode.Children, branchPhase)...)
}
} else {
queue = append(queue, generatePhaseNodes(node.Children, branchPhase)...)
uniqueQueue.add(generatePhaseNodes(node.Children, branchPhase)...)
}
}

Expand Down
69 changes: 69 additions & 0 deletions workflow/controller/phase_node.go
@@ -0,0 +1,69 @@
package controller

import (
"fmt"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
)

// A phaseNode is a node in a BFS of all nodes for the purposes of determining overall DAG phase. nodeId is the corresponding
// nodeId and phase is the current branchPhase associated with the node
type phaseNode struct {
nodeId string
phase wfv1.NodePhase
}

func generatePhaseNodes(children []string, branchPhase wfv1.NodePhase) []phaseNode {
out := make([]phaseNode, len(children))
for i, child := range children {
out[i] = phaseNode{nodeId: child, phase: branchPhase}
}
return out
}

type uniquePhaseNodeQueue struct {
seen map[string]bool
queue []phaseNode
}

// A uniquePhaseNodeQueue is a queue that only accepts a phaseNode only once during its life. If a node with a
// phaseNode is added while another had already been added before, the add will not succeed. Even if a phaseNode
// is added, popped, and re-added, the re-add will not succeed. Failed adds fail silently. Note that two phaseNodes
// with the same nodeId but different phases may be added, but only once per nodeId-phase combination. This is to ensure
// that branches with different branchPhases can still be processed: if an Omitted node is reached first from a step
// that succeeded, we consider the omitted node succeeded. However, it may be subsequently reached from another step
// that did not succeed. In that case we want to update the deduced status of the omitted node, and we may only do so by
// adding it to the queue again.
func newUniquePhaseNodeQueue(nodes ...phaseNode) *uniquePhaseNodeQueue {
uq := &uniquePhaseNodeQueue{
seen: make(map[string]bool),
queue: []phaseNode{},
}
uq.add(nodes...)
return uq
}

// If a phaseNode has already existed, it will not be added silently
func (uq *uniquePhaseNodeQueue) add(nodes ...phaseNode) {
for _, node := range nodes {
key := fmt.Sprintf("%s-%s", node.nodeId, node.phase)
if _, ok := uq.seen[key]; !ok {
uq.seen[key] = true
uq.queue = append(uq.queue, node)
}
}
}

func (uq *uniquePhaseNodeQueue) pop() phaseNode {
var head phaseNode
head, uq.queue = uq.queue[0], uq.queue[1:]
return head
}

func (uq *uniquePhaseNodeQueue) empty() bool {
return uq.len() == 0
}

func (uq *uniquePhaseNodeQueue) len() int {
return len(uq.queue)
}
66 changes: 66 additions & 0 deletions workflow/controller/phase_node_test.go
@@ -0,0 +1,66 @@
package controller

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestUniqueQueue(t *testing.T) {
queue := newUniquePhaseNodeQueue()
assert.True(t, queue.empty())

phaseNodeA := phaseNode{nodeId: "node-a"}
queue.add(phaseNodeA)
assert.Equal(t, 1, queue.len())
assert.False(t, queue.empty())
queue.add(phaseNodeA)
assert.Equal(t, 1, queue.len())

phaseNodeB := phaseNode{nodeId: "node-b"}
queue.add(phaseNodeB)
assert.Equal(t, 2, queue.len())
queue.add(phaseNodeB)
assert.Equal(t, 2, queue.len())

pop := queue.pop()
assert.Equal(t, "node-a", pop.nodeId)
assert.Equal(t, 1, queue.len())
pop = queue.pop()
assert.True(t, queue.empty())
assert.Equal(t, "node-b", pop.nodeId)
assert.Equal(t, 0, queue.len())

queue.add(phaseNodeA)
assert.Equal(t, 0, queue.len())
queue.add(phaseNodeB)
assert.Equal(t, 0, queue.len())
}

func TestUniqueQueueConstructor(t *testing.T) {
phaseNodeA := phaseNode{nodeId: "node-a"}
queue := newUniquePhaseNodeQueue(phaseNodeA)
assert.Equal(t, 1, queue.len())
assert.False(t, queue.empty())
queue.add(phaseNodeA)
assert.Equal(t, 1, queue.len())

phaseNodeB := phaseNode{nodeId: "node-b"}
queue.add(phaseNodeB)
assert.Equal(t, 2, queue.len())
queue.add(phaseNodeB)
assert.Equal(t, 2, queue.len())

pop := queue.pop()
assert.Equal(t, "node-a", pop.nodeId)
assert.Equal(t, 1, queue.len())
pop = queue.pop()
assert.True(t, queue.empty())
assert.Equal(t, "node-b", pop.nodeId)
assert.Equal(t, 0, queue.len())

queue.add(phaseNodeA)
assert.Equal(t, 0, queue.len())
queue.add(phaseNodeB)
assert.Equal(t, 0, queue.len())
}

0 comments on commit 0463f24

Please sign in to comment.