forked from openshift/origin
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathservice_ingressip_controller.go
691 lines (602 loc) · 22.5 KB
/
service_ingressip_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
package ingressip
import (
"fmt"
"net"
"sort"
"sync"
"time"
"github.com/golang/glog"
"k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
kclientset "k8s.io/client-go/kubernetes"
kcoreclient "k8s.io/client-go/kubernetes/typed/core/v1"
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/registry/core/service/allocator"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
)
const (
// It's necessary to allocate for the initial sync in consistent
// order rather than in the order received. This requires waiting
// until the initial sync has been processed, and to avoid a hot
// loop, we'll wait this long between checks.
SyncProcessedPollPeriod = 100 * time.Millisecond
clientRetryCount = 5
clientRetryInterval = 5 * time.Second
clientRetryFactor = 1.1
)
// IngressIPController is responsible for allocating ingress ip
// addresses to Service objects of type LoadBalancer.
type IngressIPController struct {
client kcoreclient.ServicesGetter
controller cache.Controller
hasSynced cache.InformerSynced
maxRetries int
// Tracks ip allocation for the configured range
ipAllocator *ipallocator.Range
// Tracks ip -> service key to allow detection of duplicate ip
// allocations.
allocationMap map[string]string
// Tracks services requeued for allocation when the range is full.
requeuedAllocations sets.String
// Protects the transition between initial sync and regular processing
lock sync.Mutex
cache cache.Store
queue workqueue.RateLimitingInterface
// recorder is used to record events.
recorder record.EventRecorder
// changeHandler does the work. It can be factored out for unit testing.
changeHandler func(change *serviceChange) error
// persistenceHandler persists service changes. It can be factored out for unit testing
persistenceHandler func(client kcoreclient.ServicesGetter, service *v1.Service, targetStatus bool) error
}
type serviceChange struct {
key string
oldService *v1.Service
requeuedAllocation bool
}
// NewIngressIPController creates a new IngressIPController.
// TODO this should accept a shared informer
func NewIngressIPController(services cache.SharedIndexInformer, kc kclientset.Interface, ipNet *net.IPNet, resyncInterval time.Duration) *IngressIPController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: kc.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "ingressip-controller"})
ic := &IngressIPController{
client: kc.Core(),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
maxRetries: 10,
recorder: recorder,
}
ic.cache = services.GetStore()
ic.controller = services.GetController()
services.AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ic.enqueueChange(obj, nil)
},
UpdateFunc: func(old, cur interface{}) {
ic.enqueueChange(cur, old)
},
DeleteFunc: func(obj interface{}) {
ic.enqueueChange(nil, obj)
},
},
resyncInterval,
)
ic.hasSynced = ic.controller.HasSynced
ic.changeHandler = ic.processChange
ic.persistenceHandler = persistService
ic.ipAllocator = ipallocator.NewAllocatorCIDRRange(ipNet, func(max int, rangeSpec string) allocator.Interface {
return allocator.NewAllocationMap(max, rangeSpec)
})
ic.allocationMap = make(map[string]string)
ic.requeuedAllocations = sets.NewString()
return ic
}
// enqueueChange transforms the old and new objects into a change
// object and queues it. A lock is shared with processInitialSync to
// avoid enqueueing while the changes from the initial sync are being
// processed.
func (ic *IngressIPController) enqueueChange(new interface{}, old interface{}) {
ic.lock.Lock()
defer ic.lock.Unlock()
change := &serviceChange{}
if new != nil {
// Queue the key needed to retrieve the lastest state from the
// cache when the change is processed.
key, err := controller.KeyFunc(new)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", new, err))
return
}
change.key = key
}
if old != nil {
service, ok := old.(*v1.Service)
if !ok {
tombstone, ok := old.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", old))
return
}
service, ok = tombstone.Obj.(*v1.Service)
if !ok {
utilruntime.HandleError(fmt.Errorf("tombstone contained unexpected object %#v", old))
return
}
}
change.oldService = service
}
ic.queue.Add(change)
}
// Run begins watching and syncing.
func (ic *IngressIPController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ic.queue.ShutDown()
glog.V(5).Infof("Waiting for the initial sync to be completed")
if !cache.WaitForCacheSync(stopCh, ic.hasSynced) {
return
}
if !ic.processInitialSync() {
return
}
glog.V(5).Infof("Initial sync completed, starting worker")
for ic.work() {
var done bool
select {
case _, ok := <-stopCh:
done = !ok
default:
}
if done {
break
}
}
glog.V(1).Infof("Shutting down ingress ip controller")
}
type serviceAge []*v1.Service
func (s serviceAge) Len() int { return len(s) }
func (s serviceAge) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s serviceAge) Less(i, j int) bool {
if s[i].CreationTimestamp.Before(&s[j].CreationTimestamp) {
return true
}
return (s[i].CreationTimestamp == s[j].CreationTimestamp && s[i].UID < s[j].UID)
}
// processInitialSync processes the items queued by informer's initial sync.
// A lock is shared between this method and enqueueService to ensure
// that queue additions are blocked while the sync is being processed.
// Returns a boolean indication of whether processing should continue.
func (ic *IngressIPController) processInitialSync() bool {
ic.lock.Lock()
defer ic.lock.Unlock()
glog.V(5).Infof("Processing initial sync")
// Track services that need to be processed after existing
// allocations are recorded.
var pendingServices []*v1.Service
// Track post-sync changes that need to be added back the queue
// after allocations are recorded. These changes may end up in
// the queue if watch events were queued before completion of the
// initial sync was detected.
var pendingChanges []*serviceChange
// Drain the queue. Len() should be safe because enqueueService
// requires the same lock held by this method.
for ic.queue.Len() > 0 {
item, quit := ic.queue.Get()
if quit {
return false
}
ic.queue.Done(item)
ic.queue.Forget(item)
change := item.(*serviceChange)
// The initial sync only includes additions, so if an update
// or delete is seen (indicated by the presence of oldService),
// it and all subsequent changes are post-sync watch events that
// should be queued without processing.
postSyncChange := change.oldService != nil || len(pendingChanges) > 0
if postSyncChange {
pendingChanges = append(pendingChanges, change)
continue
}
service := ic.getCachedService(change.key)
if service == nil {
// Service was deleted
continue
}
if service.Spec.Type == v1.ServiceTypeLoadBalancer {
// Save for subsequent addition back to the queue to
// ensure persistent state is updated during regular
// processing.
pendingServices = append(pendingServices, service)
if len(service.Status.LoadBalancer.Ingress) > 0 {
// The service has an existing allocation
ipString := service.Status.LoadBalancer.Ingress[0].IP
// Return values indicating that reallocation is
// necessary or that an error occurred can be ignored
// since the service will be processed again.
ic.recordLocalAllocation(change.key, ipString)
}
}
}
// Add pending service additions back to the queue in consistent order.
sort.Sort(serviceAge(pendingServices))
for _, service := range pendingServices {
if key, err := controller.KeyFunc(service); err == nil {
glog.V(5).Infof("Adding service back to queue: %v ", key)
change := &serviceChange{key: key}
ic.queue.Add(change)
} else {
// This error should have been caught by enqueueService
utilruntime.HandleError(fmt.Errorf("Couldn't get key for service %+v: %v", service, err))
continue
}
}
// Add watch events back to the queue
for _, change := range pendingChanges {
ic.queue.Add(change)
}
glog.V(5).Infof("Completed processing initial sync")
return true
}
// getCachedService logs if unable to retrieve a service for the given key.
func (ic *IngressIPController) getCachedService(key string) *v1.Service {
if len(key) == 0 {
return nil
}
if obj, exists, err := ic.cache.GetByKey(key); err != nil {
glog.V(5).Infof("Unable to retrieve service %v from store: %v", key, err)
} else if !exists {
glog.V(6).Infof("Service %v has been deleted", key)
} else {
return obj.(*v1.Service)
}
return nil
}
// recordLocalAllocation attempts to update local state for the given
// service key and ingress ip. Returns a boolean indication of
// whether reallocation is necessary and an error indicating the
// reason for reallocation. If reallocation is not indicated, a
// non-nil error indicates an exceptional condition.
func (ic *IngressIPController) recordLocalAllocation(key, ipString string) (reallocate bool, err error) {
ip := net.ParseIP(ipString)
if ip == nil {
return true, fmt.Errorf("Service %v has an invalid ingress ip %v. A new ip will be allocated.", key, ipString)
}
ipKey, ok := ic.allocationMap[ipString]
switch {
case ok && ipKey == key:
// Allocation exists for this service
return false, nil
case ok && ipKey != key:
// TODO prefer removing the allocation from a service that does not have a matching LoadBalancerIP
return true, fmt.Errorf("Another service is using ingress ip %v. A new ip will be allocated for %v.", ipString, key)
}
err = ic.ipAllocator.Allocate(ip)
if _, ok := err.(*ipallocator.ErrNotInRange); ok {
return true, fmt.Errorf("The ingress ip %v for service %v is not in the ingress range. A new ip will be allocated.", ipString, key)
} else if err != nil {
// The only other error that Allocate() can throw is ErrAllocated, but that
// should not happen after the check against the allocation map.
return false, fmt.Errorf("Unexpected error from ip allocator for service %v: %v", key, err)
}
ic.allocationMap[ipString] = key
glog.V(5).Infof("Recorded allocation of ip %v for service %v", ipString, key)
return false, nil
}
// work dispatches the next item in the queue to the change handler.
// If the change handler returns an error, the change will be added to
// the end of the queue to be processed again. Returns a boolean
// indication of whether processing should continue.
func (ic *IngressIPController) work() bool {
item, quit := ic.queue.Get()
if quit {
return false
}
change := item.(*serviceChange)
defer ic.queue.Done(change)
if change.requeuedAllocation {
// Reset the allocation state so that the change can be
// requeued if necessary. Only additions/updates are requeued
// for allocation so change.key should be non-empty.
change.requeuedAllocation = false
ic.requeuedAllocations.Delete(change.key)
}
if err := ic.changeHandler(change); err == nil {
// No further processing required
ic.queue.Forget(change)
} else {
if err == ipallocator.ErrFull {
// When the range is full, avoid requeueing more than a
// single change requiring allocation per service.
// Otherwise the queue could grow without bounds as every
// service update would add another change that would be
// endlessly requeued.
if ic.requeuedAllocations.Has(change.key) {
return true
}
change.requeuedAllocation = true
ic.requeuedAllocations.Insert(change.key)
service := ic.getCachedService(change.key)
if service != nil {
ic.recorder.Eventf(service, v1.EventTypeWarning, "IngressIPRangeFull", "No available ingress ip to allocate to service %s", change.key)
}
}
// Failed but can be retried
utilruntime.HandleError(fmt.Errorf("error syncing service, it will be retried: %v", err))
ic.queue.AddRateLimited(change)
}
return true
}
// processChange responds to a service change by synchronizing the
// local and persisted ingress ip allocation state of the service.
func (ic *IngressIPController) processChange(change *serviceChange) error {
service := ic.getCachedService(change.key)
ic.clearOldAllocation(service, change.oldService)
if service == nil {
// Service was deleted - no further processing required
return nil
}
typeLoadBalancer := service.Spec.Type == v1.ServiceTypeLoadBalancer
hasAllocation := len(service.Status.LoadBalancer.Ingress) > 0
switch {
case typeLoadBalancer && hasAllocation:
return ic.recordAllocation(service, change.key)
case typeLoadBalancer && !hasAllocation:
return ic.allocate(service, change.key)
case !typeLoadBalancer && hasAllocation:
return ic.deallocate(service, change.key)
default:
return nil
}
}
// clearOldAllocation clears the old allocation for a service if it
// differs from a new allocation. Returns a boolean indication of
// whether the old allocation was cleared.
func (ic *IngressIPController) clearOldAllocation(new, old *v1.Service) bool {
oldIP := ""
if old != nil && old.Spec.Type == v1.ServiceTypeLoadBalancer && len(old.Status.LoadBalancer.Ingress) > 0 {
oldIP = old.Status.LoadBalancer.Ingress[0].IP
}
noOldAllocation := len(oldIP) == 0
if noOldAllocation {
return false
}
newIP := ""
if new != nil && new.Spec.Type == v1.ServiceTypeLoadBalancer && len(new.Status.LoadBalancer.Ingress) > 0 {
newIP = new.Status.LoadBalancer.Ingress[0].IP
}
allocationUnchanged := newIP == oldIP
if allocationUnchanged {
return false
}
// New allocation differs from old due to update or deletion
// Get the key from the old service since the new service may be nil
if key, err := controller.KeyFunc(old); err == nil {
ic.clearLocalAllocation(key, oldIP)
return true
} else {
// Recovery/retry not possible for this error
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", old, err))
return false
}
}
// recordAllocation updates local state with the ingress ip indicated
// in a service's status and ensures that the ingress ip appears in
// the service's list of external ips. If the service's ingress ip is
// invalid for any reason, a new ip will be allocated.
func (ic *IngressIPController) recordAllocation(service *v1.Service, key string) error {
// If more than one ingress ip is present, it will be ignored
ipString := service.Status.LoadBalancer.Ingress[0].IP
reallocate, err := ic.recordLocalAllocation(key, ipString)
if !reallocate && err != nil {
return err
}
reallocateMessage := ""
if err != nil {
reallocateMessage = err.Error()
}
// Make a copy to modify to avoid mutating cache state
serviceCopy := service.DeepCopy()
if reallocate {
// TODO update the external ips but not the status since
// allocate() will overwrite any existing allocation.
if err = ic.clearPersistedAllocation(serviceCopy, key, reallocateMessage); err != nil {
return err
}
ic.recorder.Eventf(serviceCopy, v1.EventTypeWarning, "IngressIPReallocated", reallocateMessage)
return ic.allocate(serviceCopy, key)
} else {
// Ensure that the ingress ip is present in the service's spec.
return ic.ensureExternalIP(serviceCopy, key, ipString)
}
}
// allocate assigns an unallocated ip to a service and updates the
// service's persisted state.
func (ic *IngressIPController) allocate(service *v1.Service, key string) error {
// Make a copy to avoid mutating cache state
serviceCopy := service.DeepCopy()
ip, err := ic.allocateIP(serviceCopy.Spec.LoadBalancerIP)
if err != nil {
return err
}
ipString := ip.String()
glog.V(5).Infof("Allocating ip %v to service %v", ipString, key)
serviceCopy.Status = v1.ServiceStatus{
LoadBalancer: v1.LoadBalancerStatus{
Ingress: []v1.LoadBalancerIngress{
{
IP: ipString,
},
},
},
}
if err = ic.persistServiceStatus(serviceCopy); err != nil {
if releaseErr := ic.ipAllocator.Release(ip); releaseErr != nil {
// Release from contiguous allocator should never return an error, but just in case...
utilruntime.HandleError(fmt.Errorf("Error releasing ip %v for service %v: %v", ipString, key, releaseErr))
}
return err
}
ic.allocationMap[ipString] = key
return ic.ensureExternalIP(serviceCopy, key, ipString)
}
// deallocate ensures that the ip currently allocated to a service is
// removed and that its loadbalancer status is cleared.
func (ic *IngressIPController) deallocate(service *v1.Service, key string) error {
glog.V(5).Infof("Clearing allocation state for %v", key)
// Make a copy to modify to avoid mutating cache state
serviceCopy := service.DeepCopy()
// Get the ingress ip to remove from local allocation state before
// it is removed from the service.
ipString := serviceCopy.Status.LoadBalancer.Ingress[0].IP
if err := ic.clearPersistedAllocation(serviceCopy, key, ""); err != nil {
return err
}
ic.clearLocalAllocation(key, ipString)
return nil
}
// clearLocalAllocation clears an in-memory allocation if it belongs
// to the specified service key.
func (ic *IngressIPController) clearLocalAllocation(key, ipString string) bool {
glog.V(5).Infof("Attempting to clear local allocation of ip %v for service %v", ipString, key)
ip := net.ParseIP(ipString)
if ip == nil {
// An invalid ip address cannot be deallocated
utilruntime.HandleError(fmt.Errorf("Error parsing ip: %v", ipString))
return false
}
ipKey, ok := ic.allocationMap[ipString]
switch {
case !ok:
glog.V(6).Infof("IP address %v is not currently allocated", ipString)
return false
case key != ipKey:
glog.V(6).Infof("IP address %v is not allocated to service %v", ipString, key)
return false
}
// Remove allocation
if err := ic.ipAllocator.Release(ip); err != nil {
// Release from contiguous allocator should never return an error.
utilruntime.HandleError(fmt.Errorf("Error releasing ip %v for service %v: %v", ipString, key, err))
return false
}
delete(ic.allocationMap, ipString)
glog.V(5).Infof("IP address %v is now available for allocation", ipString)
return true
}
// clearPersistedAllocation ensures there is no ingress ip in the
// service's spec and that the service's status is cleared.
func (ic *IngressIPController) clearPersistedAllocation(service *v1.Service, key, errMessage string) error {
// Assume it is safe to modify the service without worrying about changing the local cache
if len(errMessage) > 0 {
utilruntime.HandleError(fmt.Errorf(errMessage))
} else {
glog.V(5).Infof("Attempting to clear persisted allocation for service: %v", key)
}
// An ingress ip is only allowed in ExternalIPs when a
// corresponding status exists, so update the spec first to avoid
// failing admission control.
ingressIP := service.Status.LoadBalancer.Ingress[0].IP
for i, ip := range service.Spec.ExternalIPs {
if ip == ingressIP {
glog.V(5).Infof("Removing ip %v from the external ips of service %v", ingressIP, key)
service.Spec.ExternalIPs = append(service.Spec.ExternalIPs[:i], service.Spec.ExternalIPs[i+1:]...)
if err := ic.persistServiceSpec(service); err != nil {
return err
}
break
}
}
service.Status.LoadBalancer = v1.LoadBalancerStatus{}
glog.V(5).Infof("Clearing the load balancer status of service: %v", key)
return ic.persistServiceStatus(service)
}
// ensureExternalIP ensures that the provided service has the ingress
// ip persisted as an external ip.
func (ic *IngressIPController) ensureExternalIP(service *v1.Service, key, ingressIP string) error {
// Assume it is safe to modify the service without worrying about changing the local cache
ipExists := false
for _, ip := range service.Spec.ExternalIPs {
if ip == ingressIP {
ipExists = true
glog.V(6).Infof("Service %v already has ip %v as an external ip", key, ingressIP)
break
}
}
if !ipExists {
service.Spec.ExternalIPs = append(service.Spec.ExternalIPs, ingressIP)
glog.V(5).Infof("Adding ip %v to service %v as an external ip", ingressIP, key)
return ic.persistServiceSpec(service)
}
return nil
}
// allocateIP attempts to allocate the requested ip, and if that is
// not possible, allocates the next available address.
func (ic *IngressIPController) allocateIP(requestedIP string) (net.IP, error) {
if len(requestedIP) == 0 {
// Specific ip not requested
return ic.ipAllocator.AllocateNext()
}
var ip net.IP
if ip = net.ParseIP(requestedIP); ip == nil {
// Invalid ip
return ic.ipAllocator.AllocateNext()
}
if err := ic.ipAllocator.Allocate(ip); err != nil {
// Unable to allocate requested ip
return ic.ipAllocator.AllocateNext()
}
// Allocated requested ip
return ip, nil
}
func (ic *IngressIPController) persistServiceSpec(service *v1.Service) error {
return ic.persistenceHandler(ic.client, service, false)
}
func (ic *IngressIPController) persistServiceStatus(service *v1.Service) error {
return ic.persistenceHandler(ic.client, service, true)
}
func persistService(client kcoreclient.ServicesGetter, service *v1.Service, targetStatus bool) error {
backoff := wait.Backoff{
Steps: clientRetryCount,
Duration: clientRetryInterval,
Factor: clientRetryFactor,
}
return wait.ExponentialBackoff(backoff, func() (bool, error) {
var err error
if targetStatus {
_, err = client.Services(service.Namespace).UpdateStatus(service)
} else {
_, err = client.Services(service.Namespace).Update(service)
}
switch {
case err == nil:
return true, nil
case kerrors.IsNotFound(err):
// If the service no longer exists, we don't want to recreate
// it. Just bail out so that we can process the delete, which
// we should soon be receiving if we haven't already.
glog.V(5).Infof("Not persisting update to service '%s/%s' that no longer exists: %v",
service.Namespace, service.Name, err)
return true, nil
case kerrors.IsConflict(err):
// TODO: Try to resolve the conflict if the change was
// unrelated to load balancer status. For now, just rely on
// the fact that we'll also process the update that caused the
// resource version to change.
glog.V(5).Infof("Not persisting update to service '%s/%s' that has been changed since we received it: %v",
service.Namespace, service.Name, err)
return true, nil
default:
err = fmt.Errorf("Failed to persist updated LoadBalancerStatus to service '%s/%s': %v",
service.Namespace, service.Name, err)
return false, err
}
})
}