Skip to content

Commit

Permalink
fix: Fail DAG templates with variables with invalid dependencies (#4992)
Browse files Browse the repository at this point in the history
Signed-off-by: Huan-Cheng Chang <changhc84@gmail.com>
  • Loading branch information
changhc committed Feb 3, 2021
1 parent a730b4f commit 5047f07
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
36 changes: 36 additions & 0 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,6 +1304,10 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
}
err = validateDAGTaskArgumentDependency(task.Arguments, ancestry)
if err != nil {
return errors.Errorf(errors.CodeBadRequest, "templates.%s.tasks.%s %s", tmpl.Name, task.Name, err.Error())
}
err = validateArguments(fmt.Sprintf("templates.%s.tasks.%s.arguments.", tmpl.Name, task.Name), task.Arguments)
if err != nil {
return err
Expand All @@ -1318,6 +1322,38 @@ func (ctx *templateValidationCtx) validateDAG(scope map[string]interface{}, tmpl
return nil
}

func validateDAGTaskArgumentDependency(arguments wfv1.Arguments, ancestry []string) error {
ancestryMap := make(map[string]struct{}, len(ancestry))
for _, a := range ancestry {
ancestryMap[a] = struct{}{}
}

for _, param := range arguments.Parameters {
if strings.HasPrefix(param.Value.String(), "{{tasks.") {
// All parameter values should have been validated, so
// index 1 should exist.
refTaskName := strings.Split(param.Value.String(), ".")[1]

if _, dependencyExists := ancestryMap[refTaskName]; !dependencyExists {
return errors.Errorf(errors.CodeBadRequest, "missing dependency '%s' for parameter '%s'", refTaskName, param.Name)
}
}
}

for _, artifact := range arguments.Artifacts {
if strings.HasPrefix(artifact.From, "{{tasks.") {
// All parameter values should have been validated, so
// index 1 should exist.
refTaskName := strings.Split(artifact.From, ".")[1]

if _, dependencyExists := ancestryMap[refTaskName]; !dependencyExists {
return errors.Errorf(errors.CodeBadRequest, "missing dependency '%s' for artifact '%s'", refTaskName, artifact.Name)
}
}
}
return nil
}

func validateDAGTargets(tmpl *wfv1.Template, nameToTask map[string]wfv1.DAGTask) error {
if tmpl.DAG.Target == "" {
return nil
Expand Down
4 changes: 2 additions & 2 deletions workflow/validate/validate_dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ spec:
arguments:
parameters:
- name: message
value: "{{tasks.B.outputs.parameters.unresolvable}}"
value: "{{tasks.B.outputs.parameters.hosts}}"
`

var dagResolvedGlobalVar = `
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestDAGVariableResolution(t *testing.T) {

_, err = validate(dagResolvedVarNotAncestor)
if assert.NotNil(t, err) {
assert.Contains(t, err.Error(), "failed to resolve {{tasks.B.outputs.parameters.unresolvable}}")
assert.Contains(t, err.Error(), "templates.unresolved.tasks.C missing dependency 'B' for parameter 'message'")
}

_, err = validate(dagResolvedGlobalVar)
Expand Down

0 comments on commit 5047f07

Please sign in to comment.