Skip to content

Commit

Permalink
Add parallelism control at the steps template level
Browse files Browse the repository at this point in the history
  • Loading branch information
jessesuen committed Feb 8, 2018
1 parent c788484 commit deae4c6
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 8 deletions.
2 changes: 2 additions & 0 deletions Makefile
Expand Up @@ -114,6 +114,8 @@ ui-image:
docker build -t $(IMAGE_PREFIX)argoui:$(IMAGE_TAG) -f ui/Dockerfile ui
@if [ "$(DOCKER_PUSH)" = "true" ] ; then docker push $(IMAGE_PREFIX)argoui:$(IMAGE_TAG) ; fi

precheckin: test lint verify-codegen

release-precheck:
@if [ "$(GIT_TREE_STATE)" != "clean" ]; then echo 'git tree state is $(GIT_TREE_STATE)' ; exit 1; fi
@if [ -z "$(GIT_TAG)" ]; then echo 'commit must be tagged to perform release' ; exit 1; fi
Expand Down
5 changes: 5 additions & 0 deletions api/openapi-spec/swagger.json
Expand Up @@ -700,6 +700,11 @@
"description": "Outputs describe the parameters and artifacts that this template produces",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.Outputs"
},
"parallelism": {
"description": "Parallelism limits the max total parallel pods that can execute at the same time within the boundaries of this template invocation. If additional steps/dag templates are invoked, the pods created by those templates will not be counted towards this total.",
"type": "integer",
"format": "int64"
},
"resource": {
"description": "Resource template subtype which can run k8s resources",
"$ref": "#/definitions/io.argoproj.workflow.v1alpha1.ResourceTemplate"
Expand Down
28 changes: 28 additions & 0 deletions examples/parallelism-template-limit.yaml
@@ -0,0 +1,28 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: parallelism-template-limit-
spec:
entrypoint: parallelism-template-limit
templates:
- name: parallelism-template-limit
parallelism: 2
steps:
- - name: sleep
template: sleep
withItems:
- this
- workflow
- should
- take
- at
- least
- 60
- seconds
- to
- complete

- name: sleep
container:
image: alpine:latest
command: [sh, -c, sleep 10]
7 changes: 7 additions & 0 deletions pkg/apis/workflow/v1alpha1/openapi_generated.go
Expand Up @@ -1237,6 +1237,13 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA
Ref: ref("github.com/argoproj/argo/pkg/apis/workflow/v1alpha1.RetryStrategy"),
},
},
"parallelism": {
SchemaProps: spec.SchemaProps{
Description: "Parallelism limits the max total parallel pods that can execute at the same time within the boundaries of this template invocation. If additional steps/dag templates are invoked, the pods created by those templates will not be counted towards this total.",
Type: []string{"integer"},
Format: "int64",
},
},
},
Required: []string{"name"},
},
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/workflow/v1alpha1/types.go
Expand Up @@ -167,6 +167,11 @@ type Template struct {

// RetryStrategy describes how to retry a template when it fails
RetryStrategy *RetryStrategy `json:"retryStrategy,omitempty"`

// Parallelism limits the max total parallel pods that can execute at the same time within the
// boundaries of this template invocation. If additional steps/dag templates are invoked, the
// pods created by those templates will not be counted towards this total.
Parallelism *int64 `json:"parallelism,omitempty"`
}

// Inputs are the mechanism for passing parameters, artifacts, volumes from one template to another
Expand Down
9 changes: 9 additions & 0 deletions pkg/apis/workflow/v1alpha1/zz_generated.deepcopy.go
Expand Up @@ -696,6 +696,15 @@ func (in *Template) DeepCopyInto(out *Template) {
(*in).DeepCopyInto(*out)
}
}
if in.Parallelism != nil {
in, out := &in.Parallelism, &out.Parallelism
if *in == nil {
*out = nil
} else {
*out = new(int64)
**out = **in
}
}
return
}

Expand Down
8 changes: 8 additions & 0 deletions workflow/common/util.go
Expand Up @@ -308,3 +308,11 @@ func IsValidWorkflowFieldName(name string) []string {
}
return errs
}

// IsPodTemplate returns whether the template corresponds to a pod
func IsPodTemplate(tmpl *wfv1.Template) bool {
if tmpl.Container != nil || tmpl.Script != nil || tmpl.Resource != nil {
return true
}
return false
}
19 changes: 14 additions & 5 deletions workflow/common/validate.go
Expand Up @@ -215,29 +215,32 @@ func validateLeaf(scope map[string]interface{}, tmpl *wfv1.Template) error {
}
err = resolveAllVariables(scope, string(tmplBytes))
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "template.%s: %s", tmpl.Name, err.Error())
return errors.Errorf(errors.CodeBadRequest, "templates.%s: %s", tmpl.Name, err.Error())
}
if tmpl.Container != nil {
// Ensure there are no collisions with volume mountPaths and artifact load paths
mountPaths := make(map[string]string)
for i, volMount := range tmpl.Container.VolumeMounts {
if prev, ok := mountPaths[volMount.MountPath]; ok {
return errors.Errorf(errors.CodeBadRequest, "template '%s' container.volumeMounts[%d].mountPath '%s' already mounted in %s", tmpl.Name, i, volMount.MountPath, prev)
return errors.Errorf(errors.CodeBadRequest, "templates.%s.container.volumeMounts[%d].mountPath '%s' already mounted in %s", tmpl.Name, i, volMount.MountPath, prev)
}
mountPaths[volMount.MountPath] = fmt.Sprintf("container.volumeMounts.%s", volMount.Name)
}
for i, art := range tmpl.Inputs.Artifacts {
if prev, ok := mountPaths[art.Path]; ok {
return errors.Errorf(errors.CodeBadRequest, "template '%s' inputs.artifacts[%d].path '%s' already mounted in %s", tmpl.Name, i, art.Path, prev)
return errors.Errorf(errors.CodeBadRequest, "templates.%s.inputs.artifacts[%d].path '%s' already mounted in %s", tmpl.Name, i, art.Path, prev)
}
mountPaths[art.Path] = fmt.Sprintf("inputs.artifacts.%s", art.Name)
}
}
if tmpl.ActiveDeadlineSeconds != nil {
if *tmpl.ActiveDeadlineSeconds <= 0 {
return errors.Errorf(errors.CodeBadRequest, "template '%s' activeDeadlineSeconds must be a positive integer > 0", tmpl.Name)
return errors.Errorf(errors.CodeBadRequest, "templates.%s.activeDeadlineSeconds must be a positive integer > 0", tmpl.Name)
}
}
if tmpl.Parallelism != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.parallelism is only valid for steps and dag templates ", tmpl.Name)
}
return nil
}

Expand All @@ -256,6 +259,9 @@ func validateArguments(prefix string, arguments wfv1.Arguments) error {
}

func (ctx *wfValidationCtx) validateSteps(scope map[string]interface{}, tmpl *wfv1.Template) error {
if tmpl.RetryStrategy != nil {
return errors.Errorf(errors.CodeBadRequest, "template.%s.retryStrategy is only valid for container templates", tmpl.Name)
}
stepNames := make(map[string]bool)
for i, stepGroup := range tmpl.Steps {
for _, step := range stepGroup {
Expand Down Expand Up @@ -355,7 +361,7 @@ func validateOutputs(scope map[string]interface{}, tmpl *wfv1.Template) error {
}
err = resolveAllVariables(scope, string(outputBytes))
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "template '%s' outputs %s", tmpl.Name, err.Error())
return errors.Errorf(errors.CodeBadRequest, "templates.%s.outputs %s", tmpl.Name, err.Error())
}

isLeaf := tmpl.Container != nil || tmpl.Script != nil
Expand Down Expand Up @@ -425,6 +431,9 @@ func (ctx *wfValidationCtx) validateDAG(scope map[string]interface{}, tmpl *wfv1
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks%s", tmpl.Name, err.Error())
}
if tmpl.RetryStrategy != nil {
return errors.Errorf(errors.CodeBadRequest, "template.%s.retryStrategy is only valid for container templates", tmpl.Name)
}
nameToTask := make(map[string]wfv1.DAGTask)
for _, task := range tmpl.DAG.Tasks {
nameToTask[task.Name] = task
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Expand Up @@ -914,7 +914,7 @@ func (woc *wfOperationCtx) markNodeError(nodeName string, err error) *wfv1.NodeS

func (woc *wfOperationCtx) checkParallism() error {
if woc.wf.Spec.Parallelism != nil && woc.activePods >= *woc.wf.Spec.Parallelism {
woc.log.Infof("active pod parallism reached %d/%d", woc.activePods, *woc.wf.Spec.Parallelism)
woc.log.Infof("workflow active pod parallism reached %d/%d", woc.activePods, *woc.wf.Spec.Parallelism)
return ErrParallismReached
}
return nil
Expand Down
49 changes: 48 additions & 1 deletion workflow/controller/operator_test.go
Expand Up @@ -210,7 +210,7 @@ spec:
command: [sh, -c, sleep 10]
`

// TestWorkflowParallismLimit verifies parallism is honored.
// TestWorkflowParallismLimit verifies parallism at a workflow level is honored.
func TestWorkflowParallismLimit(t *testing.T) {
controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
Expand All @@ -226,6 +226,53 @@ func TestWorkflowParallismLimit(t *testing.T) {
assert.Equal(t, 2, len(pods.Items))
}

var stepsTemplateParallismLimit = `
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: parallelism-limit-
spec:
entrypoint: parallelism-limit
templates:
- name: parallelism-limit
parallelism: 2
steps:
- - name: sleep
template: sleep
withItems:
- this
- workflow
- should
- take
- at
- least
- 60
- seconds
- to
- complete
- name: sleep
container:
image: alpine:latest
command: [sh, -c, sleep 10]
`

// TestStepsTemplateParallismLimit verifies parallism at a steps level is honored.
func TestStepsTemplateParallismLimit(t *testing.T) {
controller := newController()
wfcset := controller.wfclientset.ArgoprojV1alpha1().Workflows("")
wf := unmarshalWF(stepsTemplateParallismLimit)
wf, err := wfcset.Create(wf)
assert.Nil(t, err)
wf, err = wfcset.Get(wf.ObjectMeta.Name, metav1.GetOptions{})
assert.Nil(t, err)
woc := newWorkflowOperationCtx(wf, controller)
woc.operate()
pods, err := controller.kubeclientset.CoreV1().Pods("").List(metav1.ListOptions{})
assert.Nil(t, err)
assert.Equal(t, 2, len(pods.Items))
}

// TestSidecarResourceLimits verifies resource limits on the sidecar can be set in the controller config
func TestSidecarResourceLimits(t *testing.T) {
controller := newController()
Expand Down
31 changes: 31 additions & 0 deletions workflow/controller/steps.go
Expand Up @@ -22,6 +22,12 @@ type stepsContext struct {

// scope holds parameter and artifacts which are referenceable in scope during execution
scope *wfScope

// tracks how the max number of parallel containers that should execute within this template
parallelism *int64

// activePods tracks the number of active (Running/Pending) pods within this template for controlling parallelism
activePods int64
}

func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template, boundaryID string) error {
Expand All @@ -40,6 +46,20 @@ func (woc *wfOperationCtx) executeSteps(nodeName string, tmpl *wfv1.Template, bo
tmpl: tmpl,
scope: make(map[string]interface{}),
},
parallelism: tmpl.Parallelism,
}
if stepsCtx.parallelism != nil {
// if we care about parallism, count the active pods at the template level
for _, node := range woc.wf.Status.Nodes {
if node.BoundaryID == stepsCtx.boundaryID && node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeRunning {
stepsCtx.activePods++
}
}
woc.log.Debugf("counted %d active pods in boundary %s", stepsCtx.activePods, stepsCtx.boundaryID)
if stepsCtx.activePods >= *stepsCtx.parallelism {
woc.log.Infof("template active pod parallelism reached %d/%d", stepsCtx.activePods, *stepsCtx.parallelism)
return ErrParallismReached
}
}
for i, stepGroup := range tmpl.Steps {
sgNodeName := fmt.Sprintf("%s[%d]", nodeName, i)
Expand Down Expand Up @@ -177,6 +197,17 @@ func (woc *wfOperationCtx) executeStepGroup(stepGroup []wfv1.WorkflowStep, sgNod
}
return err
}
// Check if we reached max pod parallelism for the template and return if we did
if stepsCtx.parallelism != nil {
childNode := woc.getNodeByName(childNodeName)
if childNode.Type == wfv1.NodeTypePod && childNode.Phase == wfv1.NodeRunning {
stepsCtx.activePods++
}
if stepsCtx.activePods >= *stepsCtx.parallelism {
woc.log.Infof("template active pod parallelism reached %d/%d", stepsCtx.activePods, *stepsCtx.parallelism)
return ErrParallismReached
}
}
}

node = woc.getNodeByName(sgNodeName)
Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/workflowpod.go
Expand Up @@ -213,7 +213,7 @@ func (woc *wfOperationCtx) createWorkflowPod(nodeName string, mainCtr apiv1.Cont
return nil, errors.InternalWrapError(err)
}
woc.log.Infof("Created pod: %s (%s)", nodeName, created.Name)
woc.activePods = woc.activePods + 1
woc.activePods++
return created, nil
}

Expand Down

0 comments on commit deae4c6

Please sign in to comment.