forked from docker-archive/classicswarm
/
aggregate.go
119 lines (99 loc) · 2.36 KB
/
aggregate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package backends
import (
"github.com/docker/libswarm/beam"
"github.com/flynn/go-shlex"
"fmt"
"log"
"sync"
)
func Aggregate() beam.Sender {
backend := beam.NewServer()
backend.OnSpawn(beam.Handler(func(ctx *beam.Message) error {
allBackends := New()
instance := beam.NewServer()
a, err := newAggregator(allBackends, instance, ctx.Args)
if err != nil {
return err
}
instance.OnAttach(beam.Handler(a.attach))
instance.OnStart(beam.Handler(a.start))
instance.OnLs(beam.Handler(a.ls))
_, err = ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: instance})
return err
}))
return backend
}
type aggregator struct {
backends []*beam.Object
server *beam.Server
}
func newAggregator(allBackends *beam.Object, server *beam.Server, args []string) (*aggregator, error) {
a := &aggregator{server: server}
for _, argString := range args {
args, err := shlex.Split(argString)
if err != nil {
return nil, err
}
if len(args) == 0 {
return nil, fmt.Errorf("empty backend string")
}
log.Printf("aggregator: spawning %s(%#v)\n", args[0], args[1:])
_, b, err := allBackends.Attach(args[0])
if err != nil {
return nil, err
}
i, err := b.Spawn(args[1:]...)
if err != nil {
return nil, err
}
a.backends = append(a.backends, i)
}
return a, nil
}
func (a *aggregator) attach(ctx *beam.Message) error {
if ctx.Args[0] != "" {
// TODO: implement this?
return fmt.Errorf("attaching to a child is not implemented")
}
if _, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack, Ret: a.server}); err != nil {
return err
}
var copies sync.WaitGroup
for _, b := range a.backends {
r, _, err := b.Attach("")
if err != nil {
return err
}
copies.Add(1)
go func() {
log.Printf("copying output from %#v\n", b)
beam.Copy(ctx.Ret, r)
log.Printf("finished output from %#v\n", b)
copies.Done()
}()
}
copies.Wait()
return nil
}
func (a *aggregator) start(ctx *beam.Message) error {
for _, b := range a.backends {
err := b.Start()
if err != nil {
return err
}
}
_, err := ctx.Ret.Send(&beam.Message{Verb: beam.Ack})
return err
}
func (a *aggregator) ls(ctx *beam.Message) error {
var children []string
for _, b := range a.backends {
bChildren, err := b.Ls()
if err != nil {
return err
}
children = append(children, bChildren...)
}
ctx.Ret.Send(&beam.Message{Verb: beam.Set, Args: children})
return nil
}