-
Notifications
You must be signed in to change notification settings - Fork 2.8k
/
service.go
1886 lines (1643 loc) · 58.9 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package service
import (
"fmt"
"net"
"strconv"
"sync/atomic"
"time"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
"github.com/cilium/cilium/pkg/bpf"
"github.com/cilium/cilium/pkg/cidr"
"github.com/cilium/cilium/pkg/counter"
datapathOpt "github.com/cilium/cilium/pkg/datapath/option"
"github.com/cilium/cilium/pkg/datapath/types"
lb "github.com/cilium/cilium/pkg/loadbalancer"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/logging/logfields"
"github.com/cilium/cilium/pkg/maps/lbmap"
"github.com/cilium/cilium/pkg/metrics"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
nodeTypes "github.com/cilium/cilium/pkg/node/types"
"github.com/cilium/cilium/pkg/option"
"github.com/cilium/cilium/pkg/service/healthserver"
)
const anyPort = "*"
var (
updateMetric = metrics.ServicesCount.WithLabelValues("update")
deleteMetric = metrics.ServicesCount.WithLabelValues("delete")
addMetric = metrics.ServicesCount.WithLabelValues("add")
)
// LBMap is the interface describing methods for manipulating service maps.
type LBMap interface {
UpsertService(*lbmap.UpsertServiceParams) error
UpsertMaglevLookupTable(uint16, map[string]lb.BackendID, bool) error
IsMaglevLookupTableRecreated(bool) bool
DeleteService(lb.L3n4AddrID, int, bool, lb.SVCNatPolicy) error
AddBackend(lb.Backend, bool) error
UpdateBackendWithState(lb.Backend) error
DeleteBackendByID(lb.BackendID) error
AddAffinityMatch(uint16, lb.BackendID) error
DeleteAffinityMatch(uint16, lb.BackendID) error
UpdateSourceRanges(uint16, []*cidr.CIDR, []*cidr.CIDR, bool) error
DumpServiceMaps() ([]*lb.SVC, []error)
DumpBackendMaps() ([]*lb.Backend, error)
DumpAffinityMatches() (lbmap.BackendIDByServiceIDSet, error)
DumpSourceRanges(bool) (lbmap.SourceRangeSetByServiceID, error)
}
// healthServer is used to manage HealtCheckNodePort listeners
type healthServer interface {
UpsertService(svcID lb.ID, svcNS, svcName string, localEndpoints int, port uint16)
DeleteService(svcID lb.ID)
}
// monitorNotify is used to send update notifications to the monitor
type monitorNotify interface {
SendNotification(msg monitorAPI.AgentNotifyMessage) error
}
// Name represents the fully-qualified reference to the service by name, including both the
// namespace and name of the service.
type Name struct {
Namespace string
Name string
}
func (n Name) String() string {
return n.Namespace + "/" + n.Name
}
// envoyCache is used to sync Envoy resources to Envoy proxy
type envoyCache interface {
UpsertEnvoyEndpoints(Name, map[string][]lb.Backend) error
}
type svcInfo struct {
hash string
frontend lb.L3n4AddrID
backends []lb.Backend
backendByHash map[string]*lb.Backend
svcType lb.SVCType
svcTrafficPolicy lb.SVCTrafficPolicy
svcNatPolicy lb.SVCNatPolicy
sessionAffinity bool
sessionAffinityTimeoutSec uint32
svcHealthCheckNodePort uint16
svcName string
svcNamespace string
loadBalancerSourceRanges []*cidr.CIDR
l7LBProxyPort uint16 // Non-zero for egress L7 LB services
l7LBFrontendPorts []string // Non-zero for L7 LB frontend service ports
restoredFromDatapath bool
}
func (svc *svcInfo) isL7LBService() bool {
return svc.l7LBProxyPort != 0
}
func (svc *svcInfo) deepCopyToLBSVC() *lb.SVC {
backends := make([]lb.Backend, len(svc.backends))
for i, backend := range svc.backends {
backends[i] = *backend.DeepCopy()
}
return &lb.SVC{
Frontend: *svc.frontend.DeepCopy(),
Backends: backends,
Type: svc.svcType,
TrafficPolicy: svc.svcTrafficPolicy,
NatPolicy: svc.svcNatPolicy,
HealthCheckNodePort: svc.svcHealthCheckNodePort,
Name: svc.svcName,
Namespace: svc.svcNamespace,
L7LBProxyPort: svc.l7LBProxyPort,
L7LBFrontendPorts: svc.l7LBFrontendPorts,
}
}
// requireNodeLocalBackends returns true if the frontend service traffic policy
// is lb.SVCTrafficPolicyLocal and whether only local backends need to be filtered for the
// given frontend.
func (svc *svcInfo) requireNodeLocalBackends(frontend lb.L3n4AddrID) (bool, bool) {
switch svc.svcType {
case lb.SVCTypeLocalRedirect:
return false, true
case lb.SVCTypeNodePort, lb.SVCTypeLoadBalancer, lb.SVCTypeExternalIPs:
if svc.svcTrafficPolicy == lb.SVCTrafficPolicyLocal {
return true, frontend.Scope == lb.ScopeExternal
}
fallthrough
default:
return false, false
}
}
func (svc *svcInfo) useMaglev() bool {
if option.Config.NodePortAlg != option.NodePortAlgMaglev {
return false
}
// Provision the Maglev LUT for ClusterIP only if ExternalClusterIP is
// enabled because ClusterIP can also be accessed from outside with this
// setting. We don't do it unconditionally to avoid increasing memory
// footprint.
if svc.svcType == lb.SVCTypeClusterIP && !option.Config.ExternalClusterIP {
return false
}
// Wildcarded frontend is not exposed for external traffic.
if svc.svcType == lb.SVCTypeNodePort && isWildcardAddr(svc.frontend) {
return false
}
// Only provision the Maglev LUT for service types which are reachable
// from outside the node.
switch svc.svcType {
case lb.SVCTypeClusterIP,
lb.SVCTypeNodePort,
lb.SVCTypeLoadBalancer,
lb.SVCTypeHostPort,
lb.SVCTypeExternalIPs:
return true
}
return false
}
type L7LBInfo struct {
// Names of the CEC resources that need this service's backends to be
// synced to to Envoy.
envoyBackendRefs map[Name]struct{}
// Name of the CEC resource that needs this service to be forwarded to an
// L7 LB specified in that resource.
// Only one CEC may do this for any given service.
envoyListenerRef Name
// List of front-end ports of upstream service/cluster, which will be used for
// filtering applicable endpoints.
//
// If nil, all the available backends will be used.
frontendPorts []string
// port number for L7 LB redirection. Can be zero if only backend sync
// hass been requested.
proxyPort uint16
}
func (svc *svcInfo) checkLBSourceRange() bool {
if option.Config.EnableSVCSourceRangeCheck {
return len(svc.loadBalancerSourceRanges) != 0
}
return false
}
// Service is a service handler. Its main responsibility is to reflect
// service-related changes into BPF maps used by datapath BPF programs.
// The changes can be triggered either by k8s_watcher or directly by
// API calls to the /services endpoint.
type Service struct {
lock.RWMutex
svcByHash map[string]*svcInfo
svcByID map[lb.ID]*svcInfo
backendRefCount counter.StringCounter
backendByHash map[string]*lb.Backend
healthServer healthServer
monitorNotify monitorNotify
envoyCache envoyCache
lbmap LBMap
lastUpdatedTs atomic.Value
l7lbSvcs map[Name]*L7LBInfo
}
// NewService creates a new instance of the service handler.
func NewService(monitorNotify monitorNotify, envoyCache envoyCache) *Service {
var localHealthServer healthServer
if option.Config.EnableHealthCheckNodePort {
localHealthServer = healthserver.New()
}
maglev := option.Config.NodePortAlg == option.NodePortAlgMaglev
maglevTableSize := option.Config.MaglevTableSize
svc := &Service{
svcByHash: map[string]*svcInfo{},
svcByID: map[lb.ID]*svcInfo{},
backendRefCount: counter.StringCounter{},
backendByHash: map[string]*lb.Backend{},
monitorNotify: monitorNotify,
envoyCache: envoyCache,
healthServer: localHealthServer,
lbmap: lbmap.New(maglev, maglevTableSize),
l7lbSvcs: map[Name]*L7LBInfo{},
}
svc.lastUpdatedTs.Store(time.Now())
return svc
}
// RegisterL7LBService makes the given service to be locally forwarded to the
// given proxy port.
func (s *Service) RegisterL7LBService(serviceName, resourceName Name, ports []string, proxyPort uint16) error {
s.Lock()
err := s.registerL7LBService(serviceName, resourceName, ports, proxyPort)
s.Unlock()
if err != nil {
return err
}
log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
logfields.L7LBFrontendPorts: ports,
logfields.L7LBProxyPort: proxyPort,
}).Debug("Registering service for L7 load balancing")
svcs := s.GetDeepCopyServicesByName(serviceName.Name, serviceName.Namespace)
for _, svc := range svcs {
// Upsert the existing service again after updating 'l7lbSvcs'
// map so that the service will get the l7 flag set in bpf
// datapath and Envoy endpoint resources are created for
// registered services.
if _, _, err := s.UpsertService(svc); err != nil {
return fmt.Errorf("error while updating service in LB map: %s", err)
}
}
return nil
}
// 's' must be locked
func (s *Service) registerL7LBService(serviceName, resourceName Name, frontendPorts []string, proxyPort uint16) error {
info := s.l7lbSvcs[serviceName]
if info == nil {
info = &L7LBInfo{}
}
if proxyPort != 0 {
// Only one CEC resource for a given service may request L7 LB redirection at a time.
empty := Name{}
if info.envoyListenerRef != empty && info.envoyListenerRef != resourceName {
return fmt.Errorf("Service %q already registered for L7 LB redirection via CiliumEnvoyConfig %q", serviceName, info.envoyListenerRef)
}
info.envoyListenerRef = resourceName
info.proxyPort = proxyPort
}
// Register for sync of backends to Envoy
if info.envoyBackendRefs == nil {
info.envoyBackendRefs = make(map[Name]struct{}, 1)
}
info.envoyBackendRefs[resourceName] = struct{}{}
info.frontendPorts = frontendPorts
s.l7lbSvcs[serviceName] = info
return nil
}
// RegisterL7LBServiceBackendSync synchronizes the backends of a service to Envoy.
func (s *Service) RegisterL7LBServiceBackendSync(serviceName, resourceName Name, ports []string) error {
return s.RegisterL7LBService(serviceName, resourceName, ports, 0)
}
func (s *Service) RemoveL7LBService(serviceName, resourceName Name) error {
s.Lock()
changed := s.removeL7LBService(serviceName, resourceName)
s.Unlock()
if !changed {
return nil
}
log.WithFields(logrus.Fields{
logfields.ServiceName: serviceName.Name,
logfields.ServiceNamespace: serviceName.Namespace,
}).Debug("Removing service from L7 load balancing")
svcs := s.GetDeepCopyServicesByName(serviceName.Name, serviceName.Namespace)
for _, svc := range svcs {
if _, _, err := s.UpsertService(svc); err != nil {
return fmt.Errorf("Error while removing service from LB map: %s", err)
}
}
return nil
}
func (s *Service) removeL7LBService(serviceName, resourceName Name) bool {
info, found := s.l7lbSvcs[serviceName]
if !found {
return false
}
empty := Name{}
if info.envoyListenerRef == resourceName {
info.envoyListenerRef = empty
info.proxyPort = 0
info.frontendPorts = nil
}
if info.envoyBackendRefs != nil {
delete(info.envoyBackendRefs, resourceName)
if len(info.envoyBackendRefs) == 0 {
info.envoyBackendRefs = nil
}
}
if len(info.envoyBackendRefs) == 0 && info.envoyListenerRef == empty {
delete(s.l7lbSvcs, serviceName)
}
return true
}
func (s *Service) GetLastUpdatedTs() time.Time {
if val := s.lastUpdatedTs.Load(); val != nil {
ts, ok := val.(time.Time)
if ok {
return ts
}
}
return time.Now()
}
func (s *Service) GetCurrentTs() time.Time {
return time.Now()
}
func (s *Service) populateBackendMapV2FromV3(ipv4, ipv6 bool) error {
const (
v4 = "ipv4"
v6 = "ipv6"
)
var (
count int
err error
v3Map *bpf.Map
)
enabled := map[string]bool{v4: ipv4, v6: ipv6}
for v, e := range enabled {
if !e {
continue
}
copyBackendEntries := func(key bpf.MapKey, value bpf.MapValue) {
var (
v2Map *bpf.Map
v2BackendKey lbmap.BackendKey
v2BackendValue lbmap.BackendValue
)
if v == v4 {
backendKey := key.(*lbmap.Backend4KeyV3)
backendValue := value.(*lbmap.Backend4ValueV3)
v2Map = lbmap.Backend4MapV2
v2BackendKey = lbmap.NewBackend4KeyV2(backendKey.GetID())
v2BackendValue, err = lbmap.NewBackend4Value(
backendValue.Address[:],
backendValue.Port,
backendValue.Proto,
lb.BackendState(backendValue.Flags),
)
if err != nil {
log.WithError(err).WithField(logfields.BPFMapName, v2Map.Name()).Warn("Failed to create value")
}
if backendValue.ClusterID != 0 {
count = count + 1
return
}
} else {
backendKey := key.(*lbmap.Backend6KeyV3)
backendValue := value.(*lbmap.Backend6ValueV3)
v2Map = lbmap.Backend6MapV2
v2BackendKey = lbmap.NewBackend6KeyV2(backendKey.GetID())
v2BackendValue, err = lbmap.NewBackend6Value(
backendValue.Address[:],
backendValue.Port,
backendValue.Proto,
lb.BackendState(backendValue.Flags),
)
if err != nil {
log.WithError(err).WithField(logfields.BPFMapName, v2Map.Name()).Warn("Failed to create value")
}
if backendValue.ClusterID != 0 {
count = count + 1
return
}
}
err := v2Map.Update(v2BackendKey, v2BackendValue)
if err != nil {
log.WithError(err).WithField(logfields.BPFMapName, v2Map.Name()).Warn("Error updating backend map")
}
}
if v == v4 {
v3Map = lbmap.Backend4MapV3
} else {
v3Map = lbmap.Backend6MapV3
}
count = 0
err = v3Map.DumpWithCallback(copyBackendEntries)
if err != nil {
return fmt.Errorf("Unable to populate %s: %w", v3Map.Name(), err)
}
err = v3Map.Close()
if err != nil {
log.WithError(err).WithField(logfields.BPFMapName, v3Map.Name()).Warn("Error closing map")
}
err = v3Map.Unpin()
if err != nil {
log.WithError(err).WithField(logfields.BPFMapName, v3Map.Name()).Warn("Error unpinning map")
}
if count > 0 {
log.WithField(logfields.NumEntries, count).WithField(logfields.BPFMapName, v3Map.Name()).Warn("Dropped map entries from v2 map")
}
}
return nil
}
func (s *Service) populateBackendMapV2FromV1(ipv4, ipv6 bool) error {
const (
v4 = "ipv4"
v6 = "ipv6"
)
var (
err error
v1Map *bpf.Map
)
enabled := map[string]bool{v4: ipv4, v6: ipv6}
for v, e := range enabled {
if !e {
continue
}
copyBackendEntries := func(key bpf.MapKey, value bpf.MapValue) {
var (
v2Map *bpf.Map
v2BackendKey lbmap.BackendKey
)
if v == v4 {
backendKey := key.(lbmap.BackendKey)
v2Map = lbmap.Backend4MapV2
v2BackendKey = lbmap.NewBackend4KeyV2(backendKey.GetID())
} else {
backendKey := key.(lbmap.BackendKey)
v2Map = lbmap.Backend6MapV2
v2BackendKey = lbmap.NewBackend6KeyV2(backendKey.GetID())
}
err := v2Map.Update(v2BackendKey, value.DeepCopyMapValue())
if err != nil {
log.WithError(err).WithField(logfields.BPFMapName, v2Map.Name()).Warn("Error updating map")
}
}
if v == v4 {
v1Map = lbmap.Backend4Map
} else {
v1Map = lbmap.Backend6Map
}
err = v1Map.DumpWithCallback(copyBackendEntries)
if err != nil {
return fmt.Errorf("Unable to populate %s: %w", v1Map.Name(), err)
}
// V1 backend map will be removed from bpffs at this point,
// the map will be actually removed once the last program
// referencing it has been removed.
err = v1Map.Close()
if err != nil {
log.WithError(err).WithField(logfields.BPFMapName, v1Map.Name()).Warn("Error closing map")
}
err = v1Map.Unpin()
if err != nil {
log.WithError(err).WithField(logfields.BPFMapName, v1Map.Name()).Warn("Error unpinning map")
}
}
return nil
}
// InitMaps opens or creates BPF maps used by services.
//
// If restore is set to false, entries of the maps are removed.
func (s *Service) InitMaps(ipv6, ipv4, sockMaps, restore bool) error {
s.Lock()
defer s.Unlock()
var (
v1BackendMapExistsV4 bool
v1BackendMapExistsV6 bool
v3BackendMapExistsV4 bool
v3BackendMapExistsV6 bool
)
toOpen := []*bpf.Map{}
toDelete := []*bpf.Map{}
if ipv6 {
toOpen = append(toOpen, lbmap.Service6MapV2, lbmap.Backend6MapV2, lbmap.RevNat6Map)
if !restore {
toDelete = append(toDelete, lbmap.Service6MapV2, lbmap.Backend6MapV2, lbmap.RevNat6Map)
}
if sockMaps {
if err := lbmap.CreateSockRevNat6Map(); err != nil {
return err
}
}
v1BackendMapExistsV6 = lbmap.Backend6Map.Open() == nil
v3BackendMapExistsV6 = lbmap.Backend6MapV3.Open() == nil
}
if ipv4 {
toOpen = append(toOpen, lbmap.Service4MapV2, lbmap.Backend4MapV2, lbmap.RevNat4Map)
if !restore {
toDelete = append(toDelete, lbmap.Service4MapV2, lbmap.Backend4MapV2, lbmap.RevNat4Map)
}
if sockMaps {
if err := lbmap.CreateSockRevNat4Map(); err != nil {
return err
}
}
v1BackendMapExistsV4 = lbmap.Backend4Map.Open() == nil
v3BackendMapExistsV4 = lbmap.Backend4MapV3.Open() == nil
}
for _, m := range toOpen {
if _, err := m.OpenOrCreate(); err != nil {
return err
}
}
for _, m := range toDelete {
if err := m.DeleteAll(); err != nil {
return err
}
}
if v1BackendMapExistsV4 || v1BackendMapExistsV6 {
log.Info("Backend map v1 exists. Migrating entries to backend map v2.")
if err := s.populateBackendMapV2FromV1(v1BackendMapExistsV4, v1BackendMapExistsV6); err != nil {
log.WithError(err).Warn("Error populating V2 map from V1 map, might interrupt existing connections during upgrade")
}
}
if v3BackendMapExistsV4 || v3BackendMapExistsV6 {
log.Info("Backend map v3 exists. Migrating entries to backend map v2.")
if err := s.populateBackendMapV2FromV3(v3BackendMapExistsV4, v3BackendMapExistsV6); err != nil {
log.WithError(err).Warn("Error populating V2 map from V3 map, might interrupt existing connections during upgrade")
}
}
return nil
}
// UpsertService inserts or updates the given service.
//
// The first return value is true if the service hasn't existed before.
func (s *Service) UpsertService(params *lb.SVC) (bool, lb.ID, error) {
s.Lock()
defer s.Unlock()
return s.upsertService(params)
}
func (s *Service) upsertService(params *lb.SVC) (bool, lb.ID, error) {
empty := Name{}
// Set L7 LB for this service if registered.
name := Name{Namespace: params.Namespace, Name: params.Name}
l7lbInfo, exists := s.l7lbSvcs[name]
if exists && l7lbInfo.envoyListenerRef != empty {
params.L7LBProxyPort = l7lbInfo.proxyPort
params.L7LBFrontendPorts = l7lbInfo.frontendPorts
} else {
params.L7LBProxyPort = 0
params.L7LBFrontendPorts = nil
}
// L7 LB is sharing a C union in the datapath, disable session
// affinity if L7 LB is configured for this service.
if params.L7LBProxyPort != 0 {
params.SessionAffinity = false
params.SessionAffinityTimeoutSec = 0
}
scopedLog := log.WithFields(logrus.Fields{
logfields.ServiceIP: params.Frontend.L3n4Addr,
logfields.Backends: params.Backends,
logfields.ServiceType: params.Type,
logfields.ServiceTrafficPolicy: params.TrafficPolicy,
logfields.ServiceHealthCheckNodePort: params.HealthCheckNodePort,
logfields.ServiceName: params.Name,
logfields.ServiceNamespace: params.Namespace,
logfields.SessionAffinity: params.SessionAffinity,
logfields.SessionAffinityTimeout: params.SessionAffinityTimeoutSec,
logfields.LoadBalancerSourceRanges: params.LoadBalancerSourceRanges,
logfields.L7LBProxyPort: params.L7LBProxyPort,
logfields.L7LBFrontendPorts: params.L7LBFrontendPorts,
})
scopedLog.Debug("Upserting service")
if !option.Config.EnableSVCSourceRangeCheck &&
len(params.LoadBalancerSourceRanges) != 0 {
scopedLog.Warnf("--%s is disabled, ignoring loadBalancerSourceRanges",
option.EnableSVCSourceRangeCheck)
}
// Backends must either be the same IP proto as the frontend, or can be of
// a different proto for NAT46/64. However, backends must be consistently
// either v4 or v6, but not a mix.
v4Seen := 0
v6Seen := 0
for _, b := range params.Backends {
if b.L3n4Addr.IsIPv6() {
v6Seen++
} else {
v4Seen++
}
}
if v4Seen > 0 && v6Seen > 0 {
err := fmt.Errorf("Unable to upsert service %s with a mixed set of IPv4 and IPv6 backends", params.Frontend.L3n4Addr.String())
return false, lb.ID(0), err
}
v6Svc := params.Frontend.IsIPv6()
if (v6Svc || v6Seen > 0) && !option.Config.EnableIPv6 {
err := fmt.Errorf("Unable to upsert service %s as IPv6 is disabled", params.Frontend.L3n4Addr.String())
return false, lb.ID(0), err
}
if (!v6Svc || v4Seen > 0) && !option.Config.EnableIPv4 {
err := fmt.Errorf("Unable to upsert service %s as IPv4 is disabled", params.Frontend.L3n4Addr.String())
return false, lb.ID(0), err
}
params.NatPolicy = lb.SVCNatPolicyNone
if v6Svc && v4Seen > 0 {
params.NatPolicy = lb.SVCNatPolicyNat64
} else if !v6Svc && v6Seen > 0 {
params.NatPolicy = lb.SVCNatPolicyNat46
}
if params.NatPolicy != lb.SVCNatPolicyNone && !option.Config.NodePortNat46X64 {
err := fmt.Errorf("Unable to upsert service %s as NAT46/64 is disabled", params.Frontend.L3n4Addr.String())
return false, lb.ID(0), err
}
// If needed, create svcInfo and allocate service ID
svc, new, prevSessionAffinity, prevLoadBalancerSourceRanges, err :=
s.createSVCInfoIfNotExist(params)
if err != nil {
return false, lb.ID(0), err
}
// TODO(brb) defer ServiceID release after we have a lbmap "rollback"
scopedLog = scopedLog.WithField(logfields.ServiceID, svc.frontend.ID)
scopedLog.Debug("Acquired service ID")
onlyLocalBackends, filterBackends := svc.requireNodeLocalBackends(params.Frontend)
prevBackendCount := len(svc.backends)
backendsCopy := []lb.Backend{}
for _, b := range params.Backends {
// Local redirect services or services with trafficPolicy=Local may
// only use node-local backends for external scope. We implement this by
// filtering out all backend IPs which are not a local endpoint.
if filterBackends && len(b.NodeName) > 0 && b.NodeName != nodeTypes.GetName() {
continue
}
backendsCopy = append(backendsCopy, *b.DeepCopy())
}
// TODO (Aditi) When we filter backends for LocalRedirect service, there
// might be some backend pods with active connections. We may need to
// defer filtering the backends list (thereby defer redirecting traffic)
// in such cases. GH #12859
// Update backends cache and allocate/release backend IDs
newBackends, obsoleteBackendIDs, obsoleteSVCBackendIDs, err :=
s.updateBackendsCacheLocked(svc, backendsCopy)
if err != nil {
return false, lb.ID(0), err
}
if l7lbInfo != nil && l7lbInfo.envoyBackendRefs != nil && s.envoyCache != nil {
// Filter backend based on list of port numbers, then upsert backends
// as Envoy endpoints
be := filterServiceBackends(svc, l7lbInfo.frontendPorts)
scopedLog.WithField("filteredBackends", be).Debugf("Upsert envoy endpoints")
if err = s.envoyCache.UpsertEnvoyEndpoints(name, be); err != nil {
return false, lb.ID(0), err
}
}
// Update lbmaps (BPF service maps)
if err = s.upsertServiceIntoLBMaps(svc, onlyLocalBackends, prevBackendCount,
newBackends, obsoleteBackendIDs, prevSessionAffinity, prevLoadBalancerSourceRanges,
obsoleteSVCBackendIDs, scopedLog); err != nil {
return false, lb.ID(0), err
}
// Only add a HealthCheckNodePort server if this is a service which may
// only contain local backends (i.e. it has externalTrafficPolicy=Local)
if option.Config.EnableHealthCheckNodePort {
if onlyLocalBackends && filterBackends {
_, activeBackends, _ := segregateBackends(backendsCopy)
s.healthServer.UpsertService(lb.ID(svc.frontend.ID), svc.svcNamespace, svc.svcName,
len(activeBackends), svc.svcHealthCheckNodePort)
} else if svc.svcHealthCheckNodePort == 0 {
// Remove the health check server in case this service used to have
// externalTrafficPolicy=Local with HealthCheckNodePort in the previous
// version, but not anymore.
s.healthServer.DeleteService(lb.ID(svc.frontend.ID))
}
}
if new {
addMetric.Inc()
} else {
updateMetric.Inc()
}
s.notifyMonitorServiceUpsert(svc.frontend, svc.backends,
svc.svcType, svc.svcTrafficPolicy, svc.svcName, svc.svcNamespace)
return new, lb.ID(svc.frontend.ID), nil
}
// filterServiceBackends returns the list of backends based on given front end ports.
// The returned map will have key as port name/number, and value as list of respective backends.
func filterServiceBackends(svc *svcInfo, onlyPorts []string) map[string][]lb.Backend {
if len(onlyPorts) == 0 {
return map[string][]lb.Backend{
anyPort: svc.backends,
}
}
res := map[string][]lb.Backend{}
for _, port := range onlyPorts {
// check for port number
if port == strconv.Itoa(int(svc.frontend.Port)) {
return map[string][]lb.Backend{
port: svc.backends,
}
}
// check for either named port
for _, backend := range svc.backends {
if port == backend.FEPortName {
res[port] = append(res[port], backend)
}
}
}
return res
}
// UpdateBackendsState updates all the service(s) with the updated state of
// the given backends. It also persists the updated backend states to the BPF maps.
//
// Backend state transitions are validated before processing.
//
// In case of duplicated backends in the list, the state will be updated to the
// last duplicate entry.
func (s *Service) UpdateBackendsState(backends []lb.Backend) error {
if len(backends) == 0 {
return nil
}
for _, b := range backends {
log.WithFields(logrus.Fields{
logfields.L3n4Addr: b.L3n4Addr.String(),
logfields.BackendState: b.State,
logfields.BackendPreferred: b.Preferred,
}).Debug("Update backend states")
}
var (
errs error
updatedBackends []*lb.Backend
)
updateSvcs := make(map[lb.ID]*lbmap.UpsertServiceParams)
s.Lock()
defer s.Unlock()
for _, updatedB := range backends {
hash := updatedB.L3n4Addr.Hash()
be, exists := s.backendByHash[hash]
if !exists {
// Cilium service API and Kubernetes events are asynchronous, so it's
// possible to receive an API call for a backend that's already deleted.
continue
}
if be.State == updatedB.State {
continue
}
if !lb.IsValidStateTransition(be.State, updatedB.State) {
currentState, _ := be.State.String()
newState, _ := updatedB.State.String()
e := fmt.Errorf("invalid state transition for backend"+
"[%s] (%s) -> (%s)", updatedB.String(), currentState, newState)
errs = multierr.Append(errs, e)
continue
}
be.State = updatedB.State
be.Preferred = updatedB.Preferred
for id, info := range s.svcByID {
var p *lbmap.UpsertServiceParams
for i, b := range info.backends {
if b.L3n4Addr.String() != updatedB.L3n4Addr.String() {
continue
}
info.backends[i].State = updatedB.State
info.backends[i].Preferred = updatedB.Preferred
found := false
onlyLocalBackends, _ := info.requireNodeLocalBackends(info.frontend)
if p, found = updateSvcs[id]; !found {
p = &lbmap.UpsertServiceParams{
ID: uint16(id),
IP: info.frontend.L3n4Addr.IP,
Port: info.frontend.L3n4Addr.L4Addr.Port,
PrevBackendsCount: len(info.backends),
IPv6: info.frontend.IsIPv6(),
Type: info.svcType,
Local: onlyLocalBackends,
Scope: info.frontend.L3n4Addr.Scope,
SessionAffinity: info.sessionAffinity,
SessionAffinityTimeoutSec: info.sessionAffinityTimeoutSec,
CheckSourceRange: info.checkLBSourceRange(),
UseMaglev: info.useMaglev(),
}
}
p.PreferredBackends, p.ActiveBackends, p.NonActiveBackends = segregateBackends(info.backends)
updateSvcs[id] = p
log.WithFields(logrus.Fields{
logfields.ServiceID: p.ID,
logfields.BackendID: b.ID,
logfields.L3n4Addr: b.L3n4Addr.String(),
logfields.BackendState: b.State,
logfields.BackendPreferred: b.Preferred,
}).Info("Persisting service with backend state update")
}
s.svcByID[id] = info
s.svcByHash[info.frontend.Hash()] = info
}
updatedBackends = append(updatedBackends, be)
}
// Update the persisted backend state in BPF maps.
for _, b := range updatedBackends {
log.WithFields(logrus.Fields{
logfields.BackendID: b.ID,
logfields.L3n4Addr: b.L3n4Addr.String(),
logfields.BackendState: b.State,
logfields.BackendPreferred: b.Preferred,
}).Info("Persisting updated backend state for backend")
if err := s.lbmap.UpdateBackendWithState(*b); err != nil {
e := fmt.Errorf("failed to update backend %+v %w", b, err)
errs = multierr.Append(errs, e)
}
}
for i := range updateSvcs {
err := s.lbmap.UpsertService(updateSvcs[i])
errs = multierr.Append(errs, err)
}
return errs
}
// DeleteServiceByID removes a service identified by the given ID.
func (s *Service) DeleteServiceByID(id lb.ServiceID) (bool, error) {
s.Lock()
defer s.Unlock()
if svc, found := s.svcByID[lb.ID(id)]; found {
return true, s.deleteServiceLocked(svc)
}
return false, nil
}
// DeleteService removes the given service.
func (s *Service) DeleteService(frontend lb.L3n4Addr) (bool, error) {
s.Lock()
defer s.Unlock()
if svc, found := s.svcByHash[frontend.Hash()]; found {
return true, s.deleteServiceLocked(svc)
}
return false, nil
}
// GetDeepCopyServiceByID returns a deep-copy of a service identified with
// the given ID.
//
// If a service cannot be found, returns false.
func (s *Service) GetDeepCopyServiceByID(id lb.ServiceID) (*lb.SVC, bool) {
s.RLock()
defer s.RUnlock()
svc, found := s.svcByID[lb.ID(id)]
if !found {
return nil, false
}
return svc.deepCopyToLBSVC(), true
}
// GetDeepCopyServices returns a deep-copy of all installed services.
func (s *Service) GetDeepCopyServices() []*lb.SVC {
s.RLock()
defer s.RUnlock()
svcs := make([]*lb.SVC, 0, len(s.svcByHash))
for _, svc := range s.svcByHash {
svcs = append(svcs, svc.deepCopyToLBSVC())
}
return svcs
}
// GetDeepCopyServicesByName returns a deep-copy all matching services.
func (s *Service) GetDeepCopyServicesByName(name, namespace string) (svcs []*lb.SVC) {
s.RLock()
defer s.RUnlock()
for _, svc := range s.svcByHash {
if svc.svcName == name && svc.svcNamespace == namespace {
svcs = append(svcs, svc.deepCopyToLBSVC())
}
}
return svcs
}
// RestoreServices restores services from BPF maps.
//
// It first restores all the service entries, followed by backend entries.