This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
/
predicate.go
130 lines (107 loc) · 4.54 KB
/
predicate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package nodes
import (
"context"
"time"
"github.com/lyft/flytestdlib/logger"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/nodes/errors"
)
// Special enum to indicate if the node under consideration is ready to be executed or should be skipped
type PredicatePhase int
const (
// Indicates node is not yet ready to be executed
PredicatePhaseNotReady PredicatePhase = iota
// Indicates node is ready to be executed - execution should proceed
PredicatePhaseReady
// Indicates that the node execution should be skipped as one of its parents was skipped or the branch was not taken
PredicatePhaseSkip
// Indicates failure during Predicate check
PredicatePhaseUndefined
)
func (p PredicatePhase) String() string {
switch p {
case PredicatePhaseNotReady:
return "NotReady"
case PredicatePhaseReady:
return "Ready"
case PredicatePhaseSkip:
return "Skip"
}
return "undefined"
}
func CanExecute(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.BaseNode) (PredicatePhase, error) {
nodeID := node.GetID()
if nodeID == v1alpha1.StartNodeID {
logger.Debugf(ctx, "Start Node id is assumed to be ready.")
return PredicatePhaseReady, nil
}
nodeStatus := w.GetNodeExecutionStatus(ctx, nodeID)
parentNodeID := nodeStatus.GetParentNodeID()
upstreamNodes, ok := w.GetConnections().UpstreamEdges[nodeID]
if !ok {
return PredicatePhaseUndefined, errors.Errorf(errors.BadSpecificationError, nodeID, "Unable to find upstream nodes for Node")
}
skipped := false
for _, upstreamNodeID := range upstreamNodes {
upstreamNodeStatus := w.GetNodeExecutionStatus(ctx, upstreamNodeID)
if upstreamNodeStatus.IsDirty() {
return PredicatePhaseNotReady, nil
}
if parentNodeID != nil && *parentNodeID == upstreamNodeID {
upstreamNode, ok := w.GetNode(upstreamNodeID)
if !ok {
return PredicatePhaseUndefined, errors.Errorf(errors.BadSpecificationError, nodeID, "Upstream node [%v] of node [%v] not defined", upstreamNodeID, nodeID)
}
// This only happens if current node is the child node of a branch node
if upstreamNode.GetBranchNode() == nil || upstreamNodeStatus.GetOrCreateBranchStatus().GetPhase() != v1alpha1.BranchNodeSuccess {
logger.Debugf(ctx, "Branch sub node is expected to have parent branch node in succeeded state")
return PredicatePhaseUndefined, errors.Errorf(errors.IllegalStateError, nodeID, "Upstream node [%v] is set as parent, but is not a branch node of [%v] or in illegal state.", upstreamNodeID, nodeID)
}
continue
}
if upstreamNodeStatus.GetPhase() == v1alpha1.NodePhaseSkipped {
skipped = true
} else if upstreamNodeStatus.GetPhase() != v1alpha1.NodePhaseSucceeded {
return PredicatePhaseNotReady, nil
}
}
if skipped {
return PredicatePhaseSkip, nil
}
return PredicatePhaseReady, nil
}
func GetParentNodeMaxEndTime(ctx context.Context, w v1alpha1.ExecutableWorkflow, node v1alpha1.BaseNode) (t v1.Time, err error) {
zeroTime := v1.NewTime(time.Time{})
nodeID := node.GetID()
if nodeID == v1alpha1.StartNodeID {
logger.Debugf(ctx, "Start Node id is assumed to be ready.")
return zeroTime, nil
}
nodeStatus := w.GetNodeExecutionStatus(ctx, node.GetID())
parentNodeID := nodeStatus.GetParentNodeID()
upstreamNodes, ok := w.GetConnections().UpstreamEdges[nodeID]
if !ok {
return zeroTime, errors.Errorf(errors.BadSpecificationError, nodeID, "Unable to find upstream nodes for Node")
}
var latest v1.Time
for _, upstreamNodeID := range upstreamNodes {
upstreamNodeStatus := w.GetNodeExecutionStatus(ctx, upstreamNodeID)
if parentNodeID != nil && *parentNodeID == upstreamNodeID {
upstreamNode, ok := w.GetNode(upstreamNodeID)
if !ok {
return zeroTime, errors.Errorf(errors.BadSpecificationError, nodeID, "Upstream node [%v] of node [%v] not defined", upstreamNodeID, nodeID)
}
// This only happens if current node is the child node of a branch node
if upstreamNode.GetBranchNode() == nil || upstreamNodeStatus.GetOrCreateBranchStatus().GetPhase() != v1alpha1.BranchNodeSuccess {
logger.Debugf(ctx, "Branch sub node is expected to have parent branch node in succeeded state")
return zeroTime, errors.Errorf(errors.IllegalStateError, nodeID, "Upstream node [%v] is set as parent, but is not a branch node of [%v] or in illegal state.", upstreamNodeID, nodeID)
}
continue
}
if stoppedAt := upstreamNodeStatus.GetStoppedAt(); stoppedAt != nil && stoppedAt.Unix() > latest.Unix() {
latest = *upstreamNodeStatus.GetStoppedAt()
}
}
return latest, nil
}