Skip to content

Commit

Permalink
Issue #1113 - Wait for daemon pods completion to handle annotations (#…
Browse files Browse the repository at this point in the history
…1177)

* Issue #1113 - Wait for daemon pods completion to handle annotations

* Add output artifacts to influxdb-ci example
  • Loading branch information
alexmt committed Jan 18, 2019
1 parent 2b2651b commit f07b5af
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
4 changes: 4 additions & 0 deletions examples/influxdb-ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ spec:
- name: influxd
path: /app
daemon: true
outputs:
artifacts:
- name: data
path: /var/lib/influxdb/data
container:
image: debian:9.4
readinessProbe:
Expand Down
4 changes: 2 additions & 2 deletions pkg/apis/workflow/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ func (ws *WorkflowStatus) Completed() bool {

// Remove returns whether or not the node has completed execution
func (n NodeStatus) Completed() bool {
return isCompletedPhase(n.Phase)
return isCompletedPhase(n.Phase) || n.IsDaemoned() && n.Phase != NodePending
}

// IsDaemoned returns whether or not the node is deamoned
Expand All @@ -574,7 +574,7 @@ func (n NodeStatus) IsDaemoned() bool {

// Successful returns whether or not this node completed successfully
func (n NodeStatus) Successful() bool {
return n.Phase == NodeSucceeded || n.Phase == NodeSkipped
return n.Phase == NodeSucceeded || n.Phase == NodeSkipped || n.IsDaemoned() && n.Phase != NodePending
}

// CanRetry returns whether the node should be retried or not.
Expand Down
26 changes: 21 additions & 5 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,8 @@ func (woc *wfOperationCtx) podReconciliation() error {
woc.addOutputsToScope("workflow", node.Outputs, nil)
woc.updated = true
}
if woc.wf.Status.Nodes[pod.ObjectMeta.Name].Completed() {
node := woc.wf.Status.Nodes[pod.ObjectMeta.Name]
if node.Completed() && !node.IsDaemoned() {
woc.completedPods[pod.ObjectMeta.Name] = true
}
}
Expand Down Expand Up @@ -556,7 +557,12 @@ func assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatus) *wfv1.NodeStatus {
newPhase = wfv1.NodeSucceeded
newDaemonStatus = &f
case apiv1.PodFailed:
newPhase, message = inferFailedReason(pod)
// ignore pod failure for daemoned steps
if node.IsDaemoned() {
newPhase = wfv1.NodeSucceeded
} else {
newPhase, message = inferFailedReason(pod)
}
newDaemonStatus = &f
case apiv1.PodRunning:
newPhase = wfv1.NodeRunning
Expand All @@ -578,8 +584,8 @@ func assessNodeStatus(pod *apiv1.Pod, node *wfv1.NodeStatus) *wfv1.NodeStatus {
return nil
}
}
// proceed to mark node status as succeeded (and daemoned)
newPhase = wfv1.NodeSucceeded
// proceed to mark node status as running (and daemoned)
newPhase = wfv1.NodeRunning
t := true
newDaemonStatus = &t
log.Infof("Processing ready daemon pod: %v", pod.ObjectMeta.SelfLink)
Expand Down Expand Up @@ -1025,7 +1031,8 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted

switch phase {
case wfv1.NodeSucceeded, wfv1.NodeFailed, wfv1.NodeError:
if markCompleted {
// wait for all daemon nodes to get terminated before marking workflow completed
if markCompleted && !woc.hasDaemonNodes() {
woc.log.Infof("Marking workflow completed")
woc.wf.Status.FinishedAt = metav1.Time{Time: time.Now().UTC()}
if woc.wf.ObjectMeta.Labels == nil {
Expand All @@ -1037,6 +1044,15 @@ func (woc *wfOperationCtx) markWorkflowPhase(phase wfv1.NodePhase, markCompleted
}
}

func (woc *wfOperationCtx) hasDaemonNodes() bool {
for _, node := range woc.wf.Status.Nodes {
if node.IsDaemoned() {
return true
}
}
return false
}

func (woc *wfOperationCtx) markWorkflowRunning() {
woc.markWorkflowPhase(wfv1.NodeRunning, false)
}
Expand Down

0 comments on commit f07b5af

Please sign in to comment.