-
Notifications
You must be signed in to change notification settings - Fork 34
/
delete.go
326 lines (296 loc) · 12.8 KB
/
delete.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
package nodes
import (
"fmt"
"strings"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
comm "github.com/berops/claudie/internal/command"
"github.com/berops/claudie/internal/kubectl"
"github.com/berops/claudie/internal/utils"
"github.com/berops/claudie/proto/pb"
)
const (
longhornNamespace = "longhorn-system"
newReplicaCreationTimeout = 10 * time.Second
)
type etcdPodInfo struct {
nodeName string
memberHash string
}
type Deleter struct {
masterNodes []string
workerNodes []string
cluster *pb.K8Scluster
clusterPrefix string
logger zerolog.Logger
}
// New returns new Deleter struct, used for node deletion from a k8s cluster
// masterNodes - master nodes to DELETE
// workerNodes - worker nodes to DELETE
func NewDeleter(masterNodes, workerNodes []string, cluster *pb.K8Scluster) *Deleter {
prefix := utils.GetClusterID(cluster.ClusterInfo)
for i := range masterNodes {
masterNodes[i] = strings.TrimPrefix(masterNodes[i], fmt.Sprintf("%s-", prefix))
}
for i := range workerNodes {
workerNodes[i] = strings.TrimPrefix(workerNodes[i], fmt.Sprintf("%s-", prefix))
}
return &Deleter{
masterNodes: masterNodes,
workerNodes: workerNodes,
cluster: cluster,
clusterPrefix: prefix,
logger: utils.CreateLoggerWithClusterName(prefix),
}
}
// DeleteNodes deletes nodes specified in d.masterNodes and d.workerNodes
// return nil if successful, error otherwise
func (d *Deleter) DeleteNodes() (*pb.K8Scluster, error) {
kubectl := kubectl.Kubectl{Kubeconfig: d.cluster.Kubeconfig, MaxKubectlRetries: 3}
if log.Logger.GetLevel() == zerolog.DebugLevel {
kubectl.Stdout = comm.GetStdOut(d.clusterPrefix)
kubectl.Stderr = comm.GetStdErr(d.clusterPrefix)
}
// get real node names
realNodeNamesBytes, err := kubectl.KubectlGetNodeNames()
realNodeNames := strings.Split(string(realNodeNamesBytes), "\n")
if err != nil {
return nil, fmt.Errorf("error while getting nodes from cluster %s : %w", d.clusterPrefix, err)
}
etcdEpNode := d.getMainMaster()
// Remove master nodes sequentially to minimise risk of faults in etcd
for _, master := range d.masterNodes {
// delete master nodes from etcd
if err := d.deleteFromEtcd(kubectl, etcdEpNode); err != nil {
return nil, fmt.Errorf("error while deleting nodes from etcd for %s : %w", d.clusterPrefix, err)
}
// delete master nodes
if err := d.deleteNodesByName(kubectl, master, realNodeNames); err != nil {
return nil, fmt.Errorf("error while deleting nodes from master nodes for %s : %w", d.clusterPrefix, err)
}
}
// Cordon worker nodes to prevent any new pods/volume replicas being scheduled there
if err := utils.ConcurrentExec(d.workerNodes, func(_ int, worker string) error {
if realNodeName := utils.FindName(realNodeNames, worker); realNodeName != "" {
return kubectl.KubectlCordon(worker)
}
d.logger.Warn().Msgf("Node name %s not found in cluster.", worker)
return nil
}); err != nil {
return nil, fmt.Errorf("error while cordoning worker nodes from cluster %s which were marked for deletion : %w", d.clusterPrefix, err)
}
// Remove worker nodes sequentially to minimise risk of fault when replicating PVC
for _, worker := range d.workerNodes {
// Assure replication of storage
if err := d.assureReplication(kubectl, worker); err != nil {
return nil, fmt.Errorf("error while making sure storage is replicated before deletion on cluster %s : %w", d.clusterPrefix, err)
}
// Delete worker nodes from nodes.longhorn.io
if err := d.deleteFromLonghorn(kubectl, worker); err != nil {
return nil, fmt.Errorf("error while deleting nodes.longhorn.io for %s : %w", d.clusterPrefix, err)
}
// Delete worker nodes
if err := d.deleteNodesByName(kubectl, worker, realNodeNames); err != nil {
return nil, fmt.Errorf("error while deleting nodes from worker nodes for %s : %w", d.clusterPrefix, err)
}
// NOTE: Might need to manually verify if the volume got detached.
// https://github.com/berops/claudie/issues/784
}
// Update the current cluster
d.updateClusterData()
return d.cluster, nil
}
// deleteNodesByName deletes node from cluster by performing
// kubectl delete node <node-name>
// return nil if successful, error otherwise
func (d *Deleter) deleteNodesByName(kc kubectl.Kubectl, nodeName string, realNodeNames []string) error {
if realNodeName := utils.FindName(realNodeNames, nodeName); realNodeName != "" {
d.logger.Info().Msgf("Deleting node %s from k8s cluster", realNodeName)
//kubectl drain <node-name> --ignore-daemonsets --delete-emptydir-data
err := kc.KubectlDrain(realNodeName)
if err != nil {
return fmt.Errorf("error while draining node %s from cluster %s : %w", nodeName, d.clusterPrefix, err)
}
//kubectl delete node <node-name>
err = kc.KubectlDeleteResource("nodes", realNodeName)
if err != nil {
return fmt.Errorf("error while deleting node %s from cluster %s : %w", nodeName, d.clusterPrefix, err)
}
return nil
}
d.logger.Warn().Msgf("Node name that contains %s not found in cluster", nodeName)
return nil
}
// deleteFromEtcd function deletes members of the etcd cluster. This needs to be done in order to prevent any data corruption in etcd
// return nil if successful, error otherwise
func (d *Deleter) deleteFromEtcd(kc kubectl.Kubectl, etcdEpNode *pb.Node) error {
//get etcd pods
etcdPods, err := getEtcdPodNames(kc, strings.TrimPrefix(etcdEpNode.Name, fmt.Sprintf("%s-", d.clusterPrefix)))
if err != nil {
return fmt.Errorf("cannot find etcd pods in cluster %s : %w", d.clusterPrefix, err)
}
etcdMembers, err := getEtcdMembers(kc, etcdPods[0])
if err != nil {
return fmt.Errorf("cannot find etcd members in cluster %s : %w", d.clusterPrefix, err)
}
//get pod info, like name of a node where pod is deployed and etcd member hash
etcdPodInfos := getEtcdPodInfo(etcdMembers)
// Remove etcd members that are in mastersToDelete, you need to know an etcd node hash to be able to remove a member
for _, nodeName := range d.masterNodes {
for _, etcdPodInfo := range etcdPodInfos {
if nodeName == etcdPodInfo.nodeName {
d.logger.Debug().Msgf("Deleting etcd member %s, with hash %s", etcdPodInfo.nodeName, etcdPodInfo.memberHash)
etcdctlCmd := fmt.Sprintf("etcdctl member remove %s", etcdPodInfo.memberHash)
_, err := kc.KubectlExecEtcd(etcdPods[0], etcdctlCmd)
if err != nil {
return fmt.Errorf("error while executing \"etcdctl member remove\" on node %s, cluster %s: %w", etcdPodInfo.nodeName, d.clusterPrefix, err)
}
}
}
}
return nil
}
// updateClusterData will remove deleted nodes from nodepools
func (d *Deleter) updateClusterData() {
nodes:
for _, name := range append(d.masterNodes, d.workerNodes...) {
for _, np := range d.cluster.ClusterInfo.NodePools {
for i, node := range np.Nodes {
if node.Name == name {
np.Nodes = append(np.Nodes[:i], np.Nodes[i+1:]...)
continue nodes
}
}
}
}
}
// deleteFromLonghorn will delete node from nodes.longhorn.io
// return nil if successful, error otherwise
func (d *Deleter) deleteFromLonghorn(kc kubectl.Kubectl, worker string) error {
// check if the resource is present before deleting.
if logs, err := kc.KubectlGet(fmt.Sprintf("nodes.longhorn.io %s", worker), "-n", longhornNamespace); err != nil {
// This is not the ideal path of checking for a NotFound error, this is only done as we shell out to run kubectl.
if strings.Contains(string(logs), "NotFound") {
d.logger.Warn().Msgf("worker node: %s not found, assuming it was deleted.", worker)
return nil
}
}
d.logger.Info().Msgf("Deleting node %s from nodes.longhorn.io from cluster", worker)
if err := kc.KubectlDeleteResource("nodes.longhorn.io", worker, "-n", longhornNamespace); err != nil {
return fmt.Errorf("error while deleting node %s from nodes.longhorn.io from cluster %s : %w", worker, d.clusterPrefix, err)
}
return nil
}
// assureReplication tries to assure, that replicas for each longhorn volume are migrated to nodes, which will remain in the cluster.
func (d *Deleter) assureReplication(kc kubectl.Kubectl, worker string) error {
// Get replicas and volumes as they can be scheduled on next node, which will be deleted.
replicas, err := getReplicasMap(kc)
if err != nil {
return fmt.Errorf("error while getting replicas from cluster %s : %w", d.clusterPrefix, err)
}
volumes, err := getVolumes(kc)
if err != nil {
return fmt.Errorf("error while getting volumes from cluster %s : %w", d.clusterPrefix, err)
}
if reps, ok := replicas[worker]; ok {
for _, r := range reps {
// Try to force creation of a new replicas from node, which will be deleted.
if v, ok := volumes[r.Spec.VolumeName]; ok {
// Increase number of replicas in volume.
if err := increaseReplicaCount(v, kc); err != nil {
return fmt.Errorf("error while increasing number of replicas in volume %s from cluster %s : %w", v.Metadata.Name, d.clusterPrefix, err)
}
// Wait newReplicaCreationTimeout for Longhorn to create new replica.
d.logger.Info().Msgf("Waiting %.0f seconds for new replicas to be scheduled if possible for node %s of cluster", newReplicaCreationTimeout.Seconds(), worker)
time.Sleep(newReplicaCreationTimeout)
// Verify all current replicas are running correctly
if err := verifyAllReplicasSetUp(v.Metadata.Name, kc, d.logger); err != nil {
return fmt.Errorf("error while checking if all longhorn replicas for volume %s are running : %w", v.Metadata.Name, err)
}
d.logger.Info().Msgf("Replication for volume %s has been set up", v.Metadata.Name)
// Decrease number of replicas in volume -> original state.
if err := revertReplicaCount(v, kc); err != nil {
return fmt.Errorf("error while increasing number of replicas in volume %s cluster %s : %w", v.Metadata.Name, d.clusterPrefix, err)
}
// Delete old replica, on to-be-deleted node.
d.logger.Debug().Str("node", r.Status.OwnerID).Msgf("Deleting replica %s from node %s", r.Metadata.Name, r.Status.OwnerID)
if err := deleteReplica(r, kc); err != nil {
return err
}
}
}
}
return nil
}
// getMainMaster iterates over all control nodes in cluster and returns API EP node. If none defined with this type,
// function returns any master node which will not be deleted.
// return API EP node if successful, nil otherwise
func (d *Deleter) getMainMaster() *pb.Node {
for _, np := range d.cluster.ClusterInfo.GetNodePools() {
for _, node := range np.Nodes {
if node.NodeType == pb.NodeType_apiEndpoint {
return node
}
}
}
// Choose one master, which is not going to be deleted
for _, np := range d.cluster.ClusterInfo.GetNodePools() {
np:
for _, node := range np.Nodes {
if node.NodeType == pb.NodeType_master {
// If node will be deleted, continue.
for _, dm := range d.masterNodes {
if strings.Contains(node.Name, dm) {
continue np
}
}
}
// If loop was not broken by the continue, return this node.
return node
}
}
return nil
}
// getEtcdPodNames returns slice of strings containing all etcd pod names
func getEtcdPodNames(kc kubectl.Kubectl, masterNodeName string) ([]string, error) {
etcdPodsBytes, err := kc.KubectlGetEtcdPods(masterNodeName)
if err != nil {
return nil, fmt.Errorf("cannot find etcd pods in cluster with master node %s : %w", masterNodeName, err)
}
return strings.Split(string(etcdPodsBytes), "\n"), nil
}
// getEtcdMembers will return slice of strings, each element containing etcd member info from "etcdctl member list"
//
// Example output:
// [
// "3ea84f69be8336f3, started, test2-cluster-name1-hetzner-control-2, https://192.168.2.2:2380, https://192.168.2.2:2379, false",
// "56c921bc723229ec, started, test2-cluster-name1-hetzner-control-1, https://192.168.2.1:2380, https://192.168.2.1:2379, false"
// ]
func getEtcdMembers(kc kubectl.Kubectl, etcdPod string) ([]string, error) {
//get etcd members
etcdMembersBytes, err := kc.KubectlExecEtcd(etcdPod, "etcdctl member list")
if err != nil {
return nil, fmt.Errorf("cannot find etcd members in cluster with etcd pod %s : %w", etcdPod, err)
}
// Convert output into []string, each line of output is a separate string
etcdMembersStrings := strings.Split(string(etcdMembersBytes), "\n")
//delete last entry - empty \n
if len(etcdMembersStrings) > 0 {
etcdMembersStrings = etcdMembersStrings[:len(etcdMembersStrings)-1]
}
return etcdMembersStrings, nil
}
// getEtcdPodInfo tokenizes an etcdMemberInfo and data containing node name and etcd member hash for all etcd members
// return slice of etcdPodInfo containing node name and etcd member hash for all etcd members
func getEtcdPodInfo(etcdMembersString []string) []etcdPodInfo {
var etcdPodInfos []etcdPodInfo
for _, etcdString := range etcdMembersString {
etcdStringTokenized := strings.Split(etcdString, ", ")
if len(etcdStringTokenized) > 0 {
temp := etcdPodInfo{etcdStringTokenized[2] /*name*/, etcdStringTokenized[0] /*hash*/}
etcdPodInfos = append(etcdPodInfos, temp)
}
}
return etcdPodInfos
}