-
Notifications
You must be signed in to change notification settings - Fork 18
/
scale.go
127 lines (113 loc) · 5.01 KB
/
scale.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package controllers
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"strings"
"time"
etcdv1 "github.com/aws/etcdadm-controller/api/v1beta1"
"github.com/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
apierrors "k8s.io/apimachinery/pkg/api/errors"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/etcdadm/constants"
)
const etcdClientTimeout = 5 * time.Second
func (r *EtcdadmClusterReconciler) intializeEtcdCluster(ctx context.Context, ec *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster, ep *EtcdPlane) (ctrl.Result, error) {
if err := r.generateCAandClientCertSecrets(ctx, cluster, ec); err != nil {
r.Log.Error(err, "error generating etcd CA certs")
return ctrl.Result{}, err
}
conditions.MarkTrue(ec, etcdv1.EtcdCertificatesAvailableCondition)
fd := ep.NextFailureDomainForScaleUp()
return r.cloneConfigsAndGenerateMachine(ctx, ec, cluster, fd)
}
func (r *EtcdadmClusterReconciler) scaleUpEtcdCluster(ctx context.Context, ec *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster, ep *EtcdPlane) (ctrl.Result, error) {
fd := ep.NextFailureDomainForScaleUp()
return r.cloneConfigsAndGenerateMachine(ctx, ec, cluster, fd)
}
func (r *EtcdadmClusterReconciler) scaleDownEtcdCluster(ctx context.Context, ec *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster, ep *EtcdPlane, outdatedMachines collections.Machines) (ctrl.Result, error) {
// Pick the Machine that we should scale down.
machineToDelete, err := selectMachineForScaleDown(ep, outdatedMachines)
if err != nil || machineToDelete == nil {
return ctrl.Result{}, errors.Wrap(err, "failed to select machine for scale down")
}
machineAddress := getEtcdMachineAddress(machineToDelete)
return ctrl.Result{}, r.removeEtcdMachine(ctx, ec, cluster, machineToDelete, machineAddress)
}
func (r *EtcdadmClusterReconciler) removeEtcdMachine(ctx context.Context, ec *etcdv1.EtcdadmCluster, cluster *clusterv1.Cluster, machineToDelete *clusterv1.Machine, machineAddress string) error {
peerURL := fmt.Sprintf("https://%s:2380", machineAddress)
etcdClient, err := r.GetEtcdClient(ctx, cluster, ec.Status.Endpoints)
if err != nil {
return fmt.Errorf("error creating etcd client, err: %v", err)
}
if etcdClient == nil {
return fmt.Errorf("could not create etcd client")
}
defer etcdClient.Close()
return r.removeEtcdMemberAndDeleteMachine(ctx, etcdClient, peerURL, machineToDelete)
}
func (r *EtcdadmClusterReconciler) generateEtcdClient(ctx context.Context, cluster *clusterv1.Cluster, endpoints string) (EtcdClient, error) {
caCertPool := x509.NewCertPool()
caCert, err := r.getCACert(ctx, cluster)
if err != nil {
return nil, err
}
caCertPool.AppendCertsFromPEM(caCert)
clientCert, err := r.getClientCerts(ctx, cluster)
if err != nil {
return nil, errors.Wrap(err, "error getting client cert for healthcheck")
}
etcdClient, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(endpoints, ","),
DialTimeout: etcdClientTimeout,
TLS: &tls.Config{
RootCAs: caCertPool,
Certificates: []tls.Certificate{clientCert},
},
})
return etcdClient, err
}
func (r *EtcdadmClusterReconciler) removeEtcdMemberAndDeleteMachine(ctx context.Context, etcdClient EtcdClient, peerURL string, machineToDelete *clusterv1.Machine) error {
log := r.Log
// Etcdadm has a "reset" command to remove an etcd member. But we can't run that command on the CAPI machine object after it's provisioned.
// so the following logic is based on how etcdadm performs "reset" https://github.com/kubernetes-sigs/etcdadm/blob/master/cmd/reset.go#L65
etcdCtx, cancel := context.WithTimeout(ctx, constants.DefaultEtcdRequestTimeout)
mresp, err := etcdClient.MemberList(etcdCtx)
cancel()
if err != nil {
return fmt.Errorf("error listing members: %v", err)
}
localMember, ok := memberForPeerURLs(mresp, []string{peerURL})
if ok {
if len(mresp.Members) > 1 {
log.Info("Removing", "member", localMember.Name)
etcdCtx, cancel = context.WithTimeout(ctx, constants.DefaultEtcdRequestTimeout)
_, err = etcdClient.MemberRemove(etcdCtx, localMember.ID)
cancel()
if err != nil {
return fmt.Errorf("failed to remove etcd member %s with error %v", localMember.Name, err)
}
if machineToDelete != nil {
if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) && !apierrors.IsGone(err) {
return fmt.Errorf("failed to delete etcd machine %s with error %v", machineToDelete.Name, err)
}
}
} else {
log.Info("Not removing last member in the cluster", "member", localMember.Name)
}
} else {
log.Info("Member was removed")
if machineToDelete != nil {
// this could happen if the etcd member was removed through etcdctl calls, ensure that the machine gets deleted too
if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) && !apierrors.IsGone(err) {
return fmt.Errorf("failed to delete etcd machine %s with error %v", machineToDelete.Name, err)
}
}
}
return nil
}