/
workflow.go
472 lines (421 loc) · 13.5 KB
/
workflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package workflow declaratively defines computation graphs that support
// automatic parallelization, persistance, and monitoring.
//
// Workflows are a set of tasks that produce and consume Values. Tasks don't
// run until the workflow is started, so Values represent data that doesn't
// exist yet, and can't be used directly. Each value has a dynamic type, which
// must match its uses.
//
// To wrap an existing Go object in a Value, use Constant. To define a
// parameter that will be set when the workflow is started, use Parameter.
// To read a task's return value, register it as an Output, and it will be
// returned from Run. An arbitrary number of Values of the same type can
// be combined with Slice.
//
// Each task has a set of input Values, and returns a single output Value.
// Calling Task defines a task that will run a Go function when it runs. That
// function must take a *TaskContext or context.Context, followed by arguments
// corresponding to the dynamic type of the Values passed to it. The TaskContext
// can be used as a normal Context, and also supports unstructured logging.
//
// Once a Definition is complete, call Start to set its parameters and
// instantiate it into a Workflow. Call Run to execute the workflow until
// completion.
package workflow
import (
"context"
"encoding/json"
"fmt"
"reflect"
"sync"
"github.com/google/uuid"
)
// New creates a new workflow definition.
func New() *Definition {
return &Definition{
parameterNames: map[string]struct{}{},
tasks: map[string]*taskDefinition{},
outputs: map[string]*taskResult{},
}
}
// A Definition defines the structure of a workflow.
type Definition struct {
parameterNames map[string]struct{}
tasks map[string]*taskDefinition
outputs map[string]*taskResult
}
// A Value is a piece of data that will be produced or consumed when a task
// runs. It cannot be read directly.
type Value interface {
typ() reflect.Type
value(*Workflow) reflect.Value
deps() []*taskDefinition
}
// Parameter creates a Value that is filled in at workflow creation time.
func (d *Definition) Parameter(name string) Value {
d.parameterNames[name] = struct{}{}
return &workflowParameter{name: name}
}
// ParameterNames returns the names of all parameters associated with
// the Definition.
func (d *Definition) ParameterNames() []string {
var names []string
for n := range d.parameterNames {
names = append(names, n)
}
return names
}
type workflowParameter struct {
name string
}
func (wp *workflowParameter) typ() reflect.Type {
return reflect.TypeOf("")
}
func (wp *workflowParameter) value(w *Workflow) reflect.Value {
return reflect.ValueOf(w.params[wp.name])
}
func (wp *workflowParameter) deps() []*taskDefinition {
return nil
}
// Constant creates a Value from an existing object.
func (d *Definition) Constant(value interface{}) Value {
return &constant{reflect.ValueOf(value)}
}
type constant struct {
v reflect.Value
}
func (c *constant) typ() reflect.Type { return c.v.Type() }
func (c *constant) value(_ *Workflow) reflect.Value { return c.v }
func (c *constant) deps() []*taskDefinition { return nil }
// Slice combines multiple Values of the same type into a Value containing
// a slice of that type.
func (d *Definition) Slice(vs []Value) Value {
if len(vs) == 0 {
return &slice{}
}
typ := vs[0].typ()
for _, v := range vs[1:] {
if v.typ() != typ {
panic(fmt.Errorf("mismatched value types in Slice: %v vs. %v", v.typ(), typ))
}
}
return &slice{elt: typ, vals: vs}
}
type slice struct {
elt reflect.Type
vals []Value
}
func (s *slice) typ() reflect.Type {
return reflect.SliceOf(s.elt)
}
func (s *slice) value(w *Workflow) reflect.Value {
value := reflect.MakeSlice(reflect.SliceOf(s.elt), len(s.vals), len(s.vals))
for i, v := range s.vals {
value.Index(i).Set(v.value(w))
}
return value
}
func (s *slice) deps() []*taskDefinition {
var result []*taskDefinition
for _, v := range s.vals {
result = append(result, v.deps()...)
}
return result
}
// Output registers a Value as a workflow output which will be returned when
// the workflow finishes.
func (d *Definition) Output(name string, v Value) {
tr, ok := v.(*taskResult)
if !ok {
panic(fmt.Errorf("output must be a task result"))
}
d.outputs[name] = tr
}
// Task adds a task to the workflow definition. It can take any number of
// arguments, and returns one output. name must uniquely identify the task in
// the workflow.
// f must be a function that takes a context.Context argument, followed by one
// argument for each of args, corresponding to the Value's dynamic type.
// It must return two values, the first of which will be returned as its Value,
// and an error that will be used by the workflow engine. See the package
// documentation for examples.
func (d *Definition) Task(name string, f interface{}, args ...Value) Value {
if d.tasks[name] != nil {
panic(fmt.Errorf("task %q already exists in the workflow", name))
}
ftyp := reflect.ValueOf(f).Type()
if ftyp.Kind() != reflect.Func {
panic(fmt.Errorf("%v is not a function", f))
}
if ftyp.NumIn()-1 != len(args) {
panic(fmt.Errorf("%v takes %v non-Context arguments, but was passed %v", f, ftyp.NumIn()-1, len(args)))
}
if !reflect.TypeOf((*TaskContext)(nil)).AssignableTo(ftyp.In(0)) {
panic(fmt.Errorf("the first argument of %v must be a context.Context or *TaskContext, is %v", f, ftyp.In(0)))
}
for i, arg := range args {
if !arg.typ().AssignableTo(ftyp.In(i + 1)) {
panic(fmt.Errorf("argument %v to %v is %v, but was passed %v", i, f, ftyp.In(i+1), arg.typ()))
}
}
if ftyp.NumOut() != 2 {
panic(fmt.Errorf("%v returns %v results, must return 2", f, ftyp.NumOut()))
}
if ftyp.Out(1) != reflect.TypeOf((*error)(nil)).Elem() {
panic(fmt.Errorf("%v's second return value must be error, is %v", f, ftyp.Out(1)))
}
td := &taskDefinition{name: name, args: args, f: f}
d.tasks[name] = td
return &taskResult{task: td}
}
// A TaskContext is a context.Context, plus workflow-related features.
type TaskContext struct {
context.Context
Logger
}
// A Listener is used to notify the workflow host of state changes, for display
// and persistence.
type Listener interface {
// TaskStateChanged is called when the state of a task changes.
// state is safe to store or modify.
TaskStateChanged(workflowID uuid.UUID, taskID string, state *TaskState) error
// Logger is called to obtain a Logger for a particular task.
Logger(workflowID uuid.UUID, taskID string) Logger
}
// TaskState contains the state of a task in a running workflow. Once Finished
// is true, either Result or Error will be populated.
type TaskState struct {
Name string
Finished bool
Result interface{}
SerializedResult []byte
Error string
}
// WorkflowState contains the shallow state of a running workflow.
type WorkflowState struct {
ID uuid.UUID
Params map[string]string
}
// A Logger is a debug logger passed to a task implementation.
type Logger interface {
Printf(format string, v ...interface{})
}
type taskDefinition struct {
name string
args []Value
f interface{}
}
type taskResult struct {
task *taskDefinition
}
func (tr *taskResult) typ() reflect.Type {
return reflect.ValueOf(tr.task.f).Type().Out(0)
}
func (tr *taskResult) value(w *Workflow) reflect.Value {
return reflect.ValueOf(w.tasks[tr.task].result)
}
func (tr *taskResult) deps() []*taskDefinition {
return []*taskDefinition{tr.task}
}
// A Workflow is an instantiated workflow instance, ready to run.
type Workflow struct {
ID uuid.UUID
def *Definition
params map[string]string
tasks map[*taskDefinition]*taskState
}
type taskState struct {
def *taskDefinition
w *Workflow
started bool
finished bool
result interface{}
serializedResult []byte
err error
}
func (t *taskState) args() ([]reflect.Value, bool) {
var args []reflect.Value
for _, arg := range t.def.args {
for _, dep := range arg.deps() {
if depState, ok := t.w.tasks[dep]; !ok || !depState.finished || depState.err != nil {
return nil, false
}
}
args = append(args, arg.value(t.w))
}
return args, true
}
func (t *taskState) toExported() *TaskState {
state := &TaskState{
Name: t.def.name,
Finished: t.finished,
Result: t.result,
SerializedResult: append([]byte(nil), t.serializedResult...),
}
if t.err != nil {
state.Error = t.err.Error()
}
return state
}
// Start instantiates a workflow with the given parameters.
func Start(def *Definition, params map[string]string) (*Workflow, error) {
w := &Workflow{
ID: uuid.New(),
def: def,
params: params,
tasks: map[*taskDefinition]*taskState{},
}
if err := w.validate(); err != nil {
return nil, err
}
for _, taskDef := range def.tasks {
w.tasks[taskDef] = &taskState{def: taskDef, w: w}
}
return w, nil
}
func (w *Workflow) validate() error {
used := map[*taskDefinition]bool{}
for _, taskDef := range w.def.tasks {
for _, arg := range taskDef.args {
for _, argDep := range arg.deps() {
used[argDep] = true
}
}
}
for _, output := range w.def.outputs {
used[output.task] = true
}
for _, task := range w.def.tasks {
if !used[task] {
return fmt.Errorf("task %v is not referenced and should be deleted", task.name)
}
}
return nil
}
// Resume restores a workflow from stored state. Tasks that had not finished
// will be restarted, but tasks that finished in errors will not be retried.
//
// The host must create the WorkflowState. TaskStates should be saved from
// listener callbacks, but for ease of storage, their Result field does not
// need to be populated.
func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error) {
w := &Workflow{
ID: state.ID,
def: def,
params: state.Params,
tasks: map[*taskDefinition]*taskState{},
}
if err := w.validate(); err != nil {
return nil, err
}
for _, taskDef := range def.tasks {
tState, ok := taskStates[taskDef.name]
if !ok {
return nil, fmt.Errorf("task state for %q not found", taskDef.name)
}
state := &taskState{
def: taskDef,
w: w,
started: tState.Finished, // Can't resume tasks, so either it's new or done.
finished: tState.Finished,
serializedResult: tState.SerializedResult,
}
if state.serializedResult != nil {
ptr := reflect.New(reflect.ValueOf(taskDef.f).Type().Out(0))
if err := json.Unmarshal(tState.SerializedResult, ptr.Interface()); err != nil {
return nil, fmt.Errorf("failed to unmarshal result of %v: %v", taskDef.name, err)
}
state.result = ptr.Elem().Interface()
}
if tState.Error != "" {
state.err = fmt.Errorf("serialized error: %v", tState.Error) // untyped, but hopefully that doesn't matter.
}
w.tasks[taskDef] = state
}
return w, nil
}
// Run runs a workflow to completion or quiescence and returns its outputs.
// listener.TaskStateChanged will be called when each task starts and
// finishes. It should be used only for monitoring and persistence purposes -
// to read task results, register Outputs.
func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]interface{}, error) {
if listener == nil {
listener = &defaultListener{}
}
var running sync.WaitGroup
defer running.Wait()
stateChan := make(chan taskState, 2*len(w.def.tasks))
for {
// If we have all the outputs, the workflow is done.
outValues := map[string]interface{}{}
for outName, outDef := range w.def.outputs {
if task := w.tasks[outDef.task]; task.finished && task.err == nil {
outValues[outName] = task.result
}
}
if len(outValues) == len(w.def.outputs) {
return outValues, nil
}
// Start any idle tasks whose dependencies are all done.
for _, task := range w.tasks {
if task.started {
continue
}
in, ready := task.args()
if !ready {
continue
}
task.started = true
listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
running.Add(1)
go func(task taskState) {
stateChan <- w.runTask(ctx, listener, task, in)
running.Done()
}(*task)
}
// Exit if we've run everything we can given errors.
running := 0
for _, task := range w.tasks {
if task.started && !task.finished {
running++
}
}
if running == 0 {
return nil, fmt.Errorf("workflow has progressed as far as it can")
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case state := <-stateChan:
w.tasks[state.def] = &state
listener.TaskStateChanged(w.ID, state.def.name, state.toExported())
}
}
}
func (w *Workflow) runTask(ctx context.Context, listener Listener, state taskState, args []reflect.Value) taskState {
tctx := &TaskContext{Context: ctx, Logger: listener.Logger(w.ID, state.def.name)}
in := append([]reflect.Value{reflect.ValueOf(tctx)}, args...)
out := reflect.ValueOf(state.def.f).Call(in)
var err error
if !out[1].IsNil() {
err = out[1].Interface().(error)
}
state.finished = true
state.result, state.err = out[0].Interface(), err
if err == nil {
state.serializedResult, state.err = json.Marshal(state.result)
}
return state
}
type defaultListener struct{}
func (s *defaultListener) TaskStateChanged(_ uuid.UUID, _ string, _ *TaskState) error {
return nil
}
func (s *defaultListener) Logger(_ uuid.UUID, task string) Logger {
return &defaultLogger{}
}
type defaultLogger struct{}
func (l *defaultLogger) Printf(format string, v ...interface{}) {}