/
machines.go
129 lines (108 loc) · 4.39 KB
/
machines.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package controllers
import (
"context"
etcdv1 "github.com/aws/etcdadm-controller/api/v1beta1"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/klog/v2"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// TODO(g-gaston): remove this once we have a stable CAPI repo that contains this,
// MachineEtcdReadyLabelName is the label set on machines that have succesfully joined the etcd cluster.
const MachineEtcdReadyLabelName = "cluster.x-k8s.io/etcd-ready"
type etcdMachines map[string]etcdMachine
// endpoints returns all the API endpoints for the machines that have one available.
func (e etcdMachines) endpoints() []string {
endpoints := make([]string, 0, len(e))
for _, m := range e {
if m.endpoint != "" {
endpoints = append(endpoints, m.endpoint)
}
}
return endpoints
}
// etcdMachine represents a Machine that should be a member of an etcd cluster.
type etcdMachine struct {
*clusterv1.Machine
endpoint string
listening bool
healthError error
}
func (e etcdMachine) healthy() bool {
return e.listening && e.healthError == nil
}
// updateMachinesEtcdReadyLabel adds the etcd-ready label to the machines that have joined the etcd cluster.
func (r *EtcdadmClusterReconciler) updateMachinesEtcdReadyLabel(ctx context.Context, log logr.Logger, machines etcdMachines) error {
for _, m := range machines {
if _, ok := m.Labels[MachineEtcdReadyLabelName]; ok {
continue
}
if !m.healthy() {
log.Info("Machine not healthy yet", "machine", klog.KObj(m.Machine), "listening", m.listening, "healthError", m.healthError, "endpoint", m.endpoint)
continue
}
m.Labels[MachineEtcdReadyLabelName] = "true"
if err := r.Client.Update(ctx, m.Machine); err != nil {
return errors.Wrapf(err, "adding etcd ready label to machine %s", m.Name)
}
}
return nil
}
// checkOwnedMachines verifies the health of all etcd members.
func (r *EtcdadmClusterReconciler) checkOwnedMachines(ctx context.Context, log logr.Logger, etcdadmCluster *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster) (etcdMachines, error) {
ownedMachines, err := r.getCurrentOwnedMachines(ctx, etcdadmCluster, cluster)
if err != nil {
return nil, err
}
machines := make(etcdMachines, len(ownedMachines))
for k, machine := range ownedMachines {
m := etcdMachine{Machine: machine}
endpoint := getMachineEtcdEndpoint(machine)
if endpoint == "" {
machines[k] = m
continue
}
err := r.performEndpointHealthCheck(ctx, cluster, endpoint, true)
// This is not ideal, performEndpointHealthCheck uses an error to signal both a not ready/unhealthy member
// and also transient errors when performing such check.
// Ideally we would separate these 2 so we can abort on error and mark as unhealthy separetly
m.healthError = err
if errors.Is(err, portNotOpenErr) {
log.Info("Machine is not listening yet, this is probably transient, while etcd starts", "endpoint", endpoint)
} else {
m.endpoint = endpoint
m.listening = true
}
machines[k] = m
}
return machines, nil
}
// getCurrentOwnedMachines lists all the owned machines by the etcdadm cluster.
func (r *EtcdadmClusterReconciler) getCurrentOwnedMachines(ctx context.Context, etcdadmCluster *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster) (collections.Machines, error) {
var client client.Reader
if conditions.IsFalse(etcdadmCluster, etcdv1.EtcdMachinesSpecUpToDateCondition) {
// During upgrade with current logic, outdated machines don't get deleted right away.
// the controller removes their etcdadmCluster ownerRef and updates the Machine. So using uncachedClient here will fetch those changes
client = r.uncachedClient
} else {
client = r.Client
}
etcdMachines, err := collections.GetFilteredMachinesForCluster(ctx, client, cluster, EtcdClusterMachines(cluster.Name, etcdadmCluster.Name))
if err != nil {
return nil, errors.Wrap(err, "reading machines for etcd cluster")
}
ownedMachines := etcdMachines.Filter(collections.OwnedMachines(etcdadmCluster))
return ownedMachines, nil
}
// getMachineEtcdEndpoint constructs the full API url for an etcd member Machine.
// If the Machine doesn't have yet the right address, it returns empty string.
func getMachineEtcdEndpoint(machine *clusterv1.Machine) string {
address := getEtcdMachineAddress(machine)
if address == "" {
return ""
}
return getMemberClientURL(address)
}