Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(controller): shutdownstrategy on running workflow #5289

Merged
merged 12 commits into from
Mar 6, 2021
10 changes: 5 additions & 5 deletions workflow/controller/exec_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
return nil
case apiv1.PodPending:
// Check if we are currently shutting down
if woc.wf.Spec.Shutdown != "" {
if woc.execWf.Spec.Shutdown != "" {
// Only delete pods that are not part of an onExit handler if we are "Stopping" or all pods if we are "Terminating"
_, onExitPod := pod.Labels[common.LabelKeyOnExit]

if !woc.wf.Spec.Shutdown.ShouldExecute(onExitPod) {
woc.log.Infof("Deleting Pending pod %s/%s as part of workflow shutdown with strategy: %s", pod.Namespace, pod.Name, woc.wf.Spec.Shutdown)
if !woc.execWf.Spec.Shutdown.ShouldExecute(onExitPod) {
woc.log.Infof("Deleting Pending pod %s/%s as part of workflow shutdown with strategy: %s", pod.Namespace, pod.Name, woc.execWf.Spec.Shutdown)
err := woc.controller.kubeclientset.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err == nil {
wfNodesLock.Lock()
Expand Down Expand Up @@ -75,8 +75,8 @@ func (woc *wfOperationCtx) applyExecutionControl(ctx context.Context, pod *apiv1
}

for _, c := range woc.findTemplate(pod).GetMainContainerNames() {
if woc.wf.Spec.Shutdown != "" {
if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.wf.Spec.Shutdown.ShouldExecute(onExitPod) {
if woc.execWf.Spec.Shutdown != "" {
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
if _, onExitPod := pod.Labels[common.LabelKeyOnExit]; !woc.execWf.Spec.Shutdown.ShouldExecute(onExitPod) {
podExecCtl.Deadline = &time.Time{}
woc.log.Infof("Applying shutdown deadline for pod %s", pod.Name)
return woc.updateExecutionControl(ctx, pod.Name, podExecCtl, c)
Expand Down
42 changes: 24 additions & 18 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,7 @@ type wfOperationCtx struct {
// execWf holds the Workflow for use in execution.
// In Normal workflow scenario: It holds copy of workflow object
// In Submit From WorkflowTemplate: It holds merged workflow with WorkflowDefault, Workflow and WorkflowTemplate
// 'execWf.Spec' should usually be used instead `wf.Spec`, with two exceptions for user editable fields:
// 1. `wf.Spec.Suspend`
// 2. `wf.Spec.Shutdown`
// 'execWf.Spec' should usually be used instead `wf.Spec`
execWf *wfv1.Workflow
}

Expand Down Expand Up @@ -299,7 +297,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}
}

if woc.wf.Spec.Suspend != nil && *woc.wf.Spec.Suspend {
if woc.execWf.Spec.Suspend != nil && *woc.execWf.Spec.Suspend {
woc.log.Infof("workflow suspended")
return
}
Expand Down Expand Up @@ -373,7 +371,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}[node.Phase]

var onExitNode *wfv1.NodeStatus
if woc.execWf.Spec.OnExit != "" && woc.wf.Spec.Shutdown.ShouldExecute(true) {
if woc.execWf.Spec.OnExit != "" && woc.execWf.Spec.Shutdown.ShouldExecute(true) {
woc.globalParams[common.GlobalVarWorkflowStatus] = string(workflowStatus)

var failures []failedNodeStatus
Expand Down Expand Up @@ -412,8 +410,8 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}

var workflowMessage string
if node.FailedOrError() && woc.wf.Spec.Shutdown != "" {
workflowMessage = fmt.Sprintf("Stopped with strategy '%s'", woc.wf.Spec.Shutdown)
if node.FailedOrError() && woc.execWf.Spec.Shutdown != "" {
workflowMessage = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown)
} else {
workflowMessage = node.Message
}
Expand Down Expand Up @@ -753,10 +751,10 @@ func (woc *wfOperationCtx) processNodeRetries(node *wfv1.NodeStatus, retryStrate
return woc.markNodePhase(node.Name, wfv1.NodeSucceeded), true, nil
}

if woc.wf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
if woc.execWf.Spec.Shutdown != "" || (woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)) {
var message string
if woc.wf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.wf.Spec.Shutdown)
if woc.execWf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown)
} else {
message = fmt.Sprintf("retry exceeded workflow deadline %s", *woc.workflowDeadline)
}
Expand Down Expand Up @@ -1018,12 +1016,12 @@ func (woc *wfOperationCtx) shouldPrintPodSpec(node wfv1.NodeStatus) bool {
// fails any suspended and pending nodes if the workflow deadline has passed
func (woc *wfOperationCtx) failSuspendedAndPendingNodesAfterDeadlineOrShutdown() {
deadlineExceeded := woc.workflowDeadline != nil && time.Now().UTC().After(*woc.workflowDeadline)
if woc.wf.Spec.Shutdown != "" || deadlineExceeded {
if woc.execWf.Spec.Shutdown != "" || deadlineExceeded {
for _, node := range woc.wf.Status.Nodes {
if node.IsActiveSuspendNode() || (node.Phase == wfv1.NodePending && deadlineExceeded) {
var message string
if woc.wf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.wf.Spec.Shutdown)
if woc.execWf.Spec.Shutdown != "" {
message = fmt.Sprintf("Stopped with strategy '%s'", woc.execWf.Spec.Shutdown)
} else {
message = "Step exceeded its deadline"
}
Expand Down Expand Up @@ -2927,7 +2925,7 @@ func (woc *wfOperationCtx) createTemplateContext(scope wfv1.ResourceScope, resou
}

func (woc *wfOperationCtx) runOnExitNode(ctx context.Context, templateRef, parentDisplayName, parentNodeName, boundaryID string, tmplCtx *templateresolution.Context) (bool, *wfv1.NodeStatus, error) {
if templateRef != "" && woc.wf.Spec.Shutdown.ShouldExecute(true) {
if templateRef != "" && woc.execWf.Spec.Shutdown.ShouldExecute(true) {
woc.log.Infof("Running OnExit handler: %s", templateRef)
onExitNodeName := common.GenerateOnExitNodeName(parentDisplayName)
onExitNode, err := woc.executeTemplate(ctx, onExitNodeName, &wfv1.WorkflowStep{Template: templateRef}, tmplCtx, woc.execWf.Spec.Arguments, &executeTemplateOpts{
Expand Down Expand Up @@ -3182,15 +3180,23 @@ func (woc *wfOperationCtx) setExecWorkflow() error {
return nil
}

func (woc *wfOperationCtx) needsStoredWfSpecUpdate() bool {
// woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" check is mainly to support backward compatible with 2.11.x workflow to 2.12.x
// Need to recalculate StoredWorkflowSpec in 2.12.x format.
// This check can be removed once all user migrated from 2.11.x to 2.12.x
return (woc.wf.Spec.Entrypoint != "" && woc.wf.Status.StoredWorkflowSpec.Entrypoint == "") ||
(woc.wf.Spec.Suspend != nil && woc.wf.Status.StoredWorkflowSpec.Suspend == nil) ||
(woc.wf.Spec.Shutdown != "" && woc.wf.Status.StoredWorkflowSpec.Shutdown == "") ||
(woc.wf.Spec.Shutdown != woc.wf.Status.StoredWorkflowSpec.Shutdown)
}

func (woc *wfOperationCtx) setStoredWfSpec() error {
wfDefault := woc.controller.Config.WorkflowDefaults
if wfDefault == nil {
wfDefault = &wfv1.Workflow{}
}
// woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" check is mainly to support backward compatible with 2.11.x workflow to 2.12.x
// Need to recalculate StoredWorkflowSpec in 2.12.x format.
// This check can be removed once all user migrated from 2.11.x to 2.12.x
if woc.wf.Status.StoredWorkflowSpec == nil || woc.wf.Status.StoredWorkflowSpec.Entrypoint == "" {

if woc.wf.Status.StoredWorkflowSpec == nil || woc.needsStoredWfSpecUpdate() {
wftHolder, err := woc.fetchWorkflowSpec()
sarabala1979 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
Expand Down
49 changes: 49 additions & 0 deletions workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5953,3 +5953,52 @@ spec:
}
})
}

const resultVarRefWf = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: scripts-bash-
spec:
entrypoint: bash-script-example
templates:
- name: bash-script-example
steps:
- - name: generate-random
template: gen-random-int
- - name: generate-random-1
template: gen-random-int
- - name: from
template: print-message
arguments:
parameters:
- name: message
value: "{{steps.generate-random.outputs.result}}"
outputs:
parameters:
- name: stepresult
valueFrom:
expression: "steps['generate-random-1'].outputs.result"

- name: gen-random-int
script:
image: debian:9.4
command: [bash]
source: |
cat /dev/urandom | od -N2 -An -i | awk -v f=1 -v r=100 '{printf "%i\n", f + r * $1 / 65536}'

- name: print-message
inputs:
parameters:
- name: message
container:
image: alpine:latest
command: [sh, -c]
args: ["echo result was: {{inputs.parameters.message}}"]
`

func TestHasOutputResultRef(t *testing.T) {
wf := unmarshalWF(resultVarRefWf)
assert.True(t, hasOutputResultRef("generate-random", &wf.Spec.Templates[0]))
assert.True(t, hasOutputResultRef("generate-random-1", &wf.Spec.Templates[0]))
}
74 changes: 74 additions & 0 deletions workflow/controller/operator_workflow_template_ref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"testing"

"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
"k8s.io/utils/pointer"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo-workflows/v3/util"
Expand Down Expand Up @@ -283,3 +285,75 @@ func TestWorkflowTemplateRefGetArtifactsFromTemplate(t *testing.T) {
assert.Equal(t, "data-file", woc.execWf.Spec.Arguments.Artifacts[2].Name)
})
}

func TestWorkflowTemplateRefWithShutdownAndSuspend(t *testing.T) {
cancel, controller := newController(unmarshalWF(wfWithTmplRef), unmarshalWFTmpl(wfTmpl))
defer cancel()
t.Run("EntrypointMissingInStoredWfSpec", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Nil(t, woc.wf.Status.StoredWorkflowSpec.Suspend)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Status.StoredWorkflowSpec.Entrypoint = ""
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotNil(t, woc1.wf.Status.StoredWorkflowSpec.Entrypoint)
assert.Equal(t, woc.wf.Spec.Entrypoint, woc1.wf.Status.StoredWorkflowSpec.Entrypoint)
})

t.Run("WorkflowTemplateRefWithSuspend", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Nil(t, woc.wf.Status.StoredWorkflowSpec.Suspend)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Spec.Suspend = pointer.BoolPtr(true)
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotNil(t, woc1.wf.Status.StoredWorkflowSpec.Suspend)
assert.True(t, *woc1.wf.Status.StoredWorkflowSpec.Suspend)
})
t.Run("WorkflowTemplateRefWithShutdownTerminate", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Empty(t, woc.wf.Status.StoredWorkflowSpec.Shutdown)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Spec.Shutdown = wfv1.ShutdownStrategyTerminate
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotEmpty(t, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
assert.Equal(t, wfv1.ShutdownStrategyTerminate, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
for _, node := range woc1.wf.Status.Nodes {
if assert.NotNil(t, node) {
assert.Contains(t, node.Message, "workflow shutdown with strategy: Terminate")
}
}
})
t.Run("WorkflowTemplateRefWithShutdownStop", func(t *testing.T) {
ctx := context.Background()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate(ctx)
assert.Empty(t, woc.wf.Status.StoredWorkflowSpec.Shutdown)
wf1 := woc.wf.DeepCopy()
// Updating Pod state
makePodsPhase(ctx, woc, apiv1.PodPending)
wf1.Spec.Shutdown = wfv1.ShutdownStrategyStop
woc1 := newWorkflowOperationCtx(wf1, controller)
woc1.operate(ctx)
assert.NotEmpty(t, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
assert.Equal(t, wfv1.ShutdownStrategyStop, woc1.wf.Status.StoredWorkflowSpec.Shutdown)
for _, node := range woc1.wf.Status.Nodes {
if assert.NotNil(t, node) {
assert.Contains(t, node.Message, "workflow shutdown with strategy: Stop")
}
}
})
}
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (woc *wfOperationCtx) createWorkflowPod(ctx context.Context, nodeName strin
}
}

if !woc.wf.Spec.Shutdown.ShouldExecute(opts.onExitPod) {
if !woc.execWf.Spec.Shutdown.ShouldExecute(opts.onExitPod) {
// Do not create pods if we are shutting down
woc.markNodePhase(nodeName, wfv1.NodeSkipped, fmt.Sprintf("workflow shutdown with strategy: %s", woc.execWf.Spec.Shutdown))
return nil, nil
Expand Down