Skip to content

Commit

Permalink
fix: delete PVCs upon onExit error when OnWorkflowCompletion is enabl…
Browse files Browse the repository at this point in the history
…ed. Fixes #10408 (#10424)

Signed-off-by: Jiacheng Xu <xjcmaxwellcjx@gmail.com>
  • Loading branch information
jiachengxu authored and terrytangyuan committed Mar 29, 2023
1 parent 1253f44 commit 17ea4bc
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 10 deletions.
4 changes: 2 additions & 2 deletions pkg/apis/workflow/v1alpha1/workflow_types.go
Expand Up @@ -1649,11 +1649,11 @@ type WorkflowTemplateRef struct {
ClusterScope bool `json:"clusterScope,omitempty" protobuf:"varint,2,opt,name=clusterScope"`
}

func (ref *WorkflowTemplateRef) ToTemplateRef(entrypoint string) *TemplateRef {
func (ref *WorkflowTemplateRef) ToTemplateRef(template string) *TemplateRef {
return &TemplateRef{
Name: ref.Name,
ClusterScope: ref.ClusterScope,
Template: entrypoint,
Template: template,
}
}

Expand Down
33 changes: 33 additions & 0 deletions test/e2e/fixtures/then.go
Expand Up @@ -8,6 +8,7 @@ import (
"time"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -185,6 +186,38 @@ func (t *Then) ExpectAuditEvents(filter func(event apiv1.Event) bool, num int, b
return t
}

func (t *Then) ExpectPVCDeleted() *Then {
t.t.Helper()
timeout := defaultTimeout
_, _ = fmt.Println("Checking", timeout.String(), "for expecting PVCs deletion")
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
select {
case <-ctx.Done():
t.t.Errorf("timeout after %v waiting for condition", timeout)
return t
default:
num := len(t.wf.Status.PersistentVolumeClaims)
pvcClient := t.kubeClient.CoreV1().PersistentVolumeClaims(t.wf.ObjectMeta.Namespace)
for _, p := range t.wf.Status.PersistentVolumeClaims {
_, err := pvcClient.Get(ctx, p.PersistentVolumeClaim.ClaimName, metav1.GetOptions{})
if err == nil {
break
} else if errors.IsNotFound(err) {
num--
} else {
t.t.Fatal(err)
return t
}
}
if num == 0 {
return t
}
}
}
}

func (t *Then) ExpectArtifact(nodeName string, artifactName string, bucketName string, f func(t *testing.T, object minio.ObjectInfo, err error)) {
t.t.Helper()

Expand Down
4 changes: 2 additions & 2 deletions test/e2e/functional_test.go
Expand Up @@ -634,10 +634,10 @@ spec:
When().
CreateWorkflowTemplates().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeErrored).
WaitForWorkflow(fixtures.ToBeFailed).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Contains(t, status.Message, "error in exit template execution")
assert.Contains(t, status.Message, "invalid spec")
})
}

Expand Down
33 changes: 33 additions & 0 deletions test/e2e/testdata/workflow-template-invalid-entrypoint.yaml
@@ -0,0 +1,33 @@
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: workflow-template-invalid-entrypoint
spec:
volumeClaimTemplates:
- metadata:
name: builddir
spec:
accessModes: [ "ReadWriteMany" ]
resources:
requests:
storage: 1Mi
volumeClaimGC:
strategy: OnWorkflowCompletion
entrypoint: whalesay
onExit: notify
templates:
- name: whalesay
container:
image: 'argoproj/argosay:v2'
command:
- /argosay
args:
- echo
- 'hello world'
volumeMounts:
- name: foo # <<< invalid since no volume `foo`
- name: notify
container:
image: 'argoproj/argosay:v2'
command: [sh, -c]
args: ["exit 0"]
33 changes: 33 additions & 0 deletions test/e2e/testdata/workflow-template-invalid-onexit.yaml
@@ -0,0 +1,33 @@
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: workflow-template-invalid-onexit
spec:
volumeClaimTemplates:
- metadata:
name: builddir
spec:
accessModes: [ "ReadWriteMany" ]
resources:
requests:
storage: 1Mi
volumeClaimGC:
strategy: OnWorkflowCompletion
entrypoint: whalesay
onExit: notify
templates:
- name: whalesay
container:
image: 'argoproj/argosay:v2'
command:
- /argosay
args:
- echo
- 'hello world'
- name: notify
container:
image: 'argoproj/argosay:v2'
command: [sh, -c]
args: ["exit 0"]
volumeMounts:
- name: foo # <<< invalid since no volume `foo`
44 changes: 44 additions & 0 deletions test/e2e/workflow_template_test.go
Expand Up @@ -77,6 +77,50 @@ func (s *WorkflowTemplateSuite) TestSubmitWorkflowTemplateWorkflowMetadataSubsti
})
}

func (s *WorkflowTemplateSuite) TestWorkflowTemplateInvalidOnExit() {
s.Given().
WorkflowTemplate("@testdata/workflow-template-invalid-onexit.yaml").
Workflow(`
metadata:
generateName: workflow-template-invalid-onexit-
spec:
workflowTemplateRef:
name: workflow-template-invalid-onexit
`).
When().
CreateWorkflowTemplates().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeErrored).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowError)
assert.Contains(t, status.Message, "error in exit template execution")
}).
ExpectPVCDeleted()
}

func (s *WorkflowTemplateSuite) TestWorkflowTemplateInvalidEntryPoint() {
s.Given().
WorkflowTemplate("@testdata/workflow-template-invalid-entrypoint.yaml").
Workflow(`
metadata:
generateName: workflow-template-invalid-entrypoint-
spec:
workflowTemplateRef:
name: workflow-template-invalid-entrypoint
`).
When().
CreateWorkflowTemplates().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBeErrored).
Then().
ExpectWorkflow(func(t *testing.T, metadata *v1.ObjectMeta, status *v1alpha1.WorkflowStatus) {
assert.Equal(t, status.Phase, v1alpha1.WorkflowError)
assert.Contains(t, status.Message, "error in entry template execution")
}).
ExpectPVCDeleted()
}

func TestWorkflowTemplateSuite(t *testing.T) {
suite.Run(t, new(WorkflowTemplateSuite))
}
14 changes: 10 additions & 4 deletions workflow/controller/operator.go
Expand Up @@ -66,7 +66,7 @@ import (

// wfOperationCtx is the context for evaluation and operation of a single workflow
type wfOperationCtx struct {
// wf is the workflow object. It should not be used in execution logic. woc.wfSpec should be used instead
// wf is the workflow object. It should not be used in execution logic. woc.execWf.Spec should be used instead
wf *wfv1.Workflow
// orig is the original workflow object for purposes of creating a patch
orig *wfv1.Workflow
Expand Down Expand Up @@ -357,6 +357,10 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.markWorkflowError(ctx, x)
}
}
// Garbage collect PVCs if Entrypoint template execution returns error
if err := woc.deletePVCs(ctx); err != nil {
woc.log.WithError(err).Warn("failed to delete PVCs")
}
return
}

Expand Down Expand Up @@ -421,9 +425,12 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
default:
if !errorsutil.IsTransientErr(err) && !woc.wf.Status.Phase.Completed() && os.Getenv("BUBBLE_ENTRY_TEMPLATE_ERR") != "false" {
woc.markWorkflowError(ctx, x)
return
}
}
// Garbage collect PVCs if Onexit template execution returns error
if err := woc.deletePVCs(ctx); err != nil {
woc.log.WithError(err).Warn("failed to delete PVCs")
}
return
}

Expand Down Expand Up @@ -478,8 +485,7 @@ func (woc *wfOperationCtx) operate(ctx context.Context) {
woc.computeMetrics(woc.execWf.Spec.Metrics.Prometheus, localScope, realTimeScope, false)
}

err = woc.deletePVCs(ctx)
if err != nil {
if err := woc.deletePVCs(ctx); err != nil {
woc.log.WithError(err).Warn("failed to delete PVCs")
}
}
Expand Down
46 changes: 46 additions & 0 deletions workflow/controller/operator_test.go
Expand Up @@ -8629,3 +8629,49 @@ func TestFailSuspendedAndPendingNodesAfterDeadlineOrShutdown(t *testing.T) {
assert.Equal(t, wfv1.NodeFailed, woc.wf.Status.Nodes[step2NodeName].Phase)
})
}

func TestWorkflowTemplateOnExitValidation(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: workflow-template-invalid-onexit-
namespace: argo
spec:
workflowTemplateRef:
name: workflow-template-invalid-onexit
`)
wft := wfv1.MustUnmarshalWorkflowTemplate("@testdata/workflow-template-invalid-onexit.yaml")
cancel, controller := newController(wf, wft)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
t.Log(woc.wf)
assert.Equal(t, woc.wf.Status.Phase, wfv1.WorkflowFailed)
assert.Contains(t, woc.wf.Status.Message, "invalid spec")
}

func TestWorkflowTemplateEntryPointValidation(t *testing.T) {
wf := wfv1.MustUnmarshalWorkflow(`
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: workflow-template-invalid-entrypoint-
namespace: argo
spec:
workflowTemplateRef:
name: workflow-template-invalid-entrypoint
`)
wft := wfv1.MustUnmarshalWorkflowTemplate("@testdata/workflow-template-invalid-entrypoint.yaml")
cancel, controller := newController(wf, wft)
defer cancel()

ctx := context.Background()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate(ctx)
t.Log(woc.wf)
assert.Equal(t, woc.wf.Status.Phase, wfv1.WorkflowFailed)
assert.Contains(t, woc.wf.Status.Message, "invalid spec")
}
@@ -0,0 +1,29 @@
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: workflow-template-invalid-entrypoint
namespace: argo
spec:
volumeClaimTemplates:
- metadata:
name: builddir
spec:
accessModes: [ "ReadWriteMany" ]
resources:
requests:
storage: 1Mi
volumeClaimGC:
strategy: OnWorkflowCompletion
entrypoint: whalesay
onExit: notify
templates:
- name: whalesay
# note this is an invalid template
- name: notify
container:
image: 'argoproj/argosay:v2'
command:
- /argosay
args:
- echo
- 'hello world'
29 changes: 29 additions & 0 deletions workflow/controller/testdata/workflow-template-invalid-onexit.yaml
@@ -0,0 +1,29 @@
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: workflow-template-invalid-onexit
namespace: argo
spec:
volumeClaimTemplates:
- metadata:
name: builddir
spec:
accessModes: [ "ReadWriteMany" ]
resources:
requests:
storage: 1Mi
volumeClaimGC:
strategy: OnWorkflowCompletion
entrypoint: whalesay
onExit: notify
templates:
- name: whalesay
container:
image: 'argoproj/argosay:v2'
command:
- /argosay
args:
- echo
- 'hello world'
- name: notify
# note this is an invalid template

0 comments on commit 17ea4bc

Please sign in to comment.