-
Notifications
You must be signed in to change notification settings - Fork 850
/
sync.go
694 lines (631 loc) · 30.7 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
package rollout
import (
"fmt"
"sort"
"strconv"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
patchtypes "k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/controller"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
"github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
analysisutil "github.com/argoproj/argo-rollouts/utils/analysis"
"github.com/argoproj/argo-rollouts/utils/annotations"
"github.com/argoproj/argo-rollouts/utils/conditions"
"github.com/argoproj/argo-rollouts/utils/defaults"
"github.com/argoproj/argo-rollouts/utils/diff"
experimentutil "github.com/argoproj/argo-rollouts/utils/experiment"
logutil "github.com/argoproj/argo-rollouts/utils/log"
replicasetutil "github.com/argoproj/argo-rollouts/utils/replicaset"
)
// getAllReplicaSetsAndSyncRevision returns all the replica sets for the provided rollout (new and all old), with new RS's and rollout's revision updated.
//
// rsList should come from getReplicaSetsForRollout(r).
//
// 1. Get all old RSes this rollout targets, and calculate the max revision number among them (maxOldV).
// 2. Get new RS this rollout targets (whose pod template matches rollout's), and update new RS's revision number to (maxOldV + 1),
// only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next rollout sync loop.
// 3. Copy new RS's revision number to rollout (update rollout's revision). If this step failed, we'll update it in the next rollout sync loop.
//
// Note that currently the rollout controller is using caches to avoid querying the server for reads.
// This may lead to stale reads of replica sets, thus incorrect v status.
func (c *RolloutController) getAllReplicaSetsAndSyncRevision(rollout *v1alpha1.Rollout, rsList []*appsv1.ReplicaSet, createIfNotExisted bool) (*appsv1.ReplicaSet, []*appsv1.ReplicaSet, error) {
allOldRSs := replicasetutil.FindOldReplicaSets(rollout, rsList)
// Get new replica set with the updated revision number
newRS, err := c.getNewReplicaSet(rollout, rsList, allOldRSs, createIfNotExisted)
if err != nil {
return nil, nil, err
}
return newRS, allOldRSs, nil
}
// Returns a replica set that matches the intent of the given rollout. Returns nil if the new replica set doesn't exist yet.
// 1. Get existing new RS (the RS that the given rollout targets, whose pod template is the same as rollout's).
// 2. If there's existing new RS, update its revision number if it's smaller than (maxOldRevision + 1), where maxOldRevision is the max revision number among all old RSes.
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
// Note that the pod-template-hash will be added to adopted RSes and pods.
func (c *RolloutController) getNewReplicaSet(rollout *v1alpha1.Rollout, rsList, oldRSs []*appsv1.ReplicaSet, createIfNotExisted bool) (*appsv1.ReplicaSet, error) {
logCtx := logutil.WithRollout(rollout)
existingNewRS := replicasetutil.FindNewReplicaSet(rollout, rsList)
// Calculate the max revision number among all old RSes
maxOldRevision := replicasetutil.MaxRevision(oldRSs)
// Calculate revision number for this new replica set
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
// Latest replica set exists. We need to sync its annotations (includes copying all but
// annotationsToSkip from the parent rollout, and update revision and desiredReplicas)
// and also update the revision annotation in the rollout with the
// latest revision.
if existingNewRS != nil {
rsCopy := existingNewRS.DeepCopy()
// Set existing new replica set's annotation
annotationsUpdated := annotations.SetNewReplicaSetAnnotations(rollout, rsCopy, newRevision, true)
minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != rollout.Spec.MinReadySeconds
if annotationsUpdated || minReadySecondsNeedsUpdate {
rsCopy.Spec.MinReadySeconds = rollout.Spec.MinReadySeconds
return c.kubeclientset.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(rsCopy)
}
// Should use the revision in existingNewRS's annotation, since it set by before
needsUpdate := annotations.SetRolloutRevision(rollout, rsCopy.Annotations[annotations.RevisionAnnotation])
// If no other Progressing condition has been recorded and we need to estimate the progress
// of this rollout then it is likely that old users started caring about progress. In that
// case we need to take into account the first time we noticed their new replica set.
cond := conditions.GetRolloutCondition(rollout.Status, v1alpha1.RolloutProgressing)
if cond == nil {
msg := fmt.Sprintf(conditions.FoundNewRSMessage, rsCopy.Name)
condition := conditions.NewRolloutCondition(v1alpha1.RolloutProgressing, corev1.ConditionTrue, conditions.FoundNewRSReason, msg)
conditions.SetRolloutCondition(&rollout.Status, *condition)
needsUpdate = true
}
if needsUpdate {
var err error
logCtx.Info("Setting revision annotation after creating a new replicaset")
if rollout, err = c.argoprojclientset.ArgoprojV1alpha1().Rollouts(rollout.Namespace).Update(rollout); err != nil {
logCtx.WithError(err).Errorf("Error: Setting rollout revision annotation after creating a new replicaset")
return nil, err
}
}
return rsCopy, nil
}
if !createIfNotExisted {
return nil, nil
}
// new ReplicaSet does not exist, create one.
newRSTemplate := *rollout.Spec.Template.DeepCopy()
podTemplateSpecHash := controller.ComputeHash(&newRSTemplate, rollout.Status.CollisionCount)
newRSTemplate.Labels = labelsutil.CloneAndAddLabel(rollout.Spec.Template.Labels, v1alpha1.DefaultRolloutUniqueLabelKey, podTemplateSpecHash)
// Add podTemplateHash label to selector.
newRSSelector := labelsutil.CloneSelectorAndAddLabel(rollout.Spec.Selector, v1alpha1.DefaultRolloutUniqueLabelKey, podTemplateSpecHash)
// Create new ReplicaSet
newRS := appsv1.ReplicaSet{
ObjectMeta: metav1.ObjectMeta{
Name: rollout.Name + "-" + podTemplateSpecHash,
Namespace: rollout.Namespace,
OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(rollout, controllerKind)},
Labels: newRSTemplate.Labels,
},
Spec: appsv1.ReplicaSetSpec{
Replicas: new(int32),
MinReadySeconds: rollout.Spec.MinReadySeconds,
Selector: newRSSelector,
Template: newRSTemplate,
},
}
allRSs := append(oldRSs, &newRS)
newReplicasCount, err := replicasetutil.NewRSNewReplicas(rollout, allRSs, &newRS)
if err != nil {
return nil, err
}
*(newRS.Spec.Replicas) = newReplicasCount
// Set new replica set's annotation
annotations.SetNewReplicaSetAnnotations(rollout, &newRS, newRevision, false)
// Create the new ReplicaSet. If it already exists, then we need to check for possible
// hash collisions. If there is any other error, we need to report it in the status of
// the Rollout.
alreadyExists := false
createdRS, err := c.kubeclientset.AppsV1().ReplicaSets(rollout.Namespace).Create(&newRS)
switch {
// We may end up hitting this due to a slow cache or a fast resync of the Rollout.
case errors.IsAlreadyExists(err):
alreadyExists = true
// Fetch a copy of the ReplicaSet.
rs, rsErr := c.replicaSetLister.ReplicaSets(newRS.Namespace).Get(newRS.Name)
if rsErr != nil {
return nil, rsErr
}
// If the Rollout owns the ReplicaSet and the ReplicaSet's PodTemplateSpec is semantically
// deep equal to the PodTemplateSpec of the Rollout, it's the Rollout's new ReplicaSet.
// Otherwise, this is a hash collision and we need to increment the collisionCount field in
// the status of the Rollout and requeue to try the creation in the next sync.
controllerRef := metav1.GetControllerOf(rs)
if controllerRef != nil && controllerRef.UID == rollout.UID && replicasetutil.PodTemplateEqualIgnoreHash(&rs.Spec.Template, &rollout.Spec.Template) {
createdRS = rs
err = nil
break
}
// Matching ReplicaSet is not equal - increment the collisionCount in the RolloutStatus
// and requeue the Rollout.
if rollout.Status.CollisionCount == nil {
rollout.Status.CollisionCount = new(int32)
}
preCollisionCount := *rollout.Status.CollisionCount
*rollout.Status.CollisionCount++
// Update the collisionCount for the Rollout and let it requeue by returning the original
// error.
_, roErr := c.argoprojclientset.ArgoprojV1alpha1().Rollouts(rollout.Namespace).Update(rollout)
if roErr == nil {
logCtx.Warnf("Found a hash collision - bumped collisionCount (%d->%d) to resolve it", preCollisionCount, *rollout.Status.CollisionCount)
}
return nil, err
case err != nil:
msg := fmt.Sprintf(conditions.FailedRSCreateMessage, newRS.Name, err)
c.recorder.Event(rollout, corev1.EventTypeWarning, conditions.FailedRSCreateReason, msg)
newStatus := rollout.Status.DeepCopy()
cond := conditions.NewRolloutCondition(v1alpha1.RolloutProgressing, corev1.ConditionFalse, conditions.FailedRSCreateReason, msg)
patchErr := c.patchCondition(rollout, newStatus, cond)
if patchErr != nil {
logCtx.Warnf("Error Patching Rollout: %s", patchErr.Error())
}
return nil, err
}
if !alreadyExists && newReplicasCount > 0 {
c.recorder.Eventf(rollout, corev1.EventTypeNormal, "ScalingReplicaSet", "Scaled up replica set %s to %d", createdRS.Name, newReplicasCount)
}
needsUpdate := annotations.SetRolloutRevision(rollout, newRevision)
if !alreadyExists {
msg := fmt.Sprintf(conditions.NewReplicaSetMessage, createdRS.Name)
condition := conditions.NewRolloutCondition(v1alpha1.RolloutProgressing, corev1.ConditionTrue, conditions.NewReplicaSetReason, msg)
conditions.SetRolloutCondition(&rollout.Status, *condition)
needsUpdate = true
}
if needsUpdate {
_, err = c.argoprojclientset.ArgoprojV1alpha1().Rollouts(rollout.Namespace).Update(rollout)
}
return createdRS, err
}
// syncReplicasOnly is responsible for reconciling rollouts on scaling events.
func (c *RolloutController) syncReplicasOnly(r *v1alpha1.Rollout, rsList []*appsv1.ReplicaSet, isScaling bool) error {
logCtx := logutil.WithRollout(r)
isControllerPaused := len(r.Status.PauseConditions) > 0
logCtx.Infof("Syncing replicas only (controllerPaused: %v, userPaused %v, isScaling: %v)", isControllerPaused, r.Spec.Paused, isScaling)
newRS, oldRSs, err := c.getAllReplicaSetsAndSyncRevision(r, rsList, false)
if err != nil {
return err
}
// NOTE: it is possible for newRS to be nil (e.g. when template and replicas changed at same time)
if r.Spec.Strategy.BlueGreen != nil {
roCtx := newBlueGreenCtx(r, newRS, oldRSs)
previewSvc, activeSvc, err := c.getPreviewAndActiveServices(r)
if err != nil {
return nil
}
if err := c.scaleBlueGreen(r, newRS, oldRSs, previewSvc, activeSvc); err != nil {
// If we get an error while trying to scale, the rollout will be requeued
// so we can abort this resync
return err
}
c.reconcileBlueGreenPause(previewSvc, activeSvc, roCtx)
return c.syncRolloutStatusBlueGreen(previewSvc, activeSvc, roCtx)
}
// The controller wants to use the rolloutCanary method to reconcile the rolllout if the rollout is not paused.
// If there are no scaling events, the rollout should only sync its status
if r.Spec.Strategy.Canary != nil {
exList, err := c.getExperimentsForRollout(r)
if err != nil {
return err
}
arList, err := c.getAnalysisRunsForRollout(r)
if err != nil {
return err
}
roCtx := newCanaryCtx(r, newRS, oldRSs, exList, arList)
if isScaling {
if _, err := c.reconcileCanaryReplicaSets(roCtx); err != nil {
// If we get an error while trying to scale, the rollout will be requeued
// so we can abort this resync
return err
}
}
// Reconciling AnalysisRuns to manage Background AnalysisRun if necessary
err = c.reconcileAnalysisRuns(roCtx)
if err != nil {
return err
}
// reconcileCanaryPause will ensure we will requeue this rollout at the appropriate time
// if we are at a pause step with a duration.
c.reconcileCanaryPause(roCtx)
return c.syncRolloutStatusCanary(roCtx)
}
return fmt.Errorf("no rollout strategy provided")
}
// isScalingEvent checks whether the provided rollout has been updated with a scaling event
// by looking at the desired-replicas annotation in the active replica sets of the rollout.
//
// rsList should come from getReplicaSetsForRollout(r).
func (c *RolloutController) isScalingEvent(rollout *v1alpha1.Rollout, rsList []*appsv1.ReplicaSet) (bool, error) {
newRS, previousRSs, err := c.getAllReplicaSetsAndSyncRevision(rollout, rsList, false)
if err != nil {
return false, err
}
allRSs := append(previousRSs, newRS)
for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
desired, ok := annotations.GetDesiredReplicasAnnotation(rs)
if !ok {
continue
}
if desired != defaults.GetReplicasOrDefault(rollout.Spec.Replicas) {
return true, nil
}
}
return false, nil
}
func (c *RolloutController) scaleReplicaSetAndRecordEvent(rs *appsv1.ReplicaSet, newScale int32, rollout *v1alpha1.Rollout) (bool, *appsv1.ReplicaSet, error) {
// No need to scale
if *(rs.Spec.Replicas) == newScale {
return false, rs, nil
}
var scalingOperation string
if *(rs.Spec.Replicas) < newScale {
scalingOperation = "up"
} else {
scalingOperation = "down"
}
scaled, newRS, err := c.scaleReplicaSet(rs, newScale, rollout, scalingOperation)
return scaled, newRS, err
}
func (c *RolloutController) scaleReplicaSet(rs *appsv1.ReplicaSet, newScale int32, rollout *v1alpha1.Rollout, scalingOperation string) (bool, *appsv1.ReplicaSet, error) {
sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale
fullScaleDown := newScale == int32(0)
rolloutReplicas := defaults.GetReplicasOrDefault(rollout.Spec.Replicas)
annotationsNeedUpdate := annotations.ReplicasAnnotationsNeedUpdate(rs, rolloutReplicas)
scaled := false
var err error
if sizeNeedsUpdate || annotationsNeedUpdate {
rsCopy := rs.DeepCopy()
*(rsCopy.Spec.Replicas) = newScale
annotations.SetReplicasAnnotations(rsCopy, rolloutReplicas)
if fullScaleDown {
delete(rsCopy.Annotations, v1alpha1.DefaultReplicaSetScaleDownDeadlineAnnotationKey)
}
rs, err = c.kubeclientset.AppsV1().ReplicaSets(rsCopy.Namespace).Update(rsCopy)
if err == nil && sizeNeedsUpdate {
scaled = true
c.recorder.Eventf(rollout, corev1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
}
}
return scaled, rs, err
}
// calculateStatus calculates the common fields for all rollouts by looking into the provided replica sets.
func (c *RolloutController) calculateBaseStatus(roCtx rolloutContext) v1alpha1.RolloutStatus {
rollout := roCtx.Rollout()
newRS := roCtx.NewRS()
allRSs := roCtx.AllRSs()
prevStatus := rollout.Status
prevCond := conditions.GetRolloutCondition(prevStatus, v1alpha1.InvalidSpec)
invalidSpecCond := conditions.VerifyRolloutSpec(rollout, prevCond)
if prevCond != nil && invalidSpecCond == nil {
conditions.RemoveRolloutCondition(&prevStatus, v1alpha1.InvalidSpec)
}
var currentPodHash string
if newRS == nil {
// newRS potentially might be nil when called by Controller::syncReplicasOnly(). For this
// to happen, the user would have had to simultaneously change the number of replicas, and
// the pod template spec at the same time.
currentPodHash = controller.ComputeHash(&rollout.Spec.Template, rollout.Status.CollisionCount)
logutil.WithRollout(rollout).Warnf("Assuming %s for new replicaset pod hash", currentPodHash)
} else {
currentPodHash = newRS.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]
}
newStatus := roCtx.NewStatus()
newStatus.CurrentPodHash = currentPodHash
newStatus.Replicas = replicasetutil.GetActualReplicaCountForReplicaSets(allRSs)
newStatus.UpdatedReplicas = replicasetutil.GetActualReplicaCountForReplicaSets([]*appsv1.ReplicaSet{newRS})
newStatus.ReadyReplicas = replicasetutil.GetReadyReplicaCountForReplicaSets(allRSs)
newStatus.CollisionCount = rollout.Status.CollisionCount
newStatus.Conditions = prevStatus.Conditions
return newStatus
}
// cleanupRollout is responsible for cleaning up a rollout ie. retains all but the latest N old replica sets
// where N=r.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a rollout kept
// around by default 1) for historical reasons.
func (c *RolloutController) cleanupRollouts(oldRSs []*appsv1.ReplicaSet, roCtx rolloutContext) error {
rollout := roCtx.Rollout()
logCtx := roCtx.Log()
revHistoryLimit := defaults.GetRevisionHistoryLimitOrDefault(rollout)
// Avoid deleting replica set with deletion timestamp set
aliveFilter := func(rs *appsv1.ReplicaSet) bool {
return rs != nil && rs.ObjectMeta.DeletionTimestamp == nil
}
cleanableRSes := controller.FilterReplicaSets(oldRSs, aliveFilter)
diff := int32(len(cleanableRSes)) - revHistoryLimit
if diff <= 0 {
return nil
}
sort.Sort(controller.ReplicaSetsByCreationTimestamp(cleanableRSes))
podHashToArList := analysisutil.SortAnalysisRunByPodHash(roCtx.OtherAnalysisRuns())
podHashToExList := experimentutil.SortExperimentsByPodHash(roCtx.OtherExperiments())
logCtx.Info("Looking to cleanup old replica sets")
for i := int32(0); i < diff; i++ {
rs := cleanableRSes[i]
// Avoid delete replica set with non-zero replica counts
if rs.Status.Replicas != 0 || *(rs.Spec.Replicas) != 0 || rs.Generation > rs.Status.ObservedGeneration || rs.DeletionTimestamp != nil {
continue
}
logCtx.Infof("Trying to cleanup replica set %q", rs.Name)
if err := c.kubeclientset.AppsV1().ReplicaSets(rs.Namespace).Delete(rs.Name, nil); err != nil && !errors.IsNotFound(err) {
// Return error instead of aggregating and continuing DELETEs on the theory
// that we may be overloading the api server.
return err
}
if podHash, ok := rs.Labels[v1alpha1.DefaultRolloutUniqueLabelKey]; ok {
if ars, ok := podHashToArList[podHash]; ok {
logCtx.Infof("Cleaning up associated analysis runs with ReplicaSet '%s'", rs.Name)
err := c.deleteAnalysisRuns(roCtx, ars)
if err != nil {
return err
}
}
if exs, ok := podHashToExList[podHash]; ok {
logCtx.Infof("Cleaning up associated experiments with ReplicaSet '%s'", rs.Name)
err := c.deleteExperiments(roCtx, exs)
if err != nil {
return err
}
}
}
}
return nil
}
// checkPausedConditions checks if the given rollout is paused or not and adds an appropriate condition.
// These conditions are needed so that we won't accidentally report lack of progress for resumed rollouts
// that were paused for longer than progressDeadlineSeconds.
func (c *RolloutController) checkPausedConditions(r *v1alpha1.Rollout) error {
cond := conditions.GetRolloutCondition(r.Status, v1alpha1.RolloutProgressing)
if cond != nil && cond.Reason == conditions.TimedOutReason {
// If we have reported lack of progress, do not overwrite it with a paused condition.
return nil
}
pausedCondExists := cond != nil && cond.Reason == conditions.PausedRolloutReason
isPaused := len(r.Status.PauseConditions) > 0 || r.Spec.Paused
var updatedConditon *v1alpha1.RolloutCondition
if isPaused && !pausedCondExists {
updatedConditon = conditions.NewRolloutCondition(v1alpha1.RolloutProgressing, corev1.ConditionUnknown, conditions.PausedRolloutReason, conditions.PausedRolloutMessage)
} else if !isPaused && pausedCondExists {
updatedConditon = conditions.NewRolloutCondition(v1alpha1.RolloutProgressing, corev1.ConditionUnknown, conditions.ResumedRolloutReason, conditions.ResumeRolloutMessage)
}
abortCondExists := cond != nil && cond.Reason == conditions.RolloutAbortedReason
if !r.Status.Abort && abortCondExists {
updatedConditon = conditions.NewRolloutCondition(v1alpha1.RolloutProgressing, corev1.ConditionUnknown, conditions.RolloutRetryReason, conditions.RolloutRetryMessage)
}
if updatedConditon == nil {
return nil
}
newStatus := r.Status.DeepCopy()
err := c.patchCondition(r, newStatus, updatedConditon)
return err
}
func (c *RolloutController) patchCondition(r *v1alpha1.Rollout, newStatus *v1alpha1.RolloutStatus, condition *v1alpha1.RolloutCondition) error {
conditions.SetRolloutCondition(newStatus, *condition)
newStatus.ObservedGeneration = conditions.ComputeGenerationHash(r.Spec)
logCtx := logutil.WithRollout(r)
patch, modified, err := diff.CreateTwoWayMergePatch(
&v1alpha1.Rollout{
Status: r.Status,
},
&v1alpha1.Rollout{
Status: *newStatus,
}, v1alpha1.Rollout{})
if err != nil {
logCtx.Errorf("Error constructing app status patch: %v", err)
return err
}
if !modified {
logCtx.Info("No status changes. Skipping patch")
return nil
}
logCtx.Debugf("Rollout Condition Patch: %s", patch)
_, err = c.argoprojclientset.ArgoprojV1alpha1().Rollouts(r.Namespace).Patch(r.Name, patchtypes.MergePatchType, patch)
if err != nil {
logCtx.Warningf("Error patching rollout: %v", err)
return err
}
logCtx.Info("Condition Patch status successfully")
return nil
}
// isIndefiniteStep returns whether or not the rollout is at an Experiment or Analysis step which should
// not affect the progressDeadlineSeconds
func isIndefiniteStep(r *v1alpha1.Rollout) bool {
currentStep, _ := replicasetutil.GetCurrentCanaryStep(r)
return currentStep != nil && (currentStep.Experiment != nil || currentStep.Analysis != nil)
}
func (c *RolloutController) calculateRolloutConditions(roCtx rolloutContext, newStatus v1alpha1.RolloutStatus) v1alpha1.RolloutStatus {
r := roCtx.Rollout()
allRSs := roCtx.AllRSs()
newRS := roCtx.NewRS()
if len(r.Status.PauseConditions) > 0 || r.Spec.Paused {
return newStatus
}
// If there is only one replica set that is active then that means we are not running
// a new rollout and this is a resync where we don't need to estimate any progress.
// In such a case, we should simply not estimate any progress for this rollout.
currentCond := conditions.GetRolloutCondition(r.Status, v1alpha1.RolloutProgressing)
isCompleteRollout := newStatus.Replicas == newStatus.AvailableReplicas && currentCond != nil && currentCond.Reason == conditions.NewRSAvailableReason
// Check for progress only if the latest rollout hasn't completed yet.
if !isCompleteRollout {
switch {
case roCtx.PauseContext().IsAborted():
condition := conditions.NewRolloutCondition(v1alpha1.RolloutProgressing, corev1.ConditionFalse, conditions.RolloutAbortedReason, conditions.RolloutAbortedMessage)
conditions.SetRolloutCondition(&newStatus, *condition)
case conditions.RolloutComplete(r, &newStatus):
// Update the rollout conditions with a message for the new replica set that
// was successfully deployed. If the condition already exists, we ignore this update.
msg := fmt.Sprintf(conditions.RolloutCompletedMessage, r.Name)
if newRS != nil {
msg = fmt.Sprintf(conditions.ReplicaSetCompletedMessage, newRS.Name)
}
condition := conditions.NewRolloutCondition(v1alpha1.RolloutProgressing, corev1.ConditionTrue, conditions.NewRSAvailableReason, msg)
conditions.SetRolloutCondition(&newStatus, *condition)
case conditions.RolloutProgressing(r, &newStatus):
// If there is any progress made, continue by not checking if the rollout failed. This
// behavior emulates the rolling updater progressDeadline check.
msg := fmt.Sprintf(conditions.RolloutProgressingMessage, r.Name)
if newRS != nil {
msg = fmt.Sprintf(conditions.ReplicaSetProgressingMessage, newRS.Name)
}
condition := conditions.NewRolloutCondition(v1alpha1.RolloutProgressing, corev1.ConditionTrue, conditions.ReplicaSetUpdatedReason, msg)
// Update the current Progressing condition or add a new one if it doesn't exist.
// If a Progressing condition with status=true already exists, we should update
// everything but lastTransitionTime. SetRolloutCondition already does that but
// it also is not updating conditions when the reason of the new condition is the
// same as the old. The Progressing condition is a special case because we want to
// update with the same reason and change just lastUpdateTime iff we notice any
// progress. That's why we handle it here.
if currentCond != nil {
if currentCond.Status == corev1.ConditionTrue {
condition.LastTransitionTime = currentCond.LastTransitionTime
}
conditions.RemoveRolloutCondition(&newStatus, v1alpha1.RolloutProgressing)
}
conditions.SetRolloutCondition(&newStatus, *condition)
case !isIndefiniteStep(r) && conditions.RolloutTimedOut(r, &newStatus):
// Update the rollout with a timeout condition. If the condition already exists,
// we ignore this update.
msg := fmt.Sprintf(conditions.RolloutTimeOutMessage, r.Name)
if newRS != nil {
msg = fmt.Sprintf(conditions.ReplicaSetTimeOutMessage, newRS.Name)
}
condition := conditions.NewRolloutCondition(v1alpha1.RolloutProgressing, corev1.ConditionFalse, conditions.TimedOutReason, msg)
conditions.SetRolloutCondition(&newStatus, *condition)
}
}
activeRS, _ := replicasetutil.GetReplicaSetByTemplateHash(allRSs, newStatus.BlueGreen.ActiveSelector)
if r.Spec.Strategy.BlueGreen != nil && activeRS != nil && annotations.IsSaturated(r, activeRS) {
availability := conditions.NewRolloutCondition(v1alpha1.RolloutAvailable, corev1.ConditionTrue, conditions.AvailableReason, conditions.AvailableMessage)
conditions.SetRolloutCondition(&newStatus, *availability)
} else if r.Spec.Strategy.Canary != nil && replicasetutil.GetAvailableReplicaCountForReplicaSets(allRSs) >= defaults.GetReplicasOrDefault(r.Spec.Replicas) {
availability := conditions.NewRolloutCondition(v1alpha1.RolloutAvailable, corev1.ConditionTrue, conditions.AvailableReason, conditions.AvailableMessage)
conditions.SetRolloutCondition(&newStatus, *availability)
} else {
noAvailability := conditions.NewRolloutCondition(v1alpha1.RolloutAvailable, corev1.ConditionFalse, conditions.AvailableReason, conditions.NotAvailableMessage)
conditions.SetRolloutCondition(&newStatus, *noAvailability)
}
// Move failure conditions of all replica sets in rollout conditions. For now,
// only one failure condition is returned from getReplicaFailures.
if replicaFailureCond := c.getReplicaFailures(allRSs, newRS); len(replicaFailureCond) > 0 {
// There will be only one ReplicaFailure condition on the replica set.
conditions.SetRolloutCondition(&newStatus, replicaFailureCond[0])
} else {
conditions.RemoveRolloutCondition(&newStatus, v1alpha1.RolloutReplicaFailure)
}
return newStatus
}
// persistRolloutStatus persists updates to rollout status. If no changes were made, it is a no-op
func (c *RolloutController) persistRolloutStatus(roCtx rolloutContext, newStatus *v1alpha1.RolloutStatus) error {
orig := roCtx.Rollout()
roCtx.PauseContext().CalculatePauseStatus(newStatus)
newStatus.ObservedGeneration = conditions.ComputeGenerationHash(orig.Spec)
logCtx := logutil.WithRollout(orig)
patch, modified, err := diff.CreateTwoWayMergePatch(
&v1alpha1.Rollout{
Status: orig.Status,
},
&v1alpha1.Rollout{
Status: *newStatus,
}, v1alpha1.Rollout{})
if err != nil {
logCtx.Errorf("Error constructing app status patch: %v", err)
return err
}
if !modified {
logCtx.Info("No status changes. Skipping patch")
c.requeueStuckRollout(orig, *newStatus)
return nil
}
logCtx.Debugf("Rollout Patch: %s", patch)
_, err = c.argoprojclientset.ArgoprojV1alpha1().Rollouts(orig.Namespace).Patch(orig.Name, patchtypes.MergePatchType, patch)
if err != nil {
logCtx.Warningf("Error updating application: %v", err)
return err
}
logCtx.Info("Patch status successfully")
return nil
}
// used for unit testing
var nowFn = func() time.Time { return time.Now() }
// requeueStuckRollout checks whether the provided rollout needs to be synced for a progress
// check. It returns the time after the rollout will be requeued for the progress check, 0 if it
// will be requeued now, or -1 if it does not need to be requeued.
func (c *RolloutController) requeueStuckRollout(r *v1alpha1.Rollout, newStatus v1alpha1.RolloutStatus) time.Duration {
logctx := logutil.WithRollout(r)
currentCond := conditions.GetRolloutCondition(r.Status, v1alpha1.RolloutProgressing)
// Can't estimate progress if there is no deadline in the spec or progressing condition in the current status.
if currentCond == nil {
return time.Duration(-1)
}
// No need to estimate progress if the rollout is complete or already timed out.
isPaused := len(r.Status.PauseConditions) > 0 || r.Spec.Paused
if conditions.RolloutComplete(r, &newStatus) || currentCond.Reason == conditions.TimedOutReason || isPaused || r.Status.Abort || isIndefiniteStep(r) {
return time.Duration(-1)
}
// If there is no sign of progress at this point then there is a high chance that the
// rollout is stuck. We should resync this rollout at some point in the future[1]
// and check whether it has timed out. We definitely need this, otherwise we depend on the
// controller resync interval. See https://github.com/kubernetes/kubernetes/issues/34458.
//
// [1] ProgressingCondition.LastUpdatedTime + progressDeadlineSeconds - time.Now()
//
// For example, if a Rollout updated its Progressing condition 3 minutes ago and has a
// deadline of 10 minutes, it would need to be resynced for a progress check after 7 minutes.
//
// lastUpdated: 00:00:00
// now: 00:03:00
// progressDeadlineSeconds: 600 (10 minutes)
//
// lastUpdated + progressDeadlineSeconds - now => 00:00:00 + 00:10:00 - 00:03:00 => 07:00
progressDeadlineSeconds := defaults.GetProgressDeadlineSecondsOrDefault(r)
after := currentCond.LastUpdateTime.Time.Add(time.Duration(progressDeadlineSeconds) * time.Second).Sub(nowFn())
// If the remaining time is less than a second, then requeue the deployment immediately.
// Make it ratelimited so we stay on the safe side, eventually the Deployment should
// transition either to a Complete or to a TimedOut condition.
if after < time.Second {
logctx.Infof("Queueing up Rollout for a progress check now")
c.enqueueRollout(r)
return time.Duration(0)
}
logctx.Infof("Queueing up rollout for a progress after %ds", int(after.Seconds()))
// Add a second to avoid milliseconds skew in AddAfter.
// See https://github.com/kubernetes/kubernetes/issues/39785#issuecomment-279959133 for more info.
c.enqueueRolloutAfter(r, after+time.Second)
return after
}
// getReplicaFailures will convert replica failure conditions from replica sets
// to rollout conditions.
func (c *RolloutController) getReplicaFailures(allRSs []*appsv1.ReplicaSet, newRS *appsv1.ReplicaSet) []v1alpha1.RolloutCondition {
var errorConditions []v1alpha1.RolloutCondition
if newRS != nil {
for _, c := range newRS.Status.Conditions {
if c.Type != appsv1.ReplicaSetReplicaFailure {
continue
}
errorConditions = append(errorConditions, conditions.ReplicaSetToRolloutCondition(c))
}
}
// Return failures for the new replica set over failures from old replica sets.
if len(errorConditions) > 0 {
return errorConditions
}
for i := range allRSs {
rs := allRSs[i]
if rs == nil {
continue
}
for _, c := range rs.Status.Conditions {
if c.Type != appsv1.ReplicaSetReplicaFailure {
continue
}
errorConditions = append(errorConditions, conditions.ReplicaSetToRolloutCondition(c))
}
}
return errorConditions
}