Skip to content

Commit

Permalink
fix: Exceeding quota with volumeClaimTemplates (#3490)
Browse files Browse the repository at this point in the history
  • Loading branch information
sarabala1979 authored and alexec committed Jul 23, 2020
1 parent 94b2012 commit 802c18e
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 12 deletions.
19 changes: 19 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type When struct {
cronWorkflowName string
kubeClient kubernetes.Interface
resourceQuota *corev1.ResourceQuota
storageQuota *corev1.ResourceQuota
configMap *corev1.ConfigMap
}

Expand Down Expand Up @@ -227,6 +228,24 @@ func (w *When) MemoryQuota(quota string) *When {
return w
}

func (w *When) StorageQuota(quota string) *When {
obj, err := util.CreateHardStorageQuota(w.kubeClient, "argo", "storage-quota", quota)
if err != nil {
w.t.Fatal(err)
}
w.storageQuota = obj
return w
}

func (w *When) DeleteStorageQuota() *When {
err := util.DeleteQuota(w.kubeClient, w.storageQuota)
if err != nil {
w.t.Fatal(err)
}
w.storageQuota = nil
return w
}

func (w *When) DeleteQuota() *When {
err := util.DeleteQuota(w.kubeClient, w.resourceQuota)
if err != nil {
Expand Down
19 changes: 19 additions & 0 deletions test/e2e/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package e2e

import (
"regexp"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -639,6 +640,24 @@ spec:
})
}

func (s *FunctionalSuite) TestStorageQuotaLimit() {
s.Given().
Workflow("@testdata/storage-limit.yaml").
When().
StorageQuota("5Mi").
SubmitWorkflow().
WaitForWorkflowToStart(5*time.Second).
WaitForWorkflowCondition(func(wf *wfv1.Workflow) bool {
return strings.Contains(wf.Status.Message, "Waiting for a PVC to be created")
}, "PVC pending", 10*time.Second).
DeleteStorageQuota().
WaitForWorkflow(30 * time.Second).
Then().
ExpectWorkflow(func(t *testing.T, _ *metav1.ObjectMeta, status *wfv1.WorkflowStatus) {
assert.Equal(t, wfv1.NodeSucceeded, status.Phase)
})
}

func TestFunctionalSuite(t *testing.T) {
suite.Run(t, new(FunctionalSuite))
}
22 changes: 22 additions & 0 deletions test/e2e/testdata/storage-limit.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
name: storage-quota-limit
labels:
argo-e2e: true
spec:
entrypoint: wait
volumeClaimTemplates: # define volume, same syntax as k8s Pod spec
- metadata:
name: workdir1 # name of volume claim
spec:
accessModes: [ "ReadWriteMany" ]
resources:
requests:
storage: 20Mi

templates:
- name: wait
script:
image: argoproj/argosay:v2
args: [echo, ":) Hello Argo!"]
18 changes: 15 additions & 3 deletions test/util/resourcequota.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,27 @@ import (
)

func CreateHardMemoryQuota(clientset kubernetes.Interface, namespace, name, memoryLimit string) (*corev1.ResourceQuota, error) {
resourceList := corev1.ResourceList{
corev1.ResourceLimitsMemory: resource.MustParse(memoryLimit),
}
return CreateResourceQuota(clientset, namespace, name, resourceList)
}

func CreateHardStorageQuota(clientset kubernetes.Interface, namespace, name, storageLimit string) (*corev1.ResourceQuota, error) {
resourceList := corev1.ResourceList{
"requests.storage": resource.MustParse(storageLimit),
}
return CreateResourceQuota(clientset, namespace, name, resourceList)
}

func CreateResourceQuota(clientset kubernetes.Interface, namespace, name string, rl corev1.ResourceList) (*corev1.ResourceQuota, error) {
return clientset.CoreV1().ResourceQuotas(namespace).Create(&corev1.ResourceQuota{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: map[string]string{"argo-e2e": "true"},
},
Spec: corev1.ResourceQuotaSpec{
Hard: corev1.ResourceList{
corev1.ResourceLimitsMemory: resource.MustParse(memoryLimit),
},
Hard: rl,
},
})
}
Expand Down
21 changes: 14 additions & 7 deletions workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,21 @@ func (woc *wfOperationCtx) operate() {

err = woc.createPVCs()
if err != nil {
if apierr.IsForbidden(err) {
// Error was most likely caused by a lack of resources.
// In this case, Workflow will be in pending state and requeue.
woc.markWorkflowPhase(wfv1.NodePending, false, fmt.Sprintf("Waiting for a PVC to be created. %v", err))
woc.requeue(10)
return
}
msg := "pvc create error"
woc.log.WithError(err).Error(msg)
woc.markWorkflowError(err, true)
woc.eventRecorder.Event(woc.wf, apiv1.EventTypeWarning, "WorkflowFailed", fmt.Sprintf("%s %s: %+v", woc.wf.ObjectMeta.Name, msg, err))
return
} else if woc.wf.Status.Phase == wfv1.NodePending {
// Workflow might be in pending state if previous PVC creation is forbidden
woc.markWorkflowRunning()
}

node, err := woc.executeTemplate(woc.wf.ObjectMeta.Name, execTmplRef, tmplCtx, execArgs, &executeTemplateOpts{})
Expand Down Expand Up @@ -1225,8 +1235,8 @@ func inferFailedReason(pod *apiv1.Pod) (wfv1.NodePhase, string) {
}

func (woc *wfOperationCtx) createPVCs() error {
if woc.wf.Status.Phase != wfv1.NodeRunning {
// Only attempt to create PVCs if workflow transitioned to Running state
if !(woc.wf.Status.Phase == wfv1.NodePending || woc.wf.Status.Phase == wfv1.NodeRunning) {
// Only attempt to create PVCs if workflow is in Pending or Running state
// (e.g. passed validation, or didn't already complete)
return nil
}
Expand All @@ -1235,9 +1245,6 @@ func (woc *wfOperationCtx) createPVCs() error {
// This will also handle the case where workflow has no volumeClaimTemplates.
return nil
}
if len(woc.wf.Status.PersistentVolumeClaims) == 0 {
woc.wf.Status.PersistentVolumeClaims = make([]apiv1.Volume, len(woc.wfSpec.VolumeClaimTemplates))
}
pvcClient := woc.controller.kubeclientset.CoreV1().PersistentVolumeClaims(woc.wf.ObjectMeta.Namespace)
for i, pvcTmpl := range woc.wfSpec.VolumeClaimTemplates {
if pvcTmpl.ObjectMeta.Name == "" {
Expand Down Expand Up @@ -1289,7 +1296,7 @@ func (woc *wfOperationCtx) createPVCs() error {
},
},
}
woc.wf.Status.PersistentVolumeClaims[i] = vol
woc.wf.Status.PersistentVolumeClaims = append(woc.wf.Status.PersistentVolumeClaims, vol)
woc.updated = true
}
return nil
Expand Down Expand Up @@ -1672,7 +1679,7 @@ func (woc *wfOperationCtx) hasDaemonNodes() bool {
}

func (woc *wfOperationCtx) markWorkflowRunning() {
woc.markWorkflowPhase(wfv1.NodeRunning, false)
woc.markWorkflowPhase(wfv1.NodeRunning, false, "")
}

func (woc *wfOperationCtx) markWorkflowSuccess() {
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4002,7 +4002,7 @@ status:
defer cancel()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
assert.Equal(t, wfv1.NodePending, woc.wf.Status.Phase)
assert.Equal(t, wfv1.NodeRunning, woc.wf.Status.Phase)
for _, node := range woc.wf.Status.Nodes {
switch node.TemplateName {
case "main":
Expand Down
2 changes: 1 addition & 1 deletion workflow/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func FormulateResubmitWorkflow(wf *wfv1.Workflow, memoized bool) (*wfv1.Workflow
}

newWF.Status.Conditions = wfv1.Conditions{{Status: metav1.ConditionFalse, Type: wfv1.ConditionTypeCompleted}}
newWF.Status.Phase = wfv1.NodePending
newWF.Status.Phase = ""

return &newWF, nil
}
Expand Down

0 comments on commit 802c18e

Please sign in to comment.