forked from spotahome/kooper
/
operator.go
172 lines (147 loc) · 4.15 KB
/
operator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package operator
import (
"fmt"
"sync"
"time"
"golang.org/x/sync/errgroup"
"github.com/spotahome/kooper/log"
"github.com/spotahome/kooper/operator/controller"
"github.com/spotahome/kooper/operator/resource"
)
const (
tiemoutInitialization = 1 * time.Minute
)
// Operator is a controller, at code level have almost same contract of behaviour
// but at a higher level it need to initialize some resources (usually CRDs) before
// start its execution.
type Operator interface {
// Initialize knows how to initialize the resources.
Initialize() error
controller.Controller
}
// simpleOperator is an operator that initializes CRDs before starting
// the execution of controllers.
type simpleOperator struct {
crds []resource.CRD
controllers []controller.Controller
initialized bool
running bool
stateMu sync.Mutex
logger log.Logger
}
// NewOperator will return an operator that only manages one CRD
// and one Controller.
func NewOperator(crd resource.CRD, ctrlr controller.Controller, logger log.Logger) Operator {
return NewMultiOperator([]resource.CRD{crd}, []controller.Controller{ctrlr}, logger)
}
// NewMultiOperator returns an operator that has multiple CRDs and controllers.
func NewMultiOperator(crds []resource.CRD, ctrlrs []controller.Controller, logger log.Logger) Operator {
return &simpleOperator{
crds: crds,
controllers: ctrlrs,
logger: logger,
}
}
// Initialize will initializer all the CRDs and return. Satisfies Operator interface.
func (s *simpleOperator) Initialize() error {
if s.isInitialized() {
return nil
}
// Initialize CRDs.
var g errgroup.Group
for _, crd := range s.crds {
crd := crd
g.Go(func() error {
return crd.Initialize()
})
}
// Wait until everything is initialized.
errC := make(chan error)
go func() {
errC <- g.Wait()
}()
select {
case err := <-errC:
if err != nil {
return err
}
case <-time.After(tiemoutInitialization):
return fmt.Errorf("timeout initializing operator")
}
// All ok, we are ready to run.
s.logger.Infof("operator initialized")
s.setInitialized(true)
return nil
}
// Run will run the operator (a.k.a) the controllers and Initialize the CRDs.
// It's a blocking operation. Satisfies Operator interface. The client that uses an operator
// has the responsibility of closing the stop channel if the operator ends execution
// unexpectly so all the goroutines (controllers running) end its execution
func (s *simpleOperator) Run(stopC <-chan struct{}) error {
if s.isRunning() {
return fmt.Errorf("operator already running")
}
s.logger.Infof("starting operator")
s.setRunning(true)
defer s.setRunning(false)
if err := s.Initialize(); err != nil {
return err
}
errC := make(chan error)
go func() {
errC <- s.runAllControllers(stopC)
}()
// When stop signal is received we must stop or when an error is received from
// one of the controllers.
select {
case err := <-errC:
if err != nil {
return err
}
case <-stopC:
}
s.logger.Infof("stopping operator")
return nil
}
// runAllControllers will run controllers and block execution.
func (s *simpleOperator) runAllControllers(stopC <-chan struct{}) error {
errC := make(chan error)
for _, ctrl := range s.controllers {
ctrl := ctrl
go func() {
errC <- ctrl.Run(stopC)
}()
}
// Wait until any of the controllers end execution. All the controllers should be executing so
// if we receive any result (error or not) we should return an error
select {
case <-stopC:
return nil
case err := <-errC:
if err != nil {
return fmt.Errorf("a controller ended with an error: %s", err)
}
s.logger.Warningf("a controller stopped execution without error before operator received stop signal, stopping operator.")
return nil
}
}
func (s *simpleOperator) isInitialized() bool {
s.stateMu.Lock()
defer s.stateMu.Unlock()
return s.initialized
}
func (s *simpleOperator) setInitialized(value bool) {
s.stateMu.Lock()
defer s.stateMu.Unlock()
s.initialized = value
}
func (s *simpleOperator) isRunning() bool {
s.stateMu.Lock()
defer s.stateMu.Unlock()
return s.running
}
func (s *simpleOperator) setRunning(value bool) {
s.stateMu.Lock()
defer s.stateMu.Unlock()
s.running = value
}