This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
/
requirements.go
executable file
·86 lines (69 loc) · 3.28 KB
/
requirements.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package compiler
import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytepropeller/pkg/compiler/common"
"github.com/lyft/flytepropeller/pkg/compiler/errors"
)
type TaskIdentifier = common.Identifier
type LaunchPlanRefIdentifier = common.Identifier
// Represents the set of required resources for a given Workflow's execution. All of the resources should be loaded before
// hand and passed to the compiler.
type WorkflowExecutionRequirements struct {
taskIds []TaskIdentifier
launchPlanIds []LaunchPlanRefIdentifier
}
// Gets a slice of required Task ids to load.
func (g WorkflowExecutionRequirements) GetRequiredTaskIds() []TaskIdentifier {
return g.taskIds
}
// Gets a slice of required Workflow ids to load.
func (g WorkflowExecutionRequirements) GetRequiredLaunchPlanIds() []LaunchPlanRefIdentifier {
return g.launchPlanIds
}
// Computes requirements for a given Workflow.
func GetRequirements(fg *core.WorkflowTemplate, subWfs []*core.WorkflowTemplate) (reqs WorkflowExecutionRequirements, err error) {
errs := errors.NewCompileErrors()
compiledSubWfs := toCompiledWorkflows(subWfs...)
index, ok := common.NewWorkflowIndex(compiledSubWfs, errs)
if ok {
return getRequirements(fg, index, true, errs), nil
}
return WorkflowExecutionRequirements{}, errs
}
func getRequirements(fg *core.WorkflowTemplate, subWfs common.WorkflowIndex, followSubworkflows bool,
errs errors.CompileErrors) (reqs WorkflowExecutionRequirements) {
taskIds := common.NewIdentifierSet()
launchPlanIds := common.NewIdentifierSet()
updateWorkflowRequirements(fg, subWfs, taskIds, launchPlanIds, followSubworkflows, errs)
reqs.taskIds = taskIds.List()
reqs.launchPlanIds = launchPlanIds.List()
return
}
// Augments taskIds and launchPlanIds with referenced tasks/workflows within coreWorkflow nodes
func updateWorkflowRequirements(workflow *core.WorkflowTemplate, subWfs common.WorkflowIndex,
taskIds, workflowIds common.IdentifierSet, followSubworkflows bool, errs errors.CompileErrors) {
for _, node := range workflow.Nodes {
updateNodeRequirements(node, subWfs, taskIds, workflowIds, followSubworkflows, errs)
}
}
func updateNodeRequirements(node *flyteNode, subWfs common.WorkflowIndex, taskIds, workflowIds common.IdentifierSet,
followSubworkflows bool, errs errors.CompileErrors) {
if taskN := node.GetTaskNode(); taskN != nil && taskN.GetReferenceId() != nil {
taskIds.Insert(*taskN.GetReferenceId())
} else if workflowNode := node.GetWorkflowNode(); workflowNode != nil {
if workflowNode.GetLaunchplanRef() != nil {
workflowIds.Insert(*workflowNode.GetLaunchplanRef())
} else if workflowNode.GetSubWorkflowRef() != nil && followSubworkflows {
if subWf, found := subWfs[workflowNode.GetSubWorkflowRef().String()]; !found {
errs.Collect(errors.NewWorkflowReferenceNotFoundErr(node.Id, workflowNode.GetSubWorkflowRef().String()))
} else {
updateWorkflowRequirements(subWf.Template, subWfs, taskIds, workflowIds, followSubworkflows, errs)
}
}
} else if branchN := node.GetBranchNode(); branchN != nil {
updateNodeRequirements(branchN.IfElse.Case.ThenNode, subWfs, taskIds, workflowIds, followSubworkflows, errs)
for _, otherCase := range branchN.IfElse.Other {
updateNodeRequirements(otherCase.ThenNode, subWfs, taskIds, workflowIds, followSubworkflows, errs)
}
}
}