-
Notifications
You must be signed in to change notification settings - Fork 17
/
status.go
144 lines (121 loc) · 5.19 KB
/
status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package controllers
import (
"context"
"fmt"
"sort"
"strings"
etcdv1 "github.com/aws/etcdadm-controller/api/v1beta1"
"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func (r *EtcdadmClusterReconciler) updateStatus(ctx context.Context, ec *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster) error {
log := r.Log.WithName(ec.Name)
selector := EtcdMachinesSelectorForCluster(cluster.Name, ec.Name)
// Copy label selector to its status counterpart in string format.
// This is necessary for CRDs including scale subresources.
ec.Status.Selector = selector.String()
var etcdMachines collections.Machines
var err error
if conditions.IsFalse(ec, etcdv1.EtcdMachinesSpecUpToDateCondition) {
// During upgrade with current logic, outdated machines don't get deleted right away.
// the controller removes their etcdadmCluster ownerRef and updates the Machine. So using uncachedClient here will fetch those changes
etcdMachines, err = collections.GetFilteredMachinesForCluster(ctx, r.uncachedClient, cluster, EtcdClusterMachines(cluster.Name, ec.Name))
} else {
etcdMachines, err = collections.GetFilteredMachinesForCluster(ctx, r.Client, cluster, EtcdClusterMachines(cluster.Name, ec.Name))
}
if err != nil {
return errors.Wrap(err, "Error filtering machines for etcd cluster")
}
ownedMachines := etcdMachines.Filter(collections.OwnedMachines(ec))
log.Info("following machines owned by this etcd cluster:")
for _, machine := range ownedMachines {
fmt.Printf("%s ", machine.Name)
}
desiredReplicas := *ec.Spec.Replicas
ec.Status.ReadyReplicas = int32(len(ownedMachines))
if !ec.DeletionTimestamp.IsZero() {
return nil
}
readyReplicas := ec.Status.ReadyReplicas
if readyReplicas < desiredReplicas {
conditions.MarkFalse(ec, etcdv1.EtcdClusterResizeCompleted, etcdv1.EtcdScaleUpInProgressReason, clusterv1.ConditionSeverityWarning, "Scaling up etcd cluster to %d replicas (actual %d)", desiredReplicas, readyReplicas)
return nil
}
if readyReplicas > desiredReplicas {
conditions.MarkFalse(ec, etcdv1.EtcdClusterResizeCompleted, etcdv1.EtcdScaleDownInProgressReason, clusterv1.ConditionSeverityWarning, "Scaling up etcd cluster to %d replicas (actual %d)", desiredReplicas, readyReplicas)
return nil
}
conditions.MarkTrue(ec, etcdv1.EtcdClusterResizeCompleted)
endpoints := getMachinesEndpoints(log, ownedMachines)
if len(endpoints) == 0 {
return nil
}
log.Info("Running healthcheck on machines", "endpoints", endpoints)
if machinesReady, err := r.performMachinesHealthCheck(ctx, log, endpoints, cluster); err != nil {
ec.Status.Ready = false
return err
} else if !machinesReady {
return nil
}
// etcd ready when all machines have address set
ec.Status.Ready = true
conditions.MarkTrue(ec, etcdv1.EtcdEndpointsAvailable)
sort.Strings(endpoints)
currEndpoints := strings.Join(endpoints, ",")
log.Info("Comparing current and previous endpoints")
// Checking if endpoints have changed. This avoids unnecessary client calls
// to get and update the Secret containing the endpoints
if ec.Status.Endpoints != currEndpoints {
log.Info("Updating endpoints annotation, and the Secret containing etcdadm join address")
ec.Status.Endpoints = currEndpoints
secretNameNs := client.ObjectKey{Name: ec.Status.InitMachineAddress, Namespace: cluster.Namespace}
secretInitAddress := &corev1.Secret{}
if err := r.Client.Get(ctx, secretNameNs, secretInitAddress); err != nil {
return err
}
if len(endpoints) > 0 {
secretInitAddress.Data["address"] = []byte(getEtcdMachineAddressFromClientURL(endpoints[0]))
} else {
secretInitAddress.Data["address"] = []byte("")
}
secretInitAddress.Data["clientUrls"] = []byte(ec.Status.Endpoints)
r.Log.Info("Updating init secret with endpoints")
if err := r.Client.Update(ctx, secretInitAddress); err != nil {
return err
}
}
// set creationComplete to true, this is only set once after the first set of endpoints are ready and never unset, to indicate that the cluster has been created
ec.Status.CreationComplete = true
return nil
}
func getMachinesEndpoints(log logr.Logger, machines collections.Machines) []string {
endpoints := make([]string, 0, len(machines))
for _, m := range machines {
log.Info("Checking if machine has address set for healthcheck", "machine", m.Name)
if len(m.Status.Addresses) == 0 {
log.Info("No address set in machine yet", "machine", m.Name)
return nil
}
currentEndpoint := getMemberClientURL(getEtcdMachineAddress(m))
endpoints = append(endpoints, currentEndpoint)
}
return endpoints
}
func (r *EtcdadmClusterReconciler) performMachinesHealthCheck(ctx context.Context, log logr.Logger, endpoints []string, cluster *clusterv1.Cluster) (healthy bool, err error) {
for _, endpoint := range endpoints {
err := r.performEndpointHealthCheck(ctx, cluster, endpoint, true)
if errors.Is(err, portNotOpenErr) {
log.Info("Machine is not listening yet, this is probably transient, while etcd starts", "endpoint", endpoint)
return false, nil
}
if err != nil {
return false, err
}
}
return true, nil
}