forked from moby/swarmkit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
allocator.go
221 lines (184 loc) · 4.99 KB
/
allocator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package allocator
import (
"sync"
"github.com/docker/go-events"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"golang.org/x/net/context"
)
// Allocator controls how the allocation stage in the manager is handled.
type Allocator struct {
// The manager store.
store *store.MemoryStore
// the ballot used to synchronize across all allocators to ensure
// all of them have completed their respective allocations so that the
// task can be moved to ALLOCATED state.
taskBallot *taskBallot
// context for the network allocator that will be needed by
// network allocator.
netCtx *networkContext
// stopChan signals to the allocator to stop running.
stopChan chan struct{}
// doneChan is closed when the allocator is finished running.
doneChan chan struct{}
}
// taskBallot controls how the voting for task allocation is
// coordinated b/w different allocators. This the only structure that
// will be written by all allocator goroutines concurrently. Hence the
// mutex.
type taskBallot struct {
sync.Mutex
// List of registered voters who have to cast their vote to
// indicate their allocation complete
voters []string
// List of votes collected for every task so far from different voters.
votes map[string][]string
}
// allocActor controls the various phases in the lifecycle of one kind of allocator.
type allocActor struct {
// Channel through which the allocator gets all the events
// that it is interested in.
ch chan events.Event
// cancel unregisters the watcher.
cancel func()
// Task voter identity of the allocator.
taskVoter string
// Action routine which is called for every event that the
// allocator received.
action func(context.Context, events.Event)
// Init routine which is called during the initialization of
// the allocator.
init func(ctx context.Context) error
}
// New returns a new instance of Allocator for use during allocation
// stage of the manager.
func New(store *store.MemoryStore) (*Allocator, error) {
a := &Allocator{
store: store,
taskBallot: &taskBallot{
votes: make(map[string][]string),
},
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
}
return a, nil
}
// Run starts all allocator go-routines and waits for Stop to be called.
func (a *Allocator) Run(ctx context.Context) error {
// Setup cancel context for all goroutines to use.
ctx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
defer func() {
cancel()
wg.Wait()
close(a.doneChan)
}()
var actors []func() error
watch, watchCancel := state.Watch(a.store.WatchQueue(),
state.EventCreateNetwork{},
state.EventDeleteNetwork{},
state.EventCreateService{},
state.EventUpdateService{},
state.EventDeleteService{},
state.EventCreateTask{},
state.EventUpdateTask{},
state.EventDeleteTask{},
state.EventCreateNode{},
state.EventUpdateNode{},
state.EventDeleteNode{},
state.EventCommit{},
)
for _, aa := range []allocActor{
{
ch: watch,
cancel: watchCancel,
taskVoter: networkVoter,
init: a.doNetworkInit,
action: a.doNetworkAlloc,
},
} {
if aa.taskVoter != "" {
a.registerToVote(aa.taskVoter)
}
// Copy the iterated value for variable capture.
aaCopy := aa
actor := func() error {
wg.Add(1)
// init might return an allocator specific context
// which is a child of the passed in context to hold
// allocator specific state
if err := aaCopy.init(ctx); err != nil {
// Stop the watches for this allocator
// if we are failing in the init of
// this allocator.
aa.cancel()
wg.Done()
return err
}
go func() {
defer wg.Done()
a.run(ctx, aaCopy)
}()
return nil
}
actors = append(actors, actor)
}
for _, actor := range actors {
if err := actor(); err != nil {
return err
}
}
<-a.stopChan
return nil
}
// Stop stops the allocator
func (a *Allocator) Stop() {
close(a.stopChan)
// Wait for all allocator goroutines to truly exit
<-a.doneChan
}
func (a *Allocator) run(ctx context.Context, aa allocActor) {
for {
select {
case ev, ok := <-aa.ch:
if !ok {
return
}
aa.action(ctx, ev)
case <-ctx.Done():
return
}
}
}
func (a *Allocator) registerToVote(name string) {
a.taskBallot.Lock()
defer a.taskBallot.Unlock()
a.taskBallot.voters = append(a.taskBallot.voters, name)
}
func (a *Allocator) taskAllocateVote(voter string, id string) bool {
a.taskBallot.Lock()
defer a.taskBallot.Unlock()
// If voter has already voted, return false
for _, v := range a.taskBallot.votes[id] {
// check if voter is in x
if v == voter {
return false
}
}
a.taskBallot.votes[id] = append(a.taskBallot.votes[id], voter)
// We haven't gotten enough votes yet
if len(a.taskBallot.voters) > len(a.taskBallot.votes[id]) {
return false
}
nextVoter:
for _, voter := range a.taskBallot.voters {
for _, vote := range a.taskBallot.votes[id] {
if voter == vote {
continue nextVoter
}
}
// Not every registered voter has registered a vote.
return false
}
return true
}