Skip to content

Commit

Permalink
fix: Allow termination of workflow to update on exit handler nodes. f…
Browse files Browse the repository at this point in the history
…ixes #13052 (#13120)

Signed-off-by: Miltiadis Alexis <alexmiltiadis@gmail.com>
  • Loading branch information
miltalex committed Jun 7, 2024
1 parent 64850e0 commit bda0280
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 25 deletions.
20 changes: 20 additions & 0 deletions test/e2e/functional/workflow-exit-handler-sleep.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: workflow-exit-handler-sleep
spec:
entrypoint: argosay-template
onExit: exit-handler
templates:
- name: argosay-template
container:
image: argoproj/argosay:v2
args: ["echo", "hello-world"]
- name: sleep
container:
image: argoproj/argosay:v2
args: ["sleep", "600"]
- name: exit-handler
steps:
- - name: exit-handler-task
template: sleep
26 changes: 26 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/suite"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow"
Expand Down Expand Up @@ -1355,3 +1356,28 @@ func (s *FunctionalSuite) TestWithItemsWithHooks() {
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeSucceeded)
}

// when you terminate a workflow with onexit handler,
// then the onexit handler should fail along with steps and stepsGroup
func (s *FunctionalSuite) TestTerminateWorkflowWhileOnExitHandlerRunning() {
s.Given().
Workflow("@functional/workflow-exit-handler-sleep.yaml").
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeRunning).
WaitForWorkflow(fixtures.Condition(func(wf *wfv1.Workflow) (bool, string) {
a := wf.Status.Nodes.FindByDisplayName("workflow-exit-handler-sleep")
return wfv1.NodeSucceeded == a.Phase, "nodes succeeded"
})).
ShutdownWorkflow(wfv1.ShutdownStrategyTerminate).
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *wfv1.WorkflowStatus) {
for _, node := range status.Nodes {
if node.Type == wfv1.NodeTypeStepGroup || node.Type == wfv1.NodeTypeSteps {
assert.Equal(t, node.Phase, wfv1.NodeFailed)
}
}
assert.Equal(t, status.Phase, wfv1.WorkflowFailed)
})
}
54 changes: 29 additions & 25 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,38 +440,42 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
}

var onExitNode *wfv1.NodeStatus
if woc.execWf.Spec.HasExitHook() && woc.GetShutdownStrategy().ShouldExecute(true) {
if woc.execWf.Spec.HasExitHook() {
woc.log.Infof("Running OnExit handler: %s", woc.execWf.Spec.OnExit)
onExitNodeName := common.GenerateOnExitNodeName(woc.wf.ObjectMeta.Name)
exitHook := woc.execWf.Spec.GetExitHook(woc.execWf.Spec.Arguments)
onExitNode, err = woc.executeTemplate(ctx, onExitNodeName, &wfv1.WorkflowStep{Template: exitHook.Template, TemplateRef: exitHook.TemplateRef}, tmplCtx, exitHook.Arguments, &executeTemplateOpts{
onExitTemplate: true, nodeFlag: &wfv1.NodeFlag{Hooked: true}})
if err != nil {
x := fmt.Errorf("error in exit template execution : %w", err)
switch err {
case ErrDeadlineExceeded:
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowTimedOut", x.Error())
case ErrParallelismReached:
default:
if !errorsutil.IsTransientErr(err) && !woc.wf.Status.Phase.Completed() && os.Getenv("BUBBLE_ENTRY_TEMPLATE_ERR") != "false" {
woc.markWorkflowError(ctx, x)

// Garbage collect PVCs if Onexit template execution returns error
if err := woc.deletePVCs(ctx); err != nil {
woc.log.WithError(err).Warn("failed to delete PVCs")
onExitNode, _ = woc.execWf.GetNodeByName(onExitNodeName)
if onExitNode != nil || woc.GetShutdownStrategy().ShouldExecute(true) {
exitHook := woc.execWf.Spec.GetExitHook(woc.execWf.Spec.Arguments)
onExitNode, err = woc.executeTemplate(ctx, onExitNodeName, &wfv1.WorkflowStep{Template: exitHook.Template, TemplateRef: exitHook.TemplateRef}, tmplCtx, exitHook.Arguments, &executeTemplateOpts{
onExitTemplate: true, nodeFlag: &wfv1.NodeFlag{Hooked: true},
})
if err != nil {
x := fmt.Errorf("error in exit template execution : %w", err)
switch err {
case ErrDeadlineExceeded:
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowTimedOut", x.Error())
case ErrParallelismReached:
default:
if !errorsutil.IsTransientErr(err) && !woc.wf.Status.Phase.Completed() && os.Getenv("BUBBLE_ENTRY_TEMPLATE_ERR") != "false" {
woc.markWorkflowError(ctx, x)

// Garbage collect PVCs if Onexit template execution returns error
if err := woc.deletePVCs(ctx); err != nil {
woc.log.WithError(err).Warn("failed to delete PVCs")
}
}
}
return
}
return
}

// If the onExit node (or any child of the onExit node) requires HTTP reconciliation, do it here
if onExitNode != nil && woc.nodeRequiresTaskSetReconciliation(onExitNode.Name) {
woc.taskSetReconciliation(ctx)
}
// If the onExit node (or any child of the onExit node) requires HTTP reconciliation, do it here
if onExitNode != nil && woc.nodeRequiresTaskSetReconciliation(onExitNode.Name) {
woc.taskSetReconciliation(ctx)
}

if onExitNode == nil || !onExitNode.Fulfilled() {
return
if onExitNode == nil || !onExitNode.Fulfilled() {
return
}
}
}

Expand Down

0 comments on commit bda0280

Please sign in to comment.