Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): shutdownstrategy on running workflow #5289

Merged
merged 12 commits into from
Mar 6, 2021
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,13 @@ type ShutdownStrategy string
const (
ShutdownStrategyTerminate ShutdownStrategy = "Terminate"
ShutdownStrategyStop ShutdownStrategy = "Stop"
ShutdownStrategyNone ShutdownStrategy = ""
)

func (s ShutdownStrategy) Enabled() bool {
return s != ShutdownStrategyNone
}

func (s ShutdownStrategy) ShouldExecute(isOnExitPod bool) bool {
switch s {
case ShutdownStrategyTerminate:
Expand Down
12 changes: 6 additions & 6 deletions workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,18 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
return nil
case apiv1.PodPending:
// Check if we are currently shutting down
if woc.execWf.Spec.Shutdown != "" {
if woc.GetShutdownStrategy().Enabled() {
// Only delete pods that are not part of an onExit handler if we are "Stopping" or all pods if we are "Terminating"
_, onExitPod := pod.Labels[common.LabelKeyOnExit]

if !woc.wf.Spec.Shutdown.ShouldExecute(onExitPod) {
woc.log.Infof("Deleting Pending pod %s/%s as part of workflow shutdown with strategy: %s", pod.Namespace, pod.Name, woc.wf.Spec.Shutdown)
if !woc.GetShutdownStrategy().ShouldExecute(onExitPod) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor - could even be woc.ShouldExecute(...)

woc.log.Infof("Deleting Pending pod %s/%s as part of workflow shutdown with strategy: %s", pod.Namespace, pod.Name, woc.GetShutdownStrategy())
err := woc.controller.kubeclientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err == nil {
wfNodesLock.Lock()
defer wfNodesLock.Unlock()
node := woc.wf.Status.Nodes[pod.Name]
woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("workflow shutdown with strategy: %s", woc.execWf.Spec.Shutdown))
woc.markNodePhase(node.Name, wfv1.NodeFailed, fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy()))
return nil
}
// If we fail to delete the pod, fall back to setting the annotation
Expand Down Expand Up @@ -75,8 +75,8 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
}

for _, c := range woc.findTemplate(pod).GetMainContainerNames() {
if woc.wf.Spec.Shutdown != "" {
if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.wf.Spec.Shutdown.ShouldExecute(onExitPod) {
if woc.GetShutdownStrategy().Enabled() {
if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.GetShutdownStrategy().ShouldExecute(onExitPod) {
podExecCtl.Deadline = &time.Time{}
woc.log.Infof("Applying shutdown deadline for pod %s", pod.Name)
return woc.updateExecutionControl(ctx, pod.Name, podExecCtl, c)
Expand Down
50 changes: 32 additions & 18 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ type wfOperationCtx struct {
// execWf holds the Workflow for use in execution.
// In Normal workflow scenario: It holds copy of workflow object
// In Submit From WorkflowTemplate: It holds merged workflow with WorkflowDefault, Workflow and WorkflowTemplate
// 'execWf.Spec' should usually be used instead `wf.Spec`, with two exceptions for user editable fields:
// 1. `wf.Spec.Suspend`
// 2. `wf.Spec.Shutdown`
// 'execWf.Spec' should usually be used instead `wf.Spec`
execWf *wfv1.Workflow
}

Expand Down Expand Up @@ -299,7 +297,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
}

if woc.wf.Spec.Suspend != nil && *woc.wf.Spec.Suspend {
if woc.ShouldSuspend() {
woc.log.Infof("workflow suspended")
return
}
Expand Down Expand Up @@ -373,7 +371,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}[node.Phase]

var onExitNode *wfv1.NodeStatus
if woc.execWf.Spec.OnExit != "" && woc.wf.Spec.Shutdown.ShouldExecute(true) {
if woc.execWf.Spec.OnExit != "" && woc.GetShutdownStrategy().ShouldExecute(true) {
woc.globalParams[common.GlobalVarWorkflowStatus] = string(workflowStatus)

var failures []failedNodeStatus
Expand Down Expand Up @@ -412,8 +410,8 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}

var workflowMessage string
if node.FailedOrError() && woc.execWf.Spec.Shutdown != "" {
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
workflowMessage = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown)
if node.FailedOrError() && woc.GetShutdownStrategy().Enabled() {
workflowMessage = fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
} else {
workflowMessage = node.Message
}
Expand Down Expand Up @@ -753,10 +751,10 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil
}

if woc.execWf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
if woc.GetShutdownStrategy().Enabled() || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
var message string
if woc.execWf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown)
if woc.GetShutdownStrategy().Enabled() {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
} else {
message = fmt.Sprintf("retry exceeded workflow deadline %s", *woc.workflowDeadline)
}
Expand Down Expand Up @@ -1018,12 +1016,12 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node wfv1.NodeStatus) bool {
// fails any suspended and pending nodes if the workflow deadline has passed
func (woc *wfOperationCtx) failSuspendedAndPendingNodesAfterDeadlineOrShutdown() {
deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)
if woc.execWf.Spec.Shutdown != "" || deadlineExceeded {
if woc.GetShutdownStrategy().Enabled() || deadlineExceeded {
for _, node := range woc.wf.Status.Nodes {
if node.IsActiveSuspendNode() || (node.Phase == wfv1.NodePending && deadlineExceeded) {
var message string
if woc.execWf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown)
if woc.GetShutdownStrategy().Enabled() {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.GetShutdownStrategy())
} else {
message = "Step exceeded its deadline"
}
Expand Down Expand Up @@ -2927,7 +2925,7 @@ func (woc *wfOperationCtx) createTemplateContext(scope wfv1.ResourceScope, resou
}

func (woc *wfOperationCtx) runOnExitNode(ctx context.Context, templateRef, parentDisplayName, parentNodeName, boundaryID string, tmplCtx *templateresolution.Context) (bool, *wfv1.NodeStatus, error) {
if templateRef != "" && woc.wf.Spec.Shutdown.ShouldExecute(true) {
if templateRef != "" && woc.GetShutdownStrategy().ShouldExecute(true) {
woc.log.Infof("Running OnExit handler: %s", templateRef)
onExitNodeName := common.GenerateOnExitNodeName(parentDisplayName)
onExitNode, err := woc.executeTemplate(ctx, onExitNodeName, &wfv1.WorkflowStep{Template: templateRef}, tmplCtx, woc.execWf.Spec.Arguments, &executeTemplateOpts{
Expand Down Expand Up @@ -3182,15 +3180,31 @@ func (woc *wfOperationCtx) setExecWorkflow() error {
return nil
}

func (woc *wfOperationCtx) GetShutdownStrategy() wfv1.ShutdownStrategy {
return woc.execWf.Spec.Shutdown
}

func (woc *wfOperationCtx) ShouldSuspend() bool {
return woc.execWf.Spec.Suspend != nil && *woc.execWf.Spec.Suspend
}

func (woc *wfOperationCtx) needsStoredWfSpecUpdate() bool {
// woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" check is mainly to support backward compatible with 2.11.x workflow to 2.12.x
// Need to recalculate StoredWorkflowSpec in 2.12.x format.
// This check can be removed once all user migrated from 2.11.x to 2.12.x
return woc.wf.Status.StoredWorkflowSpec == nil || (woc.wf.Spec.Entrypoint != "" && woc.wf.Status.StoredWorkflowSpec.Entrypoint == "") ||
(woc.wf.Spec.Suspend != nil && woc.wf.Status.StoredWorkflowSpec.Suspend == nil) ||
(woc.wf.Spec.Shutdown != "" && woc.wf.Status.StoredWorkflowSpec.Shutdown == "") ||
(woc.wf.Spec.Shutdown != woc.wf.Status.StoredWorkflowSpec.Shutdown)
}

func (woc *wfOperationCtx) setStoredWfSpec() error {
wfDefault := woc.controller.Config.WorkflowDefaults
if wfDefault == nil {
wfDefault = &wfv1.Workflow{}
}
// woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" check is mainly to support backward compatible with 2.11.x workflow to 2.12.x
// Need to recalculate StoredWorkflowSpec in 2.12.x format.
// This check can be removed once all user migrated from 2.11.x to 2.12.x
if woc.wf.Status.StoredWorkflowSpec == nil || woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" {

if woc.needsStoredWfSpecUpdate() {
wftHolder, err := woc.fetchWorkflowSpec()
if err != nil {
return err
Expand Down
65 changes: 65 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5890,6 +5890,71 @@ func TestWorkflowScheduledTimeVariable(t *testing.T) {
assert.Equal(t, "2006-01-02T15:04:05-07:00", woc.globalParams[common.GlobalVarWorkflowCronScheduleTime])
}

func TestWorkflowShutdownStrategy(t *testing.T) {
wf := unmarshalWF(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: whalesay
namespace: default
spec:
entrypoint: whalesay
templates:
- name: whalesay
container:
image: docker/whalesay:latest
command: [sh, -c]
args: ["cowsay hellow"]`)

cancel, controller := newController()
defer cancel()
wf1 := wf.DeepCopy()
t.Run("StopStrategy", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)

for _, node := range woc.wf.Status.Nodes {
assert.Equal(t, wfv1.NodePending, node.Phase)
}
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
// Simulate the Stop command
wf1 := woc.wf
wf1.Spec.Shutdown = wfv1.ShutdownStrategyStop
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)

node := woc1.wf.Status.Nodes.FindByDisplayName("whalesay")
if assert.NotNil(t, node) {
assert.Contains(t, node.Message, "workflow shutdown with strategy: Stop")
}
})

t.Run("TerminateStrategy", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(wf1, controller)
woc.operate(ctx)

for _, node := range woc.wf.Status.Nodes {
assert.Equal(t, wfv1.NodePending, node.Phase)
}
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
// Simulate the Terminate command
wfOut := woc.wf
wfOut.Spec.Shutdown = wfv1.ShutdownStrategyTerminate
woc1 := newWorkflowOperationCtx(wfOut, controller)
woc1.operate(ctx)
for _, node := range woc1.wf.Status.Nodes {
if assert.NotNil(t, node) {
assert.Contains(t, node.Message, "workflow shutdown with strategy")
assert.Contains(t, node.Message, "Terminate")
}
}
})
}

const resultVarRefWf = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
Expand Down
74 changes: 74 additions & 0 deletions workflow/controller/operator_workflow_template_ref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util"
Expand Down Expand Up @@ -283,3 +285,75 @@ func TestWorkflowTemplateRefGetArtifactsFromTemplate(t *testing.T) {
assert.Equal(t, "data-file", woc.execWf.Spec.Arguments.Artifacts[2].Name)
})
}

func TestWorkflowTemplateRefWithShutdownAndSuspend(t *testing.T) {
cancel, controller := newController(unmarshalWF(wfWithTmplRef), unmarshalWFTmpl(wfTmpl))
defer cancel()
t.Run("EntrypointMissingInStoredWfSpec", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Nil(t, woc.wf.Status.StoredWorkflowSpec.Suspend)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Status.StoredWorkflowSpec.Entrypoint = ""
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotNil(t, woc1.wf.Status.StoredWorkflowSpec.Entrypoint)
assert.Equal(t, woc.wf.Spec.Entrypoint, woc1.wf.Status.StoredWorkflowSpec.Entrypoint)
})

t.Run("WorkflowTemplateRefWithSuspend", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Nil(t, woc.wf.Status.StoredWorkflowSpec.Suspend)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Spec.Suspend = pointer.BoolPtr(true)
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotNil(t, woc1.wf.Status.StoredWorkflowSpec.Suspend)
assert.True(t, *woc1.wf.Status.StoredWorkflowSpec.Suspend)
})
t.Run("WorkflowTemplateRefWithShutdownTerminate", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Empty(t, woc.wf.Status.StoredWorkflowSpec.Shutdown)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Spec.Shutdown = wfv1.ShutdownStrategyTerminate
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotEmpty(t, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
assert.Equal(t, wfv1.ShutdownStrategyTerminate, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
for _, node := range woc1.wf.Status.Nodes {
if assert.NotNil(t, node) {
assert.Contains(t, node.Message, "workflow shutdown with strategy: Terminate")
}
}
})
t.Run("WorkflowTemplateRefWithShutdownStop", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Empty(t, woc.wf.Status.StoredWorkflowSpec.Shutdown)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Spec.Shutdown = wfv1.ShutdownStrategyStop
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotEmpty(t, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
assert.Equal(t, wfv1.ShutdownStrategyStop, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
for _, node := range woc1.wf.Status.Nodes {
if assert.NotNil(t, node) {
assert.Contains(t, node.Message, "workflow shutdown with strategy: Stop")
}
}
})
}
4 changes: 2 additions & 2 deletions workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
}
}

if !woc.execWf.Spec.Shutdown.ShouldExecute(opts.onExitPod) {
if !woc.GetShutdownStrategy().ShouldExecute(opts.onExitPod) {
// Do not create pods if we are shutting down
woc.markNodePhase(nodeName, wfv1.NodeSkipped, fmt.Sprintf("workflow shutdown with strategy: %s", woc.execWf.Spec.Shutdown))
woc.markNodePhase(nodeName, wfv1.NodeSkipped, fmt.Sprintf("workflow shutdown with strategy: %s", woc.GetShutdownStrategy()))
return nil, nil
}

Expand Down