forked from argoproj/argo-rollouts
/
replicaset.go
243 lines (218 loc) · 9.75 KB
/
replicaset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package experiments
import (
"encoding/json"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
patchtypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/controller"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/argoproj/argo-rollouts/utils/conditions"
experimentutil "github.com/argoproj/argo-rollouts/utils/experiment"
logutil "github.com/argoproj/argo-rollouts/utils/log"
replicasetutil "github.com/argoproj/argo-rollouts/utils/replicaset"
log "github.com/sirupsen/logrus"
)
const (
CollisionCountPatch = `{
"status" : {
"templateStatuses" : %s
}
}`
)
var controllerKind = v1alpha1.SchemeGroupVersion.WithKind("Experiment")
func (c *ExperimentController) getReplicaSetsForExperiment(experiment *v1alpha1.Experiment) (map[string]*appsv1.ReplicaSet, error) {
rsList, err := c.replicaSetLister.ReplicaSets(experiment.Namespace).List(labels.Everything())
if err != nil {
return nil, err
}
templateDefined := func(name string) bool {
for _, tmpl := range experiment.Spec.Templates {
if tmpl.Name == name {
return true
}
}
return false
}
templateToRS := make(map[string]*appsv1.ReplicaSet)
for _, rs := range rsList {
controllerRef := metav1.GetControllerOf(rs)
if controllerRef == nil || controllerRef.UID != experiment.UID || rs.Annotations == nil || rs.Annotations[v1alpha1.ExperimentNameAnnotationKey] != experiment.Name {
continue
}
if templateName := rs.Annotations[v1alpha1.ExperimentTemplateNameAnnotationKey]; templateName != "" {
if _, ok := templateToRS[templateName]; ok {
return nil, fmt.Errorf("multiple ReplicaSets match single experiment template: %s", templateName)
}
if templateDefined(templateName) {
templateToRS[templateName] = rs
logCtx := log.WithField(logutil.ExperimentKey, experiment.Name).WithField(logutil.NamespaceKey, experiment.Namespace)
logCtx.Infof("Claimed ReplicaSet '%s' for template '%s'", rs.Name, templateName)
}
}
}
return templateToRS, nil
}
// createReplicaSet creates a new replicaset based on the template
func (ec *experimentContext) createReplicaSet(template v1alpha1.TemplateSpec, collisionCount *int32) (*appsv1.ReplicaSet, error) {
newRS := newReplicaSetFromTemplate(ec.ex, template, collisionCount)
newReplicasCount := experimentutil.CalculateTemplateReplicasCount(ec.ex, template)
*(newRS.Spec.Replicas) = newReplicasCount
// Create the new ReplicaSet. If it already exists, then we need to check for possible
// hash collisions. If there is any other error, we need to report it in the status of
// the Experiment.
alreadyExists := false
createdRS, err := ec.kubeclientset.AppsV1().ReplicaSets(ec.ex.Namespace).Create(&newRS)
switch {
// We may end up hitting this due to a slow cache or a fast resync of the Experiment.
case errors.IsAlreadyExists(err):
alreadyExists = true
// Fetch a copy of the ReplicaSet.
rs, rsErr := ec.replicaSetLister.ReplicaSets(newRS.Namespace).Get(newRS.Name)
if rsErr != nil {
return nil, rsErr
}
// If the Experiment owns the ReplicaSet and the ReplicaSet's PodTemplateSpec is semantically
// deep equal to the PodTemplateSpec of the Experiment, it's the Experiment's new ReplicaSet.
// Otherwise, this is a hash collision and we need to increment the collisionCount field in
// the status of the Experiment and requeue to try the creation in the next sync.
if ec.isReplicaSetSemanticallyEqual(&newRS, rs) {
// NOTE: we should only get here when the informer cache is stale and we already
// succeeded in creating this replicaset
createdRS = rs
err = nil
ec.log.Warnf("Claimed existing ReplicaSet %s with equivalent template spec", createdRS.Name)
break
}
// Since the replicaset is a collision, the experiment will not have a status for that rs and
// the controller needs to create a new template status for it
newTemplate := v1alpha1.TemplateStatus{
Name: template.Name,
CollisionCount: new(int32),
}
// Matching ReplicaSet is not equal - increment the collisionCount in the ExperimentStatus
// and requeue the Experiment.
preCollisionCount := *newTemplate.CollisionCount
*newTemplate.CollisionCount++
statusCpy := ec.ex.Status.DeepCopy()
statusCpy.TemplateStatuses = append(statusCpy.TemplateStatuses, newTemplate)
templateStatusBytes, marshalErr := json.Marshal(statusCpy.TemplateStatuses)
if marshalErr != nil {
return nil, marshalErr
}
patch := fmt.Sprintf(CollisionCountPatch, string(templateStatusBytes))
_, patchErr := ec.argoProjClientset.ArgoprojV1alpha1().Experiments(ec.ex.Namespace).Patch(ec.ex.Name, patchtypes.MergePatchType, []byte(patch))
ec.log.WithField("patch", patch).Debug("Applied Patch")
if patchErr != nil {
ec.log.Errorf("Error patching service %s", err.Error())
return nil, patchErr
}
ec.log.Warnf("Found a hash collision - bumped collisionCount (%d->%d) to resolve it", preCollisionCount, *newTemplate.CollisionCount)
return nil, err
case err != nil:
msg := fmt.Sprintf(conditions.FailedRSCreateMessage, newRS.Name, err)
ec.recorder.Event(ec.ex, corev1.EventTypeWarning, conditions.FailedRSCreateReason, msg)
return nil, err
default:
ec.log.Infof("Created ReplicaSet %s", createdRS.Name)
}
if !alreadyExists && newReplicasCount > int32(0) {
ec.recorder.Eventf(ec.ex, corev1.EventTypeNormal, "ScalingReplicaSet", "Scaled up replica set %s to %d", createdRS.Name, newReplicasCount)
}
return createdRS, nil
}
// newReplicaSetFromTemplate is a helper to formulate a replicaset from an experiment's template
func newReplicaSetFromTemplate(experiment *v1alpha1.Experiment, template v1alpha1.TemplateSpec, collisionCount *int32) appsv1.ReplicaSet {
newRSTemplate := *template.Template.DeepCopy()
replicaSetAnnotations := newReplicaSetAnnotations(experiment.Name, template.Name)
if newRSTemplate.Labels != nil {
if _, ok := newRSTemplate.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]; ok {
delete(newRSTemplate.Labels, v1alpha1.DefaultRolloutUniqueLabelKey)
}
}
podHash := controller.ComputeHash(&newRSTemplate, collisionCount)
newRSTemplate.Labels = labelsutil.CloneAndAddLabel(newRSTemplate.Labels, v1alpha1.DefaultRolloutUniqueLabelKey, podHash)
// Add podTemplateHash label to selector.
newRSSelector := labelsutil.CloneSelectorAndAddLabel(template.Selector, v1alpha1.DefaultRolloutUniqueLabelKey, podHash)
// The annotations must be different for each template because annotations are used to match
// replicasets to templates. We inject the experiment and template name in the replicaset
// annotations to ensure uniqueness.
rs := appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-%s", experiment.Name, template.Name),
Namespace: experiment.Namespace,
Labels: map[string]string{
v1alpha1.DefaultRolloutUniqueLabelKey: podHash,
},
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(experiment, controllerKind)},
Annotations: replicaSetAnnotations,
},
Spec: appsv1.ReplicaSetSpec{
Replicas: new(int32),
MinReadySeconds: template.MinReadySeconds,
Selector: newRSSelector,
Template: newRSTemplate,
},
}
return rs
}
// isReplicaSetSemanticallyEqual checks to see if an existing ReplicaSet is semantically equal
// to the ReplicaSet we are trying to create
func (ec *experimentContext) isReplicaSetSemanticallyEqual(newRS, existingRS *appsv1.ReplicaSet) bool {
controllerRef := metav1.GetControllerOf(existingRS)
podTemplatesEqual := replicasetutil.PodTemplateEqualIgnoreHash(&existingRS.Spec.Template, &newRS.Spec.Template)
existingAnnotations := existingRS.GetAnnotations()
newAnnotations := newRS.GetAnnotations()
return controllerRef != nil &&
controllerRef.UID == ec.ex.UID &&
podTemplatesEqual &&
existingAnnotations != nil &&
existingAnnotations[v1alpha1.ExperimentNameAnnotationKey] == newAnnotations[v1alpha1.ExperimentNameAnnotationKey] &&
existingAnnotations[v1alpha1.ExperimentTemplateNameAnnotationKey] == newAnnotations[v1alpha1.ExperimentTemplateNameAnnotationKey]
}
func (ec *experimentContext) scaleReplicaSetAndRecordEvent(rs *appsv1.ReplicaSet, newScale int32) (bool, *appsv1.ReplicaSet, error) {
// No need to scale
if *(rs.Spec.Replicas) == newScale {
return false, rs, nil
}
var scalingOperation string
if *(rs.Spec.Replicas) < newScale {
scalingOperation = "up"
} else {
scalingOperation = "down"
}
scaled, newRS, err := ec.scaleReplicaSet(rs, newScale, scalingOperation)
if err != nil {
// TODO(jessesuen): gracefully handle conflict issues
msg := fmt.Sprintf("Failed to scale %s %s: %v", rs.Name, scalingOperation, err)
ec.recorder.Event(ec.ex, corev1.EventTypeWarning, "ReplicaSetUpdateError", msg)
} else {
ec.log.Infof("Scaled %s ReplicaSet %s from %d to %d", scalingOperation, rs.Name, *(rs.Spec.Replicas), newScale)
}
return scaled, newRS, err
}
func (ec *experimentContext) scaleReplicaSet(rs *appsv1.ReplicaSet, newScale int32, scalingOperation string) (bool, *appsv1.ReplicaSet, error) {
sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale
scaled := false
var err error
if sizeNeedsUpdate {
rsCopy := rs.DeepCopy()
*(rsCopy.Spec.Replicas) = newScale
rs, err = ec.kubeclientset.AppsV1().ReplicaSets(rsCopy.Namespace).Update(rsCopy)
if err == nil && sizeNeedsUpdate {
scaled = true
ec.recorder.Eventf(ec.ex, corev1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
}
}
return scaled, rs, err
}
func newReplicaSetAnnotations(experimentName, templateName string) map[string]string {
return map[string]string{
v1alpha1.ExperimentNameAnnotationKey: experimentName,
v1alpha1.ExperimentTemplateNameAnnotationKey: templateName,
}
}