/
updater.go
304 lines (283 loc) · 8.66 KB
/
updater.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
// Copyright 2013 Canonical Ltd.
// Licensed under the AGPLv3, see LICENCE file for details.
package instancepoller
import (
"fmt"
"time"
"github.com/juju/loggo"
"github.com/juju/names"
"github.com/juju/juju/apiserver/params"
"github.com/juju/juju/instance"
"github.com/juju/juju/network"
"github.com/juju/juju/state/watcher"
)
var logger = loggo.GetLogger("juju.worker.instanceupdater")
// ShortPoll and LongPoll hold the polling intervals for the instance
// updater. When a machine has no address or is not started, it will be
// polled at ShortPoll intervals until it does, exponentially backing off
// with an exponent of ShortPollBackoff until a maximum(ish) of LongPoll.
//
// When a machine has an address and is started LongPoll will be used to
// check that the instance address or status has not changed.
var (
ShortPoll = 1 * time.Second
ShortPollBackoff = 2.0
LongPoll = 15 * time.Minute
)
type machine interface {
Id() string
Tag() names.MachineTag
InstanceId() (instance.Id, error)
ProviderAddresses() ([]network.Address, error)
SetProviderAddresses(...network.Address) error
InstanceStatus() (string, error)
SetInstanceStatus(status string) error
String() string
Refresh() error
Life() params.Life
Status() (params.StatusResult, error)
IsManual() (bool, error)
}
type instanceInfo struct {
addresses []network.Address
status string
}
type machineContext interface {
killAll(err error)
instanceInfo(id instance.Id) (instanceInfo, error)
dying() <-chan struct{}
}
type machineAddress struct {
machine machine
addresses []network.Address
}
type machinesWatcher interface {
Changes() <-chan []string
Err() error
Stop() error
}
type updaterContext interface {
newMachineContext() machineContext
getMachine(tag names.MachineTag) (machine, error)
dying() <-chan struct{}
}
type updater struct {
context updaterContext
machines map[names.MachineTag]chan struct{}
machineDead chan machine
}
// watchMachinesLoop watches for changes provided by the given
// machinesWatcher and starts machine goroutines to deal with them,
// using the provided newMachineContext function to create the
// appropriate context for each new machine tag.
func watchMachinesLoop(context updaterContext, w machinesWatcher) (err error) {
p := &updater{
context: context,
machines: make(map[names.MachineTag]chan struct{}),
machineDead: make(chan machine),
}
defer func() {
if stopErr := w.Stop(); stopErr != nil {
if err == nil {
err = fmt.Errorf("error stopping watcher: %v", stopErr)
} else {
logger.Warningf("ignoring error when stopping watcher: %v", stopErr)
}
}
for len(p.machines) > 0 {
delete(p.machines, (<-p.machineDead).Tag())
}
}()
for {
select {
case ids, ok := <-w.Changes():
if !ok {
return watcher.EnsureErr(w)
}
tags := make([]names.MachineTag, len(ids))
for i := range ids {
tags[i] = names.NewMachineTag(ids[i])
}
if err := p.startMachines(tags); err != nil {
return err
}
case m := <-p.machineDead:
delete(p.machines, m.Tag())
case <-p.context.dying():
return nil
}
}
}
func (p *updater) startMachines(tags []names.MachineTag) error {
for _, tag := range tags {
if c := p.machines[tag]; c == nil {
// We don't know about the machine - start
// a goroutine to deal with it.
m, err := p.context.getMachine(tag)
if params.IsCodeNotFound(err) {
logger.Warningf("watcher gave notification of non-existent machine %q", tag.Id())
continue
}
if err != nil {
return err
}
// We don't poll manual machines.
isManual, err := m.IsManual()
if err != nil {
return err
}
if isManual {
continue
}
c = make(chan struct{})
p.machines[tag] = c
go runMachine(p.context.newMachineContext(), m, c, p.machineDead)
} else {
c <- struct{}{}
}
}
return nil
}
// runMachine processes the address and status publishing for a given machine.
// We assume that the machine is alive when this is first called.
func runMachine(context machineContext, m machine, changed <-chan struct{}, died chan<- machine) {
defer func() {
// We can't just send on the died channel because the
// central loop might be trying to write to us on the
// changed channel.
for {
select {
case died <- m:
return
case <-changed:
}
}
}()
if err := machineLoop(context, m, changed); err != nil {
context.killAll(err)
}
}
func machineLoop(context machineContext, m machine, changed <-chan struct{}) error {
// Use a short poll interval when initially waiting for
// a machine's address and machine agent to start, and a long one when it already
// has an address and the machine agent is started.
pollInterval := ShortPoll
pollInstance := true
for {
if pollInstance {
instInfo, err := pollInstanceInfo(context, m)
if err != nil && !params.IsCodeNotProvisioned(err) {
// If the provider doesn't implement Addresses/Status now,
// it never will until we're upgraded, so don't bother
// asking any more. We could use less resources
// by taking down the entire worker, but this is easier for now
// (and hopefully the local provider will implement
// Addresses/Status in the not-too-distant future),
// so we won't need to worry about this case at all.
if params.IsCodeNotImplemented(err) {
pollInterval = 365 * 24 * time.Hour
} else {
return err
}
}
machineStatus := params.StatusPending
if err == nil {
if statusInfo, err := m.Status(); err != nil {
logger.Warningf("cannot get current machine status for machine %v: %v", m.Id(), err)
} else {
machineStatus = statusInfo.Status
}
}
if len(instInfo.addresses) > 0 && instInfo.status != "" && machineStatus == params.StatusStarted {
// We've got at least one address and a status and instance is started, so poll infrequently.
pollInterval = LongPoll
} else if pollInterval < LongPoll {
// We have no addresses or not started - poll increasingly rarely
// until we do.
pollInterval = time.Duration(float64(pollInterval) * ShortPollBackoff)
}
pollInstance = false
}
select {
case <-time.After(pollInterval):
pollInstance = true
case <-context.dying():
return nil
case <-changed:
if err := m.Refresh(); err != nil {
return err
}
if m.Life() == params.Dead {
return nil
}
}
}
}
// pollInstanceInfo checks the current provider addresses and status
// for the given machine's instance, and sets them on the machine if they've changed.
func pollInstanceInfo(context machineContext, m machine) (instInfo instanceInfo, err error) {
instInfo = instanceInfo{}
instId, err := m.InstanceId()
// We can't ask the machine for its addresses if it isn't provisioned yet.
if params.IsCodeNotProvisioned(err) {
return instInfo, err
}
if err != nil {
return instInfo, fmt.Errorf("cannot get machine's instance id: %v", err)
}
instInfo, err = context.instanceInfo(instId)
if err != nil {
if params.IsCodeNotImplemented(err) {
return instInfo, err
}
logger.Warningf("cannot get instance info for instance %q: %v", instId, err)
return instInfo, nil
}
currentInstStatus, err := m.InstanceStatus()
if err != nil {
// This should never occur since the machine is provisioned.
// But just in case, we reset polled status so we try again next time.
logger.Warningf("cannot get current instance status for machine %v: %v", m.Id(), err)
instInfo.status = ""
} else {
if instInfo.status != currentInstStatus {
logger.Infof("machine %q instance status changed from %q to %q", m.Id(), currentInstStatus, instInfo.status)
if err = m.SetInstanceStatus(instInfo.status); err != nil {
logger.Errorf("cannot set instance status on %q: %v", m, err)
}
}
}
providerAddresses, err := m.ProviderAddresses()
if err != nil {
return instInfo, err
}
if !addressesEqual(providerAddresses, instInfo.addresses) {
logger.Infof("machine %q has new addresses: %v", m.Id(), instInfo.addresses)
if err = m.SetProviderAddresses(instInfo.addresses...); err != nil {
logger.Errorf("cannot set addresses on %q: %v", m, err)
}
}
return instInfo, err
}
// addressesEqual compares the addresses of the machine and the instance information.
func addressesEqual(a0, a1 []network.Address) bool {
if len(a0) != len(a1) {
logger.Tracef("address lists have different lengths %d != %d for %v != %v",
len(a0), len(a1), a0, a1)
return false
}
ca0 := make([]network.Address, len(a0))
copy(ca0, a0)
network.SortAddresses(ca0, true)
ca1 := make([]network.Address, len(a1))
copy(ca1, a1)
network.SortAddresses(ca1, true)
for i := range ca0 {
if ca0[i] != ca1[i] {
logger.Tracef("address entry at offset %d has a different value for %v != %v",
i, ca0, ca1)
return false
}
}
return true
}