Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): Fix node status when daemon pod deleted but its children nodes are still running #4683

Merged
merged 9 commits into from
Jan 13, 2021
5 changes: 5 additions & 0 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,11 @@ type TaskResults struct {
// evaluateDependsLogic returns whether a node should execute and proceed. proceed means that all of its dependencies are
// completed and execute means that given the results of its dependencies, this node should execute.
func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) {
node := d.getTaskNode(taskName)
if node != nil {
return true, true, nil
}
Comment on lines +695 to +698
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain this particular change more? Not sure if we want this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The case for this code is "explained" in the unittest.


evalScope := make(map[string]TaskResults)

for _, taskName := range d.GetTaskDependencies(taskName) {
Expand Down
49 changes: 49 additions & 0 deletions workflow/controller/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,55 @@ func TestEvaluateAnyAllDependsLogic(t *testing.T) {

}

func TestEvaluateDependsLogicWhenDaemonFailed(t *testing.T) {
testTasks := []wfv1.DAGTask{
{
Name: "A",
},
{
Name: "B",
Depends: "A",
},
}

d := &dagContext{
boundaryName: "test",
tasks: testTasks,
wf: &wfv1.Workflow{ObjectMeta: metav1.ObjectMeta{Name: "test-wf"}},
dependencies: make(map[string][]string),
dependsLogic: make(map[string]string),
}

// Task A is running
daemon := true
d.wf = &wfv1.Workflow{
ObjectMeta: metav1.ObjectMeta{Name: "test-wf"},
Status: wfv1.WorkflowStatus{
Nodes: map[string]wfv1.NodeStatus{
d.taskNodeID("A"): {Phase: wfv1.NodeRunning, Daemoned: &daemon},
},
},
}

// Task B should proceed and execute
execute, proceed, err := d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)

// Task B running
d.wf.Status.Nodes[d.taskNodeID("B")] = wfv1.NodeStatus{Phase: wfv1.NodeRunning}

// Task A failed or error
d.wf.Status.Nodes[d.taskNodeID("A")] = wfv1.NodeStatus{Phase: wfv1.NodeFailed}

// Task B should proceed and execute
execute, proceed, err = d.evaluateDependsLogic("B")
assert.NoError(t, err)
assert.True(t, proceed)
assert.True(t, execute)
}

func TestAllEvaluateDependsLogic(t *testing.T) {
statusMap := map[common.TaskResult]wfv1.NodePhase{
common.TaskResultSucceeded: wfv1.NodeSucceeded,
Expand Down
7 changes: 6 additions & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
// It is now impossible to infer pod status. We can do at this point is to mark the node with Error, or
// we can re-submit it.
for nodeID, node := range woc.wf.Status.Nodes {
if node.Type != wfv1.NodeTypePod || node.Fulfilled() || node.StartedAt.IsZero() {
if node.Type != wfv1.NodeTypePod || node.Phase.Fulfilled() || node.StartedAt.IsZero() {
// node is not a pod, it is already complete, or it can be re-run.
continue
}
Expand All @@ -950,6 +950,10 @@ func (woc *wfOperationCtx) podReconciliation(ctx context.Context) error {
continue
}

if node.Daemoned != nil && *node.Daemoned {
node.Daemoned = nil
woc.updated = true
}
woc.markNodePhase(node.Name, wfv1.NodeError, "pod deleted")
} else {
// At this point we are certain that the pod associated with our node is running or has been run;
Expand Down Expand Up @@ -1100,6 +1104,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu
if pod.DeletionTimestamp != nil {
// pod is being terminated
newPhase = wfv1.NodeError
newDaemonStatus = pointer.BoolPtr(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the purpose of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

message = "pod deleted during operation"
woc.log.WithField("displayName", node.DisplayName).WithField("templateName", node.TemplateName).
WithField("pod", pod.Name).Error(message)
Expand Down