forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gce.go
2425 lines (2165 loc) · 80.6 KB
/
gce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"encoding/json"
"fmt"
"io"
"net/http"
"path"
"regexp"
"sort"
"strconv"
"strings"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/service"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/types"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/flowcontrol"
netsets "k8s.io/kubernetes/pkg/util/net/sets"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
compute "google.golang.org/api/compute/v1"
container "google.golang.org/api/container/v1"
"google.golang.org/api/googleapi"
"google.golang.org/cloud/compute/metadata"
"gopkg.in/gcfg.v1"
)
const (
ProviderName = "gce"
k8sNodeRouteTag = "k8s-node-route"
// AffinityTypeNone - no session affinity.
gceAffinityTypeNone = "NONE"
// AffinityTypeClientIP - affinity based on Client IP.
gceAffinityTypeClientIP = "CLIENT_IP"
// AffinityTypeClientIPProto - affinity based on Client IP and port.
gceAffinityTypeClientIPProto = "CLIENT_IP_PROTO"
operationPollInterval = 3 * time.Second
operationPollTimeoutDuration = 30 * time.Minute
// Each page can have 500 results, but we cap how many pages
// are iterated through to prevent infinite loops if the API
// were to continuously return a nextPageToken.
maxPages = 25
)
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
type GCECloud struct {
service *compute.Service
containerService *container.Service
projectID string
region string
localZone string // The zone in which we are running
managedZones []string // List of zones we are spanning (for Ubernetes-Lite, primarily when running on master)
networkURL string
useMetadataServer bool
operationPollRateLimiter flowcontrol.RateLimiter
}
type Config struct {
Global struct {
TokenURL string `gcfg:"token-url"`
TokenBody string `gcfg:"token-body"`
ProjectID string `gcfg:"project-id"`
NetworkName string `gcfg:"network-name"`
Multizone bool `gcfg:"multizone"`
}
}
func init() {
cloudprovider.RegisterCloudProvider(ProviderName, func(config io.Reader) (cloudprovider.Interface, error) { return newGCECloud(config) })
}
// Raw access to the underlying GCE service, probably should only be used for e2e tests
func (g *GCECloud) GetComputeService() *compute.Service {
return g.service
}
func getProjectAndZone() (string, string, error) {
result, err := metadata.Get("instance/zone")
if err != nil {
return "", "", err
}
parts := strings.Split(result, "/")
if len(parts) != 4 {
return "", "", fmt.Errorf("unexpected response: %s", result)
}
zone := parts[3]
projectID, err := metadata.ProjectID()
if err != nil {
return "", "", err
}
return projectID, zone, nil
}
func getInstanceIDViaMetadata() (string, error) {
result, err := metadata.Get("instance/hostname")
if err != nil {
return "", err
}
parts := strings.Split(result, ".")
if len(parts) == 0 {
return "", fmt.Errorf("unexpected response: %s", result)
}
return parts[0], nil
}
func getCurrentExternalIDViaMetadata() (string, error) {
externalID, err := metadata.Get("instance/id")
if err != nil {
return "", fmt.Errorf("couldn't get external ID: %v", err)
}
return externalID, nil
}
func getCurrentMachineTypeViaMetadata() (string, error) {
mType, err := metadata.Get("instance/machine-type")
if err != nil {
return "", fmt.Errorf("couldn't get machine type: %v", err)
}
parts := strings.Split(mType, "/")
if len(parts) != 4 {
return "", fmt.Errorf("unexpected response for machine type: %s", mType)
}
return parts[3], nil
}
func getNetworkNameViaMetadata() (string, error) {
result, err := metadata.Get("instance/network-interfaces/0/network")
if err != nil {
return "", err
}
parts := strings.Split(result, "/")
if len(parts) != 4 {
return "", fmt.Errorf("unexpected response: %s", result)
}
return parts[3], nil
}
func getNetworkNameViaAPICall(svc *compute.Service, projectID string) (string, error) {
// TODO: use PageToken to list all not just the first 500
networkList, err := svc.Networks.List(projectID).Do()
if err != nil {
return "", err
}
if networkList == nil || len(networkList.Items) <= 0 {
return "", fmt.Errorf("GCE Network List call returned no networks for project %q.", projectID)
}
return networkList.Items[0].Name, nil
}
func getZonesForRegion(svc *compute.Service, projectID, region string) ([]string, error) {
// TODO: use PageToken to list all not just the first 500
listCall := svc.Zones.List(projectID)
// Filtering by region doesn't seem to work
// (tested in https://cloud.google.com/compute/docs/reference/latest/zones/list)
// listCall = listCall.Filter("region eq " + region)
res, err := listCall.Do()
if err != nil {
return nil, fmt.Errorf("unexpected response listing zones: %v", err)
}
zones := []string{}
for _, zone := range res.Items {
regionName := lastComponent(zone.Region)
if regionName == region {
zones = append(zones, zone.Name)
}
}
return zones, nil
}
// newGCECloud creates a new instance of GCECloud.
func newGCECloud(config io.Reader) (*GCECloud, error) {
projectID, zone, err := getProjectAndZone()
if err != nil {
return nil, err
}
region, err := GetGCERegion(zone)
if err != nil {
return nil, err
}
networkName, err := getNetworkNameViaMetadata()
if err != nil {
return nil, err
}
networkURL := gceNetworkURL(projectID, networkName)
// By default, Kubernetes clusters only run against one zone
managedZones := []string{zone}
tokenSource := google.ComputeTokenSource("")
if config != nil {
var cfg Config
if err := gcfg.ReadInto(&cfg, config); err != nil {
glog.Errorf("Couldn't read config: %v", err)
return nil, err
}
if cfg.Global.ProjectID != "" {
projectID = cfg.Global.ProjectID
}
if cfg.Global.NetworkName != "" {
if strings.Contains(cfg.Global.NetworkName, "/") {
networkURL = cfg.Global.NetworkName
} else {
networkURL = gceNetworkURL(cfg.Global.ProjectID, cfg.Global.NetworkName)
}
}
if cfg.Global.TokenURL != "" {
tokenSource = newAltTokenSource(cfg.Global.TokenURL, cfg.Global.TokenBody)
}
if cfg.Global.Multizone {
managedZones = nil // Use all zones in region
}
}
return CreateGCECloud(projectID, region, zone, managedZones, networkURL, tokenSource, true /* useMetadataServer */)
}
// Creates a GCECloud object using the specified parameters.
// If no networkUrl is specified, loads networkName via rest call.
// If no tokenSource is specified, uses oauth2.DefaultTokenSource.
// If managedZones is nil / empty all zones in the region will be managed.
func CreateGCECloud(projectID, region, zone string, managedZones []string, networkURL string, tokenSource oauth2.TokenSource, useMetadataServer bool) (*GCECloud, error) {
if tokenSource == nil {
var err error
tokenSource, err = google.DefaultTokenSource(
oauth2.NoContext,
compute.CloudPlatformScope,
compute.ComputeScope)
glog.Infof("Using DefaultTokenSource %#v", tokenSource)
if err != nil {
return nil, err
}
} else {
glog.Infof("Using existing Token Source %#v", tokenSource)
}
client := oauth2.NewClient(oauth2.NoContext, tokenSource)
svc, err := compute.New(client)
if err != nil {
return nil, err
}
containerSvc, err := container.New(client)
if err != nil {
return nil, err
}
if networkURL == "" {
networkName, err := getNetworkNameViaAPICall(svc, projectID)
if err != nil {
return nil, err
}
networkURL = gceNetworkURL(projectID, networkName)
}
if len(managedZones) == 0 {
managedZones, err = getZonesForRegion(svc, projectID, region)
if err != nil {
return nil, err
}
}
if len(managedZones) != 1 {
glog.Infof("managing multiple zones: %v", managedZones)
}
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.
return &GCECloud{
service: svc,
containerService: containerSvc,
projectID: projectID,
region: region,
localZone: zone,
managedZones: managedZones,
networkURL: networkURL,
useMetadataServer: useMetadataServer,
operationPollRateLimiter: operationPollRateLimiter,
}, nil
}
func (gce *GCECloud) Clusters() (cloudprovider.Clusters, bool) {
return gce, true
}
// ProviderName returns the cloud provider ID.
func (gce *GCECloud) ProviderName() string {
return ProviderName
}
// Known-useless DNS search path.
var uselessDNSSearchRE = regexp.MustCompile(`^[0-9]+.google.internal.$`)
// ScrubDNS filters DNS settings for pods.
func (gce *GCECloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []string) {
// GCE has too many search paths by default. Filter the ones we know are useless.
for _, s := range searches {
if !uselessDNSSearchRE.MatchString(s) {
srchOut = append(srchOut, s)
}
}
return nameservers, srchOut
}
// LoadBalancer returns an implementation of LoadBalancer for Google Compute Engine.
func (gce *GCECloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
return gce, true
}
// Instances returns an implementation of Instances for Google Compute Engine.
func (gce *GCECloud) Instances() (cloudprovider.Instances, bool) {
return gce, true
}
// Zones returns an implementation of Zones for Google Compute Engine.
func (gce *GCECloud) Zones() (cloudprovider.Zones, bool) {
return gce, true
}
// Routes returns an implementation of Routes for Google Compute Engine.
func (gce *GCECloud) Routes() (cloudprovider.Routes, bool) {
return gce, true
}
func makeHostURL(projectID, zone, host string) string {
host = canonicalizeInstanceName(host)
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/zones/%s/instances/%s",
projectID, zone, host)
}
func (h *gceInstance) makeComparableHostPath() string {
return fmt.Sprintf("/zones/%s/instances/%s", h.Zone, h.Name)
}
func hostURLToComparablePath(hostURL string) string {
idx := strings.Index(hostURL, "/zones/")
if idx < 0 {
return ""
}
return hostURL[idx:]
}
func (gce *GCECloud) targetPoolURL(name, region string) string {
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
}
func (gce *GCECloud) waitForOp(op *compute.Operation, getOperation func(operationName string) (*compute.Operation, error)) error {
if op == nil {
return fmt.Errorf("operation must not be nil")
}
if opIsDone(op) {
return getErrorFromOp(op)
}
opName := op.Name
return wait.Poll(operationPollInterval, operationPollTimeoutDuration, func() (bool, error) {
start := time.Now()
gce.operationPollRateLimiter.Accept()
duration := time.Now().Sub(start)
if duration > 5*time.Second {
glog.Infof("pollOperation: waited %v for %v", duration, opName)
}
pollOp, err := getOperation(opName)
if err != nil {
glog.Warningf("GCE poll operation %s failed: pollOp: [%v] err: [%v] getErrorFromOp: [%v]", opName, pollOp, err, getErrorFromOp(pollOp))
}
return opIsDone(pollOp), getErrorFromOp(pollOp)
})
}
func opIsDone(op *compute.Operation) bool {
return op != nil && op.Status == "DONE"
}
func getErrorFromOp(op *compute.Operation) error {
if op != nil && op.Error != nil && len(op.Error.Errors) > 0 {
err := &googleapi.Error{
Code: int(op.HttpErrorStatusCode),
Message: op.Error.Errors[0].Message,
}
glog.Errorf("GCE operation failed: %v", err)
return err
}
return nil
}
func (gce *GCECloud) waitForGlobalOp(op *compute.Operation) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.GlobalOperations.Get(gce.projectID, operationName).Do()
})
}
func (gce *GCECloud) waitForRegionOp(op *compute.Operation, region string) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.RegionOperations.Get(gce.projectID, region, operationName).Do()
})
}
func (gce *GCECloud) waitForZoneOp(op *compute.Operation, zone string) error {
return gce.waitForOp(op, func(operationName string) (*compute.Operation, error) {
return gce.service.ZoneOperations.Get(gce.projectID, zone, operationName).Do()
})
}
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (gce *GCECloud) GetLoadBalancer(service *api.Service) (*api.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, gce.region, loadBalancerName).Do()
if err == nil {
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: fwd.IPAddress}}
return status, true, nil
}
if isHTTPErrorCode(err, http.StatusNotFound) {
return nil, false, nil
}
return nil, false, err
}
func isHTTPErrorCode(err error, code int) bool {
apiErr, ok := err.(*googleapi.Error)
return ok && apiErr.Code == code
}
// EnsureLoadBalancer is an implementation of LoadBalancer.EnsureLoadBalancer.
// Our load balancers in GCE consist of four separate GCE resources - a static
// IP address, a firewall rule, a target pool, and a forwarding rule. This
// function has to manage all of them.
// Due to an interesting series of design decisions, this handles both creating
// new load balancers and updating existing load balancers, recognizing when
// each is needed.
func (gce *GCECloud) EnsureLoadBalancer(apiService *api.Service, hostNames []string, annotations map[string]string) (*api.LoadBalancerStatus, error) {
if len(hostNames) == 0 {
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
}
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return nil, err
}
loadBalancerName := cloudprovider.GetLoadBalancerName(apiService)
loadBalancerIP := apiService.Spec.LoadBalancerIP
ports := apiService.Spec.Ports
portStr := []string{}
for _, p := range apiService.Spec.Ports {
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
}
affinityType := apiService.Spec.SessionAffinity
serviceName := types.NamespacedName{Namespace: apiService.Namespace, Name: apiService.Name}
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", loadBalancerName, gce.region, loadBalancerIP, portStr, hosts, serviceName, annotations)
// Check if the forwarding rule exists, and if so, what its IP is.
fwdRuleExists, fwdRuleNeedsUpdate, fwdRuleIP, err := gce.forwardingRuleNeedsUpdate(loadBalancerName, gce.region, loadBalancerIP, ports)
if err != nil {
return nil, err
}
// Make sure we know which IP address will be used and have properly reserved
// it as static before moving forward with the rest of our operations.
//
// We use static IP addresses when updating a load balancer to ensure that we
// can replace the load balancer's other components without changing the
// address its service is reachable on. We do it this way rather than always
// keeping the static IP around even though this is more complicated because
// it makes it less likely that we'll run into quota issues. Only 7 static
// IP addresses are allowed per region by default.
//
// We could let an IP be allocated for us when the forwarding rule is created,
// but we need the IP to set up the firewall rule, and we want to keep the
// forwarding rule creation as the last thing that needs to be done in this
// function in order to maintain the invariant that "if the forwarding rule
// exists, the LB has been fully created".
ipAddress := ""
// Through this process we try to keep track of whether it is safe to
// release the IP that was allocated. If the user specifically asked for
// an IP, we assume they are managing it themselves. Otherwise, we will
// release the IP in case of early-terminating failure or upon successful
// creating of the LB.
isUserOwnedIP := false // if this is set, we never release the IP
isSafeToReleaseIP := false
defer func() {
if isUserOwnedIP {
return
}
if isSafeToReleaseIP {
if err := gce.deleteStaticIP(loadBalancerName, gce.region); err != nil {
glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
}
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", loadBalancerName, serviceName, ipAddress)
} else {
glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, loadBalancerName, serviceName, gce.region, err)
}
}()
if loadBalancerIP != "" {
// If a specific IP address has been requested, we have to respect the
// user's request and use that IP. If the forwarding rule was already using
// a different IP, it will be harmlessly abandoned because it was only an
// ephemeral IP (or it was a different static IP owned by the user, in which
// case we shouldn't delete it anyway).
if isStatic, err := gce.projectOwnsStaticIP(loadBalancerName, gce.region, loadBalancerIP); err != nil {
return nil, fmt.Errorf("failed to test if this GCE project owns the static IP %s: %v", loadBalancerIP, err)
} else if isStatic {
// The requested IP is a static IP, owned and managed by the user.
isUserOwnedIP = true
isSafeToReleaseIP = false
ipAddress = loadBalancerIP
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", loadBalancerName, serviceName, ipAddress)
} else if loadBalancerIP == fwdRuleIP {
// The requested IP is not a static IP, but is currently assigned
// to this forwarding rule, so we can keep it.
isUserOwnedIP = false
isSafeToReleaseIP = true
ipAddress, _, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP)
if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", loadBalancerName, serviceName, ipAddress)
} else {
// The requested IP is not static and it is not assigned to the
// current forwarding rule. It might be attached to a different
// rule or it might not be part of this project at all. Either
// way, we can't use it.
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", loadBalancerIP, loadBalancerName, serviceName, err)
}
} else {
// The user did not request a specific IP.
isUserOwnedIP = false
// This will either allocate a new static IP if the forwarding rule didn't
// already have an IP, or it will promote the forwarding rule's current
// IP from ephemeral to static, or it will just get the IP if it is
// already static.
existed := false
ipAddress, existed, err = gce.ensureStaticIP(loadBalancerName, serviceName.String(), gce.region, fwdRuleIP)
if err != nil {
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
}
if existed {
// If the IP was not specifically requested by the user, but it
// already existed, it seems to be a failed update cycle. We can
// use this IP and try to run through the process again, but we
// should not release the IP unless it is explicitly flagged as OK.
isSafeToReleaseIP = false
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", loadBalancerName, serviceName, ipAddress)
} else {
// For total clarity. The IP did not pre-exist and the user did
// not ask for a particular one, so we can release the IP in case
// of failure or success.
isSafeToReleaseIP = true
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", loadBalancerName, serviceName, ipAddress)
}
}
// Deal with the firewall next. The reason we do this here rather than last
// is because the forwarding rule is used as the indicator that the load
// balancer is fully created - it's what getLoadBalancer checks for.
// Check if user specified the allow source range
sourceRanges, err := service.GetLoadBalancerSourceRanges(annotations)
if err != nil {
return nil, err
}
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports, sourceRanges)
if err != nil {
return nil, err
}
if firewallNeedsUpdate {
desc := makeFirewallDescription(serviceName.String(), ipAddress)
// Unlike forwarding rules and target pools, firewalls can be updated
// without needing to be deleted and recreated.
if firewallExists {
if err := gce.updateFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName)
} else {
if err := gce.createFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName)
}
}
tpExists, tpNeedsUpdate, err := gce.targetPoolNeedsUpdate(loadBalancerName, gce.region, affinityType)
if err != nil {
return nil, err
}
// Now we get to some slightly more interesting logic.
// First, neither target pools nor forwarding rules can be updated in place -
// they have to be deleted and recreated.
// Second, forwarding rules are layered on top of target pools in that you
// can't delete a target pool that's currently in use by a forwarding rule.
// Thus, we have to tear down the forwarding rule if either it or the target
// pool needs to be updated.
if fwdRuleExists && (fwdRuleNeedsUpdate || tpNeedsUpdate) {
// Begin critical section. If we have to delete the forwarding rule,
// and something should fail before we recreate it, don't release the
// IP. That way we can come back to it later.
isSafeToReleaseIP = false
if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil {
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", loadBalancerName, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName)
}
if tpExists && tpNeedsUpdate {
if err := gce.deleteTargetPool(loadBalancerName, gce.region); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
}
// Once we've deleted the resources (if necessary), build them back up (or for
// the first time if they're new).
if tpNeedsUpdate {
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, hosts, affinityType); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
}
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName)
}
if tpNeedsUpdate || fwdRuleNeedsUpdate {
if err := gce.createForwardingRule(loadBalancerName, serviceName.String(), gce.region, ipAddress, ports); err != nil {
return nil, fmt.Errorf("failed to create forwarding rule %s: %v", loadBalancerName, err)
}
// End critical section. It is safe to release the static IP (which
// just demotes it to ephemeral) now that it is attached. In the case
// of a user-requested IP, the "is user-owned" flag will be set,
// preventing it from actually being released.
isSafeToReleaseIP = true
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", loadBalancerName, serviceName, ipAddress)
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: ipAddress}}
return status, nil
}
// Passing nil for requested IP is perfectly fine - it just means that no specific
// IP is being requested.
// Returns whether the forwarding rule exists, whether it needs to be updated,
// what its IP address is (if it exists), and any error we encountered.
func (gce *GCECloud) forwardingRuleNeedsUpdate(name, region string, loadBalancerIP string, ports []api.ServicePort) (exists bool, needsUpdate bool, ipAddress string, err error) {
fwd, err := gce.service.ForwardingRules.Get(gce.projectID, region, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, "", nil
}
return false, false, "", fmt.Errorf("error getting load balancer's forwarding rule: %v", err)
}
if loadBalancerIP != fwd.IPAddress {
return true, true, fwd.IPAddress, nil
}
portRange, err := loadBalancerPortRange(ports)
if err != nil {
return false, false, "", err
}
if portRange != fwd.PortRange {
return true, true, fwd.IPAddress, nil
}
// The service controller verified all the protocols match on the ports, just check the first one
if string(ports[0].Protocol) != fwd.IPProtocol {
return true, true, fwd.IPAddress, nil
}
return true, false, fwd.IPAddress, nil
}
func loadBalancerPortRange(ports []api.ServicePort) (string, error) {
if len(ports) == 0 {
return "", fmt.Errorf("no ports specified for GCE load balancer")
}
// The service controller verified all the protocols match on the ports, just check and use the first one
if ports[0].Protocol != api.ProtocolTCP && ports[0].Protocol != api.ProtocolUDP {
return "", fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(ports[0].Protocol))
}
minPort := 65536
maxPort := 0
for i := range ports {
if ports[i].Port < minPort {
minPort = ports[i].Port
}
if ports[i].Port > maxPort {
maxPort = ports[i].Port
}
}
return fmt.Sprintf("%d-%d", minPort, maxPort), nil
}
// Doesn't check whether the hosts have changed, since host updating is handled
// separately.
func (gce *GCECloud) targetPoolNeedsUpdate(name, region string, affinityType api.ServiceAffinity) (exists bool, needsUpdate bool, err error) {
tp, err := gce.service.TargetPools.Get(gce.projectID, region, name).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil
}
return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err)
}
if translateAffinityType(affinityType) != tp.SessionAffinity {
return true, true, nil
}
return true, false, nil
}
// translate from what K8s supports to what the cloud provider supports for session affinity.
func translateAffinityType(affinityType api.ServiceAffinity) string {
switch affinityType {
case api.ServiceAffinityClientIP:
return gceAffinityTypeClientIP
case api.ServiceAffinityNone:
return gceAffinityTypeNone
default:
glog.Errorf("Unexpected affinity type: %v", affinityType)
return gceAffinityTypeNone
}
}
func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []api.ServicePort, sourceRanges netsets.IPNet) (exists bool, needsUpdate bool, err error) {
fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do()
if err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil
}
return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err)
}
if fw.Description != makeFirewallDescription(serviceName, ipAddress) {
return true, true, nil
}
if len(fw.Allowed) != 1 || (fw.Allowed[0].IPProtocol != "tcp" && fw.Allowed[0].IPProtocol != "udp") {
return true, true, nil
}
// Make sure the allowed ports match.
allowedPorts := make([]string, len(ports))
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
}
if !slicesEqual(allowedPorts, fw.Allowed[0].Ports) {
return true, true, nil
}
// The service controller already verified that the protocol matches on all ports, no need to check.
actualSourceRanges, err := netsets.ParseIPNets(fw.SourceRanges...)
if err != nil {
// This really shouldn't happen... GCE has returned something unexpected
glog.Warningf("Error parsing firewall SourceRanges: %v", fw.SourceRanges)
// We don't return the error, because we can hopefully recover from this by reconfiguring the firewall
return true, true, nil
}
if !sourceRanges.Equal(actualSourceRanges) {
return true, true, nil
}
return true, false, nil
}
func makeFirewallName(name string) string {
return fmt.Sprintf("k8s-fw-%s", name)
}
func makeFirewallDescription(serviceName, ipAddress string) string {
return fmt.Sprintf(`{"kubernetes.io/service-ip":"%s", "kubernetes.io/service-name":"%s"}`,
ipAddress, serviceName)
}
func slicesEqual(x, y []string) bool {
if len(x) != len(y) {
return false
}
sort.Strings(x)
sort.Strings(y)
for i := range x {
if x[i] != y[i] {
return false
}
}
return true
}
func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []api.ServicePort) error {
portRange, err := loadBalancerPortRange(ports)
if err != nil {
return err
}
req := &compute.ForwardingRule{
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
IPAddress: ipAddress,
IPProtocol: string(ports[0].Protocol),
PortRange: portRange,
Target: gce.targetPoolURL(name, region),
}
op, err := gce.service.ForwardingRules.Insert(gce.projectID, region, req).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if op != nil {
err = gce.waitForRegionOp(op, region)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
}
return nil
}
func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType api.ServiceAffinity) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
}
pool := &compute.TargetPool{
Name: name,
Description: fmt.Sprintf(`{"kubernetes.io/service-name":"%s"}`, serviceName),
Instances: instances,
SessionAffinity: translateAffinityType(affinityType),
}
op, err := gce.service.TargetPools.Insert(gce.projectID, region, pool).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if op != nil {
err = gce.waitForRegionOp(op, region)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
}
return nil
}
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
}
op, err := gce.service.Firewalls.Insert(gce.projectID, firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if op != nil {
err = gce.waitForGlobalOp(op)
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
}
return nil
}
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) error {
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
if err != nil {
return err
}
op, err := gce.service.Firewalls.Update(gce.projectID, makeFirewallName(name), firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return err
}
if op != nil {
err = gce.waitForGlobalOp(op)
if err != nil {
return err
}
}
return nil
}
func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges netsets.IPNet, ports []api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
allowedPorts := make([]string, len(ports))
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
}
hostTags, err := gce.computeHostTags(hosts)
if err != nil {
return nil, err
}
firewall := &compute.Firewall{
Name: makeFirewallName(name),
Description: desc,
Network: gce.networkURL,
SourceRanges: sourceRanges.StringSlice(),
TargetTags: hostTags,
Allowed: []*compute.FirewallAllowed{
{
IPProtocol: strings.ToLower(string(ports[0].Protocol)),
Ports: allowedPorts,
},
},
}
return firewall, nil
}
// We grab all tags from all instances being added to the pool.
// * The longest tag that is a prefix of the instance name is used
// * If any instance has a prefix tag, all instances must
// * If no instances have a prefix tag, no tags are used
func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
// TODO: We could store the tags in gceInstance, so we could have already fetched it
hostNamesByZone := make(map[string][]string)
for _, host := range hosts {
hostNamesByZone[host.Zone] = append(hostNamesByZone[host.Zone], host.Name)
}
tags := sets.NewString()
for zone, hostNames := range hostNamesByZone {
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Instances.List(gce.projectID, zone)
// Add the filter for hosts
listCall = listCall.Filter("name eq (" + strings.Join(hostNames, "|") + ")")
// Add the fields we want
listCall = listCall.Fields("items(name,tags)")
if pageToken != "" {
listCall = listCall.PageToken(pageToken)
}
res, err := listCall.Do()
if err != nil {
return nil, err
}
pageToken = res.NextPageToken
for _, instance := range res.Items {
longest_tag := ""
for _, tag := range instance.Tags.Items {
if strings.HasPrefix(instance.Name, tag) && len(tag) > len(longest_tag) {
longest_tag = tag
}
}
if len(longest_tag) > 0 {
tags.Insert(longest_tag)
} else if len(tags) > 0 {
return nil, fmt.Errorf("Some, but not all, instances have prefix tags (%s is missing)", instance.Name)
}
}
}
if page >= maxPages {
glog.Errorf("computeHostTags exceeded maxPages=%d for Instances.List: truncating.", maxPages)
}
}
if len(tags) == 0 {
glog.V(2).Info("No instances had tags, creating rule without target tags")
}
return tags.List(), nil
}
func (gce *GCECloud) projectOwnsStaticIP(name, region string, ipAddress string) (bool, error) {
pageToken := ""
page := 0
for ; page == 0 || (pageToken != "" && page < maxPages); page++ {
listCall := gce.service.Addresses.List(gce.projectID, region)
if pageToken != "" {