This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 60
/
interface.go
128 lines (114 loc) · 4.57 KB
/
interface.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package validators
import (
"fmt"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
c "github.com/lyft/flytepropeller/pkg/compiler/common"
"github.com/lyft/flytepropeller/pkg/compiler/errors"
)
// Validate interface has its required attributes set
func ValidateInterface(nodeID c.NodeID, iface *core.TypedInterface, errs errors.CompileErrors) (
typedInterface *core.TypedInterface, ok bool) {
if iface == nil {
iface = &core.TypedInterface{}
}
// validate InputsRef/OutputsRef parameters required attributes are set
if iface.Inputs != nil && iface.Inputs.Variables != nil {
validateVariables(nodeID, iface.Inputs, errs.NewScope())
} else {
iface.Inputs = &core.VariableMap{Variables: map[string]*core.Variable{}}
}
if iface.Outputs != nil && iface.Outputs.Variables != nil {
validateVariables(nodeID, iface.Outputs, errs.NewScope())
} else {
iface.Outputs = &core.VariableMap{Variables: map[string]*core.Variable{}}
}
return iface, !errs.HasErrors()
}
// Validates underlying interface of a node and returns the effective Typed Interface.
func ValidateUnderlyingInterface(w c.WorkflowBuilder, node c.NodeBuilder, errs errors.CompileErrors) (iface *core.TypedInterface, ok bool) {
switch node.GetCoreNode().GetTarget().(type) {
case *core.Node_TaskNode:
if node.GetTaskNode().GetReferenceId() == nil {
errs.Collect(errors.NewValueRequiredErr(node.GetId(), "TaskNode.ReferenceId"))
} else if task, taskOk := w.GetTask(*node.GetTaskNode().GetReferenceId()); taskOk {
iface = task.GetInterface()
if iface == nil {
// Default value for no interface is nil, initialize an empty interface
iface = &core.TypedInterface{
Inputs: &core.VariableMap{Variables: map[string]*core.Variable{}},
Outputs: &core.VariableMap{Variables: map[string]*core.Variable{}},
}
}
} else {
errs.Collect(errors.NewTaskReferenceNotFoundErr(node.GetId(), node.GetTaskNode().GetReferenceId().String()))
}
case *core.Node_WorkflowNode:
if node.GetWorkflowNode().GetLaunchplanRef().String() == w.GetCoreWorkflow().Template.Id.String() {
iface = w.GetCoreWorkflow().Template.Interface
if iface == nil {
errs.Collect(errors.NewValueRequiredErr(node.GetId(), "WorkflowNode.Interface"))
}
} else if node.GetWorkflowNode().GetLaunchplanRef() != nil {
if launchPlan, launchPlanOk := w.GetLaunchPlan(*node.GetWorkflowNode().GetLaunchplanRef()); launchPlanOk {
inputs := launchPlan.GetExpectedInputs()
if inputs == nil {
errs.Collect(errors.NewValueRequiredErr(node.GetId(), "WorkflowNode.ExpectedInputs"))
}
outputs := launchPlan.GetExpectedOutputs()
if outputs == nil {
errs.Collect(errors.NewValueRequiredErr(node.GetId(), "WorkflowNode.ExpectedOutputs"))
}
// Compute exposed inputs as the union of all required inputs and any input overwritten by the node.
exposedInputs := map[string]*core.Variable{}
for name, p := range inputs.Parameters {
if p.GetRequired() {
exposedInputs[name] = p.Var
} else if _, found := findBindingByVariableName(node.GetInputs(), name); found {
exposedInputs[name] = p.Var
}
// else, the param has a default value and is not being overwritten by the node
}
iface = &core.TypedInterface{
Inputs: &core.VariableMap{
Variables: exposedInputs,
},
Outputs: outputs,
}
} else {
errs.Collect(errors.NewWorkflowReferenceNotFoundErr(
node.GetId(),
fmt.Sprintf("%v", node.GetWorkflowNode().GetLaunchplanRef())))
}
} else if node.GetWorkflowNode().GetSubWorkflowRef() != nil {
if wf, wfOk := w.GetSubWorkflow(*node.GetWorkflowNode().GetSubWorkflowRef()); wfOk {
if wf.Template == nil {
errs.Collect(errors.NewValueRequiredErr(node.GetId(), "WorkflowNode.Template"))
} else {
iface = wf.Template.Interface
if iface == nil {
errs.Collect(errors.NewValueRequiredErr(node.GetId(), "WorkflowNode.Template.Interface"))
}
}
} else {
errs.Collect(errors.NewWorkflowReferenceNotFoundErr(
node.GetId(),
fmt.Sprintf("%v", node.GetWorkflowNode().GetSubWorkflowRef())))
}
} else {
errs.Collect(errors.NewWorkflowReferenceNotFoundErr(
node.GetId(),
fmt.Sprintf("%v/%v", node.GetWorkflowNode().GetLaunchplanRef(), node.GetWorkflowNode().GetSubWorkflowRef())))
}
case *core.Node_BranchNode:
iface, _ = validateBranchInterface(w, node, errs.NewScope())
default:
errs.Collect(errors.NewValueRequiredErr(node.GetId(), "Target"))
}
if iface != nil {
ValidateInterface(node.GetId(), iface, errs.NewScope())
}
if !errs.HasErrors() {
node.SetInterface(iface)
}
return iface, !errs.HasErrors()
}