forked from coreos/fleet
/
registrymux.go
347 lines (300 loc) · 10.4 KB
/
registrymux.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
// Copyright 2016 The fleet Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package rpc
import (
"errors"
"fmt"
"net"
"sync"
"time"
"github.com/coreos/go-semver/semver"
"github.com/coreos/fleet/engine"
"github.com/coreos/fleet/job"
"github.com/coreos/fleet/log"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/pkg/lease"
"github.com/coreos/fleet/registry"
"github.com/coreos/fleet/unit"
)
type RegistryMux struct {
etcdRegistry *registry.EtcdRegistry
localMachine machine.Machine
rpcserver *rpcserver
currentRegistry registry.Registry
rpcRegistry *RPCRegistry
currentEngine machine.MachineState
leaseManager lease.Manager
handlingEngineChange *sync.RWMutex
}
const (
dialRegistryReconnectTimeout = 200 * time.Millisecond
engineLeaderKeyPath = "engine-leader"
)
func NewRegistryMux(etcdRegistry *registry.EtcdRegistry, localMachine machine.Machine, leaseManager lease.Manager) *RegistryMux {
return &RegistryMux{
etcdRegistry: etcdRegistry,
localMachine: localMachine,
handlingEngineChange: new(sync.RWMutex),
leaseManager: leaseManager,
}
}
// ConnectToRegistry allows to disable_engine fleet agents to adapt its Registry
// to fleet leader changes regardless of whether is etcd or gRPC based.
func (r *RegistryMux) ConnectToRegistry(e *engine.Engine) {
for {
// We have to check if the leader has changed to etcd otherwise keep grpc connection
isGrpc, err := e.IsGrpcLeader()
// If there is not error then we are able to get the leader state and continue
// otherwise we have to wait
if err == nil {
if isGrpc {
if r.rpcRegistry != nil && r.rpcRegistry.IsRegistryReady() {
log.Infof("Reusing gRPC engine, connection is READY\n")
r.currentRegistry = r.rpcRegistry
} else {
if r.rpcRegistry != nil {
r.rpcRegistry.Close()
}
log.Infof("New engine supports gRPC, connecting\n")
r.rpcRegistry = NewRPCRegistry(r.rpcDialerNoEngine)
// connect to rpc registry
r.rpcRegistry.Connect()
r.currentRegistry = r.rpcRegistry
}
} else {
if r.rpcRegistry != nil {
r.rpcRegistry.Close()
}
// new leader is etcd-based
r.currentRegistry = r.etcdRegistry
}
}
time.Sleep(5 * time.Second)
}
}
func (r *RegistryMux) rpcDialerNoEngine(_ string, timeout time.Duration) (net.Conn, error) {
ticker := time.Tick(dialRegistryReconnectTimeout)
// Timeout re-defined to call etcd every 5secs to get the leader
timeout = 5 * time.Second
check := time.After(timeout)
for {
select {
case <-check:
log.Errorf("Unable to connect to engine %s\n", r.currentEngine.PublicIP)
// Get the new engine leader of the cluster out of etcd
lease, err := r.leaseManager.GetLease(engineLeaderKeyPath)
// Key found
if err == nil && lease != nil {
var err error
machines, err := r.etcdRegistry.Machines()
if err != nil {
log.Errorf("Unable to get the machines of the cluster %v\n", err)
return nil, errors.New("Unable to get the machines of the cluster")
}
for _, s := range machines {
// Update the currentEngine with the new one... otherwise wait until
// there is one
if s.ID == lease.MachineID() {
// New leader has not gRPC capabilities enabled.
if !s.Capabilities.Has(machine.CapGRPC) {
log.Error("New leader engine has not gRPC enabled!")
return nil, errors.New("New leader engine has not gRPC enabled!")
}
r.currentEngine = s
log.Infof("Found a new engine to connect to: %s\n", r.currentEngine.PublicIP)
// Restore initial check configuration
timeout = 5 * time.Second
check = time.After(timeout)
}
}
} else {
timeout = 2 * time.Second
log.Errorf("Unable to get the leader engine, retrying in %v...", timeout)
check = time.After(timeout)
}
case <-ticker:
addr := fmt.Sprintf("%s:%d", r.currentEngine.PublicIP, rpcServerPort)
conn, err := net.Dial("tcp", addr)
if err == nil {
log.Infof("Connected to engine on %s\n", r.currentEngine.PublicIP)
return conn, nil
}
log.Errorf("Retry to connect to new engine: %+v", err)
}
}
}
func (r *RegistryMux) rpcDialer(_ string, timeout time.Duration) (net.Conn, error) {
ticker := time.Tick(dialRegistryReconnectTimeout)
alert := time.After(timeout)
for {
select {
case <-alert:
log.Errorf("Unable to connect to engine %s\n", r.currentEngine.PublicIP)
return nil, errors.New("Unable to connect to new engine, the client connection is closing")
case <-ticker:
addr := fmt.Sprintf("%s:%d", r.currentEngine.PublicIP, rpcServerPort)
conn, err := net.Dial("tcp", addr)
if err == nil {
log.Infof("Connected to engine on %s\n", r.currentEngine.PublicIP)
return conn, nil
}
log.Errorf("Retry to connect to new engine: %+v", err)
}
}
}
func (r *RegistryMux) EngineChanged(newEngine machine.MachineState) {
r.handlingEngineChange.Lock()
defer r.handlingEngineChange.Unlock()
stopServer := false
if r.currentEngine.ID != newEngine.ID {
stopServer = true
}
r.currentEngine = newEngine
log.Infof("Engine changed, checking capabilities %+v", newEngine)
if r.localMachine.State().Capabilities.Has(machine.CapGRPC) {
if r.rpcserver != nil && ((r.rpcRegistry != nil && !r.rpcRegistry.IsRegistryReady()) || stopServer) {
// If the engine changed, we need to stop the rpc server
r.rpcserver.Stop()
r.rpcserver = nil
}
if newEngine.ID == r.localMachine.State().ID {
if r.rpcserver == nil {
// start rpc server
log.Infof("Starting rpc server...\n")
var err error
r.rpcserver, err = NewRPCServer(r.etcdRegistry, newEngine.PublicIP)
if err != nil {
log.Fatalf("Unable to create rpc server %+v", err)
}
go func() {
errc := make(chan error, 1)
if errc <- r.rpcserver.Start(); <-errc != nil {
log.Fatalf("Failed to serve gRPC requests on listener: %v", <-errc)
}
}()
}
}
if newEngine.Capabilities.Has(machine.CapGRPC) {
if r.rpcRegistry != nil && r.rpcRegistry.IsRegistryReady() {
log.Infof("Reusing gRPC engine, connection is READY\n")
r.currentRegistry = r.rpcRegistry
} else {
log.Infof("New engine supports gRPC, connecting\n")
r.rpcRegistry = NewRPCRegistry(r.rpcDialer)
// connect to rpc registry
r.rpcRegistry.Connect()
r.currentRegistry = r.rpcRegistry
}
} else {
log.Infof("Falling back to etcd registry\n")
if r.rpcserver != nil {
// If the engine changed to a non gRPC leader, we need to stop the server
r.rpcserver.Stop()
}
r.currentRegistry = r.etcdRegistry
}
} else {
log.Infof("Falling back to etcd registry\n")
r.currentRegistry = r.etcdRegistry
}
}
func (r *RegistryMux) getRegistry() registry.Registry {
r.handlingEngineChange.RLock()
defer r.handlingEngineChange.RUnlock()
if r.currentRegistry == nil {
return r.etcdRegistry
}
return r.currentRegistry
}
func (r *RegistryMux) IsRegistryReady() bool {
return r.getRegistry().IsRegistryReady()
}
func (r *RegistryMux) UseEtcdRegistry() bool {
return r.getRegistry().UseEtcdRegistry()
}
func (r *RegistryMux) ClearUnitHeartbeat(name string) {
r.getRegistry().ClearUnitHeartbeat(name)
}
func (r *RegistryMux) CreateUnit(unit *job.Unit) error {
return r.getRegistry().CreateUnit(unit)
}
func (r *RegistryMux) CreateMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error) {
return r.etcdRegistry.CreateMachineState(ms, ttl)
}
func (r *RegistryMux) DestroyUnit(unit string) error {
return r.getRegistry().DestroyUnit(unit)
}
func (r *RegistryMux) UnitHeartbeat(name string, machID string, ttl time.Duration) error {
return r.getRegistry().UnitHeartbeat(name, machID, ttl)
}
func (r *RegistryMux) Machines() ([]machine.MachineState, error) {
return r.etcdRegistry.Machines()
}
func (r *RegistryMux) RemoveMachineState(machID string) error {
return r.etcdRegistry.RemoveMachineState(machID)
}
func (r *RegistryMux) RemoveUnitState(jobName string) error {
return r.getRegistry().RemoveUnitState(jobName)
}
func (r *RegistryMux) SaveUnitState(jobName string, unitState *unit.UnitState, ttl time.Duration) {
r.getRegistry().SaveUnitState(jobName, unitState, ttl)
}
func (r *RegistryMux) ScheduleUnit(name string, machID string) error {
return r.getRegistry().ScheduleUnit(name, machID)
}
func (r *RegistryMux) SetUnitTargetState(name string, state job.JobState) error {
return r.getRegistry().SetUnitTargetState(name, state)
}
func (r *RegistryMux) MachineState(machID string) (machine.MachineState, error) {
return r.etcdRegistry.MachineState(machID)
}
func (r *RegistryMux) SetMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error) {
return r.etcdRegistry.SetMachineState(ms, ttl)
}
func (r *RegistryMux) UnscheduleUnit(name string, machID string) error {
return r.getRegistry().UnscheduleUnit(name, machID)
}
func (r *RegistryMux) Schedule() ([]job.ScheduledUnit, error) {
return r.getRegistry().Schedule()
}
func (r *RegistryMux) ScheduledUnit(name string) (*job.ScheduledUnit, error) {
return r.getRegistry().ScheduledUnit(name)
}
func (r *RegistryMux) Unit(name string) (*job.Unit, error) {
return r.getRegistry().Unit(name)
}
func (r *RegistryMux) Units() ([]job.Unit, error) {
return r.getRegistry().Units()
}
func (r *RegistryMux) UnitState(name string) (*unit.UnitState, error) {
return r.getRegistry().UnitState(name)
}
func (r *RegistryMux) UnitStates() ([]*unit.UnitState, error) {
return r.getRegistry().UnitStates()
}
func (r *RegistryMux) LatestDaemonVersion() (*semver.Version, error) {
return r.etcdRegistry.LatestDaemonVersion()
}
func (r *RegistryMux) EngineVersion() (int, error) {
return r.etcdRegistry.EngineVersion()
}
func (r *RegistryMux) UpdateEngineVersion(from int, to int) error {
return r.etcdRegistry.UpdateEngineVersion(from, to)
}
func (r *RegistryMux) SetMachineMetadata(machID string, key string, value string) error {
return r.etcdRegistry.SetMachineMetadata(machID, key, value)
}
func (r *RegistryMux) DeleteMachineMetadata(machID string, key string) error {
return r.etcdRegistry.DeleteMachineMetadata(machID, key)
}