forked from hashicorp/nomad
-
Notifications
You must be signed in to change notification settings - Fork 0
/
syncer.go
1016 lines (908 loc) · 31.6 KB
/
syncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Package consul is used by Nomad to register all services both static services
// and dynamic via allocations.
//
// Consul Service IDs have the following format: ${nomadServicePrefix}-${groupName}-${serviceKey}
// groupName takes on one of the following values:
// - server
// - client
// - executor-${alloc-id}-${task-name}
//
// serviceKey should be generated by service registrators.
// If the serviceKey is being generated by the executor for a Nomad Task.Services
// the following helper should be used:
// NOTE: Executor should interpolate the service prior to calling
// func GenerateTaskServiceKey(service *structs.Service) string
//
// The Nomad Client reaps services registered from dead allocations that were
// not properly cleaned up by the executor (this is not the expected case).
//
// TODO fix this comment
// The Consul ServiceIDs generated by the executor will contain the allocation
// ID. Thus the client can generate the list of Consul ServiceIDs to keep by
// calling the following method on all running allocations the client is aware
// of:
// func GenerateExecutorServiceKeyPrefixFromAlloc(allocID string) string
package consul
import (
"fmt"
"log"
"net"
"net/url"
"strconv"
"strings"
"sync"
"time"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/nomad/types"
)
const (
// initialSyncBuffer is the max time an initial sync will sleep
// before syncing.
initialSyncBuffer = 30 * time.Second
// initialSyncDelay is the delay before an initial sync.
initialSyncDelay = 5 * time.Second
// nomadServicePrefix is the first prefix that scopes all Nomad registered
// services
nomadServicePrefix = "_nomad"
// The periodic time interval for syncing services and checks with Consul
defaultSyncInterval = 6 * time.Second
// defaultSyncJitter provides a little variance in the frequency at which
// Syncer polls Consul.
defaultSyncJitter = time.Second
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
ttlCheckBuffer = 31 * time.Second
// DefaultQueryWaitDuration is the max duration the Consul Agent will
// spend waiting for a response from a Consul Query.
DefaultQueryWaitDuration = 2 * time.Second
// ServiceTagHTTP is the tag assigned to HTTP services
ServiceTagHTTP = "http"
// ServiceTagRPC is the tag assigned to RPC services
ServiceTagRPC = "rpc"
// ServiceTagSerf is the tag assigned to Serf services
ServiceTagSerf = "serf"
)
// consulServiceID and consulCheckID are the IDs registered with Consul
type consulServiceID string
type consulCheckID string
// ServiceKey is the generated service key that is used to build the Consul
// ServiceID
type ServiceKey string
// ServiceDomain is the domain of services registered by Nomad
type ServiceDomain string
const (
ClientDomain ServiceDomain = "client"
ServerDomain ServiceDomain = "server"
)
// NewExecutorDomain returns a domain specific to the alloc ID and task
func NewExecutorDomain(allocID, task string) ServiceDomain {
return ServiceDomain(fmt.Sprintf("executor-%s-%s", allocID, task))
}
// Syncer allows syncing of services and checks with Consul
type Syncer struct {
client *consul.Client
consulAvailable bool
// servicesGroups and checkGroups are named groups of services and checks
// respectively that will be flattened and reconciled with Consul when
// SyncServices() is called. The key to the servicesGroups map is unique
// per handler and is used to allow the Agent's services to be maintained
// independently of the Client or Server's services.
servicesGroups map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration
checkGroups map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration
groupsLock sync.RWMutex
// The "Consul Registry" is a collection of Consul Services and
// Checks all guarded by the registryLock.
registryLock sync.RWMutex
// checkRunners are delegated Consul checks being ran by the Syncer
checkRunners map[consulCheckID]*CheckRunner
addrFinder func(portLabel string) (string, int)
createDelegatedCheck func(*structs.ServiceCheck, string) (Check, error)
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
// End registryLock guarded attributes.
logger *log.Logger
shutdownCh chan struct{}
shutdown bool
shutdownLock sync.Mutex
// notifyShutdownCh is used to notify a Syncer it needs to shutdown.
// This can happen because there was an explicit call to the Syncer's
// Shutdown() method, or because the calling task signaled the
// program is going to exit by closing its shutdownCh.
notifyShutdownCh chan struct{}
// periodicCallbacks is walked sequentially when the timer in Run
// fires.
periodicCallbacks map[string]types.PeriodicCallback
notifySyncCh chan struct{}
periodicLock sync.RWMutex
// The periodic time interval for syncing services and checks with Consul
syncInterval time.Duration
// syncJitter provides a little variance in the frequency at which
// Syncer polls Consul.
syncJitter time.Duration
}
// NewSyncer returns a new consul.Syncer
func NewSyncer(consulConfig *config.ConsulConfig, shutdownCh chan struct{}, logger *log.Logger) (*Syncer, error) {
var consulClientConfig *consul.Config
var err error
consulClientConfig, err = consulConfig.ApiConfig()
if err != nil {
return nil, err
}
var consulClient *consul.Client
if consulClient, err = consul.NewClient(consulClientConfig); err != nil {
return nil, err
}
consulSyncer := Syncer{
client: consulClient,
logger: logger,
consulAvailable: true,
shutdownCh: shutdownCh,
servicesGroups: make(map[ServiceDomain]map[ServiceKey]*consul.AgentServiceRegistration),
checkGroups: make(map[ServiceDomain]map[ServiceKey][]*consul.AgentCheckRegistration),
checkRunners: make(map[consulCheckID]*CheckRunner),
periodicCallbacks: make(map[string]types.PeriodicCallback),
notifySyncCh: make(chan struct{}, 1),
// default noop implementation of addrFinder
addrFinder: func(string) (string, int) { return "", 0 },
syncInterval: defaultSyncInterval,
syncJitter: defaultSyncJitter,
}
return &consulSyncer, nil
}
// SetDelegatedChecks sets the checks that nomad is going to run and report the
// result back to consul
func (c *Syncer) SetDelegatedChecks(delegateChecks map[string]struct{}, createDelegatedCheckFn func(*structs.ServiceCheck, string) (Check, error)) *Syncer {
c.delegateChecks = delegateChecks
c.createDelegatedCheck = createDelegatedCheckFn
return c
}
// SetAddrFinder sets a function to find the host and port for a Service given its port label
func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer {
c.addrFinder = addrFinder
return c
}
// GenerateServiceKey should be called to generate a serviceKey based on the
// Service.
func GenerateServiceKey(service *structs.Service) ServiceKey {
var key string
numTags := len(service.Tags)
switch numTags {
case 0:
key = fmt.Sprintf("%s", service.Name)
default:
tags := strings.Join(service.Tags, "-")
key = fmt.Sprintf("%s-%s", service.Name, tags)
}
return ServiceKey(key)
}
// SetServices stores the map of Nomad Services to the provided service
// domain name.
func (c *Syncer) SetServices(domain ServiceDomain, services map[ServiceKey]*structs.Service) error {
var mErr multierror.Error
numServ := len(services)
registeredServices := make(map[ServiceKey]*consul.AgentServiceRegistration, numServ)
registeredChecks := make(map[ServiceKey][]*consul.AgentCheckRegistration, numServ)
for serviceKey, service := range services {
serviceReg, err := c.createService(service, domain, serviceKey)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
registeredServices[serviceKey] = serviceReg
// Register the check(s) for this service
for _, chk := range service.Checks {
// Create a Consul check registration
chkReg, err := c.createCheckReg(chk, serviceReg)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
// creating a nomad check if we have to handle this particular check type
c.registryLock.RLock()
if _, ok := c.delegateChecks[chk.Type]; ok {
_, ok := c.checkRunners[consulCheckID(chkReg.ID)]
c.registryLock.RUnlock()
if ok {
continue
}
nc, err := c.createDelegatedCheck(chk, chkReg.ID)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
cr := NewCheckRunner(nc, c.runCheck, c.logger)
c.registryLock.Lock()
// TODO type the CheckRunner
c.checkRunners[consulCheckID(nc.ID())] = cr
c.registryLock.Unlock()
} else {
c.registryLock.RUnlock()
}
registeredChecks[serviceKey] = append(registeredChecks[serviceKey], chkReg)
}
}
if len(mErr.Errors) > 0 {
return mErr.ErrorOrNil()
}
// Update the services and checks groups for this domain
c.groupsLock.Lock()
// Create map for service group if it doesn't exist
serviceKeys, ok := c.servicesGroups[domain]
if !ok {
serviceKeys = make(map[ServiceKey]*consul.AgentServiceRegistration, len(registeredServices))
c.servicesGroups[domain] = serviceKeys
}
// Remove stale services
for existingServiceKey := range serviceKeys {
if _, ok := registeredServices[existingServiceKey]; !ok {
// Exisitng service needs to be removed
delete(serviceKeys, existingServiceKey)
}
}
// Add registered services
for serviceKey, service := range registeredServices {
serviceKeys[serviceKey] = service
}
// Create map for check group if it doesn't exist
checkKeys, ok := c.checkGroups[domain]
if !ok {
checkKeys = make(map[ServiceKey][]*consul.AgentCheckRegistration, len(registeredChecks))
c.checkGroups[domain] = checkKeys
}
// Remove stale checks
for existingCheckKey := range checkKeys {
if _, ok := registeredChecks[existingCheckKey]; !ok {
// Exisitng check needs to be removed
delete(checkKeys, existingCheckKey)
}
}
// Add registered checks
for checkKey, checks := range registeredChecks {
checkKeys[checkKey] = checks
}
c.groupsLock.Unlock()
// Sync immediately
c.SyncNow()
return nil
}
// SyncNow expires the current timer forcing the list of periodic callbacks
// to be synced immediately.
func (c *Syncer) SyncNow() {
select {
case c.notifySyncCh <- struct{}{}:
default:
}
}
// flattenedServices returns a flattened list of services that are registered
// locally
func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration {
const initialNumServices = 8
services := make([]*consul.AgentServiceRegistration, 0, initialNumServices)
c.groupsLock.RLock()
defer c.groupsLock.RUnlock()
for _, servicesGroup := range c.servicesGroups {
for _, service := range servicesGroup {
services = append(services, service)
}
}
return services
}
// flattenedChecks returns a flattened list of checks that are registered
// locally
func (c *Syncer) flattenedChecks() []*consul.AgentCheckRegistration {
const initialNumChecks = 8
checks := make([]*consul.AgentCheckRegistration, 0, initialNumChecks)
c.groupsLock.RLock()
for _, checkGroup := range c.checkGroups {
for _, check := range checkGroup {
checks = append(checks, check...)
}
}
c.groupsLock.RUnlock()
return checks
}
func (c *Syncer) signalShutdown() {
select {
case c.notifyShutdownCh <- struct{}{}:
default:
}
}
// Shutdown de-registers the services and checks and shuts down periodic syncing
func (c *Syncer) Shutdown() error {
var mErr multierror.Error
c.shutdownLock.Lock()
if !c.shutdown {
c.shutdown = true
}
c.shutdownLock.Unlock()
c.signalShutdown()
// Stop all the checks that nomad is running
c.registryLock.RLock()
defer c.registryLock.RUnlock()
for _, cr := range c.checkRunners {
cr.Stop()
}
// De-register all the services registered by this syncer from Consul
services, err := c.queryAgentServices()
if err != nil {
mErr.Errors = append(mErr.Errors, err)
}
for serviceID := range services {
convertedID := string(serviceID)
if err := c.client.Agent().ServiceDeregister(convertedID); err != nil {
c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", convertedID, err)
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// queryChecks queries the Consul Agent for a list of Consul checks that
// have been registered with this Consul Syncer.
func (c *Syncer) queryChecks() (map[consulCheckID]*consul.AgentCheck, error) {
checks, err := c.client.Agent().Checks()
if err != nil {
return nil, err
}
return c.filterConsulChecks(checks), nil
}
// queryAgentServices queries the Consul Agent for a list of Consul services that
// have been registered with this Consul Syncer.
func (c *Syncer) queryAgentServices() (map[consulServiceID]*consul.AgentService, error) {
services, err := c.client.Agent().Services()
if err != nil {
return nil, err
}
return c.filterConsulServices(services), nil
}
// syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent.
func (c *Syncer) syncChecks() error {
var mErr multierror.Error
consulChecks, err := c.queryChecks()
if err != nil {
return err
}
// Synchronize checks with Consul
missingChecks, existingChecks, changedChecks, staleChecks := c.calcChecksDiff(consulChecks)
for _, check := range missingChecks {
if err := c.registerCheck(check); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
for _, check := range existingChecks {
c.ensureCheckRunning(check)
}
for _, check := range changedChecks {
// NOTE(sean@): Do we need to deregister the check before
// re-registering it? Not deregistering to avoid missing the
// TTL but doesn't correct reconcile any possible drift with
// the check.
//
// if err := c.deregisterCheck(check.ID); err != nil {
// mErr.Errors = append(mErr.Errors, err)
// }
if err := c.registerCheck(check); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
for _, check := range staleChecks {
if err := c.deregisterCheck(consulCheckID(check.ID)); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// compareConsulCheck takes a consul.AgentCheckRegistration instance and
// compares it with a consul.AgentCheck. Returns true if they are equal
// according to consul.AgentCheck, otherwise false.
func compareConsulCheck(localCheck *consul.AgentCheckRegistration, consulCheck *consul.AgentCheck) bool {
if consulCheck.CheckID != localCheck.ID ||
consulCheck.Name != localCheck.Name ||
consulCheck.Notes != localCheck.Notes ||
consulCheck.ServiceID != localCheck.ServiceID {
return false
}
return true
}
// calcChecksDiff takes the argument (consulChecks) and calculates the delta
// between the consul.Syncer's list of known checks (c.flattenedChecks()).
// Four arrays are returned:
//
// 1) a slice of checks that exist only locally in the Syncer and are missing
// from the Consul Agent (consulChecks) and therefore need to be registered.
//
// 2) a slice of checks that exist in both the local consul.Syncer's
// tracked list and Consul Agent (consulChecks).
//
// 3) a slice of checks that exist in both the local consul.Syncer's
// tracked list and Consul Agent (consulServices) but have diverged state.
//
// 4) a slice of checks that exist only in the Consul Agent (consulChecks)
// and should be removed because the Consul Agent has drifted from the
// Syncer.
func (c *Syncer) calcChecksDiff(consulChecks map[consulCheckID]*consul.AgentCheck) (
missingChecks []*consul.AgentCheckRegistration,
equalChecks []*consul.AgentCheckRegistration,
changedChecks []*consul.AgentCheckRegistration,
staleChecks []*consul.AgentCheckRegistration) {
type mergedCheck struct {
check *consul.AgentCheckRegistration
// 'l' == Nomad local only
// 'e' == equal
// 'c' == changed
// 'a' == Consul agent only
state byte
}
var (
localChecksCount = 0
equalChecksCount = 0
changedChecksCount = 0
agentChecks = 0
)
flattenedChecks := c.flattenedChecks()
localChecks := make(map[string]*mergedCheck, len(flattenedChecks)+len(consulChecks))
for _, localCheck := range flattenedChecks {
localChecksCount++
localChecks[localCheck.ID] = &mergedCheck{localCheck, 'l'}
}
for _, consulCheck := range consulChecks {
if localCheck, found := localChecks[consulCheck.CheckID]; found {
localChecksCount--
if compareConsulCheck(localCheck.check, consulCheck) {
equalChecksCount++
localChecks[consulCheck.CheckID].state = 'e'
} else {
changedChecksCount++
localChecks[consulCheck.CheckID].state = 'c'
}
} else {
agentChecks++
agentCheckReg := &consul.AgentCheckRegistration{
ID: consulCheck.CheckID,
Name: consulCheck.Name,
Notes: consulCheck.Notes,
ServiceID: consulCheck.ServiceID,
}
localChecks[consulCheck.CheckID] = &mergedCheck{agentCheckReg, 'a'}
}
}
missingChecks = make([]*consul.AgentCheckRegistration, 0, localChecksCount)
equalChecks = make([]*consul.AgentCheckRegistration, 0, equalChecksCount)
changedChecks = make([]*consul.AgentCheckRegistration, 0, changedChecksCount)
staleChecks = make([]*consul.AgentCheckRegistration, 0, agentChecks)
for _, check := range localChecks {
switch check.state {
case 'l':
missingChecks = append(missingChecks, check.check)
case 'e':
equalChecks = append(equalChecks, check.check)
case 'c':
changedChecks = append(changedChecks, check.check)
case 'a':
staleChecks = append(staleChecks, check.check)
}
}
return missingChecks, equalChecks, changedChecks, staleChecks
}
// compareConsulService takes a consul.AgentServiceRegistration instance and
// compares it with a consul.AgentService. Returns true if they are equal
// according to consul.AgentService, otherwise false.
func compareConsulService(localService *consul.AgentServiceRegistration, consulService *consul.AgentService) bool {
if consulService.ID != localService.ID ||
consulService.Service != localService.Name ||
consulService.Port != localService.Port ||
consulService.Address != localService.Address ||
consulService.EnableTagOverride != localService.EnableTagOverride {
return false
}
serviceTags := make(map[string]byte, len(localService.Tags))
for _, tag := range localService.Tags {
serviceTags[tag] = 'l'
}
for _, tag := range consulService.Tags {
if _, found := serviceTags[tag]; !found {
return false
}
serviceTags[tag] = 'b'
}
for _, state := range serviceTags {
if state == 'l' {
return false
}
}
return true
}
// calcServicesDiff takes the argument (consulServices) and calculates the
// delta between the consul.Syncer's list of known services
// (c.flattenedServices()). Four arrays are returned:
//
// 1) a slice of services that exist only locally in the Syncer and are
// missing from the Consul Agent (consulServices) and therefore need to be
// registered.
//
// 2) a slice of services that exist in both the local consul.Syncer's
// tracked list and Consul Agent (consulServices) *AND* are identical.
//
// 3) a slice of services that exist in both the local consul.Syncer's
// tracked list and Consul Agent (consulServices) but have diverged state.
//
// 4) a slice of services that exist only in the Consul Agent
// (consulServices) and should be removed because the Consul Agent has
// drifted from the Syncer.
func (c *Syncer) calcServicesDiff(consulServices map[consulServiceID]*consul.AgentService) (missingServices []*consul.AgentServiceRegistration, equalServices []*consul.AgentServiceRegistration, changedServices []*consul.AgentServiceRegistration, staleServices []*consul.AgentServiceRegistration) {
type mergedService struct {
service *consul.AgentServiceRegistration
// 'l' == Nomad local only
// 'e' == equal
// 'c' == changed
// 'a' == Consul agent only
state byte
}
var (
localServicesCount = 0
equalServicesCount = 0
changedServicesCount = 0
agentServices = 0
)
flattenedServices := c.flattenedServices()
localServices := make(map[string]*mergedService, len(flattenedServices)+len(consulServices))
for _, localService := range flattenedServices {
localServicesCount++
localServices[localService.ID] = &mergedService{localService, 'l'}
}
for _, consulService := range consulServices {
if localService, found := localServices[consulService.ID]; found {
localServicesCount--
if compareConsulService(localService.service, consulService) {
equalServicesCount++
localServices[consulService.ID].state = 'e'
} else {
changedServicesCount++
localServices[consulService.ID].state = 'c'
}
} else {
agentServices++
agentServiceReg := &consul.AgentServiceRegistration{
ID: consulService.ID,
Name: consulService.Service,
Tags: consulService.Tags,
Port: consulService.Port,
Address: consulService.Address,
}
localServices[consulService.ID] = &mergedService{agentServiceReg, 'a'}
}
}
missingServices = make([]*consul.AgentServiceRegistration, 0, localServicesCount)
equalServices = make([]*consul.AgentServiceRegistration, 0, equalServicesCount)
changedServices = make([]*consul.AgentServiceRegistration, 0, changedServicesCount)
staleServices = make([]*consul.AgentServiceRegistration, 0, agentServices)
for _, service := range localServices {
switch service.state {
case 'l':
missingServices = append(missingServices, service.service)
case 'e':
equalServices = append(equalServices, service.service)
case 'c':
changedServices = append(changedServices, service.service)
case 'a':
staleServices = append(staleServices, service.service)
}
}
return missingServices, equalServices, changedServices, staleServices
}
// syncServices synchronizes this Syncer's Consul Services with the Consul
// Agent.
func (c *Syncer) syncServices() error {
consulServices, err := c.queryAgentServices()
if err != nil {
return err
}
// Synchronize services with Consul
var mErr multierror.Error
missingServices, _, changedServices, removedServices := c.calcServicesDiff(consulServices)
for _, service := range missingServices {
if err := c.client.Agent().ServiceRegister(service); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
for _, service := range changedServices {
// Re-register the local service
if err := c.client.Agent().ServiceRegister(service); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
for _, service := range removedServices {
if err := c.deregisterService(service.ID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// registerCheck registers a check definition with Consul
func (c *Syncer) registerCheck(chkReg *consul.AgentCheckRegistration) error {
c.registryLock.RLock()
if cr, ok := c.checkRunners[consulCheckID(chkReg.ID)]; ok {
cr.Start()
}
c.registryLock.RUnlock()
return c.client.Agent().CheckRegister(chkReg)
}
// ensureCheckRunning starts the check runner for a check if it's not already running
func (c *Syncer) ensureCheckRunning(chk *consul.AgentCheckRegistration) {
c.registryLock.RLock()
defer c.registryLock.RUnlock()
if cr, ok := c.checkRunners[consulCheckID(chk.ID)]; ok && !cr.Started() {
c.logger.Printf("[DEBUG] consul.syncer: starting runner for existing check. %v", chk.ID)
cr.Start()
}
}
// createCheckReg creates a Check that can be registered with Nomad. It also
// creates a Nomad check for the check types that it can handle.
func (c *Syncer) createCheckReg(check *structs.ServiceCheck, serviceReg *consul.AgentServiceRegistration) (*consul.AgentCheckRegistration, error) {
chkReg := consul.AgentCheckRegistration{
ID: check.Hash(serviceReg.ID),
Name: check.Name,
ServiceID: serviceReg.ID,
}
chkReg.Timeout = check.Timeout.String()
chkReg.Interval = check.Interval.String()
host, port := serviceReg.Address, serviceReg.Port
if check.PortLabel != "" {
host, port = c.addrFinder(check.PortLabel)
}
switch check.Type {
case structs.ServiceCheckHTTP:
if check.Protocol == "" {
check.Protocol = "http"
}
base := url.URL{
Scheme: check.Protocol,
Host: net.JoinHostPort(host, strconv.Itoa(port)),
}
relative, err := url.Parse(check.Path)
if err != nil {
return nil, err
}
url := base.ResolveReference(relative)
chkReg.HTTP = url.String()
case structs.ServiceCheckTCP:
chkReg.TCP = net.JoinHostPort(host, strconv.Itoa(port))
case structs.ServiceCheckScript:
chkReg.TTL = (check.Interval + ttlCheckBuffer).String()
default:
return nil, fmt.Errorf("check type %+q not valid", check.Type)
}
chkReg.Status = check.InitialStatus
return &chkReg, nil
}
// generateConsulServiceID takes the domain and service key and returns a Consul
// ServiceID
func generateConsulServiceID(domain ServiceDomain, key ServiceKey) consulServiceID {
return consulServiceID(fmt.Sprintf("%s-%s-%s", nomadServicePrefix, domain, key))
}
// createService creates a Consul AgentService from a Nomad ConsulService.
func (c *Syncer) createService(service *structs.Service, domain ServiceDomain, key ServiceKey) (*consul.AgentServiceRegistration, error) {
c.registryLock.RLock()
defer c.registryLock.RUnlock()
srv := consul.AgentServiceRegistration{
ID: string(generateConsulServiceID(domain, key)),
Name: service.Name,
Tags: service.Tags,
}
host, port := c.addrFinder(service.PortLabel)
if host != "" {
srv.Address = host
}
if port != 0 {
srv.Port = port
}
return &srv, nil
}
// deregisterService de-registers a service with the given ID from consul
func (c *Syncer) deregisterService(serviceID string) error {
return c.client.Agent().ServiceDeregister(serviceID)
}
// deregisterCheck de-registers a check from Consul
func (c *Syncer) deregisterCheck(id consulCheckID) error {
c.registryLock.Lock()
defer c.registryLock.Unlock()
// Deleting from Consul Agent
if err := c.client.Agent().CheckDeregister(string(id)); err != nil {
// CheckDeregister() will be reattempted again in a future
// sync.
return err
}
// Remove the check from the local registry
if cr, ok := c.checkRunners[id]; ok {
cr.Stop()
delete(c.checkRunners, id)
}
return nil
}
// Run triggers periodic syncing of services and checks with Consul. This is
// a long lived go-routine which is stopped during shutdown.
func (c *Syncer) Run() {
sync := time.NewTimer(0)
for {
select {
case <-sync.C:
d := c.syncInterval - c.syncJitter
sync.Reset(d)
if err := c.SyncServices(); err != nil {
if c.consulAvailable {
c.logger.Printf("[DEBUG] consul.syncer: error in syncing: %v", err)
}
c.consulAvailable = false
} else {
if !c.consulAvailable {
c.logger.Printf("[DEBUG] consul.syncer: syncs succesful")
}
c.consulAvailable = true
}
case <-c.notifySyncCh:
sync.Reset(0)
case <-c.shutdownCh:
c.Shutdown()
case <-c.notifyShutdownCh:
sync.Stop()
c.logger.Printf("[INFO] consul.syncer: shutting down syncer ")
return
}
}
}
// RunHandlers executes each handler (randomly)
func (c *Syncer) RunHandlers() error {
c.periodicLock.RLock()
handlers := make(map[string]types.PeriodicCallback, len(c.periodicCallbacks))
for name, fn := range c.periodicCallbacks {
handlers[name] = fn
}
c.periodicLock.RUnlock()
var mErr multierror.Error
for _, fn := range handlers {
if err := fn(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()
}
// SyncServices sync the services with the Consul Agent
func (c *Syncer) SyncServices() error {
var mErr multierror.Error
if err := c.syncServices(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if err := c.syncChecks(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if err := c.RunHandlers(); err != nil {
return err
}
return mErr.ErrorOrNil()
}
// filterConsulServices prunes out all the service who were not registered with
// the syncer
func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[consulServiceID]*consul.AgentService {
localServices := make(map[consulServiceID]*consul.AgentService, len(consulServices))
c.groupsLock.RLock()
defer c.groupsLock.RUnlock()
for serviceID, service := range consulServices {
for domain := range c.servicesGroups {
if strings.HasPrefix(service.ID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {
localServices[consulServiceID(serviceID)] = service
break
}
}
}
return localServices
}
// filterConsulChecks prunes out all the consul checks which do not have
// services with Syncer's idPrefix.
func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[consulCheckID]*consul.AgentCheck {
localChecks := make(map[consulCheckID]*consul.AgentCheck, len(consulChecks))
c.groupsLock.RLock()
defer c.groupsLock.RUnlock()
for checkID, check := range consulChecks {
for domain := range c.checkGroups {
if strings.HasPrefix(check.ServiceID, fmt.Sprintf("%s-%s", nomadServicePrefix, domain)) {
localChecks[consulCheckID(checkID)] = check
break
}
}
}
return localChecks
}
// consulPresent indicates whether the Consul Agent is responding
func (c *Syncer) consulPresent() bool {
_, err := c.client.Agent().Self()
return err == nil
}
// runCheck runs a check and updates the corresponding ttl check in consul
func (c *Syncer) runCheck(check Check) {
res := check.Run()
if res.Duration >= check.Timeout() {
c.logger.Printf("[DEBUG] consul.syncer: check took time: %v, timeout: %v", res.Duration, check.Timeout())
}
state := consul.HealthCritical
output := res.Output
switch res.ExitCode {
case 0:
state = consul.HealthPassing
case 1:
state = consul.HealthWarning
default:
state = consul.HealthCritical
}
if res.Err != nil {
state = consul.HealthCritical
output = res.Err.Error()
}
if err := c.client.Agent().UpdateTTL(check.ID(), output, state); err != nil {
if c.consulAvailable {
c.logger.Printf("[DEBUG] consul.syncer: check %+q failed, disabling Consul checks until until next successful sync: %v", check.ID(), err)
c.consulAvailable = false
} else {
c.consulAvailable = true
}
}
}
// ReapUnmatched prunes all services that do not exist in the passed domains
func (c *Syncer) ReapUnmatched(domains []ServiceDomain) error {
servicesInConsul, err := c.ConsulClient().Agent().Services()
if err != nil {
return err
}
var mErr multierror.Error
for serviceID := range servicesInConsul {
// Skip any service that was not registered by Nomad
if !strings.HasPrefix(serviceID, nomadServicePrefix) {
continue
}
// Filter services that do not exist in the desired domains
match := false
for _, domain := range domains {
// Include the hyphen so it is explicit to that domain otherwise it
// maybe a subset match
desired := fmt.Sprintf("%s-%s-", nomadServicePrefix, domain)
if strings.HasPrefix(serviceID, desired) {
match = true
break
}
}
if !match {
if err := c.deregisterService(serviceID); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
}
return mErr.ErrorOrNil()
}
// AddPeriodicHandler adds a uniquely named callback. Returns true if
// successful, false if a handler with the same name already exists.
func (c *Syncer) AddPeriodicHandler(name string, fn types.PeriodicCallback) bool {
c.periodicLock.Lock()
defer c.periodicLock.Unlock()
if _, found := c.periodicCallbacks[name]; found {
c.logger.Printf("[ERROR] consul.syncer: failed adding handler %+q", name)
return false
}
c.periodicCallbacks[name] = fn
return true
}
// NumHandlers returns the number of callbacks registered with the syncer
func (c *Syncer) NumHandlers() int {