-
Notifications
You must be signed in to change notification settings - Fork 7
/
containers.go
590 lines (468 loc) · 18.5 KB
/
containers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
// Package container allows to run and manage multiple containers across multiple hosts,
// by talking directly to the container runtime on local or remote hosts.
package container
import (
"fmt"
"reflect"
"github.com/google/go-cmp/cmp"
"sigs.k8s.io/yaml"
"github.com/flexkube/libflexkube/internal/util"
"github.com/flexkube/libflexkube/pkg/container/types"
)
// ContainersInterface represents capabilities of containers struct.
type ContainersInterface interface {
// CheckCurrentState iterates over containers defined in the state, checks if they exist, are
// running etc and writes to containers current state. This allows then to compare current state // of the containers with desired state, using Containers() method, to check if there are any
// pending changes to cluster configuration.
//
// Calling CheckCurrentState is required before calling Deploy(), to ensure, that Deploy() executes
// correct actions.
CheckCurrentState() error
// Deploy creates configured containers.
//
// CheckCurrentState() must be called before calling Deploy(), otherwise error will be returned.
Deploy() error
// StateToYaml converts resource's containers state into YAML format and returns it to the user,
// so it can be persisted, e.g. to the file.
StateToYaml() ([]byte, error)
// ToExported converts unexported containers struct into exported one, which can be then
// serialized and persisted.
ToExported() *Containers
// DesiredState returns desired state of configured containers.
//
// Desired state differs from
// exported or user-defined desired state, as it will have container IDs filled from the
// previous state.
//
// All returned containers will also have status set to running, as this is always the desired
// state of the container.
//
// Having those fields modified allows to minimize the difference when comparing previous state
// and desired state.
DesiredState() ContainersState
}
// Containers allow to orchestrate and update multiple containers spread
// across multiple hosts and update their configurations.
type Containers struct {
// PreviousState stores previous state of the containers, which should be obtained and persisted
// after containers modifications.
PreviousState ContainersState `json:"previousState,omitempty"`
// DesiredState is a user-defined desired containers configuration.
DesiredState ContainersState `json:"desiredState,omitempty"`
}
// containers is a validated version of the Containers, which allows user to perform operations on them
// like planning, getting status etc.
type containers struct {
// previousState is a previous state of the containers, given by user.
previousState containersState
// currentState stores current state of the containers. It is fed by calling Refresh() function.
currentState containersState
// resiredState is a user-defined desired containers configuration after validation.
desiredState containersState
}
// New validates Containers configuration and returns container object, which can be
// deployed.
func (c *Containers) New() (ContainersInterface, error) {
if err := c.Validate(); err != nil {
return nil, fmt.Errorf("failed to validate containers configuration: %w", err)
}
// Validate already checks for errors, so we can skip checking here.
previousState, _ := c.PreviousState.New()
desiredState, _ := c.DesiredState.New()
return &containers{
previousState: previousState.(containersState),
desiredState: desiredState.(containersState),
}, nil
}
// Validate validates Containers struct and all structs used underneath.
func (c *Containers) Validate() error {
var errors util.ValidateError
if c == nil {
errors = append(errors, fmt.Errorf("containers must be defined"))
return errors.Return()
}
if (c.PreviousState == nil && c.DesiredState == nil) || (len(c.PreviousState) == 0 && len(c.DesiredState) == 0) {
errors = append(errors, fmt.Errorf("either current state or desired state must be defined"))
}
if _, err := c.PreviousState.New(); err != nil {
errors = append(errors, fmt.Errorf("validating previous state failed: %w", err))
}
if _, err := c.DesiredState.New(); err != nil {
errors = append(errors, fmt.Errorf("validating desired state failed: %w", err))
}
return errors.Return()
}
// CheckCurrentState iterates over containers defined in the state, checks if they exist, are
// running etc and writes to containers current state. This allows then to compare current state
// of the containers with desired state, using Containers() method, to check if there are any
// pending changes to cluster configuration.
//
// Calling CheckCurrentState is required before calling Deploy(), to ensure, that Deploy() executes
// correct actions.
func (c *Containers) CheckCurrentState() error {
containers, err := c.New()
if err != nil {
return fmt.Errorf("failed creating containers configuration: %w", err)
}
if err := containers.CheckCurrentState(); err != nil {
return fmt.Errorf("failed checking current state of the containers: %w", err)
}
*c = *containers.ToExported()
return nil
}
// Deploy creates configured containers.
//
// CheckCurrentState() must be called before calling Deploy(), otherwise error will be returned.
func (c *Containers) Deploy() error {
containers, err := c.New()
if err != nil {
return err
}
// TODO Deploy shouldn't refresh the state. However, due to how we handle exported/unexported
// structs to enforce validation of objects, we lose current state, as we want it to be computed.
// On the other hand, maybe it's a good thing to call it once we execute. This way we could compare
// the plan user agreed to execute with plan calculated right before the execution and fail early if they
// differ.
// This is similar to what terraform is doing and may cause planning to run several times, so it may require
// some optimization.
// Alternatively we can have serializable plan and a knob in execute command to control whether we should
// make additional validation or not.
if err := containers.CheckCurrentState(); err != nil {
return err
}
if err := containers.Deploy(); err != nil {
return err
}
*c = *containers.ToExported()
return nil
}
// CheckCurrentState copies previous state to current state, to mark, that it has been called at least once
// and then updates state of all containers.
func (c *containers) CheckCurrentState() error {
if c.currentState == nil {
// We just assign the pointer, but it's fine, since we don't need previous
// state anyway.
// TODO we could keep previous state to inform user, that some external changes happened since last run
c.currentState = c.previousState
}
return c.currentState.CheckState()
}
// filesToUpdate returns list of files, which needs to be updated, based on the current state of the container.
// If the file is missing or it's content is not the same as desired content, it will be added to the list.
func filesToUpdate(d hostConfiguredContainer, c *hostConfiguredContainer) []string {
// If current state does not exist, just return all files.
if c == nil {
return util.KeysStringMap(d.configFiles)
}
files := []string{}
// Loop over desired config files and check if they exist.
for p, content := range d.configFiles {
if currentContent, exists := c.configFiles[p]; !exists || content != currentContent {
// TODO convert all prints to logging, so we can add more verbose information too
fmt.Printf("Detected configuration drift for file '%s'\n", p)
fmt.Printf(" current: \n%+v\n", currentContent)
fmt.Printf(" desired: \n%+v\n", content)
files = append(files, p)
}
}
return files
}
// ensureConfigured makes sure that all desired configuration files are correct.
func (c *containers) ensureConfigured(n string) error {
d := c.desiredState[n]
// Container won't be needed anyway, so skip everything.
if d == nil {
return nil
}
r := c.currentState[n]
f := filesToUpdate(*d, r)
if len(f) == 0 {
return nil
}
err := d.Configure(f)
if err != nil && reflect.DeepEqual(f, filesToUpdate(*d, r)) {
return err
}
// If current state does not exist, simply replace it with desired state.
if r == nil {
c.currentState[n] = d
r = d
}
// Update current state config files map.
r.configFiles = d.configFiles
return err
}
// ensureRunning makes sure that given container is running.
func ensureRunning(c *hostConfiguredContainer) error {
if c == nil {
return fmt.Errorf("can't start non-existing container")
}
if c.container.Status().Running() {
return nil
}
return c.Start()
}
func (c *containers) ensureExists(n string) error {
r := c.currentState[n]
if r != nil && r.container.Status().Exists() {
return nil
}
fmt.Printf("Creating new container '%s'\n", n)
d := c.desiredState[n]
err := c.desiredState.CreateAndStart(n)
// Container creation failed and it does not exist, meaning state is clean.
if err != nil && !d.container.Status().Exists() {
return err
}
// Even if CreateAndStart failed, update current state. This makes the process more robust,
// for example when creation succeeded (so container ID got assigned), but starting failed (as
// for example requested port is already in use), so on the next run, engine won't try to create
// the container again (as that would fail, because container of the same name already exists).
// If current state does not exist, simply replace it with desired state.
if r == nil {
c.currentState[n] = d
r = d
}
// After new container is created, add it to current state, so it can be returned to the user.
*r.container.Status() = *d.container.Status()
return err
}
// isUpdatable determines if given container can be updated.
func (c *containers) isUpdatable(n string) error {
// Container which currently does not exist can't be updated, only created.
if _, ok := c.currentState[n]; !ok {
return fmt.Errorf("can't update non-existing container '%s'", n)
}
// Container which is suppose to be removed shouldn't be updated.
if _, ok := c.desiredState[n]; !ok {
return fmt.Errorf("can't update container '%s', which is scheduler for removal", n)
}
return nil
}
// diffHost compares host fields of the container and returns it's diff.
//
// If the container cannot be updated, error is returned.
func (c *containers) diffHost(n string) (string, error) {
if err := c.isUpdatable(n); err != nil {
return "", fmt.Errorf("can't diff container: %w", err)
}
return cmp.Diff(c.currentState[n].host, c.desiredState[n].host), nil
}
// recreate is a helper, which removes container from current state and creates new one from
// desired state.
func (c *containers) recreate(n string) error {
if err := c.currentState.RemoveContainer(n); err != nil {
return fmt.Errorf("failed removing old container: %w", err)
}
return c.desiredState.CreateAndStart(n)
}
// ensureHost makes sure container is running on the right host.
//
// If host configuration changes, existing container will be removed and new one will be created.
//
// TODO This might be an overkill. e.g. changing SSH key for deployment will re-create all containers.
func (c *containers) ensureHost(n string) error {
diff, err := c.diffHost(n)
if err != nil {
return fmt.Errorf("failed to check host diff: %w", err)
}
if diff == "" {
return nil
}
fmt.Printf("Detected host configuration drift '%s'\n", n)
fmt.Printf(" Diff: %v\n", util.ColorizeDiff(diff))
// recreate is 2 step process, it removes old container and creates new one.
// If process fails in the middle, we still want to save the progress.
defer func() {
c.currentState[n] = c.desiredState[n]
}()
return c.recreate(n)
}
// diffContainer compares container fields of the container and returns it's diff.
//
// If the container cannot be updated, error is returned.
func (c *containers) diffContainer(n string) (string, error) {
if err := c.isUpdatable(n); err != nil {
return "", fmt.Errorf("can't diff container: %w", err)
}
cd := cmp.Diff(c.currentState[n].container.Config(), c.desiredState[n].container.Config())
rcd := cmp.Diff(c.currentState[n].container.RuntimeConfig(), c.desiredState[n].container.RuntimeConfig())
return cd + rcd, nil
}
// ensureContainer makes sure container configuration is up to date.
//
// If container configuration changes, existing container will be removed and new one will be created.
func (c *containers) ensureContainer(n string) error {
diff, err := c.diffContainer(n)
if err != nil {
return fmt.Errorf("failed to check container diff: %w", err)
}
if diff == "" {
return nil
}
fmt.Printf("Detected container configuration drift '%s'\n", n)
fmt.Printf(" Diff: %v\n", util.ColorizeDiff(diff))
// Reconfiguring container is 2 step process. If we fail in the middle, we still want to
// return updated state to the user.
defer func() {
c.currentState[n] = c.desiredState[n]
}()
return c.recreate(n)
}
// hasUpdates return bool if there are any pending configuration changes to the container.
func (c *containers) hasUpdates(n string) (bool, error) {
diffHost, err := c.diffHost(n)
if err != nil {
return false, fmt.Errorf("failed to check host diff: %w", err)
}
f := filesToUpdate(*c.desiredState[n], c.currentState[n])
diffContainer, err := c.diffContainer(n)
if err != nil {
return false, fmt.Errorf("failed to check container diff: %w", err)
}
return diffHost != "" || len(f) != 0 || diffContainer != "", nil
}
func (c *containers) ensureCurrentContainer(n string, r hostConfiguredContainer) (hostConfiguredContainer, error) {
// Gather facts about the container..
exists := r.container.Status().Exists()
_, isDesired := c.desiredState[n]
hasUpdates := false
if err := c.isUpdatable(n); err == nil {
// If container is updatable, check if it has any updates pending.
u, err := c.hasUpdates(n)
if err != nil {
return r, fmt.Errorf("failed checking if container has pending updates: %w", err)
}
hasUpdates = u
}
// Container is gone, remove it from current state, so it will be scheduled for recreation.
if !exists {
delete(c.currentState, n)
}
// If container exist, is desired or has no pending updates, make sure it's running.
if exists && isDesired && !hasUpdates {
return r, ensureRunning(&r)
}
return r, nil
}
// ensureNewContainer handles configuring and creating new containers.
func (c *containers) ensureNewContainer(i string) error {
if err := c.ensureConfigured(i); err != nil {
return fmt.Errorf("failed configuring container %s: %w", i, err)
}
if err := c.ensureExists(i); err != nil {
return fmt.Errorf("failed creating new container %s: %w", i, err)
}
return nil
}
func (c *containers) ensureUpToDate(i string) error {
// Update containers on hosts.
// This can move containers between hosts, but NOT the data.
if err := c.ensureHost(i); err != nil {
return fmt.Errorf("failed updating host configuration of container %s: %w", i, err)
}
if err := c.ensureConfigured(i); err != nil {
return fmt.Errorf("failed updating configuration for container %s: %w", i, err)
}
if err := c.ensureContainer(i); err != nil {
return fmt.Errorf("failed updating container %s: %w", i, err)
}
return nil
}
// updateExistingContainer handles updating existing containers. It either removes them
// if they are not needed anymore or makes sure that their configuration is up to date.
func (c *containers) updateExistingContainers() error {
for i := range c.currentState {
if _, exists := c.desiredState[i]; !exists {
if err := c.currentState.RemoveContainer(i); err != nil {
return fmt.Errorf("failed removing old container: %w", err)
}
continue
}
if err := c.ensureUpToDate(i); err != nil {
return fmt.Errorf("failed ensuring, that container %s is up to date: %w", i, err)
}
}
return nil
}
// Deploy checks for containers configuration drifts and tries to reach desired state.
//
// TODO we should break down this function into smaller functions
// TODO add planning, so it is possible to inspect what will be done
// TODO currently we only compare previous configuration with new configuration.
// We should also read runtime parameters and confirm that everything is according
// to the spec.
func (c *containers) Deploy() error {
if c.currentState == nil {
return fmt.Errorf("can't execute without knowing current state of the containers")
}
fmt.Println("Checking for stopped and missing containers")
for n, r := range c.currentState {
d, err := c.ensureCurrentContainer(n, *r)
c.currentState[n] = &d
if err != nil {
return fmt.Errorf("failed to handle existing container %s: %w", n, err)
}
}
fmt.Println("Configuring and creating new containers")
for i := range c.desiredState {
if err := c.ensureNewContainer(i); err != nil {
return fmt.Errorf("failed creating new container %s: %w", i, err)
}
}
fmt.Println("Updating existing containers")
return c.updateExistingContainers()
}
// FromYaml allows to load containers configuration and state from YAML format.
func FromYaml(c []byte) (ContainersInterface, error) {
containers := &Containers{}
if err := yaml.Unmarshal(c, &containers); err != nil {
return nil, fmt.Errorf("failed to parse input yaml: %w", err)
}
cl, err := containers.New()
if err != nil {
return nil, fmt.Errorf("failed to create containers object: %w", err)
}
return cl, nil
}
// StateToYaml dumps current state as previousState in exported format,
// which can be serialized and stored.
func (c *containers) StateToYaml() ([]byte, error) {
containers := &Containers{
PreviousState: c.previousState.Export(),
}
return yaml.Marshal(containers)
}
// ToExported converts containers struct to exported Containers.
func (c *containers) ToExported() *Containers {
return &Containers{
PreviousState: c.previousState.Export(),
DesiredState: c.desiredState.Export(),
}
}
// DesiredState returns desired state enhanced with current state, to highlight
// important configuration changes from user perspective.
func (c *containers) DesiredState() ContainersState {
d := c.desiredState.Export()
for h := range d {
// If container already exist, append it's ID to desired state to reduce the diff.
id := ""
cs, ok := c.previousState[h]
if ok && cs.container.Status().ID != "" {
id = cs.container.Status().ID
}
// Make sure, that desired state has correct status. Container should always be running
// and optionally, we also set the ID of already existing container. If there are changes
// to the container, it will get new ID anyway, but user does not care about this change,
// so we can hide it this way from the diff.
d[h].Container.Status = &types.ContainerStatus{
Status: "running",
ID: id,
}
}
return d
}
// Containers implement types.Resource interface.
func (c *containers) Containers() ContainersInterface {
return c
}