forked from moby/swarmkit
/
manager.go
1264 lines (1101 loc) · 40.8 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package manager
import (
"context"
"crypto/tls"
"fmt"
"math"
"net"
"os"
"path/filepath"
"runtime"
"sync"
"syscall"
"time"
"github.com/docker/docker/pkg/plugingetter"
"github.com/docker/go-events"
gmetrics "github.com/docker/go-metrics"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/connectionbroker"
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/allocator"
"github.com/docker/swarmkit/manager/allocator/cnmallocator"
"github.com/docker/swarmkit/manager/allocator/networkallocator"
"github.com/docker/swarmkit/manager/controlapi"
"github.com/docker/swarmkit/manager/dispatcher"
"github.com/docker/swarmkit/manager/drivers"
"github.com/docker/swarmkit/manager/health"
"github.com/docker/swarmkit/manager/keymanager"
"github.com/docker/swarmkit/manager/logbroker"
"github.com/docker/swarmkit/manager/metrics"
"github.com/docker/swarmkit/manager/orchestrator/constraintenforcer"
"github.com/docker/swarmkit/manager/orchestrator/global"
"github.com/docker/swarmkit/manager/orchestrator/jobs"
"github.com/docker/swarmkit/manager/orchestrator/replicated"
"github.com/docker/swarmkit/manager/orchestrator/taskreaper"
"github.com/docker/swarmkit/manager/resourceapi"
"github.com/docker/swarmkit/manager/scheduler"
"github.com/docker/swarmkit/manager/state/raft"
"github.com/docker/swarmkit/manager/state/raft/transport"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/manager/watchapi"
"github.com/docker/swarmkit/remotes"
"github.com/docker/swarmkit/xnet"
gogotypes "github.com/gogo/protobuf/types"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
const (
// defaultTaskHistoryRetentionLimit is the number of tasks to keep.
defaultTaskHistoryRetentionLimit = 5
)
// RemoteAddrs provides a listening address and an optional advertise address
// for serving the remote API.
type RemoteAddrs struct {
// Address to bind
ListenAddr string
// Address to advertise to remote nodes (optional).
AdvertiseAddr string
}
// Config is used to tune the Manager.
type Config struct {
SecurityConfig *ca.SecurityConfig
// RootCAPaths is the path to which new root certs should be save
RootCAPaths ca.CertPaths
// ExternalCAs is a list of initial CAs to which a manager node
// will make certificate signing requests for node certificates.
ExternalCAs []*api.ExternalCA
// ControlAPI is an address for serving the control API.
ControlAPI string
// RemoteAPI is a listening address for serving the remote API, and
// an optional advertise address.
RemoteAPI *RemoteAddrs
// JoinRaft is an optional address of a node in an existing raft
// cluster to join.
JoinRaft string
// ForceJoin causes us to invoke raft's Join RPC even if already part
// of a cluster.
ForceJoin bool
// StateDir is the top-level state directory
StateDir string
// ForceNewCluster defines if we have to force a new cluster
// because we are recovering from a backup data directory.
ForceNewCluster bool
// ElectionTick defines the amount of ticks needed without
// leader to trigger a new election
ElectionTick uint32
// HeartbeatTick defines the amount of ticks between each
// heartbeat sent to other members for health-check purposes
HeartbeatTick uint32
// AutoLockManagers determines whether or not managers require an unlock key
// when starting from a stopped state. This configuration parameter is only
// applicable when bootstrapping a new cluster for the first time.
AutoLockManagers bool
// UnlockKey is the key to unlock a node - used for decrypting manager TLS keys
// as well as the raft data encryption key (DEK). It is applicable when
// bootstrapping a cluster for the first time (it's a cluster-wide setting),
// and also when loading up any raft data on disk (as a KEK for the raft DEK).
UnlockKey []byte
// Availability allows a user to control the current scheduling status of a node
Availability api.NodeSpec_Availability
// PluginGetter provides access to docker's plugin inventory.
PluginGetter plugingetter.PluginGetter
// FIPS is a boolean stating whether the node is FIPS enabled - if this is the
// first node in the cluster, this setting is used to set the cluster-wide mandatory
// FIPS setting.
FIPS bool
// NetworkConfig stores network related config for the cluster
NetworkConfig *cnmallocator.NetworkConfig
}
// Manager is the cluster manager for Swarm.
// This is the high-level object holding and initializing all the manager
// subsystems.
type Manager struct {
config Config
collector *metrics.Collector
caserver *ca.Server
dispatcher *dispatcher.Dispatcher
logbroker *logbroker.LogBroker
watchServer *watchapi.Server
replicatedOrchestrator *replicated.Orchestrator
globalOrchestrator *global.Orchestrator
jobsOrchestrator *jobs.Orchestrator
taskReaper *taskreaper.TaskReaper
constraintEnforcer *constraintenforcer.ConstraintEnforcer
scheduler *scheduler.Scheduler
allocator *allocator.Allocator
keyManager *keymanager.KeyManager
server *grpc.Server
localserver *grpc.Server
raftNode *raft.Node
dekRotator *RaftDEKManager
roleManager *roleManager
cancelFunc context.CancelFunc
// mu is a general mutex used to coordinate starting/stopping and
// leadership events.
mu sync.Mutex
// addrMu is a mutex that protects config.ControlAPI and config.RemoteAPI
addrMu sync.Mutex
started chan struct{}
stopped bool
remoteListener chan net.Listener
controlListener chan net.Listener
errServe chan error
}
var (
leaderMetric gmetrics.Gauge
)
func init() {
ns := gmetrics.NewNamespace("swarm", "manager", nil)
leaderMetric = ns.NewGauge("leader", "Indicates if this manager node is a leader", "")
gmetrics.Register(ns)
}
type closeOnceListener struct {
once sync.Once
net.Listener
}
func (l *closeOnceListener) Close() error {
var err error
l.once.Do(func() {
err = l.Listener.Close()
})
return err
}
// New creates a Manager which has not started to accept requests yet.
func New(config *Config) (*Manager, error) {
err := os.MkdirAll(config.StateDir, 0700)
if err != nil {
return nil, errors.Wrap(err, "failed to create state directory")
}
raftStateDir := filepath.Join(config.StateDir, "raft")
err = os.MkdirAll(raftStateDir, 0700)
if err != nil {
return nil, errors.Wrap(err, "failed to create raft state directory")
}
raftCfg := raft.DefaultNodeConfig()
if config.ElectionTick > 0 {
raftCfg.ElectionTick = int(config.ElectionTick)
}
if config.HeartbeatTick > 0 {
raftCfg.HeartbeatTick = int(config.HeartbeatTick)
}
dekRotator, err := NewRaftDEKManager(config.SecurityConfig.KeyWriter(), config.FIPS)
if err != nil {
return nil, err
}
newNodeOpts := raft.NodeOptions{
ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
JoinAddr: config.JoinRaft,
ForceJoin: config.ForceJoin,
Config: raftCfg,
StateDir: raftStateDir,
ForceNewCluster: config.ForceNewCluster,
TLSCredentials: config.SecurityConfig.ClientTLSCreds,
KeyRotator: dekRotator,
FIPS: config.FIPS,
}
raftNode := raft.NewNode(newNodeOpts)
// the interceptorWrappers are functions that wrap the prometheus grpc
// interceptor, and add some of code to log errors locally. one for stream
// and one for unary. this is needed because the grpc unary interceptor
// doesn't natively do chaining, you have to implement it in the caller.
// note that even though these are logging errors, we're still using
// debug level. returning errors from GRPC methods is common and expected,
// and logging an ERROR every time a user mistypes a service name would
// pollute the logs really fast.
//
// NOTE(dperny): Because of the fact that these functions are very simple
// in their operation and have no side effects other than the log output,
// they are not automatically tested. If you modify them later, make _sure_
// that they are correct. If you add substantial side effects, abstract
// these out and test them!
unaryInterceptorWrapper := func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// pass the call down into the grpc_prometheus interceptor
resp, err := grpc_prometheus.UnaryServerInterceptor(ctx, req, info, handler)
if err != nil {
log.G(ctx).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling rpc")
}
return resp, err
}
streamInterceptorWrapper := func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// we can't re-write a stream context, so don't bother creating a
// sub-context like in unary methods
// pass the call down into the grpc_prometheus interceptor
err := grpc_prometheus.StreamServerInterceptor(srv, ss, info, handler)
if err != nil {
log.G(ss.Context()).WithField("rpc", info.FullMethod).WithError(err).Debug("error handling streaming rpc")
}
return err
}
opts := []grpc.ServerOption{
grpc.Creds(config.SecurityConfig.ServerTLSCreds),
grpc.StreamInterceptor(streamInterceptorWrapper),
grpc.UnaryInterceptor(unaryInterceptorWrapper),
grpc.MaxRecvMsgSize(transport.GRPCMaxMsgSize),
}
m := &Manager{
config: *config,
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig),
dispatcher: dispatcher.New(),
logbroker: logbroker.New(raftNode.MemoryStore()),
watchServer: watchapi.NewServer(raftNode.MemoryStore()),
server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...),
raftNode: raftNode,
started: make(chan struct{}),
dekRotator: dekRotator,
remoteListener: make(chan net.Listener, 1),
controlListener: make(chan net.Listener, 1),
errServe: make(chan error, 2),
}
if config.ControlAPI != "" {
m.config.ControlAPI = ""
if err := m.BindControl(config.ControlAPI); err != nil {
return nil, err
}
}
if config.RemoteAPI != nil {
m.config.RemoteAPI = nil
// The context isn't used in this case (before (*Manager).Run).
if err := m.BindRemote(context.Background(), *config.RemoteAPI); err != nil {
if config.ControlAPI != "" {
l := <-m.controlListener
l.Close()
}
return nil, err
}
}
return m, nil
}
// BindControl binds a local socket for the control API.
func (m *Manager) BindControl(addr string) error {
m.addrMu.Lock()
defer m.addrMu.Unlock()
if m.config.ControlAPI != "" {
return errors.New("manager already has a control API address")
}
// don't create a socket directory if we're on windows. we used named pipe
if runtime.GOOS != "windows" {
err := os.MkdirAll(filepath.Dir(addr), 0700)
if err != nil {
return errors.Wrap(err, "failed to create socket directory")
}
}
l, err := xnet.ListenLocal(addr)
// A unix socket may fail to bind if the file already
// exists. Try replacing the file.
if runtime.GOOS != "windows" {
unwrappedErr := err
if op, ok := unwrappedErr.(*net.OpError); ok {
unwrappedErr = op.Err
}
if sys, ok := unwrappedErr.(*os.SyscallError); ok {
unwrappedErr = sys.Err
}
if unwrappedErr == syscall.EADDRINUSE {
os.Remove(addr)
l, err = xnet.ListenLocal(addr)
}
}
if err != nil {
return errors.Wrap(err, "failed to listen on control API address")
}
m.config.ControlAPI = addr
m.controlListener <- l
return nil
}
// BindRemote binds a port for the remote API.
func (m *Manager) BindRemote(ctx context.Context, addrs RemoteAddrs) error {
m.addrMu.Lock()
defer m.addrMu.Unlock()
if m.config.RemoteAPI != nil {
return errors.New("manager already has remote API address")
}
// If an AdvertiseAddr was specified, we use that as our
// externally-reachable address.
advertiseAddr := addrs.AdvertiseAddr
var advertiseAddrPort string
if advertiseAddr == "" {
// Otherwise, we know we are joining an existing swarm. Use a
// wildcard address to trigger remote autodetection of our
// address.
var err error
_, advertiseAddrPort, err = net.SplitHostPort(addrs.ListenAddr)
if err != nil {
return fmt.Errorf("missing or invalid listen address %s", addrs.ListenAddr)
}
// Even with an IPv6 listening address, it's okay to use
// 0.0.0.0 here. Any "unspecified" (wildcard) IP will
// be substituted with the actual source address.
advertiseAddr = net.JoinHostPort("0.0.0.0", advertiseAddrPort)
}
l, err := net.Listen("tcp", addrs.ListenAddr)
if err != nil {
return errors.Wrap(err, "failed to listen on remote API address")
}
if advertiseAddrPort == "0" {
advertiseAddr = l.Addr().String()
addrs.ListenAddr = advertiseAddr
}
m.config.RemoteAPI = &addrs
m.raftNode.SetAddr(ctx, advertiseAddr)
m.remoteListener <- l
return nil
}
// RemovedFromRaft returns a channel that's closed if the manager is removed
// from the raft cluster. This should be used to trigger a manager shutdown.
func (m *Manager) RemovedFromRaft() <-chan struct{} {
return m.raftNode.RemovedFromRaft
}
// Addr returns tcp address on which remote api listens.
func (m *Manager) Addr() string {
m.addrMu.Lock()
defer m.addrMu.Unlock()
if m.config.RemoteAPI == nil {
return ""
}
return m.config.RemoteAPI.ListenAddr
}
// Run starts all manager sub-systems and the gRPC server at the configured
// address.
// The call never returns unless an error occurs or `Stop()` is called.
func (m *Manager) Run(parent context.Context) error {
ctx, ctxCancel := context.WithCancel(parent)
defer ctxCancel()
m.cancelFunc = ctxCancel
leadershipCh, cancel := m.raftNode.SubscribeLeadership()
defer cancel()
go m.handleLeadershipEvents(ctx, leadershipCh)
authorize := func(ctx context.Context, roles []string) error {
var (
blacklistedCerts map[string]*api.BlacklistedCertificate
clusters []*api.Cluster
err error
)
m.raftNode.MemoryStore().View(func(readTx store.ReadTx) {
clusters, err = store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
})
// Not having a cluster object yet means we can't check
// the blacklist.
if err == nil && len(clusters) == 1 {
blacklistedCerts = clusters[0].BlacklistedCertificates
}
// Authorize the remote roles, ensure they can only be forwarded by managers
_, err = ca.AuthorizeForwardedRoleAndOrg(ctx, roles, []string{ca.ManagerRole}, m.config.SecurityConfig.ClientTLSCreds.Organization(), blacklistedCerts)
return err
}
baseControlAPI := controlapi.NewServer(m.raftNode.MemoryStore(), m.raftNode, m.config.SecurityConfig, m.config.PluginGetter, drivers.New(m.config.PluginGetter))
baseResourceAPI := resourceapi.New(m.raftNode.MemoryStore())
healthServer := health.NewHealthServer()
localHealthServer := health.NewHealthServer()
authenticatedControlAPI := api.NewAuthenticatedWrapperControlServer(baseControlAPI, authorize)
authenticatedWatchAPI := api.NewAuthenticatedWrapperWatchServer(m.watchServer, authorize)
authenticatedResourceAPI := api.NewAuthenticatedWrapperResourceAllocatorServer(baseResourceAPI, authorize)
authenticatedLogsServerAPI := api.NewAuthenticatedWrapperLogsServer(m.logbroker, authorize)
authenticatedLogBrokerAPI := api.NewAuthenticatedWrapperLogBrokerServer(m.logbroker, authorize)
authenticatedDispatcherAPI := api.NewAuthenticatedWrapperDispatcherServer(m.dispatcher, authorize)
authenticatedCAAPI := api.NewAuthenticatedWrapperCAServer(m.caserver, authorize)
authenticatedNodeCAAPI := api.NewAuthenticatedWrapperNodeCAServer(m.caserver, authorize)
authenticatedRaftAPI := api.NewAuthenticatedWrapperRaftServer(m.raftNode, authorize)
authenticatedHealthAPI := api.NewAuthenticatedWrapperHealthServer(healthServer, authorize)
authenticatedRaftMembershipAPI := api.NewAuthenticatedWrapperRaftMembershipServer(m.raftNode, authorize)
proxyDispatcherAPI := api.NewRaftProxyDispatcherServer(authenticatedDispatcherAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
proxyCAAPI := api.NewRaftProxyCAServer(authenticatedCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
proxyNodeCAAPI := api.NewRaftProxyNodeCAServer(authenticatedNodeCAAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
proxyRaftMembershipAPI := api.NewRaftProxyRaftMembershipServer(authenticatedRaftMembershipAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
proxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(authenticatedResourceAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
proxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(authenticatedLogBrokerAPI, m.raftNode, nil, ca.WithMetadataForwardTLSInfo)
// The following local proxies are only wired up to receive requests
// from a trusted local socket, and these requests don't use TLS,
// therefore the requests they handle locally should bypass
// authorization. When requests are proxied from these servers, they
// are sent as requests from this manager rather than forwarded
// requests (it has no TLS information to put in the metadata map).
forwardAsOwnRequest := func(ctx context.Context) (context.Context, error) { return ctx, nil }
handleRequestLocally := func(ctx context.Context) (context.Context, error) {
remoteAddr := "127.0.0.1:0"
m.addrMu.Lock()
if m.config.RemoteAPI != nil {
if m.config.RemoteAPI.AdvertiseAddr != "" {
remoteAddr = m.config.RemoteAPI.AdvertiseAddr
} else {
remoteAddr = m.config.RemoteAPI.ListenAddr
}
}
m.addrMu.Unlock()
creds := m.config.SecurityConfig.ClientTLSCreds
nodeInfo := ca.RemoteNodeInfo{
Roles: []string{creds.Role()},
Organization: creds.Organization(),
NodeID: creds.NodeID(),
RemoteAddr: remoteAddr,
}
return context.WithValue(ctx, ca.LocalRequestKey, nodeInfo), nil
}
localProxyControlAPI := api.NewRaftProxyControlServer(baseControlAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
localProxyLogsAPI := api.NewRaftProxyLogsServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
localProxyDispatcherAPI := api.NewRaftProxyDispatcherServer(m.dispatcher, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
localProxyCAAPI := api.NewRaftProxyCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
localProxyNodeCAAPI := api.NewRaftProxyNodeCAServer(m.caserver, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
localProxyResourceAPI := api.NewRaftProxyResourceAllocatorServer(baseResourceAPI, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
localProxyLogBrokerAPI := api.NewRaftProxyLogBrokerServer(m.logbroker, m.raftNode, handleRequestLocally, forwardAsOwnRequest)
// Everything registered on m.server should be an authenticated
// wrapper, or a proxy wrapping an authenticated wrapper!
api.RegisterCAServer(m.server, proxyCAAPI)
api.RegisterNodeCAServer(m.server, proxyNodeCAAPI)
api.RegisterRaftServer(m.server, authenticatedRaftAPI)
api.RegisterHealthServer(m.server, authenticatedHealthAPI)
api.RegisterRaftMembershipServer(m.server, proxyRaftMembershipAPI)
api.RegisterControlServer(m.server, authenticatedControlAPI)
api.RegisterWatchServer(m.server, authenticatedWatchAPI)
api.RegisterLogsServer(m.server, authenticatedLogsServerAPI)
api.RegisterLogBrokerServer(m.server, proxyLogBrokerAPI)
api.RegisterResourceAllocatorServer(m.server, proxyResourceAPI)
api.RegisterDispatcherServer(m.server, proxyDispatcherAPI)
grpc_prometheus.Register(m.server)
api.RegisterControlServer(m.localserver, localProxyControlAPI)
api.RegisterWatchServer(m.localserver, m.watchServer)
api.RegisterLogsServer(m.localserver, localProxyLogsAPI)
api.RegisterHealthServer(m.localserver, localHealthServer)
api.RegisterDispatcherServer(m.localserver, localProxyDispatcherAPI)
api.RegisterCAServer(m.localserver, localProxyCAAPI)
api.RegisterNodeCAServer(m.localserver, localProxyNodeCAAPI)
api.RegisterResourceAllocatorServer(m.localserver, localProxyResourceAPI)
api.RegisterLogBrokerServer(m.localserver, localProxyLogBrokerAPI)
grpc_prometheus.Register(m.localserver)
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_NOT_SERVING)
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_NOT_SERVING)
if err := m.watchServer.Start(ctx); err != nil {
log.G(ctx).WithError(err).Error("watch server failed to start")
}
go m.serveListener(ctx, m.remoteListener)
go m.serveListener(ctx, m.controlListener)
defer func() {
m.server.Stop()
m.localserver.Stop()
}()
// Set the raft server as serving for the health server
healthServer.SetServingStatus("Raft", api.HealthCheckResponse_SERVING)
if err := m.raftNode.JoinAndStart(ctx); err != nil {
// Don't block future calls to Stop.
close(m.started)
return errors.Wrap(err, "can't initialize raft node")
}
localHealthServer.SetServingStatus("ControlAPI", api.HealthCheckResponse_SERVING)
// Start metrics collection.
m.collector = metrics.NewCollector(m.raftNode.MemoryStore())
go func(collector *metrics.Collector) {
if err := collector.Run(ctx); err != nil {
log.G(ctx).WithError(err).Error("collector failed with an error")
}
}(m.collector)
close(m.started)
go func() {
err := m.raftNode.Run(ctx)
if err != nil {
log.G(ctx).WithError(err).Error("raft node stopped")
m.Stop(ctx, false)
}
}()
if err := raft.WaitForLeader(ctx, m.raftNode); err != nil {
return err
}
c, err := raft.WaitForCluster(ctx, m.raftNode)
if err != nil {
return err
}
raftConfig := c.Spec.Raft
if err := m.watchForClusterChanges(ctx); err != nil {
return err
}
if int(raftConfig.ElectionTick) != m.raftNode.Config.ElectionTick {
log.G(ctx).Warningf("election tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.ElectionTick, raftConfig.ElectionTick)
}
if int(raftConfig.HeartbeatTick) != m.raftNode.Config.HeartbeatTick {
log.G(ctx).Warningf("heartbeat tick value (%ds) is different from the one defined in the cluster config (%vs), the cluster may be unstable", m.raftNode.Config.HeartbeatTick, raftConfig.HeartbeatTick)
}
// wait for an error in serving.
err = <-m.errServe
m.mu.Lock()
if m.stopped {
m.mu.Unlock()
return nil
}
m.mu.Unlock()
m.Stop(ctx, false)
return err
}
const stopTimeout = 8 * time.Second
// Stop stops the manager. It immediately closes all open connections and
// active RPCs as well as stopping the manager's subsystems. If clearData is
// set, the raft logs, snapshots, and keys will be erased.
func (m *Manager) Stop(ctx context.Context, clearData bool) {
log.G(ctx).Info("Stopping manager")
// It's not safe to start shutting down while the manager is still
// starting up.
<-m.started
// the mutex stops us from trying to stop while we're already stopping, or
// from returning before we've finished stopping.
m.mu.Lock()
defer m.mu.Unlock()
if m.stopped {
return
}
m.stopped = true
srvDone, localSrvDone := make(chan struct{}), make(chan struct{})
go func() {
m.server.GracefulStop()
close(srvDone)
}()
go func() {
m.localserver.GracefulStop()
close(localSrvDone)
}()
m.raftNode.Cancel()
if m.collector != nil {
m.collector.Stop()
}
// The following components are gRPC services that are
// registered when creating the manager and will need
// to be re-registered if they are recreated.
// For simplicity, they are not nilled out.
m.dispatcher.Stop()
m.logbroker.Stop()
m.watchServer.Stop()
m.caserver.Stop()
if m.allocator != nil {
m.allocator.Stop()
}
if m.replicatedOrchestrator != nil {
m.replicatedOrchestrator.Stop()
}
if m.globalOrchestrator != nil {
m.globalOrchestrator.Stop()
}
if m.jobsOrchestrator != nil {
m.jobsOrchestrator.Stop()
}
if m.taskReaper != nil {
m.taskReaper.Stop()
}
if m.constraintEnforcer != nil {
m.constraintEnforcer.Stop()
}
if m.scheduler != nil {
m.scheduler.Stop()
}
if m.roleManager != nil {
m.roleManager.Stop()
}
if m.keyManager != nil {
m.keyManager.Stop()
}
if clearData {
m.raftNode.ClearData()
}
m.cancelFunc()
<-m.raftNode.Done()
timer := time.AfterFunc(stopTimeout, func() {
m.server.Stop()
m.localserver.Stop()
})
defer timer.Stop()
// TODO: we're not waiting on ctx because it very well could be passed from Run,
// which is already cancelled here. We need to refactor that.
select {
case <-srvDone:
<-localSrvDone
case <-localSrvDone:
<-srvDone
}
log.G(ctx).Info("Manager shut down")
// mutex is released and Run can return now
}
func (m *Manager) updateKEK(ctx context.Context, cluster *api.Cluster) error {
securityConfig := m.config.SecurityConfig
nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
logger := log.G(ctx).WithFields(logrus.Fields{
"node.id": nodeID,
"node.role": ca.ManagerRole,
})
kekData := ca.KEKData{Version: cluster.Meta.Version.Index}
for _, encryptionKey := range cluster.UnlockKeys {
if encryptionKey.Subsystem == ca.ManagerRole {
kekData.KEK = encryptionKey.Key
break
}
}
updated, unlockedToLocked, err := m.dekRotator.MaybeUpdateKEK(kekData)
if err != nil {
logger.WithError(err).Errorf("failed to re-encrypt TLS key with a new KEK")
return err
}
if updated {
logger.Debug("successfully rotated KEK")
}
if unlockedToLocked {
// a best effort attempt to update the TLS certificate - if it fails, it'll be updated the next time it renews;
// don't wait because it might take a bit
go func() {
insecureCreds := credentials.NewTLS(&tls.Config{InsecureSkipVerify: true})
conn, err := grpc.Dial(
m.config.ControlAPI,
grpc.WithUnaryInterceptor(grpc_prometheus.UnaryClientInterceptor),
grpc.WithStreamInterceptor(grpc_prometheus.StreamClientInterceptor),
grpc.WithTransportCredentials(insecureCreds),
grpc.WithDialer(
func(addr string, timeout time.Duration) (net.Conn, error) {
return xnet.DialTimeoutLocal(addr, timeout)
}),
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(math.MaxInt32)),
)
if err != nil {
logger.WithError(err).Error("failed to connect to local manager socket after locking the cluster")
return
}
defer conn.Close()
connBroker := connectionbroker.New(remotes.NewRemotes())
connBroker.SetLocalConn(conn)
if err := ca.RenewTLSConfigNow(ctx, securityConfig, connBroker, m.config.RootCAPaths); err != nil {
logger.WithError(err).Error("failed to download new TLS certificate after locking the cluster")
}
}()
}
return nil
}
func (m *Manager) watchForClusterChanges(ctx context.Context) error {
clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
var cluster *api.Cluster
clusterWatch, clusterWatchCancel, err := store.ViewAndWatch(m.raftNode.MemoryStore(),
func(tx store.ReadTx) error {
cluster = store.GetCluster(tx, clusterID)
if cluster == nil {
return fmt.Errorf("unable to get current cluster")
}
return nil
},
api.EventUpdateCluster{
Cluster: &api.Cluster{ID: clusterID},
Checks: []api.ClusterCheckFunc{api.ClusterCheckID},
},
)
if err != nil {
return err
}
if err := m.updateKEK(ctx, cluster); err != nil {
return err
}
go func() {
for {
select {
case event := <-clusterWatch:
clusterEvent := event.(api.EventUpdateCluster)
m.updateKEK(ctx, clusterEvent.Cluster)
case <-ctx.Done():
clusterWatchCancel()
return
}
}
}()
return nil
}
// getLeaderNodeID is a small helper function returning a string with the
// leader's node ID. it is only used for logging, and should not be relied on
// to give a node ID for actual operational purposes (because it returns errors
// as nicely decorated strings)
func (m *Manager) getLeaderNodeID() string {
// get the current leader ID. this variable tracks the leader *only* for
// the purposes of logging leadership changes, and should not be relied on
// for other purposes
leader, leaderErr := m.raftNode.Leader()
switch leaderErr {
case raft.ErrNoRaftMember:
// this is an unlikely case, but we have to handle it. this means this
// node is not a member of the raft quorum. this won't look very pretty
// in logs ("leadership changed from aslkdjfa to ErrNoRaftMember") but
// it also won't be very common
return "not yet part of a raft cluster"
case raft.ErrNoClusterLeader:
return "no cluster leader"
default:
id, err := m.raftNode.GetNodeIDByRaftID(leader)
// the only possible error here is "ErrMemberUnknown"
if err != nil {
return "an unknown node"
}
return id
}
}
// handleLeadershipEvents handles the is leader event or is follower event.
func (m *Manager) handleLeadershipEvents(ctx context.Context, leadershipCh chan events.Event) {
// get the current leader and save it for logging leadership changes in
// this loop
oldLeader := m.getLeaderNodeID()
for {
select {
case leadershipEvent := <-leadershipCh:
m.mu.Lock()
if m.stopped {
m.mu.Unlock()
return
}
newState := leadershipEvent.(raft.LeadershipState)
if newState == raft.IsLeader {
m.becomeLeader(ctx)
leaderMetric.Set(1)
} else if newState == raft.IsFollower {
m.becomeFollower()
leaderMetric.Set(0)
}
m.mu.Unlock()
newLeader := m.getLeaderNodeID()
// maybe we should use logrus fields for old and new leader, so
// that users are better able to ingest leadership changes into log
// aggregators?
log.G(ctx).Infof("leadership changed from %v to %v", oldLeader, newLeader)
case <-ctx.Done():
return
}
}
}
// serveListener serves a listener for local and non local connections.
func (m *Manager) serveListener(ctx context.Context, lCh <-chan net.Listener) {
var l net.Listener
select {
case l = <-lCh:
case <-ctx.Done():
return
}
ctx = log.WithLogger(ctx, log.G(ctx).WithFields(
logrus.Fields{
"proto": l.Addr().Network(),
"addr": l.Addr().String(),
}))
if _, ok := l.(*net.TCPListener); !ok {
log.G(ctx).Info("Listening for local connections")
// we need to disallow double closes because UnixListener.Close
// can delete unix-socket file of newer listener. grpc calls
// Close twice indeed: in Serve and in Stop.
m.errServe <- m.localserver.Serve(&closeOnceListener{Listener: l})
} else {
log.G(ctx).Info("Listening for connections")
m.errServe <- m.server.Serve(l)
}
}
// becomeLeader starts the subsystems that are run on the leader.
func (m *Manager) becomeLeader(ctx context.Context) {
s := m.raftNode.MemoryStore()
rootCA := m.config.SecurityConfig.RootCA()
nodeID := m.config.SecurityConfig.ClientTLSCreds.NodeID()
raftCfg := raft.DefaultRaftConfig()
raftCfg.ElectionTick = uint32(m.raftNode.Config.ElectionTick)
raftCfg.HeartbeatTick = uint32(m.raftNode.Config.HeartbeatTick)
clusterID := m.config.SecurityConfig.ClientTLSCreds.Organization()
initialCAConfig := ca.DefaultCAConfig()
initialCAConfig.ExternalCAs = m.config.ExternalCAs
var (
unlockKeys []*api.EncryptionKey
err error
)
if m.config.AutoLockManagers {
unlockKeys = []*api.EncryptionKey{{
Subsystem: ca.ManagerRole,
Key: m.config.UnlockKey,
}}
}
s.Update(func(tx store.Tx) error {
// Add a default cluster object to the
// store. Don't check the error because
// we expect this to fail unless this
// is a brand new cluster.
clusterObj := defaultClusterObject(
clusterID,
initialCAConfig,
raftCfg,
api.EncryptionConfig{AutoLockManagers: m.config.AutoLockManagers},
unlockKeys,
rootCA,
m.config.FIPS,
nil,
0,
0)
// If defaultAddrPool is valid we update cluster object with new value
// If VXLANUDPPort is not 0 then we call update cluster object with new value
if m.config.NetworkConfig != nil {
if m.config.NetworkConfig.DefaultAddrPool != nil {
clusterObj.DefaultAddressPool = m.config.NetworkConfig.DefaultAddrPool
clusterObj.SubnetSize = m.config.NetworkConfig.SubnetSize
}
if m.config.NetworkConfig.VXLANUDPPort != 0 {
clusterObj.VXLANUDPPort = m.config.NetworkConfig.VXLANUDPPort
}
}
err := store.CreateCluster(tx, clusterObj)
if err != nil && err != store.ErrExist {
log.G(ctx).WithError(err).Errorf("error creating cluster object")
}
// Add Node entry for ourself, if one
// doesn't exist already.
freshCluster := nil == store.CreateNode(tx, managerNode(nodeID, m.config.Availability, clusterObj.VXLANUDPPort))
if freshCluster {
// This is a fresh swarm cluster. Add to store now any initial
// cluster resource, like the default ingress network which
// provides the routing mesh for this cluster.
log.G(ctx).Info("Creating default ingress network")
if err := store.CreateNetwork(tx, newIngressNetwork()); err != nil {
log.G(ctx).WithError(err).Error("failed to create default ingress network")
}
}
// Create now the static predefined if the store does not contain predefined
// networks like bridge/host node-local networks which
// are known to be present in each cluster node. This is needed
// in order to allow running services on the predefined docker
// networks like `bridge` and `host`.
for _, p := range allocator.PredefinedNetworks() {
if err := store.CreateNetwork(tx, newPredefinedNetwork(p.Name, p.Driver)); err != nil && err != store.ErrNameConflict {
log.G(ctx).WithError(err).Error("failed to create predefined network " + p.Name)
}
}
return nil
})
m.replicatedOrchestrator = replicated.NewReplicatedOrchestrator(s)
m.constraintEnforcer = constraintenforcer.New(s)