Skip to content

Commit

Permalink
Revert "fix: Pod OOMKilled should fail workflow. Fixes #8456 (#8478)"
Browse files Browse the repository at this point in the history
This reverts commit f2b075c.

Signed-off-by: Alex Collins <alex_collins@intuit.com>
  • Loading branch information
alexec committed May 3, 2022
1 parent 29f3ad8 commit 93cb050
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 27 deletions.
2 changes: 1 addition & 1 deletion workflow/controller/controller.go
Expand Up @@ -545,7 +545,7 @@ func (wfc *WorkflowController) signalContainers(namespace string, podName string
}

for _, c := range pod.Status.ContainerStatuses {
if c.State.Terminated != nil {
if c.Name == common.WaitContainerName || c.State.Terminated != nil {
continue
}
if err := signal.SignalContainer(wfc.restConfig, pod, c.Name, sig); err != nil {
Expand Down
21 changes: 7 additions & 14 deletions workflow/controller/operator.go
Expand Up @@ -1273,15 +1273,6 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
new.Outputs.ExitCode = pointer.StringPtr(fmt.Sprint(*exitCode))
}

// We cannot fail the node until the wait container is finished because it may be busy saving outputs, and these
// would not get captured successfully.
for _, c := range pod.Status.ContainerStatuses {
if c.Name == common.WaitContainerName && c.State.Terminated == nil && new.Phase.Completed() {
woc.log.WithField("new.phase", new.Phase).Info("leaving phase un-changed: wait container is not yet terminated ")
new.Phase = old.Phase
}
}

// if we are transitioning from Pending to a different state, clear out unchanged message
if old.Phase == wfv1.NodePending && new.Phase != wfv1.NodePending && old.Message == new.Message {
new.Message = ""
Expand All @@ -1293,7 +1284,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
}

if !reflect.DeepEqual(old, new) {
woc.log.WithField("nodeID", old.ID).
log.WithField("nodeID", old.ID).
WithField("old.phase", old.Phase).
WithField("new.phase", new.Phase).
WithField("old.message", old.Message).
Expand All @@ -1303,7 +1294,7 @@ func (woc *wfOperationCtx) assessNodeStatus(pod *apiv1.Pod, old *wfv1.NodeStatus
Info("node changed")
return new
}
woc.log.WithField("nodeID", old.ID).
log.WithField("nodeID", old.ID).
Info("node unchanged")
return nil
}
Expand All @@ -1319,9 +1310,11 @@ func getExitCode(pod *apiv1.Pod) *int32 {

func podHasContainerNeedingTermination(pod *apiv1.Pod, tmpl wfv1.Template) bool {
for _, c := range pod.Status.ContainerStatuses {
// Only clean up pod when all main containers are terminated
if tmpl.IsMainContainerName(c.Name) && c.State.Terminated == nil {
return false
// Only clean up pod when both the wait and the main containers are terminated
if c.Name == common.WaitContainerName || tmpl.IsMainContainerName(c.Name) {
if c.State.Terminated == nil {
return false
}
}
}
return true
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator_test.go
Expand Up @@ -6364,7 +6364,7 @@ func TestPodHasContainerNeedingTermination(t *testing.T) {
State: apiv1.ContainerState{Terminated: &apiv1.ContainerStateTerminated{ExitCode: 1}},
},
}}}
assert.True(t, podHasContainerNeedingTermination(&pod, tmpl))
assert.False(t, podHasContainerNeedingTermination(&pod, tmpl))

pod = apiv1.Pod{
Status: apiv1.PodStatus{
Expand Down
20 changes: 9 additions & 11 deletions workflow/executor/executor.go
Expand Up @@ -968,21 +968,15 @@ func (we *WorkflowExecutor) Wait(ctx context.Context) error {
log.WithField("annotationPatchTickDuration", we.annotationPatchTickDuration).WithField("readProgressFileTickDuration", we.readProgressFileTickDuration).Info("monitoring progress disabled")
}

// this allows us to gracefully shutdown, capturing artifacts
ctx, cancel := signal.NotifyContext(ctx, syscall.SIGTERM)
defer cancel()

go we.monitorDeadline(ctx, containerNames)

err := retryutil.OnError(executorretry.ExecutorRetry, errorsutil.IsTransientErr, func() error {
return we.RuntimeExecutor.Wait(ctx, containerNames)
err := waitutil.Backoff(executorretry.ExecutorRetry, func() (bool, error) {
err := we.RuntimeExecutor.Wait(ctx, containerNames)
return err == nil, err
})

log.WithError(err).Info("Main container completed")

if err != nil && err != context.Canceled {
if err != nil {
return fmt.Errorf("failed to wait for main container to complete: %w", err)
}
log.Infof("Main container completed")
return nil
}

Expand Down Expand Up @@ -1041,6 +1035,8 @@ func (we *WorkflowExecutor) monitorProgress(ctx context.Context, progressFile st
// monitorDeadline checks to see if we exceeded the deadline for the step and
// terminates the main container if we did
func (we *WorkflowExecutor) monitorDeadline(ctx context.Context, containerNames []string) {
terminate := make(chan os.Signal, 1)
signal.Notify(terminate, syscall.SIGTERM)

deadlineExceeded := make(chan bool, 1)
if !we.Deadline.IsZero() {
Expand All @@ -1058,6 +1054,8 @@ func (we *WorkflowExecutor) monitorDeadline(ctx context.Context, containerNames
return
case <-deadlineExceeded:
message = "Step exceeded its deadline"
case <-terminate:
message = "Step terminated"
}
log.Info(message)
util.WriteTerminateMessage(message)
Expand Down

0 comments on commit 93cb050

Please sign in to comment.