/
workflow.go
85 lines (72 loc) · 1.81 KB
/
workflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package modules
import (
"context"
"github.com/pkg/errors"
)
// errors
var (
ErrUnknownModule = errors.New("unknown module")
)
// Workflow -
type Workflow struct {
modules map[string]Module
}
// NewWorkflow -
func NewWorkflow(modules ...Module) *Workflow {
names := make(map[string]Module)
for i := range modules {
names[modules[i].Name()] = modules[i]
}
return &Workflow{
modules: names,
}
}
// Add - adds module to workflow
func (wf *Workflow) Add(module Module) error {
return wf.AddWithName(module, module.Name())
}
// AddWithName - adds module to workflow with custom name
func (wf *Workflow) AddWithName(module Module, name string) error {
if _, ok := wf.modules[name]; ok {
return errors.Errorf("module with name '%s' is already in the workflow", module.Name())
}
wf.modules[name] = module
return nil
}
// Get - gets module from the workflow by name
func (wf *Workflow) Get(name string) (Module, error) {
module, ok := wf.modules[name]
if !ok {
return nil, errors.Wrap(ErrUnknownModule, name)
}
return module, nil
}
// Connect - connect destination nodule input to source module output
func (wf *Workflow) Connect(srcModule, srcOutput, destModule, destInput string) error {
src, ok := wf.modules[srcModule]
if !ok {
return errors.Wrap(ErrUnknownModule, srcModule)
}
output, err := src.Output(srcOutput)
if err != nil {
return err
}
dest, ok := wf.modules[destModule]
if !ok {
return errors.Wrap(ErrUnknownModule, destModule)
}
input, err := dest.Input(destInput)
if err != nil {
return err
}
output.Attach(input)
return nil
}
// Start - starts workflow
func (wf *Workflow) Start(ctx context.Context) {
// TODO: check on cyclic dependencies.
// TODO: detect starting order by connections: leafs are first, root is last.
for _, module := range wf.modules {
module.Start(ctx)
}
}