This repository has been archived by the owner on Apr 29, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
ensure.go
123 lines (104 loc) · 3.32 KB
/
ensure.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package static
import (
"sync"
"github.com/caos/orbos/internal/api"
"github.com/caos/orbos/internal/helpers"
"github.com/caos/orbos/internal/operator/orbiter/kinds/loadbalancers/dynamic/wrap"
"github.com/caos/orbos/internal/operator/orbiter/kinds/providers/core"
"github.com/pkg/errors"
"github.com/caos/orbos/internal/operator/common"
"github.com/caos/orbos/internal/operator/orbiter"
"github.com/caos/orbos/internal/operator/orbiter/kinds/clusters/core/infra"
dynamiclbmodel "github.com/caos/orbos/internal/operator/orbiter/kinds/loadbalancers/dynamic"
"github.com/caos/orbos/mntr"
)
func query(
desired *DesiredV0,
current *Current,
nodeAgentsDesired *common.DesiredNodeAgents,
nodeAgentsCurrent *common.CurrentNodeAgents,
lb interface{},
monitor mntr.Monitor,
internalMachinesService *machinesService,
naFuncs core.IterateNodeAgentFuncs,
orbiterCommit string,
) (ensureFunc orbiter.EnsureFunc, err error) {
// TODO: Allow Changes
desireHostnameFunc := desireHostname(desired.Spec.Pools, nodeAgentsDesired, nodeAgentsCurrent, monitor)
queryNA, installNA := naFuncs(nodeAgentsCurrent)
ensureNodeFunc := func(machine infra.Machine, pool string) error {
running, err := queryNA(machine, orbiterCommit)
if err != nil {
return err
}
if !running {
if err := installNA(machine); err != nil {
return err
}
}
_, err = desireHostnameFunc(machine, pool)
return err
}
internalMachinesService.onCreate = ensureNodeFunc
var externalMachinesService core.MachinesService = internalMachinesService
pools, err := internalMachinesService.ListPools()
if err != nil {
return nil, err
}
current.Current.Ingresses = make(map[string]*infra.Address)
ensureLBFunc := func() *orbiter.EnsureResult {
return &orbiter.EnsureResult{
Err: nil,
Done: true,
}
}
switch lbCurrent := lb.(type) {
case *dynamiclbmodel.Current:
mapVIP := func(vip *dynamiclbmodel.VIP) string {
return vip.IP
}
wrappedMachinesService := wrap.MachinesService(internalMachinesService, *lbCurrent, true, nil, mapVIP)
externalMachinesService = wrappedMachinesService
ensureLBFunc = func() *orbiter.EnsureResult {
return orbiter.ToEnsureResult(wrappedMachinesService.InitializeDesiredNodeAgents())
}
for _, pool := range lbCurrent.Current.Spec {
for _, vip := range pool {
for _, src := range vip.Transport {
current.Current.Ingresses[src.Name] = &infra.Address{
Location: vip.IP,
FrontendPort: uint16(src.FrontendPort),
BackendPort: uint16(src.BackendPort),
}
}
}
}
// case *externallbmodel.Current:
// for name, address := range lbCurrent.Current.Addresses {
// current.Current.Ingresses[name] = address
// }
default:
return nil, errors.Errorf("Unknown load balancer of type %T", lb)
}
return func(pdf api.PushDesiredFunc) *orbiter.EnsureResult {
var wg sync.WaitGroup
for _, pool := range pools {
machines, listErr := internalMachinesService.List(pool)
if listErr != nil {
err = helpers.Concat(err, listErr)
}
for _, machine := range machines {
wg.Add(1)
go func(m infra.Machine, p string) {
err = helpers.Concat(err, ensureNodeFunc(m, p))
wg.Done()
}(machine, pool)
}
}
wg.Wait()
if err != nil {
return orbiter.ToEnsureResult(false, err)
}
return ensureLBFunc()
}, addPools(current, desired, externalMachinesService)
}