/
converger.go
561 lines (495 loc) · 16.7 KB
/
converger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
package converge
import (
"time"
"github.com/gofrs/uuid"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
api "github.com/chef/automate/api/interservice/deployment"
"github.com/chef/automate/components/automate-deployment/pkg/depot"
"github.com/chef/automate/components/automate-deployment/pkg/habpkg"
"github.com/chef/automate/components/automate-deployment/pkg/target"
)
const (
deploymentServiceName = "deployment-service"
postgresqlServiceName = "automate-postgresql"
pgGatewayServiceName = "automate-pg-gateway"
esGatewayServiceName = "automate-es-gateway"
elasticsearchServiceName = "automate-elasticsearch"
opensearchServiceName = "automate-opensearch"
)
var (
waitingForReconfigureTimeout = 300 * time.Second
waitingForRestartTimeout = 600 * time.Second
)
// A Converger mutates the underlying system in response to system
// events and user commands.
// TODO(ssd) 2019-04-22: new name??
type Converger interface {
Converge(*Task, DesiredState, EventSink) error
StopServices(*Task, target.Target, EventSink) error
PrepareForShutdown(*Task, target.Target, EventSink) error
// Stop() stops the Converger, not the deployment-service or
// any other Automate services.
Stop()
}
// Converger is a server that handles converge requests. It will take each request,
// compile it into a plan, and execute that plan.
type converger struct {
stop chan struct{}
inbox chan message
debug chan debugReq
compiler Compiler
currentState state
}
// ConvergerOpt represents a configuration for converger
type ConvergerOpt func(*converger)
// Task is a future like object that will receive a message once
// the requested task has been completed
type Task struct {
ID uuid.UUID
// C will receive a message when the task is done
C chan error
}
// NewTask returns a Task which can be passed to the Converge()
// function. The Task is used to identify and signal completion of
// the requested converge.
func NewTask() (*Task, error) {
id, err := uuid.NewV4()
if err != nil {
return nil, errors.Wrap(err, "could not create UUID")
}
doneChan := make(chan error, 1)
return &Task{
ID: id,
C: doneChan,
}, nil
}
// WithCompiler sets the compiler to use when fulfilling a converge request.
// By default, we will use the PhaseOrderedCompiler. Specifying a compiler
// is useful in unit testing
func WithCompiler(compiler Compiler) ConvergerOpt {
return func(converger *converger) {
converger.compiler = compiler
}
}
// WithDebugChannel enables the debug channel used in testing to poll
// the state of the converger. By default this is a nil channel.
//
// NOTE(ssd) 2019-04-22: Ideally I think these messages would also be
// sent via the inbox, but I'm a chicken.
func WithDebugChannel() ConvergerOpt {
return func(converger *converger) {
converger.debug = make(chan debugReq)
}
}
// WithMaxInboxSize sets the maximum backlog for converge requests. Converge
// requests surpassing the backlog will return an error
func WithMaxInboxSize(size int) ConvergerOpt {
return func(converger *converger) {
converger.inbox = make(chan message, size)
}
}
// StartConverger starts a converger. Options may be provided to override default
// values for the compiler and inbox size.
// NOTE: A Converger runs in a goroutine, and this function launches it
func StartConverger(opts ...ConvergerOpt) Converger {
converger := &converger{
inbox: make(chan message, 5),
stop: make(chan struct{}, 1),
currentState: &idle{},
compiler: NewPhaseOrderedCompiler(),
}
for _, opt := range opts {
opt(converger)
}
go converger.run()
return converger
}
func (converger *converger) run() {
for {
select {
case d := <-converger.debug:
d.Do(converger)
default:
}
select {
case <-converger.stop:
//TODO(jaym) drain inbox and fail pending requests
close(converger.inbox)
close(converger.stop)
return
case req := <-converger.inbox:
logrus.Debugf("got message %s while %s", req, converger.currentState)
nextState, stopTimer, err := converger.currentState.ProcessMessage(converger, req)
if stopTimer {
converger.currentState.StopTimer()
}
converger.setState(nextState)
req.SendResponse(err)
case d := <-converger.debug:
d.Do(converger)
case <-converger.currentState.TimeoutChan():
logrus.Debugf("timed out while %s", converger.currentState)
nextState, _, err := converger.currentState.ProcessMessage(
converger,
&timeout{})
if err != nil {
logrus.WithError(err).Error("error processing timeout event")
}
converger.setState(nextState)
}
}
}
func (converger *converger) setState(s state) {
if s.String() != converger.currentState.String() {
logrus.WithFields(logrus.Fields{
"old_state": converger.currentState,
"new_state": s,
}).Debug("converger state transition")
}
converger.currentState = s
}
// state represents the possible states of our converge
// loop. Currently, we use a small state machine to represent the
// possible "waiting" states that we might end up in as the result of
// a converge request.
type state interface {
// ProcessMessage takes the converger and the most recently
// received message. The returned state is the next state of
// the fsm. The returned error may be send to the sender of
// the message. The returned bool indicates whether we should
// reset the current state timeout before moving to the next
// state.
ProcessMessage(*converger, message) (state, bool, error)
// TimeoutChan returns a channel that should delivery a
// message when this state has timed out. If a message is
// received on the timeout channel a timeout message will be
// sent to the current state.
TimeoutChan() <-chan time.Time
// StopTimer should correctly stop any timers or resources
// associated with the TimeoutChan.
StopTimer()
// String is a convenience method used for logging. states should
// have uniq string descriptors so we can more easily
// conditionally log.
String() string
}
// message represents the messages that can be sent to our
// converger. Messages move the converger from one state to another,
// possibly triggering some system change.
type message interface {
// SendResponse delivers err back to the original requestor if
// the original requestor expects a response.
SendResponse(err error)
// String is a convenience method used for logging.
String() string
}
// TODO(ssd) 2019-04-21: Should we just panic here? It is a programming error.
func UnknownMessageError(m message, s state) error {
return errors.Errorf("unknown message %s received while in state %s", m, s)
}
//
// Messages
//
// timeout is a message sent to the current state when the current
// state's timeout has fired. It is generated by the converger and is
// not sent by the user code so there is no caller to respond to.
type timeout struct{}
func (t *timeout) SendResponse(_ error) {}
func (t *timeout) String() string { return "timeout" }
// convergeRequest is a user-sendable message that instructs the
// converger to run the converge loop.
type convergeRequest struct {
eventSink EventSink
desiredState DesiredState
doneChan chan error
}
func (c *convergeRequest) SendResponse(err error) {
c.doneChan <- err
close(c.doneChan)
}
func (c *convergeRequest) String() string { return "convergeRequest" }
// stopRequest is a user-sendable message that instructs the
// converger to stop all services.
type stopServicesRequest struct {
eventSink EventSink
desiredState DesiredState
doneChan chan error
}
func (s *stopServicesRequest) SendResponse(err error) {
s.doneChan <- err
close(s.doneChan)
}
func (s *stopServicesRequest) String() string { return "stopServicesRequest" }
// prepareForShutdown is a user-sendable message that instructs the
// converger to stop all services and wait for a shutdown
type prepareForShutdownRequest struct {
eventSink EventSink
desiredState DesiredState
doneChan chan error
}
func (s *prepareForShutdownRequest) SendResponse(err error) {
s.doneChan <- err
close(s.doneChan)
}
func (s *prepareForShutdownRequest) String() string { return "prepareForShutdown" }
// States
//
// We use the following state machine to handle the two "waiting"
// states that we need to track inside the converger.
//
// +-----------+ convergeRequest| prepareForShutdown +-------------------+
// | | err = ErrRestartPending | |
// | +-------------------------------------->| |
// | idle | | waitingForRestart |
// | |<--------------------------------------+ |
// | | timeout | |
// +-------+---+ +-------------------+
// ^ |
// | | convergeRequest
// timeout| | err = ErrSelfReconfigurePending
// | |
// | v
// +--+--------------------+
// | |
// | |
// | waitingForReconfigure |
// | |
// | |
// +-----------------------+
//
// idle is the default state of the converger. In this state we
// are ready to accept any user-sendable requests. The idle state
// never times out.
type idle struct{}
func (i *idle) ProcessMessage(c *converger, msg message) (state, bool, error) {
switch msg.(type) {
case *convergeRequest:
req := msg.(*convergeRequest)
convergePlan, err := c.compiler.Compile(req.desiredState)
if err != nil {
logrus.WithError(err).Error("Failed to compile")
return i, false, err
}
err = convergePlan.Execute(req.eventSink)
return i.handleError(err)
case *stopServicesRequest:
req := msg.(*stopServicesRequest)
err := doStopServices(c.compiler, req.desiredState, req.eventSink)
return i.handleError(err)
case *prepareForShutdownRequest:
req := msg.(*prepareForShutdownRequest)
err := doStopServices(c.compiler, req.desiredState, req.eventSink)
if err != nil {
return i.handleError(err)
}
return newWaitingForRestart(), false, nil
case *timeout:
logrus.Warn("received timeout when in idle state.")
return i, false, nil
default:
return i, false, UnknownMessageError(msg, i)
}
}
func doStopServices(c Compiler, state DesiredState, sink EventSink) error {
convergePlan, err := c.Compile(state)
if err != nil {
logrus.WithError(err).Error("Failed to compile")
return errors.Wrap(err, "failed to compile")
}
return convergePlan.Execute(sink)
}
func (i *idle) handleError(err error) (state, bool, error) {
switch err {
case api.ErrRestartPending:
return newWaitingForRestart(), false, err
case api.ErrSelfReconfigurePending:
return newWaitingForReconfigure(), false, err
case api.ErrSelfUpgradePending:
// NOTE(ssd) 2019-04-19: A self-upgrade is
// essentially just a restart from hab-sup.
return newWaitingForRestart(), false, err
default:
return i, false, err
}
}
func (i *idle) TimeoutChan() <-chan time.Time { return nil }
func (i *idle) StopTimer() {}
func (i *idle) String() string { return "idle" }
// waitingForRestart is a converger state that we enter whenever we
// expect to be restarted by an external signal.
type waitingForRestart struct {
timeoutable
}
func newWaitingForRestart() *waitingForRestart {
return &waitingForRestart{
timeoutable{timeout: waitingForRestartTimeout},
}
}
func (w *waitingForRestart) ProcessMessage(_ *converger, msg message) (state, bool, error) {
switch msg.(type) {
case *timeout:
logrus.Info("timed out waiting for restart, re-entering idle state")
return &idle{}, true, nil
case *stopServicesRequest, *prepareForShutdownRequest, *convergeRequest:
logrus.Infof("ignoring message %v because we are waiting to be restarted", msg)
return w, false, api.ErrRestartPending
default:
logrus.Infof("ignoring message %v because we are waiting to be restarted", msg)
return w, false, UnknownMessageError(msg, w)
}
}
func (w *waitingForRestart) String() string { return "waiting for restart" }
// waitingForReconfigure is a converger state that we enter whenever we
// expect to have received a reconfiguration request from the Habitat
// supervisor.
type waitingForReconfigure struct {
timeoutable
}
func newWaitingForReconfigure() *waitingForReconfigure {
return &waitingForReconfigure{
timeoutable: timeoutable{timeout: waitingForReconfigureTimeout},
}
}
func (w *waitingForReconfigure) ProcessMessage(c *converger, msg message) (state, bool, error) {
switch msg.(type) {
case *timeout:
logrus.Warnf("No restart occurred from reconfigure after %s, giving up", waitingForReconfigureTimeout)
return &idle{}, true, nil
case *stopServicesRequest, *prepareForShutdownRequest, *convergeRequest:
logrus.Infof("ignoring message %v because we are waiting to be reconfigured", msg)
return w, false, api.ErrSelfReconfigurePending
default:
return w, false, UnknownMessageError(msg, w)
}
}
func (w *waitingForReconfigure) String() string { return "waiting for reconfigure" }
// timeoutable is can be embedded in state's that want to time out
// after a duration.
type timeoutable struct {
timeout time.Duration
timer *time.Timer
}
func (t *timeoutable) TimeoutChan() <-chan time.Time {
if t.timer == nil {
t.timer = time.NewTimer(t.timeout)
}
return t.timer.C
}
func (t *timeoutable) StopTimer() {
if t.timer == nil {
return
}
if !t.timer.Stop() {
<-t.timer.C
}
}
// Converger Message API Functions
// Converge requests the converger try to reach the specified desired state.
// This function returns an error if the backlog is full. Otherwise, a Task with
// a channel that will receive a message will be returned.
func (converger *converger) Converge(task *Task, desiredState DesiredState, eventSink EventSink) error {
return converger.sendMessage(task, &convergeRequest{
eventSink: eventSink,
desiredState: desiredState, //TODO(jaym) make a copy
doneChan: task.C,
})
}
// StopServices requests that the converger tries to stop all
// non-deployment services on the given target.
func (converger *converger) StopServices(t *Task, tgt target.Target, eventSink EventSink) error {
return converger.sendMessage(t, &stopServicesRequest{
eventSink: eventSink,
desiredState: makeStoppedDesiredState(tgt), //TODO(jaym) make a copy
doneChan: t.C,
})
}
// PrepareForShutdown gets the system ready to shutdown. It stops all
// services and moves the converger into waitingForRestart()
func (converger *converger) PrepareForShutdown(t *Task, tgt target.Target, eventSink EventSink) error {
return converger.sendMessage(t, &prepareForShutdownRequest{
eventSink: eventSink,
desiredState: makeStoppedDesiredState(tgt), //TODO(jaym) make a copy
doneChan: t.C,
})
}
func makeStoppedDesiredState(tgt target.Target) DesiredState {
topology := make(Topology)
topology[tgt] = []Service{
{
Name: deploymentServiceName,
ConvergeState: Skip(),
},
}
return NewDesiredState(topology,
NewSkipSupervisorState(),
[]habpkg.HabPkg{}, // ignore-list is empty because cleanup mode is disabled
depot.DisabledGC)
}
// Stop stops the converger
func (converger *converger) Stop() {
converger.stop <- struct{}{}
}
func (converger *converger) sendMessage(t *Task, msg message) error {
select {
case converger.inbox <- msg:
logrus.WithField("converger_inbox_len", len(converger.inbox)).Debugf("Sent %s request to converger", msg)
return nil
default:
if t != nil {
close(t.C)
}
return errors.New("Inbox full")
}
}
// Debug
//
// Our converger has a debug channel where we can send debug requests
// to query the current state of the converger. This is used in the
// test. An alternative approach here would be to add a mutex.
//
// NOTE(ssd) 2019-04-23: This currently does not do a deep copy of the
// states, so callers should take care not to mutate it!
type debugReq interface {
Do(*converger)
}
type debugResp struct {
state state
inboxLen int
}
type debugGetReq struct {
responseChan chan debugResp
}
func (d debugGetReq) Do(c *converger) {
d.responseChan <- debugResp{
state: c.currentState,
inboxLen: len(c.inbox),
}
}
type debugSetReq struct {
newState state
doneChan chan struct{}
}
func (d debugSetReq) Do(c *converger) {
logrus.WithFields(logrus.Fields{
"new_state": d.newState,
}).Debug("setting state via converger debug message")
c.setState(d.newState)
close(d.doneChan)
}
func (converger *converger) debugGetState() state {
respChan := make(chan debugResp, 1)
converger.debug <- debugGetReq{
responseChan: respChan,
}
resp := <-respChan
return resp.state
}
func (converger *converger) debugSetState(newState state) {
done := make(chan struct{})
converger.debug <- debugSetReq{
doneChan: done,
newState: newState,
}
<-done
}