This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
/
node_state_manager.go
95 lines (80 loc) · 2.35 KB
/
node_state_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package nodes
import (
"context"
pluginCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler"
)
type nodeStateManager struct {
nodeStatus v1alpha1.ExecutableNodeStatus
t *handler.TaskNodeState
b *handler.BranchNodeState
d *handler.DynamicNodeState
w *handler.WorkflowNodeState
}
func (n *nodeStateManager) PutTaskNodeState(s handler.TaskNodeState) error {
n.t = &s
return nil
}
func (n *nodeStateManager) PutBranchNode(s handler.BranchNodeState) error {
n.b = &s
return nil
}
func (n *nodeStateManager) PutDynamicNodeState(s handler.DynamicNodeState) error {
n.d = &s
return nil
}
func (n *nodeStateManager) PutWorkflowNodeState(s handler.WorkflowNodeState) error {
n.w = &s
return nil
}
func (n nodeStateManager) GetTaskNodeState() handler.TaskNodeState {
tn := n.nodeStatus.GetTaskNodeStatus()
if tn != nil {
return handler.TaskNodeState{
PluginPhase: pluginCore.Phase(tn.GetPhase()),
PluginPhaseVersion: tn.GetPhaseVersion(),
PluginStateVersion: tn.GetPluginStateVersion(),
PluginState: tn.GetPluginState(),
BarrierClockTick: tn.GetBarrierClockTick(),
LastPhaseUpdatedAt: tn.GetLastPhaseUpdatedAt(),
}
}
return handler.TaskNodeState{}
}
func (n nodeStateManager) GetBranchNode() handler.BranchNodeState {
bn := n.nodeStatus.GetBranchStatus()
bs := handler.BranchNodeState{}
if bn != nil {
bs.Phase = bn.GetPhase()
bs.FinalizedNodeID = bn.GetFinalizedNode()
}
return bs
}
func (n nodeStateManager) GetDynamicNodeState() handler.DynamicNodeState {
dn := n.nodeStatus.GetDynamicNodeStatus()
ds := handler.DynamicNodeState{}
if dn != nil {
ds.Phase = dn.GetDynamicNodePhase()
ds.Reason = dn.GetDynamicNodeReason()
}
return ds
}
func (n nodeStateManager) GetWorkflowNodeState() handler.WorkflowNodeState {
wn := n.nodeStatus.GetWorkflowNodeStatus()
ws := handler.WorkflowNodeState{}
if wn != nil {
ws.Phase = wn.GetWorkflowNodePhase()
}
return ws
}
func (n nodeStateManager) clearNodeStatus() {
n.t = nil
n.b = nil
n.d = nil
n.w = nil
n.nodeStatus.ClearLastAttemptStartedAt()
}
func newNodeStateManager(_ context.Context, status v1alpha1.ExecutableNodeStatus) *nodeStateManager {
return &nodeStateManager{nodeStatus: status}
}