This repository has been archived by the owner on Oct 9, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 59
/
iface.go
497 lines (425 loc) · 13.4 KB
/
iface.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
package v1alpha1
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/golang/protobuf/proto"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flytestdlib/storage"
)
// The intention of these interfaces is to decouple the algorithm and usage from the actual CRD definition.
// this would help in ease of changes underneath without affecting the code.
//go:generate mockery -all
var nilJSON, _ = json.Marshal(nil)
type CustomState map[string]interface{}
type WorkflowID = string
type TaskID = string
type NodeID = string
type LaunchPlanRefID = Identifier
type ExecutionID = WorkflowExecutionIdentifier
// NodeKind refers to the type of Node.
type NodeKind string
func (n NodeKind) String() string {
return string(n)
}
type DataReference = storage.DataReference
const (
// TODO Should we default a NodeKindTask to empty? thus we can assume all unspecified nodetypes as task
NodeKindTask NodeKind = "task"
NodeKindBranch NodeKind = "branch" // A Branch node with conditions
NodeKindWorkflow NodeKind = "workflow" // Either an inline workflow or a remote workflow definition
NodeKindStart NodeKind = "start" // Start node is a special node
NodeKindEnd NodeKind = "end"
)
// NodePhase indicates the current state of the Node (phase). A node progresses through these states
type NodePhase int
const (
NodePhaseNotYetStarted NodePhase = iota
NodePhaseQueued
NodePhaseRunning
NodePhaseFailing
NodePhaseSucceeding
NodePhaseSucceeded
NodePhaseFailed
NodePhaseSkipped
NodePhaseRetryableFailure
NodePhaseTimingOut
NodePhaseTimedOut
NodePhaseDynamicRunning
NodePhaseRecovered
)
func (p NodePhase) String() string {
switch p {
case NodePhaseNotYetStarted:
return "NotYetStarted"
case NodePhaseQueued:
return "Queued"
case NodePhaseRunning:
return "Running"
case NodePhaseTimingOut:
return "NodePhaseTimingOut"
case NodePhaseTimedOut:
return "NodePhaseTimedOut"
case NodePhaseSucceeding:
return "Succeeding"
case NodePhaseSucceeded:
return "Succeeded"
case NodePhaseFailed:
return "Failed"
case NodePhaseFailing:
return "Failing"
case NodePhaseSkipped:
return "Skipped"
case NodePhaseRetryableFailure:
return "RetryableFailure"
case NodePhaseDynamicRunning:
return "DynamicRunning"
case NodePhaseRecovered:
return "NodePhaseRecovered"
}
return "Unknown"
}
// WorkflowPhase indicates current state of the Workflow.
type WorkflowPhase int
const (
WorkflowPhaseReady WorkflowPhase = iota
WorkflowPhaseRunning
WorkflowPhaseSucceeding
WorkflowPhaseSuccess
WorkflowPhaseFailing
WorkflowPhaseFailed
WorkflowPhaseAborted
// WorkflowPhaseHandlingFailureNode is the phase the workflow will enter when a failure is detected in the workflow,
// the workflow has finished cleaning up (aborted running nodes... etc.) and a failure node is declared in the
// workflow spec. We enter this explicit phase so as to ensure we do not attempt to repeatedly clean up old nodes
// when handling a workflow event which might yield to seemingly random failures. This phase ensure we are handling,
// and only so, the failure node until it's done executing or it fails itself.
// If a failure node fails to execute (a real possibility), the final failure output of the workflow will only include
// its failure reason. In other words, its failure will mask the original failure for the workflow. It's imperative
// failure nodes should be very simple, very resilient and very well tested.
WorkflowPhaseHandlingFailureNode
)
func (p WorkflowPhase) String() string {
switch p {
case WorkflowPhaseReady:
return "Ready"
case WorkflowPhaseRunning:
return "Running"
case WorkflowPhaseSuccess:
return "Succeeded"
case WorkflowPhaseFailed:
return "Failed"
case WorkflowPhaseFailing:
return "Failing"
case WorkflowPhaseSucceeding:
return "Succeeding"
case WorkflowPhaseAborted:
return "Aborted"
case WorkflowPhaseHandlingFailureNode:
return "HandlingFailureNode"
}
return "Unknown"
}
// A branchNode has its own Phases. These are used by the child nodes to ensure that the branch node is in the right state
type BranchNodePhase int
const (
BranchNodeNotYetEvaluated BranchNodePhase = iota
BranchNodeSuccess
BranchNodeError
)
func (b BranchNodePhase) String() string {
switch b {
case BranchNodeNotYetEvaluated:
return "NotYetEvaluated"
case BranchNodeSuccess:
return "BranchEvalSuccess"
case BranchNodeError:
return "BranchEvalFailed"
}
return "Undefined"
}
// Failure Handling Policy
type WorkflowOnFailurePolicy core.WorkflowMetadata_OnFailurePolicy
func (in WorkflowOnFailurePolicy) MarshalJSON() ([]byte, error) {
return json.Marshal(proto.EnumName(core.WorkflowMetadata_OnFailurePolicy_name, int32(in)))
}
func (in *WorkflowOnFailurePolicy) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return fmt.Errorf("WorkflowOnFailurePolicy should be a string, got %s", data)
}
var err error
*in, err = WorkflowOnFailurePolicyString(s)
return err
}
func WorkflowOnFailurePolicyString(policy string) (WorkflowOnFailurePolicy, error) {
if val, found := core.WorkflowMetadata_OnFailurePolicy_value[policy]; found {
return WorkflowOnFailurePolicy(val), nil
}
return WorkflowOnFailurePolicy(0), fmt.Errorf("%s does not belong to WorkflowOnFailurePolicy values", policy)
}
// TaskType is a dynamic enumeration, that is defined by configuration
type TaskType = string
// Interface for a Task that can be executed
type ExecutableTask interface {
TaskType() TaskType
CoreTask() *core.TaskTemplate
}
// Interface for the executable If block
type ExecutableIfBlock interface {
GetCondition() *core.BooleanExpression
GetThenNode() *NodeID
}
// Interface for branch node status. This is the mutable API for a branch node
type ExecutableBranchNodeStatus interface {
GetPhase() BranchNodePhase
GetFinalizedNode() *NodeID
}
type MutableBranchNodeStatus interface {
Mutable
ExecutableBranchNodeStatus
SetBranchNodeError()
SetBranchNodeSuccess(id NodeID)
}
// Interface for dynamic node status.
type ExecutableDynamicNodeStatus interface {
GetDynamicNodePhase() DynamicNodePhase
GetDynamicNodeReason() string
GetExecutionError() *core.ExecutionError
}
type MutableDynamicNodeStatus interface {
Mutable
ExecutableDynamicNodeStatus
SetDynamicNodePhase(phase DynamicNodePhase)
SetDynamicNodeReason(reason string)
SetExecutionError(executionError *core.ExecutionError)
}
// Interface for Branch node. All the methods are purely read only except for the GetExecutionStatus.
// p returns ExecutableBranchNodeStatus, which permits some mutations
type ExecutableBranchNode interface {
GetIf() ExecutableIfBlock
GetElse() *NodeID
GetElseIf() []ExecutableIfBlock
GetElseFail() *core.Error
}
type ExecutableWorkflowNodeStatus interface {
GetWorkflowNodePhase() WorkflowNodePhase
GetExecutionError() *core.ExecutionError
}
type MutableWorkflowNodeStatus interface {
Mutable
ExecutableWorkflowNodeStatus
SetWorkflowNodePhase(phase WorkflowNodePhase)
SetExecutionError(executionError *core.ExecutionError)
}
type Mutable interface {
IsDirty() bool
}
type MutableNodeStatus interface {
Mutable
// Mutation API's
SetDataDir(DataReference)
SetOutputDir(d DataReference)
SetParentNodeID(n *NodeID)
SetParentTaskID(t *core.TaskExecutionIdentifier)
UpdatePhase(phase NodePhase, occurredAt metav1.Time, reason string, err *core.ExecutionError)
IncrementAttempts() uint32
IncrementSystemFailures() uint32
SetCached()
ResetDirty()
GetBranchStatus() MutableBranchNodeStatus
GetOrCreateBranchStatus() MutableBranchNodeStatus
GetWorkflowStatus() MutableWorkflowNodeStatus
GetOrCreateWorkflowStatus() MutableWorkflowNodeStatus
ClearWorkflowStatus()
GetOrCreateTaskStatus() MutableTaskNodeStatus
GetTaskStatus() MutableTaskNodeStatus
ClearTaskStatus()
GetOrCreateDynamicNodeStatus() MutableDynamicNodeStatus
GetDynamicNodeStatus() MutableDynamicNodeStatus
ClearDynamicNodeStatus()
ClearLastAttemptStartedAt()
ClearSubNodeStatus()
}
type ExecutionTimeInfo interface {
GetStoppedAt() *metav1.Time
GetStartedAt() *metav1.Time
GetLastUpdatedAt() *metav1.Time
}
// ExecutableNodeStatus interface for a Node p. This provides a mutable API.
type ExecutableNodeStatus interface {
NodeStatusGetter
MutableNodeStatus
NodeStatusVisitor
ExecutionTimeInfo
GetPhase() NodePhase
GetQueuedAt() *metav1.Time
GetLastAttemptStartedAt() *metav1.Time
GetParentNodeID() *NodeID
GetParentTaskID() *core.TaskExecutionIdentifier
GetDataDir() DataReference
GetOutputDir() DataReference
GetMessage() string
GetExecutionError() *core.ExecutionError
GetAttempts() uint32
GetSystemFailures() uint32
GetWorkflowNodeStatus() ExecutableWorkflowNodeStatus
GetTaskNodeStatus() ExecutableTaskNodeStatus
IsCached() bool
}
type ExecutableSubWorkflowNodeStatus interface {
GetPhase() WorkflowPhase
}
type MutableSubWorkflowNodeStatus interface {
Mutable
ExecutableSubWorkflowNodeStatus
SetPhase(phase WorkflowPhase)
}
type ExecutableTaskNodeStatus interface {
GetPhase() int
GetPhaseVersion() uint32
GetPluginState() []byte
GetPluginStateVersion() uint32
GetBarrierClockTick() uint32
GetLastPhaseUpdatedAt() time.Time
}
type MutableTaskNodeStatus interface {
Mutable
ExecutableTaskNodeStatus
SetPhase(phase int)
SetLastPhaseUpdatedAt(updatedAt time.Time)
SetPhaseVersion(version uint32)
SetPluginState([]byte)
SetPluginStateVersion(uint32)
SetBarrierClockTick(tick uint32)
}
// ExecutableWorkflowNode is an interface for a Child Workflow Node
type ExecutableWorkflowNode interface {
GetLaunchPlanRefID() *LaunchPlanRefID
GetSubWorkflowRef() *WorkflowID
}
type BaseNode interface {
GetID() NodeID
GetKind() NodeKind
}
// ExecutableNode is an interface for the Executable Node
type ExecutableNode interface {
BaseNode
IsStartNode() bool
IsEndNode() bool
GetTaskID() *TaskID
GetBranchNode() ExecutableBranchNode
GetWorkflowNode() ExecutableWorkflowNode
GetOutputAlias() []Alias
GetInputBindings() []*Binding
GetResources() *v1.ResourceRequirements
GetConfig() *v1.ConfigMap
GetRetryStrategy() *RetryStrategy
GetExecutionDeadline() *time.Duration
GetActiveDeadline() *time.Duration
IsInterruptible() *bool
GetName() string
}
// ExecutableWorkflowStatus is an interface for the Workflow p. This is the mutable portion for a Workflow
type ExecutableWorkflowStatus interface {
NodeStatusGetter
ExecutionTimeInfo
UpdatePhase(p WorkflowPhase, msg string, err *core.ExecutionError)
GetPhase() WorkflowPhase
IsTerminated() bool
GetMessage() string
GetExecutionError() *core.ExecutionError
SetDataDir(DataReference)
GetDataDir() DataReference
GetOutputReference() DataReference
SetOutputReference(reference DataReference)
IncFailedAttempts()
SetMessage(msg string)
ConstructNodeDataDir(ctx context.Context, name NodeID) (storage.DataReference, error)
}
type NodeGetter interface {
GetNode(nodeID NodeID) (ExecutableNode, bool)
}
type BaseWorkflow interface {
NodeGetter
StartNode() ExecutableNode
GetID() WorkflowID
// FromNode returns all nodes that can be reached directly
// from the node with the given unique name.
FromNode(name NodeID) ([]NodeID, error)
ToNode(name NodeID) ([]NodeID, error)
}
type BaseWorkflowWithStatus interface {
BaseWorkflow
NodeStatusGetter
}
// ExecutableSubWorkflow interface captures the methods available on any workflow (top level or child). The Meta section is available
// only for the top level workflow
type ExecutableSubWorkflow interface {
BaseWorkflow
GetOutputBindings() []*Binding
GetOnFailureNode() ExecutableNode
GetNodes() []NodeID
GetConnections() *Connections
GetOutputs() *OutputVarMap
GetOnFailurePolicy() WorkflowOnFailurePolicy
}
// Meta provides an interface to retrieve labels, annotations and other concepts that are declared only once
// for the top level workflow
type Meta interface {
GetExecutionID() ExecutionID
GetK8sWorkflowID() types.NamespacedName
GetOwnerReference() metav1.OwnerReference
GetNamespace() string
GetCreationTimestamp() metav1.Time
GetAnnotations() map[string]string
GetLabels() map[string]string
GetName() string
GetServiceAccountName() string
GetSecurityContext() core.SecurityContext
IsInterruptible() bool
GetEventVersion() EventVersion
GetRawOutputDataConfig() RawOutputDataConfig
}
type TaskDetailsGetter interface {
GetTask(id TaskID) (ExecutableTask, error)
}
type SubWorkflowGetter interface {
FindSubWorkflow(subID WorkflowID) ExecutableSubWorkflow
}
type MetaExtended interface {
Meta
TaskDetailsGetter
SubWorkflowGetter
GetExecutionStatus() ExecutableWorkflowStatus
}
// A Top level Workflow is a combination of Meta and an ExecutableSubWorkflow
type ExecutableWorkflow interface {
ExecutableSubWorkflow
MetaExtended
NodeStatusGetter
GetExecutionConfig() ExecutionConfig
}
type NodeStatusGetter interface {
GetNodeExecutionStatus(ctx context.Context, id NodeID) ExecutableNodeStatus
}
type NodeStatusMap = map[NodeID]ExecutableNodeStatus
type NodeStatusVisitFn = func(node NodeID, status ExecutableNodeStatus)
type NodeStatusVisitor interface {
VisitNodeStatuses(visitor NodeStatusVisitFn)
}
// Simple callback that can be used to indicate that the workflow with WorkflowID should be re-enqueued for examination.
type EnqueueWorkflow func(workflowID WorkflowID)
func GetOutputsFile(outputDir DataReference) DataReference {
return outputDir + "/outputs.pb"
}
func GetInputsFile(inputDir DataReference) DataReference {
return inputDir + "/inputs.pb"
}
func GetDeckFile(inputDir DataReference) DataReference {
return inputDir + "/deck.html"
}