-
Notifications
You must be signed in to change notification settings - Fork 4
/
workflow.go
392 lines (353 loc) · 14.4 KB
/
workflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
package domain
import (
"context"
"fmt"
"time"
)
const (
// ErrWorkflowExists describes the error message returned when trying to create a workflow with that already exists.
ErrWorkflowExists = WorkflowErr("workflow already exists")
// ErrWorkflowNotFound describes the error message returned when trying to get a workflow that does not exist.
ErrWorkflowNotFound = WorkflowErr("could not find a workflow with the specified name")
// ErrWorkflowNotAssignedToCodeset describes the error message returned when trying to unassign a workflow from a codeset
// but it is not assigned to the codeset.
ErrWorkflowNotAssignedToCodeset = WorkflowErr("workflow not assigned to codeset")
// ErrCannotDeleteAssignedWorkflow describes the error message returned when trying to delete a workflow that is assigned to a codeset.
ErrCannotDeleteAssignedWorkflow = WorkflowErr("cannot delete workflow, there are codesets assigned to it")
)
const (
// WorkflowIOTypeString represents a workflow input that is of a string type.
WorkflowIOTypeString WorkflowIOType = "string"
// WorkflowIOTypeCodeset represents a workflow input that is a of codeset type.
WorkflowIOTypeCodeset WorkflowIOType = "codeset"
)
// Workflow represents a FuseML workflow.
type Workflow struct {
// CreatedAt is the time the workflow was created.
Created time.Time
// Name is the name of the workflow.
Name string
// Description is the description of the workflow.
Description string
// Inputs is the list of workflow inputs.
Inputs []*WorkflowInput
// Outputs is the list of workflow outputs.
Outputs []*WorkflowOutput
// Steps is the list of workflow steps.
Steps []*WorkflowStep
// AssignedTo is the assignments of the workflow.
AssignedTo *WorkflowAssignment
}
// WorkflowInput represents a input for a FuseML workflow.
type WorkflowInput struct {
// Name is the name of the input.
Name string
// Description is the description of the input.
Description string
// Type is the type of the input.
Type WorkflowIOType
// Default is the default value of the input.
Default string
// Labels is the list of labels for the input.
Labels []string
}
// WorkflowIOType is the type of a workflow input/output.
type WorkflowIOType string
// WorkflowOutput represents the output for a FuseML workflow.
type WorkflowOutput struct {
// Name is the name of the output.
Name string
// Description is the description of the output.
Description string
// Type is the type of the output.
Type WorkflowIOType
}
// WorkflowStep represents a step in a FuseML workflow.
type WorkflowStep struct {
// Name is the name of the step.
Name string
// Image is the name of the image to use for the step.
Image string
// Inputs is the list of inputs for the step.
Inputs []*WorkflowStepInput
// Outputs is the list of outputs for the step.
Outputs []*WorkflowStepOutput
// Extensions is the list of extensions required for the step.
Extensions []*WorkflowStepExtension
// Env is the list of environment variables for the step.
Env []*WorkflowStepEnv
// Resources specify the resources requests and limits for the step.
Resources WorkflowStepResources
}
// WorkflowStepInput represents a input for a FuseML workflow step.
type WorkflowStepInput struct {
// Name is the name of the input.
Name string
// Value is the value of the input.
Value string
// Codeset is the codeset to use for the input.
Codeset *WorkflowStepInputCodeset
}
// WorkflowStepInputCodeset represents a codeset for a FuseML workflow step input.
type WorkflowStepInputCodeset struct {
// Name is the name of the codeset.
Name string
// Path is the path where the codeset will be mounted
Path string
}
// WorkflowStepOutput represents a output for a FuseML workflow step.
type WorkflowStepOutput struct {
// Name is the name of the output.
Name string
// Image is the image generated by the step
Image *WorkflowStepOutputImage
}
// WorkflowStepOutputImage represents a image generated by a FuseML workflow step.
type WorkflowStepOutputImage struct {
// Dockerfile is the path to the Dockerfile used to build the image.
Dockerfile string
// Image is the name of the image.
Name string
}
// WorkflowStepExtension represents the extension requirements for a FuseML workflow step.
type WorkflowStepExtension struct {
// Unique name used to reference this extension requirement
Name string
// Reference extension explicitly by ID
ExtensionID string
// Reference extension by product name
Product string
// Filter extension by version or by semantic version constraints
VersionConstraints string
// Match extensions installed in a given zone
Zone string
// Reference service explicitly by ID
ServiceID string
// Filter by service resource type
ServiceResource string
// Filter by service category
ServiceCategory string
// Extension access - points to the extension endpoint and credentials that
// the extension requirements are (currently) resolved to
ExtensionAccess *ExtensionAccessDescriptor
}
// WorkflowStepEnv represents an environment variable for a FuseML workflow step.
type WorkflowStepEnv struct {
// Name is the name of the environment variable.
Name string
// Value is the value of the environment variable.
Value string
}
// WorkflowStepResources represents the resources requests and limits for a FuseML workflow step.
type WorkflowStepResources struct {
// Requests is a map of resource and its request value.
Requests map[string]string
// Limits is a map of resource and its limit value.
Limits map[string]string
}
// WorkflowRun represents a FuseML workflow run.
type WorkflowRun struct {
// Name is the name of the workflow run.
Name string
// WorkflowRef is the reference to the workflow.
WorkflowRef string
// Inputs is the list of workflow inputs used on a run.
Inputs []*WorkflowRunInput
// Outputs is the list of workflow outputs from a run.
Outputs []*WorkflowRunOutput
// StartTime is the time the workflow run started.
StartTime time.Time
// CompletionTime is the time the workflow run completed.
CompletionTime time.Time
// Status is the status of the workflow run.
Status string
// URL is the URL to the workflow run.
URL string
}
// WorkflowRunInput represents a input from a FuseML workflow run.
type WorkflowRunInput struct {
// Input is the input from the workflow.
Input *WorkflowInput
// Value is the value of the input for a run.
Value string
}
// WorkflowRunOutput represents a output from a FuseML workflow run.
type WorkflowRunOutput struct {
// Output is the output from the workflow.
Output *WorkflowOutput
// Value is the value of the output for a run.
Value string
}
// WorkflowAssignment represents a workflow assignment.
type WorkflowAssignment struct {
// Codesets is the list of codesets that the workflow is assigned to.
Codesets []*CodesetAssignment
}
// WorkflowAssignmentStatus represents the status of a workflow assignment.
type WorkflowAssignmentStatus struct {
// Available is weather the assignment is available.
Available bool
// URL is the URL to the assignment.
URL string
}
// WorkflowRunFilter defines the available filter when listing workflow runs.
type WorkflowRunFilter struct {
// WorkflowName is the name of the workflow to filter by.
WorkflowName *string
// CodesetName is the name of the codeset to filter by.
CodesetName string
// CodesetProject is the name of the codeset project to filter by.
CodesetProject string
// Status is the status of the workflow run to filter by.
Status []string
}
// WorkflowListener defines a listener for a workflow
type WorkflowListener struct {
// Name is the name of the listener.
Name string
// Available is weather the listener is available.
Available bool
// URL is the URL to the listener.
URL string
// DashboardURL is the URL to the dashboard for the listener.
DashboardURL string
}
// CodesetAssignment describes a codeset that has a workflow assigned to it through its webhook ID.
type CodesetAssignment struct {
// Codeset is a reference to the codeset.
Codeset *Codeset
// WebhookID is the ID of the webhook that is used by the workflow assignment.
WebhookID *int64
}
// WorkflowErr are expected errors returned when performing operations on workflows,
type WorkflowErr string
// WorkflowManager describes the interface for a Workflow Manager
type WorkflowManager interface {
// CreateWorkflow creates a new workflow.
CreateWorkflow(ctx context.Context, workflow *Workflow) (*Workflow, error)
// GetWorkflow retrieves a workflow.
GetWorkflow(ctx context.Context, name string) (*Workflow, error)
// GetWorkflows returns a list of workflows.
GetWorkflows(ctx context.Context, name *string) []*Workflow
// DeleteWorkflow deletes a workflow.
DeleteWorkflow(ctx context.Context, name string) error
// AssignToCodeset assigns a workflow to a codeset.
AssignToCodeset(ctx context.Context, name, codesetProject, codesetName string) (*WorkflowListener, *int64, error)
// UnassignFromCodeset removes a workflow assignment from a codeset.
UnassignFromCodeset(ctx context.Context, name, codesetProject, codesetName string) error
// GetAllCodesetAssignments returns all the codeset assignments from all workflows, or a specific one.
GetAllCodesetAssignments(ctx context.Context, name *string) map[string][]*CodesetAssignment
// GetAssignmentStatus returns the status of a workflow assignment.
GetAssignmentStatus(ctx context.Context, name string) *WorkflowAssignmentStatus
// GetWorkflowRuns returns all the workflow runs for a workflow.
GetWorkflowRuns(ctx context.Context, filter *WorkflowRunFilter) ([]*WorkflowRun, error)
}
// WorkflowStore is an interface for workflow stores.
type WorkflowStore interface {
// AddWorkflow adds a workflow to the store.
AddWorkflow(ctx context.Context, w *Workflow) (*Workflow, error)
// GetWorkflow returns a workflow.
GetWorkflow(ctx context.Context, name string) (*Workflow, error)
// ListWorkflows returns a list of workflows.
GetWorkflows(ctx context.Context, name *string) []*Workflow
// DeleteWorkflow deletes a workflow from the store.
DeleteWorkflow(ctx context.Context, name string) error
// AddCodesetAssignment adds a codeset assignment to the store.
AddCodesetAssignment(ctx context.Context, workflowName string, codeset *Codeset, webhook *int64) ([]*CodesetAssignment, error)
// GetCodesetAssignment returns the assignment for a workflow and a codeset.
GetCodesetAssignment(ctx context.Context, workflowName string, codeset *Codeset) (*CodesetAssignment, error)
// GetCodesetAssignments returns the codeset assignments for a workflow.
GetCodesetAssignments(ctx context.Context, workflowName string) []*CodesetAssignment
// GetAllCodesetAssignments returns all the codeset assignments from all workflows, or a specific one.
GetAllCodesetAssignments(ctx context.Context, workflowName *string) map[string][]*CodesetAssignment
// DeleteCodesetAssignment deletes a codeset assignment from the store.
DeleteCodesetAssignment(ctx context.Context, workflowName string, codeset *Codeset) ([]*CodesetAssignment, error)
}
// WorkflowBackend is the interface for the FuseML workflows
type WorkflowBackend interface {
// CreateWorkflow creates a new workflow.
CreateWorkflow(ctx context.Context, workflow *Workflow) error
// DeleteWorkflow deletes a workflow.
DeleteWorkflow(ctx context.Context, workflowName string) error
// CreateWorkflowRun creates a new workflow run.
CreateWorkflowRun(ctx context.Context, workflowName string, codeset *Codeset) error
// GetWorkflowRuns returns a list of workflow runs.
GetWorkflowRuns(ctx context.Context, workflow *Workflow, filter *WorkflowRunFilter) ([]*WorkflowRun, error)
// CreateWorkflowListener creates a new workflow listener.
CreateWorkflowListener(ctx context.Context, workflowName string, timeout time.Duration) (*WorkflowListener, error)
// DeleteWorkflowListener deletes a workflow listener.
DeleteWorkflowListener(ctx context.Context, workflowName string) error
// GetWorkflowListener returns a workflow listener for a workflow.
GetWorkflowListener(ctx context.Context, workflowName string) (*WorkflowListener, error)
}
// AssignToCodeset assigns a workflow to a codeset.
func (w *Workflow) AssignToCodeset(ctx context.Context, codeset *Codeset, webhookID *int64) error {
if codeset == nil {
return fmt.Errorf("codeset is nil")
}
if w.AssignedTo == nil {
w.AssignedTo = &WorkflowAssignment{}
}
if w.AssignedTo.Codesets == nil {
w.AssignedTo.Codesets = []*CodesetAssignment{{Codeset: codeset, WebhookID: webhookID}}
return nil
}
codesetAssignments := w.AssignedTo.Codesets
for _, assignment := range codesetAssignments {
if assignment.Codeset.Name == codeset.Name && assignment.Codeset.Project == codeset.Project {
return nil
}
}
codesetAssignments = append(codesetAssignments, &CodesetAssignment{Codeset: codeset, WebhookID: webhookID})
w.AssignedTo.Codesets = codesetAssignments
return nil
}
// UnassignFromCodeset removes a workflow assignment from a codeset.
func (w *Workflow) UnassignFromCodeset(ctx context.Context, codeset *Codeset) error {
if codeset == nil {
return fmt.Errorf("codeset is nil")
}
if w.AssignedTo == nil || w.AssignedTo.Codesets == nil {
return nil
}
codesetAssignments := w.AssignedTo.Codesets
for i, assignment := range codesetAssignments {
// TODO: Maybe we should implement an equals function for Codeset?
if assignment.Codeset.Name == codeset.Name && assignment.Codeset.Project == codeset.Project {
codesetAssignments = append(codesetAssignments[:i], codesetAssignments[i+1:]...)
break
}
}
if len(codesetAssignments) == 0 {
w.AssignedTo.Codesets = nil
} else {
w.AssignedTo.Codesets = codesetAssignments
}
return nil
}
// GetCodesetAssignments returns the codeset assignments for a workflow.
func (w *Workflow) GetCodesetAssignments(ctx context.Context) []*CodesetAssignment {
if w.AssignedTo == nil || w.AssignedTo.Codesets == nil {
return []*CodesetAssignment{}
}
return w.AssignedTo.Codesets
}
// GetCodesetAssignment returns the assignment for a workflow and a codeset.
func (w *Workflow) GetCodesetAssignment(ctx context.Context, codeset *Codeset) (*CodesetAssignment, error) {
if w.AssignedTo != nil {
for _, assignment := range w.AssignedTo.Codesets {
if assignment.Codeset.Name == codeset.Name && assignment.Codeset.Project == codeset.Project {
return assignment, nil
}
}
}
return nil, ErrWorkflowNotAssignedToCodeset
}
// Error returns the error message
func (e WorkflowErr) Error() string {
return string(e)
}
// String returns the string representation of the Workflow Input/Output type
func (t WorkflowIOType) String() string {
return string(t)
}