forked from argoproj/argo-rollouts
/
replicaset.go
150 lines (134 loc) · 6.17 KB
/
replicaset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package rollout
import (
"fmt"
"sort"
"time"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
patchtypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/controller"
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/defaults"
replicasetutil "github.com/argoproj/argo-rollouts/utils/replicaset"
)
var controllerKind = v1alpha1.SchemeGroupVersion.WithKind("Rollout")
const (
addScaleDownAtAnnotationsPatch = `[{ "op": "add", "path": "/metadata/annotations/%s", "value": "%s"}]`
removeScaleDownAtAnnotationsPatch = `[{ "op": "remove", "path": "/metadata/annotations/%s"}]`
)
func (c *RolloutController) removeScaleDownDelay(roCtx rolloutContext, rs *appsv1.ReplicaSet) error {
logCtx := roCtx.Log()
logCtx.Infof("Removing '%s' annotation on RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name)
patch := fmt.Sprintf(removeScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey)
_, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(rs.Name, patchtypes.JSONPatchType, []byte(patch))
return err
}
func (c *RolloutController) addScaleDownDelay(roCtx rolloutContext, rs *appsv1.ReplicaSet) error {
logCtx := roCtx.Log()
logCtx.Infof("Adding '%s' annotation to RS '%s'", v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, rs.Name)
scaleDownDelaySeconds := time.Duration(defaults.GetScaleDownDelaySecondsOrDefault(roCtx.Rollout()))
now := metav1.Now().Add(scaleDownDelaySeconds * time.Second).UTC().Format(time.RFC3339)
patch := fmt.Sprintf(addScaleDownAtAnnotationsPatch, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey, now)
_, err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Patch(rs.Name, patchtypes.JSONPatchType, []byte(patch))
return err
}
func (c *RolloutController) getReplicaSetsForRollouts(r *v1alpha1.Rollout) ([]*appsv1.ReplicaSet, error) {
// List all ReplicaSets to find those we own but that no longer match our
// selector. They will be orphaned by ClaimReplicaSets().
rsList, err := c.replicaSetLister.ReplicaSets(r.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
replicaSetSelector, err := metav1.LabelSelectorAsSelector(r.Spec.Selector)
if err != nil {
return nil, fmt.Errorf("rollout %s/%s has invalid label selector: %v", r.Namespace, r.Name, err)
}
// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing ReplicaSets (see #42639).
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := c.argoprojclientset.ArgoprojV1alpha1().Rollouts(r.Namespace).Get(r.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
if fresh.UID != r.UID {
return nil, fmt.Errorf("original Rollout %v/%v is gone: got uid %v, wanted %v", r.Namespace, r.Name, fresh.UID, r.UID)
}
return fresh, nil
})
cm := controller.NewReplicaSetControllerRefManager(c.replicaSetControl, r, replicaSetSelector, controllerKind, canAdoptFunc)
return cm.ClaimReplicaSets(rsList)
}
func (c *RolloutController) reconcileNewReplicaSet(roCtx rolloutContext) (bool, error) {
rollout := roCtx.Rollout()
newRS := roCtx.NewRS()
if newRS == nil {
return false, nil
}
roCtx.Log().Infof("Reconciling new ReplicaSet '%s'", newRS.Name)
allRSs := roCtx.AllRSs()
newReplicasCount, err := replicasetutil.NewRSNewReplicas(rollout, allRSs, newRS)
if err != nil {
return false, err
}
scaled, _, err := c.scaleReplicaSetAndRecordEvent(newRS, newReplicasCount, rollout)
return scaled, err
}
func (c *RolloutController) reconcileOldReplicaSets(oldRSs []*appsv1.ReplicaSet, roCtx rolloutContext) (bool, error) {
rollout := roCtx.Rollout()
oldPodsCount := replicasetutil.GetReplicaCountForReplicaSets(oldRSs)
if oldPodsCount == 0 {
// Can't scale down further
return false, nil
}
var err error
hasScaled := false
if rollout.Spec.Strategy.Canary != nil {
// Clean up unhealthy replicas first, otherwise unhealthy replicas will block rollout
// and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737
oldRSs, hasScaled, err = c.cleanupUnhealthyReplicas(oldRSs, roCtx)
if err != nil {
return false, nil
}
}
// Scale down old replica sets
if rollout.Spec.Strategy.BlueGreen != nil {
hasScaled, err = c.scaleDownOldReplicaSetsForBlueGreen(oldRSs, rollout)
if err != nil {
return false, nil
}
}
return hasScaled, nil
}
// cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
func (c *RolloutController) cleanupUnhealthyReplicas(oldRSs []*appsv1.ReplicaSet, roCtx rolloutContext) ([]*appsv1.ReplicaSet, bool, error) {
logCtx := roCtx.Log()
sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs))
// Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order
// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
// been deleted first and won't increase unavailability.
totalScaledDown := int32(0)
for i, targetRS := range oldRSs {
if *(targetRS.Spec.Replicas) == 0 {
// cannot scale down this replica set.
continue
}
logCtx.Infof("Found %d available pods in old RS %s/%s", targetRS.Status.AvailableReplicas, targetRS.Namespace, targetRS.Name)
if *(targetRS.Spec.Replicas) == targetRS.Status.AvailableReplicas {
// no unhealthy replicas found, no scaling required.
continue
}
scaledDownCount := *(targetRS.Spec.Replicas) - targetRS.Status.AvailableReplicas
newReplicasCount := targetRS.Status.AvailableReplicas
if newReplicasCount > *(targetRS.Spec.Replicas) {
return nil, false, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, *(targetRS.Spec.Replicas), newReplicasCount)
}
_, updatedOldRS, err := c.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, roCtx.Rollout())
if err != nil {
return nil, totalScaledDown > 0, err
}
totalScaledDown += scaledDownCount
oldRSs[i] = updatedOldRS
}
return oldRSs, totalScaledDown > 0, nil
}