-
Notifications
You must be signed in to change notification settings - Fork 0
/
helper.go
211 lines (184 loc) · 6.2 KB
/
helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
package distributedrediscluster
import (
"context"
"fmt"
"net"
"time"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
redisv1alpha1 "github.com/dinesh-murugiah/rediscluster-operator/api/v1alpha1"
config "github.com/dinesh-murugiah/rediscluster-operator/redisconfig"
utils "github.com/dinesh-murugiah/rediscluster-operator/utils/commonutils"
"github.com/dinesh-murugiah/rediscluster-operator/utils/k8sutil"
"github.com/dinesh-murugiah/rediscluster-operator/utils/redisutil"
)
var (
defaultLabels = map[string]string{
redisv1alpha1.LabelManagedByKey: redisv1alpha1.OperatorName,
}
)
func getLabels(cluster *redisv1alpha1.DistributedRedisCluster) map[string]string {
dynLabels := map[string]string{
redisv1alpha1.LabelClusterName: cluster.Name,
}
return utils.MergeLabels(defaultLabels, dynLabels, cluster.Labels)
}
// newRedisAdmin builds and returns new redis.Admin from the list of pods
func newRedisAdmin(pods []*corev1.Pod, password string, cfg *config.Redis, reqLogger logr.Logger, ctx context.Context) (redisutil.IAdmin, error) {
nodesAddrs := []string{}
for _, pod := range pods {
redisPort := redisutil.DefaultRedisPort
for _, container := range pod.Spec.Containers {
if container.Name == "redis" {
for _, port := range container.Ports {
if port.Name == "client" {
redisPort = fmt.Sprintf("%d", port.ContainerPort)
}
}
}
}
reqLogger.V(4).Info("append redis admin addr", "addr", pod.Status.PodIP, "port", redisPort)
nodesAddrs = append(nodesAddrs, net.JoinHostPort(pod.Status.PodIP, redisPort))
}
adminConfig := redisutil.AdminOptions{
ConnectionTimeout: time.Duration(cfg.DialTimeout) * time.Millisecond,
RenameCommandsFile: cfg.GetRenameCommandsFile(),
Password: password,
}
return redisutil.NewAdmin(nodesAddrs, &adminConfig, reqLogger, ctx), nil
}
func newRedisCluster(infos *redisutil.ClusterInfos, cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) (*redisutil.Cluster, redisutil.Nodes, error) {
// now we can trigger the rebalance
nodes := infos.GetNodes()
// build redis cluster vision
rCluster := &redisutil.Cluster{
Name: cluster.Name,
Namespace: cluster.Namespace,
Nodes: make(map[string]*redisutil.Node),
HaStatus: cluster.Status.HAStatus,
}
for _, node := range nodes {
rCluster.Nodes[node.ID] = node
//reqLogger.Info("newRedisCluster", "STS", node.StatefulSet, "NodeName", node.NodeName, "Zone", node.Zonename)
if node.Zonename == "" || node.Zonename == "unknown" {
err1 := fmt.Errorf("zone label not found in node %s", node.NodeName)
reqLogger.Error(err1, "Unable to find Zone label", "err", err1)
return nil, nil, err1
}
}
return rCluster, nodes, nil
}
func clusterPods(pods []corev1.Pod) []*corev1.Pod {
var podSlice []*corev1.Pod
for _, pod := range pods {
// Only work with running pods
if pod.Status.Phase == corev1.PodRunning {
podPointer := pod
podSlice = append(podSlice, &podPointer)
}
}
return podSlice
}
func needClusterOperation(cluster *redisv1alpha1.DistributedRedisCluster, reqLogger logr.Logger) bool {
if utils.CompareIntValue("NumberOfMaster", &cluster.Status.NumberOfMaster, &cluster.Spec.MasterSize, reqLogger) {
reqLogger.V(4).Info("needClusterOperation---NumberOfMaster")
return true
}
if utils.CompareIntValue("MinReplicationFactor", &cluster.Status.MinReplicationFactor, &cluster.Spec.ClusterReplicas, reqLogger) {
reqLogger.V(4).Info("needClusterOperation---MinReplicationFactor")
return true
}
if utils.CompareIntValue("MaxReplicationFactor", &cluster.Status.MaxReplicationFactor, &cluster.Spec.ClusterReplicas, reqLogger) {
reqLogger.V(4).Info("needClusterOperation---MaxReplicationFactor")
return true
}
return false
}
type IWaitHandle interface {
Name() string
Tick() time.Duration
Timeout() time.Duration
Handler() error
}
// waiting will keep trying to handler.Handler() until either
// we get a result from handler.Handler() or the timeout expires
func waiting(handler IWaitHandle, reqLogger logr.Logger) error {
timeout := time.After(handler.Timeout())
tick := time.NewTicker(time.Second)
defer tick.Stop()
// Keep trying until we're timed out or got a result or got an error
for {
select {
// Got a timeout! fail with a timeout error
case <-timeout:
return fmt.Errorf("%s timed out", handler.Name())
// Got a tick, we should check on Handler()
case <-tick.C:
err := handler.Handler()
if err == nil {
return nil
}
reqLogger.V(4).Info(err.Error())
}
}
}
type waitPodTerminating struct {
name string
statefulSet string
timeout time.Duration
tick time.Duration
statefulSetController k8sutil.IStatefulSetControl
cluster *redisv1alpha1.DistributedRedisCluster
}
func (w *waitPodTerminating) Name() string {
return w.name
}
func (w *waitPodTerminating) Tick() time.Duration {
return w.tick
}
func (w *waitPodTerminating) Timeout() time.Duration {
return w.timeout
}
func (w *waitPodTerminating) Handler() error {
labels := getLabels(w.cluster)
labels[redisv1alpha1.StatefulSetLabel] = w.statefulSet
podList, err := w.statefulSetController.GetStatefulSetPodsByLabels(w.cluster.Namespace, labels)
if err != nil {
return err
}
for _, pod := range podList.Items {
if pod.Status.Phase == corev1.PodRunning {
return fmt.Errorf("[%s] pod still runing", pod.Name)
}
}
return nil
}
type waitStatefulSetUpdating struct {
name string
timeout time.Duration
tick time.Duration
statefulSetController k8sutil.IStatefulSetControl
cluster *redisv1alpha1.DistributedRedisCluster
}
func (w *waitStatefulSetUpdating) Name() string {
return w.name
}
func (w *waitStatefulSetUpdating) Tick() time.Duration {
return w.tick
}
func (w *waitStatefulSetUpdating) Timeout() time.Duration {
return w.timeout
}
func (w *waitStatefulSetUpdating) Handler() error {
labels := getLabels(w.cluster)
stsList, err := w.statefulSetController.ListStatefulSetByLabels(w.cluster.Namespace, labels)
if err != nil {
return err
}
for _, sts := range stsList.Items {
if sts.Status.ReadyReplicas != (w.cluster.Spec.ClusterReplicas + 1) {
return nil
}
}
return fmt.Errorf("statefulSet still not updated")
}