Skip to content
This repository was archived by the owner on Feb 8, 2021. It is now read-only.

Commit ee81e5e

Browse files
committed
Retry Pod/RC updates in kubectl rolling-update
1 parent 62ce669 commit ee81e5e

File tree

5 files changed

+168
-113
lines changed

5 files changed

+168
-113
lines changed

pkg/client/unversioned/conditions.go

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package unversioned
1818

1919
import (
2020
"fmt"
21-
"time"
2221

2322
"k8s.io/kubernetes/pkg/api"
2423
"k8s.io/kubernetes/pkg/api/errors"
@@ -29,61 +28,6 @@ import (
2928
"k8s.io/kubernetes/pkg/watch"
3029
)
3130

32-
// DefaultRetry is the recommended retry for a conflict where multiple clients
33-
// are making changes to the same resource.
34-
var DefaultRetry = wait.Backoff{
35-
Steps: 5,
36-
Duration: 10 * time.Millisecond,
37-
Factor: 1.0,
38-
Jitter: 0.1,
39-
}
40-
41-
// DefaultBackoff is the recommended backoff for a conflict where a client
42-
// may be attempting to make an unrelated modification to a resource under
43-
// active management by one or more controllers.
44-
var DefaultBackoff = wait.Backoff{
45-
Steps: 4,
46-
Duration: 10 * time.Millisecond,
47-
Factor: 5.0,
48-
Jitter: 0.1,
49-
}
50-
51-
// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting
52-
// write. Callers should preserve previous executions if they wish to retry changes. It performs an
53-
// exponential backoff.
54-
//
55-
// var pod *api.Pod
56-
// err := RetryOnConflict(DefaultBackoff, func() (err error) {
57-
// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
58-
// return
59-
// })
60-
// if err != nil {
61-
// // may be conflict if max retries were hit
62-
// return err
63-
// }
64-
// ...
65-
//
66-
// TODO: Make Backoff an interface?
67-
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
68-
var lastConflictErr error
69-
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
70-
err := fn()
71-
switch {
72-
case err == nil:
73-
return true, nil
74-
case errors.IsConflict(err):
75-
lastConflictErr = err
76-
return false, nil
77-
default:
78-
return false, err
79-
}
80-
})
81-
if err == wait.ErrWaitTimeout {
82-
err = lastConflictErr
83-
}
84-
return err
85-
}
86-
8731
// ControllerHasDesiredReplicas returns a condition that will be true if and only if
8832
// the desired replica count for a controller's ReplicaSelector equals the Replicas count.
8933
func ControllerHasDesiredReplicas(c Interface, controller *api.ReplicationController) wait.ConditionFunc {

pkg/client/unversioned/util.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
Copyright 2016 The Kubernetes Authors All rights reserved.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package unversioned
18+
19+
import (
20+
"time"
21+
22+
"k8s.io/kubernetes/pkg/api/errors"
23+
"k8s.io/kubernetes/pkg/util/wait"
24+
)
25+
26+
// DefaultRetry is the recommended retry for a conflict where multiple clients
27+
// are making changes to the same resource.
28+
var DefaultRetry = wait.Backoff{
29+
Steps: 5,
30+
Duration: 10 * time.Millisecond,
31+
Factor: 1.0,
32+
Jitter: 0.1,
33+
}
34+
35+
// DefaultBackoff is the recommended backoff for a conflict where a client
36+
// may be attempting to make an unrelated modification to a resource under
37+
// active management by one or more controllers.
38+
var DefaultBackoff = wait.Backoff{
39+
Steps: 4,
40+
Duration: 10 * time.Millisecond,
41+
Factor: 5.0,
42+
Jitter: 0.1,
43+
}
44+
45+
// RetryConflict executes the provided function repeatedly, retrying if the server returns a conflicting
46+
// write. Callers should preserve previous executions if they wish to retry changes. It performs an
47+
// exponential backoff.
48+
//
49+
// var pod *api.Pod
50+
// err := RetryOnConflict(DefaultBackoff, func() (err error) {
51+
// pod, err = c.Pods("mynamespace").UpdateStatus(podStatus)
52+
// return
53+
// })
54+
// if err != nil {
55+
// // may be conflict if max retries were hit
56+
// return err
57+
// }
58+
// ...
59+
//
60+
// TODO: Make Backoff an interface?
61+
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
62+
var lastConflictErr error
63+
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
64+
err := fn()
65+
switch {
66+
case err == nil:
67+
return true, nil
68+
case errors.IsConflict(err):
69+
lastConflictErr = err
70+
return false, nil
71+
default:
72+
return false, err
73+
}
74+
})
75+
if err == wait.ErrWaitTimeout {
76+
err = lastConflictErr
77+
}
78+
return err
79+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2014 The Kubernetes Authors All rights reserved.
2+
Copyright 2016 The Kubernetes Authors All rights reserved.
33
44
Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.

pkg/kubectl/rolling_updater.go

Lines changed: 82 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,16 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error {
187187
if err != nil {
188188
return err
189189
}
190-
if existing.Annotations == nil {
191-
existing.Annotations = map[string]string{}
190+
originReplicas := strconv.Itoa(int(existing.Spec.Replicas))
191+
applyUpdate := func(rc *api.ReplicationController) {
192+
if rc.Annotations == nil {
193+
rc.Annotations = map[string]string{}
194+
}
195+
rc.Annotations[originalReplicasAnnotation] = originReplicas
192196
}
193-
existing.Annotations[originalReplicasAnnotation] = strconv.Itoa(int(existing.Spec.Replicas))
194-
updated, err := r.c.ReplicationControllers(existing.Namespace).Update(existing)
195-
if err != nil {
197+
if oldRc, err = updateRcWithRetries(r.c, existing.Namespace, existing, applyUpdate); err != nil {
196198
return err
197199
}
198-
oldRc = updated
199200
}
200201
// maxSurge is the maximum scaling increment and maxUnavailable are the maximum pods
201202
// that can be unavailable during a rollout.
@@ -482,13 +483,14 @@ func (r *RollingUpdater) cleanupWithClients(oldRc, newRc *api.ReplicationControl
482483
if err != nil {
483484
return err
484485
}
485-
delete(newRc.Annotations, sourceIdAnnotation)
486-
delete(newRc.Annotations, desiredReplicasAnnotation)
487-
488-
newRc, err = r.c.ReplicationControllers(r.ns).Update(newRc)
489-
if err != nil {
486+
applyUpdate := func(rc *api.ReplicationController) {
487+
delete(rc.Annotations, sourceIdAnnotation)
488+
delete(rc.Annotations, desiredReplicasAnnotation)
489+
}
490+
if newRc, err = updateRcWithRetries(r.c, r.ns, newRc, applyUpdate); err != nil {
490491
return err
491492
}
493+
492494
if err = wait.Poll(config.Interval, config.Timeout, client.ControllerHasDesiredReplicas(r.c, newRc)); err != nil {
493495
return err
494496
}
@@ -643,27 +645,29 @@ func SetNextControllerAnnotation(rc *api.ReplicationController, name string) {
643645
}
644646

645647
func UpdateExistingReplicationController(c client.Interface, oldRc *api.ReplicationController, namespace, newName, deploymentKey, deploymentValue string, out io.Writer) (*api.ReplicationController, error) {
646-
SetNextControllerAnnotation(oldRc, newName)
647648
if _, found := oldRc.Spec.Selector[deploymentKey]; !found {
649+
SetNextControllerAnnotation(oldRc, newName)
648650
return AddDeploymentKeyToReplicationController(oldRc, c, deploymentKey, deploymentValue, namespace, out)
649651
} else {
650652
// If we didn't need to update the controller for the deployment key, we still need to write
651653
// the "next" controller.
652-
return c.ReplicationControllers(namespace).Update(oldRc)
654+
applyUpdate := func(rc *api.ReplicationController) {
655+
SetNextControllerAnnotation(rc, newName)
656+
}
657+
return updateRcWithRetries(c, namespace, oldRc, applyUpdate)
653658
}
654659
}
655660

656-
const MaxRetries = 3
657-
658661
func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, client client.Interface, deploymentKey, deploymentValue, namespace string, out io.Writer) (*api.ReplicationController, error) {
659662
var err error
660663
// First, update the template label. This ensures that any newly created pods will have the new label
661-
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
664+
applyUpdate := func(rc *api.ReplicationController) {
662665
if rc.Spec.Template.Labels == nil {
663666
rc.Spec.Template.Labels = map[string]string{}
664667
}
665668
rc.Spec.Template.Labels[deploymentKey] = deploymentValue
666-
}); err != nil {
669+
}
670+
if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil {
667671
return nil, err
668672
}
669673

@@ -677,26 +681,16 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
677681
}
678682
for ix := range podList.Items {
679683
pod := &podList.Items[ix]
680-
if pod.Labels == nil {
681-
pod.Labels = map[string]string{
682-
deploymentKey: deploymentValue,
683-
}
684-
} else {
685-
pod.Labels[deploymentKey] = deploymentValue
686-
}
687-
err = nil
688-
delay := 3
689-
for i := 0; i < MaxRetries; i++ {
690-
_, err = client.Pods(namespace).Update(pod)
691-
if err != nil {
692-
fmt.Fprintf(out, "Error updating pod (%v), retrying after %d seconds", err, delay)
693-
time.Sleep(time.Second * time.Duration(delay))
694-
delay *= delay
684+
applyUpdate := func(p *api.Pod) {
685+
if p.Labels == nil {
686+
p.Labels = map[string]string{
687+
deploymentKey: deploymentValue,
688+
}
695689
} else {
696-
break
690+
p.Labels[deploymentKey] = deploymentValue
697691
}
698692
}
699-
if err != nil {
693+
if pod, err = updatePodWithRetries(client, namespace, pod, applyUpdate); err != nil {
700694
return nil, err
701695
}
702696
}
@@ -709,12 +703,11 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
709703
for k, v := range oldRc.Spec.Selector {
710704
selectorCopy[k] = v
711705
}
712-
oldRc.Spec.Selector[deploymentKey] = deploymentValue
713-
714-
// Update the selector of the rc so it manages all the pods we updated above
715-
if oldRc, err = updateWithRetries(client.ReplicationControllers(namespace), oldRc, func(rc *api.ReplicationController) {
706+
applyUpdate = func(rc *api.ReplicationController) {
716707
rc.Spec.Selector[deploymentKey] = deploymentValue
717-
}); err != nil {
708+
}
709+
// Update the selector of the rc so it manages all the pods we updated above
710+
if oldRc, err = updateRcWithRetries(client, namespace, oldRc, applyUpdate); err != nil {
718711
return nil, err
719712
}
720713

@@ -736,33 +729,72 @@ func AddDeploymentKeyToReplicationController(oldRc *api.ReplicationController, c
736729
return oldRc, nil
737730
}
738731

739-
type updateFunc func(controller *api.ReplicationController)
732+
type updateRcFunc func(controller *api.ReplicationController)
740733

741-
// updateWithRetries updates applies the given rc as an update.
742-
func updateWithRetries(rcClient client.ReplicationControllerInterface, rc *api.ReplicationController, applyUpdate updateFunc) (*api.ReplicationController, error) {
743-
var err error
744-
oldRc := rc
745-
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
734+
// updateRcWithRetries retries updating the given rc on conflict with the following steps:
735+
// 1. Get latest resource
736+
// 2. applyUpdate
737+
// 3. Update the resource
738+
func updateRcWithRetries(c client.Interface, namespace string, rc *api.ReplicationController, applyUpdate updateRcFunc) (*api.ReplicationController, error) {
739+
// Deep copy the rc in case we failed on Get during retry loop
740+
obj, err := api.Scheme.Copy(rc)
741+
if err != nil {
742+
return nil, fmt.Errorf("failed to deep copy rc before updating it: %v", err)
743+
}
744+
oldRc := obj.(*api.ReplicationController)
745+
err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) {
746746
// Apply the update, then attempt to push it to the apiserver.
747747
applyUpdate(rc)
748-
if rc, err = rcClient.Update(rc); err == nil {
748+
if rc, e = c.ReplicationControllers(namespace).Update(rc); e == nil {
749749
// rc contains the latest controller post update
750-
return true, nil
750+
return
751751
}
752+
updateErr := e
752753
// Update the controller with the latest resource version, if the update failed we
753754
// can't trust rc so use oldRc.Name.
754-
if rc, err = rcClient.Get(oldRc.Name); err != nil {
755+
if rc, e = c.ReplicationControllers(namespace).Get(oldRc.Name); e != nil {
755756
// The Get failed: Value in rc cannot be trusted.
756757
rc = oldRc
757758
}
758-
// The Get passed: rc contains the latest controller, expect a poll for the update.
759-
return false, nil
759+
// Only return the error from update
760+
return updateErr
760761
})
761762
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
762763
// controller contains the applied update.
763764
return rc, err
764765
}
765766

767+
type updatePodFunc func(controller *api.Pod)
768+
769+
// updatePodWithRetries retries updating the given pod on conflict with the following steps:
770+
// 1. Get latest resource
771+
// 2. applyUpdate
772+
// 3. Update the resource
773+
func updatePodWithRetries(c client.Interface, namespace string, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) {
774+
// Deep copy the pod in case we failed on Get during retry loop
775+
obj, err := api.Scheme.Copy(pod)
776+
if err != nil {
777+
return nil, fmt.Errorf("failed to deep copy pod before updating it: %v", err)
778+
}
779+
oldPod := obj.(*api.Pod)
780+
err = client.RetryOnConflict(client.DefaultBackoff, func() (e error) {
781+
// Apply the update, then attempt to push it to the apiserver.
782+
applyUpdate(pod)
783+
if pod, e = c.Pods(namespace).Update(pod); e == nil {
784+
return
785+
}
786+
updateErr := e
787+
if pod, e = c.Pods(namespace).Get(oldPod.Name); e != nil {
788+
pod = oldPod
789+
}
790+
// Only return the error from update
791+
return updateErr
792+
})
793+
// If the error is non-nil the returned pod cannot be trusted, if it is nil, the returned
794+
// controller contains the applied update.
795+
return pod, err
796+
}
797+
766798
func FindSourceController(r client.ReplicationControllersNamespacer, namespace, name string) (*api.ReplicationController, error) {
767799
list, err := r.ReplicationControllers(namespace).List(api.ListOptions{})
768800
if err != nil {

0 commit comments

Comments
 (0)