This repository has been archived by the owner on Apr 29, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
ensure.go
83 lines (74 loc) · 2.6 KB
/
ensure.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package cs
import (
"github.com/pkg/errors"
"github.com/caos/orbos/internal/api"
"github.com/caos/orbos/internal/helpers"
"github.com/caos/orbos/internal/operator/common"
"github.com/caos/orbos/internal/operator/orbiter"
"github.com/caos/orbos/internal/operator/orbiter/kinds/clusters/core/infra"
dynamiclbmodel "github.com/caos/orbos/internal/operator/orbiter/kinds/loadbalancers/dynamic"
"github.com/caos/orbos/internal/operator/orbiter/kinds/loadbalancers/dynamic/wrap"
"github.com/caos/orbos/internal/operator/orbiter/kinds/providers/core"
)
func query(
desired *Spec,
current *Current,
lb interface{},
context *context,
nodeAgentsCurrent *common.CurrentNodeAgents,
nodeAgentsDesired *common.DesiredNodeAgents,
naFuncs core.IterateNodeAgentFuncs,
orbiterCommit string,
) (ensureFunc orbiter.EnsureFunc, err error) {
lbCurrent, ok := lb.(*dynamiclbmodel.Current)
if !ok {
panic(errors.Errorf("Unknown or unsupported load balancing of type %T", lb))
}
hostPools, authChecks, err := lbCurrent.Current.Spec(context.machinesService)
if err != nil {
return nil, err
}
ensureFIPs, removeFIPs, poolsWithUnassignedVIPs, err := queryFloatingIPs(context, hostPools, current)
if err != nil {
return nil, err
}
queryNA, installNA := naFuncs(nodeAgentsCurrent)
ensureNodeAgent := func(m infra.Machine) error {
running, err := queryNA(m, orbiterCommit)
if err != nil {
return err
}
if !running {
return installNA(m)
}
return nil
}
ensureServers, err := queryServers(context, current, hostPools, ensureNodeAgent)
if err != nil {
return nil, err
}
context.machinesService.onCreate = func(pool string, m infra.Machine) error {
return ensureServer(context, current, hostPools, pool, m.(*machine), ensureNodeAgent)
}
wrappedMachines := wrap.MachinesService(context.machinesService, *lbCurrent, &dynamiclbmodel.VRRP{
VRRPInterface: "eth1",
NotifyMaster: notifyMaster(hostPools, current, poolsWithUnassignedVIPs),
AuthCheck: checkAuth,
}, desiredToCurrentVIP(current))
return func(pdf api.PushDesiredFunc) *orbiter.EnsureResult {
var done bool
return orbiter.ToEnsureResult(done, helpers.Fanout([]func() error{
func() error {
return helpers.Fanout(ensureTokens(context.monitor, []byte(desired.APIToken.Value), authChecks))()
},
func() error { return helpers.Fanout(ensureFIPs)() },
func() error { return helpers.Fanout(removeFIPs)() },
func() error { return helpers.Fanout(ensureServers)() },
func() error {
var err error
done, err = wrappedMachines.InitializeDesiredNodeAgents()
return err
},
})())
}, addPools(current, desired, wrappedMachines)
}