This repository has been archived by the owner on Oct 22, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
statefulset_rollout_util.go
71 lines (64 loc) · 2.94 KB
/
statefulset_rollout_util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package statefulset
import (
"context"
"fmt"
"strconv"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
crc "sigs.k8s.io/controller-runtime/pkg/client"
"code.cloudfoundry.org/quarks-utils/pkg/ctxlog"
podutil "code.cloudfoundry.org/quarks-utils/pkg/pod"
"code.cloudfoundry.org/quarks-utils/pkg/pointers"
"code.cloudfoundry.org/quarks-utils/pkg/util"
)
// ConfigureStatefulSetForRollout configures a stateful set for canarying and rollout
func ConfigureStatefulSetForRollout(statefulSet *appsv1.StatefulSet) {
statefulSet.Spec.UpdateStrategy.Type = appsv1.RollingUpdateStatefulSetStrategyType
//the canary rollout is for now directly started, the might move to a webhook instead
statefulSet.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{
Partition: pointers.Int32(util.MinInt32(*statefulSet.Spec.Replicas, statefulSet.Status.Replicas)),
}
statefulSet.Annotations[AnnotationCanaryRollout] = rolloutStatePending
statefulSet.Annotations[AnnotationUpdateStartTime] = strconv.FormatInt(time.Now().Unix(), 10)
}
// ConfigureStatefulSetForInitialRollout initially configures a stateful set for canarying and rollout
func ConfigureStatefulSetForInitialRollout(statefulSet *appsv1.StatefulSet) {
statefulSet.Spec.UpdateStrategy.Type = appsv1.RollingUpdateStatefulSetStrategyType
//the canary rollout is for now directly started, the might move to a webhook instead
statefulSet.Spec.UpdateStrategy.RollingUpdate = &appsv1.RollingUpdateStatefulSetStrategy{
Partition: pointers.Int32(0),
}
statefulSet.Annotations[AnnotationCanaryRollout] = rolloutStateCanaryUpscale
statefulSet.Annotations[AnnotationUpdateStartTime] = strconv.FormatInt(time.Now().Unix(), 10)
}
// CleanupNonReadyPod deletes all pods, that are not ready
func CleanupNonReadyPod(ctx context.Context, client crc.Client, statefulSet *appsv1.StatefulSet, index int32) error {
ctxlog.Debug(ctx, "Cleaning up non ready pod for StatefulSet ", statefulSet.Namespace, "/", statefulSet.Name, "-", index)
pod, ready, err := getPodWithIndex(ctx, client, statefulSet, index)
if err != nil {
return err
}
if ready || pod == nil {
return nil
}
ctxlog.Debug(ctx, "Deleting pod ", pod.Name)
if err = client.Delete(ctx, pod); err != nil {
ctxlog.Error(ctx, "Error deleting non-ready pod ", err)
}
return err
}
// getPodWithIndex returns a pod for a given statefulset and index
func getPodWithIndex(ctx context.Context, client crc.Client, statefulSet *appsv1.StatefulSet, index int32) (*corev1.Pod, bool, error) {
var pod corev1.Pod
podName := fmt.Sprintf("%s-%d", statefulSet.Name, index)
err := client.Get(ctx, crc.ObjectKey{Name: podName, Namespace: statefulSet.Namespace}, &pod)
if err != nil {
if crc.IgnoreNotFound(err) == nil {
ctxlog.Error(ctx, "Pods ", podName, " belonging to StatefulSet not found", statefulSet.Namespace, "/", statefulSet.Name, ":", err)
return nil, false, nil
}
return nil, false, err
}
return &pod, podutil.IsPodReady(&pod), nil
}