Skip to content

Commit

Permalink
fix(controller): Respect the volumes of a workflowTemplateRef. Fixes … (
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec committed Jul 14, 2020
1 parent 584cb40 commit 194a213
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 21 deletions.
17 changes: 13 additions & 4 deletions workflow/controller/controller_test.go
Expand Up @@ -316,7 +316,8 @@ func TestNamespacedController(t *testing.T) {
}, nil
})

_, controller := newController()
cancel, controller := newController()
defer cancel()
controller.kubeclientset = kubernetes.Interface(&kubeClient)
controller.cwftmplInformer = nil
controller.createClusterWorkflowTemplateInformer(context.TODO())
Expand All @@ -332,7 +333,8 @@ func TestClusterController(t *testing.T) {
}, nil
})

_, controller := newController()
cancel, controller := newController()
defer cancel()
controller.kubeclientset = kubernetes.Interface(&kubeClient)
controller.cwftmplInformer = nil
controller.createClusterWorkflowTemplateInformer(context.TODO())
Expand Down Expand Up @@ -368,6 +370,8 @@ metadata:
name: workflow-template-whalesay-template
namespace: default
spec:
serviceAccountName: my-sa
priority: 77
templates:
- name: whalesay-template
inputs:
Expand All @@ -377,12 +381,16 @@ spec:
image: docker/whalesay
command: [cowsay]
args: ["{{inputs.parameters.message}}"]
volumes:
- name: data
empty: {}
`

func TestCheckAndInitWorkflowTmplRef(t *testing.T) {
wf := unmarshalWF(wfWithTmplRef)
wftmpl := unmarshalWFTmpl(wfTmpl)
_, controller := newController(wf, wftmpl)
cancel, controller := newController(wf, wftmpl)
defer cancel()
woc := wfOperationCtx{controller: controller,
wf: wf}
_, _, err := woc.loadExecutionSpec()
Expand All @@ -391,7 +399,8 @@ func TestCheckAndInitWorkflowTmplRef(t *testing.T) {
}

func TestIsArchivable(t *testing.T) {
_, controller := newController()
cancel, controller := newController()
defer cancel()
var lblSelector metav1.LabelSelector
lblSelector.MatchLabels = make(map[string]string)
lblSelector.MatchLabels["workflows.argoproj.io/archive-strategy"] = "true"
Expand Down
14 changes: 10 additions & 4 deletions workflow/controller/operator.go
Expand Up @@ -85,7 +85,10 @@ type wfOperationCtx struct {
// preExecutionNodePhases contains the phases of all the nodes before the current operation. Necessary to infer
// changes in phase for metric emission
preExecutionNodePhases map[string]wfv1.NodePhase
// wfSpec holds the WorkflowSpec for use in execution
// wfSpec holds the WorkflowSpec for use in execution.
// This should usually be used instead `wf.Spec`, with two exceptions for user editable fields:
// 1. `wf.Spec.Suspend`
// 2. `wf.Spec.Shutdown`
wfSpec *wfv1.WorkflowSpec
}

Expand Down Expand Up @@ -420,11 +423,11 @@ func (woc *wfOperationCtx) getWorkflowDeadline() *time.Time {
func (woc *wfOperationCtx) setGlobalParameters(executionParameters wfv1.Arguments) {
woc.globalParams[common.GlobalVarWorkflowName] = woc.wf.ObjectMeta.Name
woc.globalParams[common.GlobalVarWorkflowNamespace] = woc.wf.ObjectMeta.Namespace
woc.globalParams[common.GlobalVarWorkflowServiceAccountName] = woc.wf.Spec.ServiceAccountName
woc.globalParams[common.GlobalVarWorkflowServiceAccountName] = woc.wfSpec.ServiceAccountName
woc.globalParams[common.GlobalVarWorkflowUID] = string(woc.wf.ObjectMeta.UID)
woc.globalParams[common.GlobalVarWorkflowCreationTimestamp] = woc.wf.ObjectMeta.CreationTimestamp.Format(time.RFC3339)
if woc.wf.Spec.Priority != nil {
woc.globalParams[common.GlobalVarWorkflowPriority] = strconv.Itoa(int(*woc.wf.Spec.Priority))
if woc.wfSpec.Priority != nil {
woc.globalParams[common.GlobalVarWorkflowPriority] = strconv.Itoa(int(*woc.wfSpec.Priority))
}
for char := range strftime.FormatChars {
cTimeVar := fmt.Sprintf("%s.%s", common.GlobalVarWorkflowCreationTimestamp, string(char))
Expand Down Expand Up @@ -2778,6 +2781,9 @@ func (woc *wfOperationCtx) loadExecutionSpec() (wfv1.TemplateReferenceHolder, wf
if entrypoint == "" {
entrypoint = woc.wfSpec.Entrypoint
}

woc.volumes = woc.wfSpec.DeepCopy().Volumes

tmplRef := &wfv1.WorkflowStep{TemplateRef: woc.wf.Spec.WorkflowTemplateRef.ToTemplateRef(entrypoint)}

if len(woc.wfSpec.Arguments.Parameters) > 0 {
Expand Down
31 changes: 18 additions & 13 deletions workflow/controller/operator_workflow_template_ref_test.go
Expand Up @@ -10,15 +10,16 @@ import (
)

func TestWorkflowTemplateRef(t *testing.T) {
wf := unmarshalWF(wfWithTmplRef)
wftmpl := unmarshalWFTmpl(wfTmpl)

t.Run("ExecuteWorkflowWithTmplRef", func(t *testing.T) {
_, controller := newController(wf, wftmpl)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
assert.Equal(t, &wftmpl.Spec.WorkflowSpec, woc.wfSpec)
})
cancel, controller := newController(unmarshalWF(wfWithTmplRef), unmarshalWFTmpl(wfTmpl))
defer cancel()
woc := newWorkflowOperationCtx(unmarshalWF(wfWithTmplRef), controller)
woc.operate()
assert.Equal(t, &unmarshalWFTmpl(wfTmpl).Spec.WorkflowSpec, woc.wfSpec)
// verify we copy these values
assert.Len(t, woc.volumes, 1, "volumes from workflow template")
// and these
assert.Equal(t, "my-sa", woc.globalParams["workflow.serviceAccountName"])
assert.Equal(t, "77", woc.globalParams["workflow.priority"])
}

func TestWorkflowTemplateRefWithArgs(t *testing.T) {
Expand All @@ -34,7 +35,8 @@ func TestWorkflowTemplateRefWithArgs(t *testing.T) {
},
}
wf.Spec.Arguments.Parameters = util.MergeParameters(wf.Spec.Arguments.Parameters, args)
_, controller := newController(wf, wftmpl)
cancel, controller := newController(wf, wftmpl)
defer cancel()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
assert.Equal(t, "test", woc.globalParams["workflow.parameters.param1"])
Expand All @@ -54,7 +56,8 @@ func TestWorkflowTemplateRefWithWorkflowTemplateArgs(t *testing.T) {
},
}
wftmpl.Spec.Arguments.Parameters = util.MergeParameters(wf.Spec.Arguments.Parameters, args)
_, controller := newController(wf, wftmpl)
cancel, controller := newController(wf, wftmpl)
defer cancel()
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
assert.Equal(t, "test", woc.globalParams["workflow.parameters.param1"])
Expand Down Expand Up @@ -121,7 +124,8 @@ status:
func TestWorkflowTemplateRefGetFromStored(t *testing.T) {
wf := unmarshalWF(wfWithStatus)
t.Run("ProcessWFWithStoredWFT", func(t *testing.T) {
_, controller := newController(wf)
cancel, controller := newController(wf)
defer cancel()
woc := newWorkflowOperationCtx(wf, controller)
_, execArgs, err := woc.loadExecutionSpec()
assert.NoError(t, err)
Expand All @@ -146,7 +150,8 @@ spec:
func TestWorkflowTemplateRefInvalidWF(t *testing.T) {
wf := unmarshalWF(invalidWF)
t.Run("ProcessWFWithStoredWFT", func(t *testing.T) {
_, controller := newController(wf)
cancel, controller := newController(wf)
defer cancel()
woc := newWorkflowOperationCtx(wf, controller)
_, _, err := woc.loadExecutionSpec()
assert.Error(t, err)
Expand Down

0 comments on commit 194a213

Please sign in to comment.