Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Differentiate between Fulfilled and Completed #3083

Merged
merged 8 commits into from
May 27, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 16 additions & 6 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,16 +1189,21 @@ func (n Nodes) GetResourcesDuration() ResourcesDuration {
return i
}

// Fulfilled returns whether a phase is fulfilled, i.e. it finished execution or was skipped
func (phase NodePhase) Fulfilled() bool {
return phase.Completed() || phase == NodeSkipped
}

// Completed returns whether or not a phase completed. Notably, a skipped phase is not considered as having completed
func (phase NodePhase) Completed() bool {
simster7 marked this conversation as resolved.
Show resolved Hide resolved
return phase == NodeSucceeded ||
phase == NodeFailed ||
phase == NodeError ||
phase == NodeSkipped
phase == NodeError
}

// Completed returns whether or not the workflow has completed execution
func (ws WorkflowStatus) Completed() bool {
simster7 marked this conversation as resolved.
Show resolved Hide resolved
return ws.Phase.Completed()
return ws.Phase.Fulfilled()
}

// Successful return whether or not the workflow has succeeded
Expand All @@ -1219,9 +1224,14 @@ func (ws WorkflowStatus) FinishTime() *metav1.Time {
return &ws.FinishedAt
}

// Completed returns whether or not the node has completed execution
// Fulfilled returns whether a node is fulfilled, i.e. it finished execution, was skipped, or was dameoned successfully
func (n NodeStatus) Fulfilled() bool {
return n.Phase.Fulfilled() || n.IsDaemoned() && n.Phase != NodePending
}

// Completed returns whether a node completed. Notably, a skipped node is not considered as having completed
func (n NodeStatus) Completed() bool {
return n.Phase.Completed() || n.IsDaemoned() && n.Phase != NodePending
return n.Phase.Completed()
}

func (in *WorkflowStatus) AnyActiveSuspendNode() bool {
Expand Down Expand Up @@ -1261,7 +1271,7 @@ func (n NodeStatus) FinishTime() *metav1.Time {
// CanRetry returns whether the node should be retried or not.
func (n NodeStatus) CanRetry() bool {
// TODO(shri): Check if there are some 'unretryable' errors.
return n.Completed() && !n.Successful()
return n.Fulfilled() && !n.Successful()
}

func (n NodeStatus) GetTemplateScope() (ResourceScope, string) {
Expand Down
2 changes: 1 addition & 1 deletion util/printer/workflow-printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func countPendingRunningCompleted(wf *wfv1.Workflow) (int, int, int) {
if tmpl == nil || !tmpl.IsPodType() {
continue
}
if node.Completed() {
if node.Fulfilled() {
completed++
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename variable?

} else if node.Phase == wfv1.NodeRunning {
running++
Expand Down
22 changes: 12 additions & 10 deletions workflow/controller/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func (d *dagContext) assessDAGPhase(targetTasks []string, nodes map[string]wfv1.
if node.BoundaryID != d.boundaryID {
continue
}
if !node.Completed() {
if !node.Fulfilled() {
return wfv1.NodeRunning
}
if node.Successful() {
Expand Down Expand Up @@ -278,7 +278,7 @@ func (woc *wfOperationCtx) executeDAG(nodeName string, tmplCtx *templateresoluti
}

defer func() {
if woc.wf.Status.Nodes[node.ID].Completed() {
if woc.wf.Status.Nodes[node.ID].Fulfilled() {
_ = woc.killDaemonedChildren(node.ID)
}
}()
Expand Down Expand Up @@ -377,12 +377,14 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {

node := dagCtx.getTaskNode(taskName)
task := dagCtx.GetTask(taskName)
if node != nil && node.Completed() {
// Run the node's onExit node, if any.
hasOnExitNode, onExitNode, err := woc.runOnExitNode(task.Name, task.OnExit, dagCtx.boundaryID, dagCtx.tmplCtx)
if hasOnExitNode && (onExitNode == nil || !onExitNode.Completed() || err != nil) {
// The onExit node is either not complete or has errored out, return.
return
if node != nil && node.Fulfilled() {
if node.Completed() {
simster7 marked this conversation as resolved.
Show resolved Hide resolved
// Run the node's onExit node, if any.
hasOnExitNode, onExitNode, err := woc.runOnExitNode(task.Name, task.OnExit, dagCtx.boundaryID, dagCtx.tmplCtx)
if hasOnExitNode && (onExitNode == nil || !onExitNode.Fulfilled() || err != nil) {
// The onExit node is either not complete or has errored out, return.
return
}
}
return
}
Expand Down Expand Up @@ -506,7 +508,7 @@ func (woc *wfOperationCtx) executeDAGTask(dagCtx *dagContext, taskName string) {
for _, t := range expandedTasks {
// Add the child relationship from our dependency's outbound nodes to this node.
node := dagCtx.getTaskNode(t.Name)
if node == nil || !node.Completed() {
if node == nil || !node.Fulfilled() {
return
}
if !node.Successful() {
Expand Down Expand Up @@ -702,7 +704,7 @@ func (d *dagContext) evaluateDependsLogic(taskName string) (bool, bool, error) {

// If the task is still running, we should not proceed.
depNode := d.getTaskNode(taskName)
if depNode == nil || !depNode.Completed() {
if depNode == nil || !depNode.Fulfilled() {
return false, false, nil
}

Expand Down
34 changes: 17 additions & 17 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func (woc *wfOperationCtx) operate() {
return
}

if node == nil || !node.Completed() {
if node == nil || !node.Fulfilled() {
// node can be nil if a workflow created immediately in a parallelism == 0 state
return
}
Expand Down Expand Up @@ -347,7 +347,7 @@ func (woc *wfOperationCtx) operate() {
woc.log.Errorf("error in exit template execution: %+v", err)
return
}
if onExitNode == nil || !onExitNode.Completed() {
if onExitNode == nil || !onExitNode.Fulfilled() {
return
}
}
Expand Down Expand Up @@ -622,7 +622,7 @@ func (woc *wfOperationCtx) requeue(afterDuration time.Duration) {

// processNodeRetries updates the retry node state based on the child node state and the retry strategy and returns the node.
func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrategy wfv1.RetryStrategy) (*wfv1.NodeStatus, bool, error) {
if node.Completed() {
if node.Fulfilled() {
return node, true, nil
}
lastChildNode, err := woc.getLastChildNode(node)
Expand All @@ -634,7 +634,7 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return node, true, nil
}

if !lastChildNode.Completed() {
if !lastChildNode.Fulfilled() {
// last child node is still running.
return node, true, nil
}
Expand Down Expand Up @@ -774,7 +774,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
woc.updated = true
}
node := woc.wf.Status.Nodes[pod.ObjectMeta.Name]
if node.Completed() && !node.IsDaemoned() {
if node.Fulfilled() && !node.IsDaemoned() {
if tmpVal, tmpOk := pod.Labels[common.LabelKeyCompleted]; tmpOk {
if tmpVal == "true" {
return
Expand All @@ -784,7 +784,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
if woc.shouldPrintPodSpec(node) {
printPodSpecLog(pod, woc.wf.Name)
}
if !woc.orig.Status.Nodes[node.ID].Completed() {
if !woc.orig.Status.Nodes[node.ID].Fulfilled() {
woc.onNodeComplete(&node)
}
}
Expand Down Expand Up @@ -818,7 +818,7 @@ func (woc *wfOperationCtx) podReconciliation() error {
// It is now impossible to infer pod status. The only thing we can do at this point is to mark
// the node with Error.
for nodeID, node := range woc.wf.Status.Nodes {
if node.Type != wfv1.NodeTypePod || node.Completed() || node.StartedAt.IsZero() {
if node.Type != wfv1.NodeTypePod || node.Fulfilled() || node.StartedAt.IsZero() {
// node is not a pod, it is already complete, or it can be re-run.
continue
}
Expand Down Expand Up @@ -1059,7 +1059,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatu
node.Message = message
}

if node.Completed() && node.FinishedAt.IsZero() {
if node.Fulfilled() && node.FinishedAt.IsZero() {
updated = true
if !node.IsDaemoned() {
node.FinishedAt = getLatestFinishedAt(pod)
Expand Down Expand Up @@ -1402,13 +1402,13 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
}

if node != nil {
if node.Completed() {
if node.Fulfilled() {
woc.log.Debugf("Node %s already completed", nodeName)
if resolvedTmpl.Metrics != nil {
// Check if this node completed between executions. If it did, emit metrics. If a node completes within
// the same execution, its metrics are emitted below.
// We can infer that this node completed during the current operation, emit metrics
if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Completed() {
if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; ok && !prevNodeStatus.Fulfilled() {
localScope, realTimeScope := woc.prepareMetricScope(node)
woc.computeMetrics(resolvedTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
}
Expand Down Expand Up @@ -1470,14 +1470,14 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
}
retryParentNode = processedRetryParentNode
// The retry node might have completed by now.
if retryParentNode.Completed() {
if retryParentNode.Fulfilled() {
return retryParentNode, nil
}
lastChildNode, err := woc.getLastChildNode(retryParentNode)
if err != nil {
return woc.markNodeError(retryNodeName, err), err
}
if lastChildNode != nil && !lastChildNode.Completed() {
if lastChildNode != nil && !lastChildNode.Fulfilled() {
// Last child node is still running.
nodeName = lastChildNode.Name
node = lastChildNode
Expand Down Expand Up @@ -1547,7 +1547,7 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
// completed before this execution. If it did not exist prior, then we can infer that it was completed during this execution.
// The statement "(!ok || prevNodeStatus.Completed())" checks for this behavior and represents the material conditional
// "ok -> prevNodeStatus.Completed()" (https://en.wikipedia.org/wiki/Material_conditional)
if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; (!ok || prevNodeStatus.Completed()) && node.Completed() {
if prevNodeStatus, ok := woc.preExecutionNodePhases[node.ID]; (!ok || prevNodeStatus.Fulfilled()) && node.Fulfilled() {
localScope, realTimeScope := woc.prepareMetricScope(node)
woc.computeMetrics(resolvedTmpl.Metrics.Prometheus, localScope, realTimeScope, false)
}
Expand All @@ -1558,7 +1558,7 @@ func (woc *wfOperationCtx) executeTemplate(nodeName string, orgTmpl wfv1.Templat
// Swap the node back to retry node
if retryNodeName != "" {
retryNode := woc.getNodeByName(retryNodeName)
if !retryNode.Completed() && node.Completed() { //if the retry child has completed we need to update outself
if !retryNode.Fulfilled() && node.Fulfilled() { //if the retry child has completed we need to update outself
node, err = woc.executeTemplate(retryNodeName, orgTmpl, tmplCtx, args, opts)
if err != nil {
return woc.markNodeError(node.Name, err), err
Expand Down Expand Up @@ -1714,7 +1714,7 @@ func (woc *wfOperationCtx) initializeNode(nodeName string, nodeType wfv1.NodeTyp
node.DisplayName = nodeName
}

if node.Completed() && node.FinishedAt.IsZero() {
if node.Fulfilled() && node.FinishedAt.IsZero() {
node.FinishedAt = node.StartedAt
}
var message string
Expand Down Expand Up @@ -1746,12 +1746,12 @@ func (woc *wfOperationCtx) markNodePhase(nodeName string, phase wfv1.NodePhase,
woc.updated = true
}
}
if node.Completed() && node.FinishedAt.IsZero() {
if node.Fulfilled() && node.FinishedAt.IsZero() {
node.FinishedAt = metav1.Time{Time: time.Now().UTC()}
woc.log.Infof("node %s finished: %s", node.ID, node.FinishedAt)
woc.updated = true
}
if !woc.orig.Status.Nodes[node.ID].Completed() && node.Completed() {
if !woc.orig.Status.Nodes[node.ID].Fulfilled() && node.Fulfilled() {
woc.onNodeComplete(node)
}
woc.wf.Status.Nodes[node.ID] = *node
Expand Down