-
Notifications
You must be signed in to change notification settings - Fork 0
/
kubernetes_workload.go
439 lines (354 loc) · 13.9 KB
/
kubernetes_workload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
package k8s
import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"strings"
"time"
"github.com/anynines/a9s-cli-v2/makeup"
"github.com/kr/pretty"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
m1u "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
)
const CertManagerNamespace = "cert-manager"
// TODO Make version configurable
const CertManagerManifestUrl = "https://github.com/cert-manager/cert-manager/releases/download/v1.12.0/cert-manager.yaml"
// TODO Make configurable
const kubectlWaitTimeoutOption = "--timeout=120s"
/*
Represents the state of a Pod which is expected to be running at some point.
The attribute "Running" is meant to be updated by a control loop.
*/
type PodExpectationState struct {
Name string
Running bool
}
/*
Wait for a set of Pods known by name to enter the status "Running".
*/
func WaitForSystemToBecomeReady(namespace, systemName string, expectedPods []PodExpectationState) {
makeup.PrintH1("Waiting for " + systemName + " to become ready...")
allGood := true
makeup.Print(fmt.Sprintf("Checking the existence of the following %d Pods: ", len(expectedPods)))
out:
for {
// We start optimistically that all pods are running
allGood = true
for _, expectedPodPrefix := range expectedPods {
makeup.Print("Checking the " + expectedPodPrefix.Name + "...")
if checkIfPodHasStatusRunningInNamespace(expectedPodPrefix.Name, namespace) {
makeup.PrintCheckmark("The " + expectedPodPrefix.Name + " appears to be running.")
expectedPodPrefix.Running = true
} else {
// Sadly, at least one pod isn't running so we need another loop iteration
makeup.PrintWait("The " + expectedPodPrefix.Name + " is not ready (yet).")
allGood = false
}
}
if allGood {
makeup.PrintSuccessSummary("The " + systemName + " appears to be ready. All expected pods are running.")
break out
} else {
makeup.PrintWait("The " + systemName + " is not ready (yet), let's try again in 5s ...")
time.Sleep(5 * time.Second)
}
}
}
/*
Uses kubectl wait to wait for each expected pod to become ready.
Pods are identified by label and namespace.
*/
func KubectlWaitForSystemToBecomeReady(namespace string, expectedPodsByLabels []string) {
for _, podLabel := range expectedPodsByLabels {
KubectlWaitForPod(namespace, podLabel)
}
}
func KubectlWaitForPod(namespace, podLabel string) {
// kubectl wait --for=condition=Ready pod -l "app.kubernetes.io/name=backup-manager" -n a8s-system
// Outcome 1: error: timed out waiting for the condition on pods/a8s-backup-controller-manager-788fcd578d-kzb4f
// Outcome 2: pod/postgresql-controller-manager-7f8c7758d-28lc2 condition met
cmd := exec.Command("kubectl", "wait", "--for=condition=Ready", "pod", "-l", podLabel, "-n", namespace, kubectlWaitTimeoutOption)
output, err := cmd.CombinedOutput()
strOutput := string(output)
if err != nil {
makeup.ExitDueToFatalError(err, fmt.Sprintf("Pod with label %s in namespace %s has not become ready on time", podLabel, namespace))
}
if !strings.Contains(strOutput, "condition met") {
makeup.ExitDueToFatalError(nil, fmt.Sprintf("Pod with label %s in namespace %s has not become ready but conditions haven't been met. Got: %s", podLabel, namespace, strOutput))
}
}
/*
TODO This method did not work when the backup-manager went into a CrashLoopBackOff. There is likely a bug here.
*/
func checkIfPodHasStatusRunningInNamespace(podNameStartsWith string, namespace string) bool {
clientset := GetKubernetesClientSet()
//for {
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
for _, pod := range pods.Items {
if strings.HasPrefix(pod.Name, podNameStartsWith) {
makeup.Print("Found pod with prefix " + podNameStartsWith)
// if debug {
// //pod.Status.Phase
// makeup.Print("Pod has status: " + pod.Status.String())
// }
switch phase := pod.Status.Phase; phase {
case v1.PodRunning:
makeup.PrintCheckmark("The Pod " + pod.Name + " is running as expected.")
return true
case v1.PodFailed:
makeup.PrintFail("The Pod " + pod.Name + "h has failed but should be running.")
// makeup.PrintFail("The " + A8sSystemName + " has not been installed successfully.")
os.Exit(1)
case v1.PodPending:
makeup.Print("The Pod " + pod.Name + " in pending but should be running.")
return false
case v1.PodSucceeded:
makeup.Print("The Pod " + pod.Name + " has succeeded but should be running.")
return false
case v1.PodUnknown:
makeup.Print("The Pod " + pod.Name + " has an unknown status but should be running.")
return false
default:
return false
}
}
}
return false
}
// https://github.com/kubernetes/client-go/blob/master/examples/in-cluster-client-configuration/main.go
func CountPodsInNamespace(namespace string) int {
makeup.PrintH2("Checking whether there are pods in the cluster...")
clientset := GetKubernetesClientSet()
//for {
pods, err := clientset.CoreV1().Pods(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
return len(pods.Items)
}
/*
See: https://github.com/kubernetes/client-go > dynamic.
A dynamic client can perform generic operations on arbitrary Kubernetes API objects.
*/
func GetDynamicKubernetesClient() *dynamic.DynamicClient {
// use the current context in kubeconfig
config := GetKubernetesConfig()
// create the clientset
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
makeup.ExitDueToFatalError(err, "Can't create dynamic Kubernetes client.")
}
return dynamicClient
}
/*
Wait for a Kubernetes resource to reach either a desired or failed state.
Namespace
Name: name of the object to wait for, e.g. name of the backup
The desiredConditionsMap contains the conditions to indicate success while
the failedConditionsMap contains the conditions to indicate failure.
Example:
gvr := schema.GroupVersionResource{Group: "backups.anynines.com", Version: "v1beta3", Resource: "backups"}
desiredConditionsMap := make(map[string]interface{})
desiredConditionsMap["reason"] = "Complete"
desiredConditionsMap["status"] = "True"
failedConditionsMap := make(map[string]interface{})
failedConditionsMap["reason"] = "PermanentlyFailed"
failedConditionsMap["status"] = "True"
TODO
- Refactor using WaitForKubernetesResourceWithFunction
- Rename WaitForKubernetesResourceWithFunction to WaitForKubernetesResource
*/
func WaitForKubernetesResource(namespace, name string, gvr schema.GroupVersionResource, desiredConditionsMap map[string]interface{}, failedConditionsMap map[string]interface{}) error {
listOptions := metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
}
dynamicClient := GetDynamicKubernetesClient()
// Watch for changes in Backup resources.
watchInterface, err := dynamicClient.Resource(gvr).Namespace(namespace).Watch(context.TODO(), listOptions)
if err != nil {
makeup.ExitDueToFatalError(err, fmt.Sprintf("Can't create dynamic WatchInterface to watch Kubernetes resource %v.", gvr))
}
makeup.PrintVerbose(fmt.Sprintf("Watching for resources: %v", gvr))
for event := range watchInterface.ResultChan() {
switch event.Type {
case watch.Error:
makeup.PrintFail(fmt.Sprintf("It was all for nothing! %v", event))
os.Exit(1)
case watch.Added, watch.Modified:
backup, ok := event.Object.(*m1u.Unstructured)
if !ok {
makeup.ExitDueToFatalError(nil, "Could not cast to Unstructured")
}
if makeup.Verbose {
fmt.Print("Event object:")
pretty.Print(backup)
}
// Check the status.conditions for the desired status.
status, exists, err := m1u.NestedFieldCopy(backup.Object, "status", "conditions")
if err != nil && !exists {
if makeup.Verbose {
makeup.PrintWait("There is not status, yet.")
}
continue
}
/*
Conditions is a list of condition maps.
One of the condition maps in conditions has the "Status" => "True".
This is the current condition.
Conditions change over time so that this section is part of a loop and
whill be executed when conditions change.
We are waiting for the circumstance when there's a condition map with
"Reason" => "Complete" and "Status" => "True".
TODO There are also other cases which represent a final state, for
example when a backup has permanently failed. They should also be captured
to indicate the user that the backup/restore has failed instead of
keeping the loop running while blocking the cli.
*/
conditions, ok := status.([]interface{})
if !ok {
if makeup.Verbose {
makeup.PrintWait(".")
}
continue
}
if makeup.Verbose {
makeup.PrintWait("Status is now available. Checking conditions...")
}
conditionsAreMet := false
failedConditionsAreMet := false
for _, condition := range conditions {
makeup.PrintVerbose(fmt.Sprintf("Investigating condition %v of conditions\n", condition))
condMap, ok := condition.(map[string]interface{})
if !ok {
makeup.PrintWarning("Condition is not a map")
continue
}
makeup.PrintVerbose(fmt.Sprintf("%v\n", condMap))
// There are several condition fields only the condition with Status => true matters
// hence: if one of the condition maps has Status => true and has the desired "reason",
// we are ready to proceed.
if ConditionsAreMet(condMap, desiredConditionsMap) {
conditionsAreMet = true
break
}
if failedConditionsMap != nil && ConditionsAreMet(condMap, failedConditionsMap) {
failedConditionsAreMet = true
break
}
}
//TODO The conditionsAreMet variable is not necessary but increases readability. Does it?
// No it doesn't. Code here could also be put in the above if clause (if ConditionsAreMet ...)
if conditionsAreMet {
//makeup.PrintCheckmark("Operation complete for resource: " + backup.GetName())
return nil
//
} else {
if makeup.Verbose {
makeup.PrintWait("Desired conditions are not met, yet...")
}
}
if failedConditionsAreMet {
errorMessage := fmt.Sprintf("waiting for Kubernetes resource %v in namespace %s has failed. Resource reached failed state", gvr, namespace)
if makeup.Verbose {
makeup.PrintWarning(errorMessage)
}
return errors.New(errorMessage)
}
continue
}
}
return errors.New("expected conditions have not been met")
}
/*
name refers to the metadata.name value of the object of interest.
waitLonger is a function describing what to wait for covering both success and failure scenarios.
It returns true if waiting shall go on and false if the awaited event has happened.g
*/
func WaitForKubernetesResourceWithFunction(namespace, name string, gvr schema.GroupVersionResource, waitLonger func(object *m1u.Unstructured) bool) error {
listOptions := metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", name),
}
dynamicClient := GetDynamicKubernetesClient()
// Watch for changes in Backup resources.
watchInterface, err := dynamicClient.Resource(gvr).Namespace(namespace).Watch(context.TODO(), listOptions)
if err != nil {
makeup.ExitDueToFatalError(err, fmt.Sprintf("Can't create dynamic WatchInterface to watch Kubernetes resource %v.", gvr))
}
makeup.PrintVerbose(fmt.Sprintf("Watching for resources: %v", gvr))
var goOn bool
for event := range watchInterface.ResultChan() {
switch event.Type {
case watch.Error:
makeup.ExitDueToFatalError(err, "A watch.Error occurred watching the resource.")
case watch.Added, watch.Modified:
object, ok := event.Object.(*m1u.Unstructured)
if !ok {
makeup.ExitDueToFatalError(nil, "Could not cast to Unstructured")
}
if makeup.Verbose {
fmt.Print("Event object:")
pretty.Print(object)
}
goOn = waitLonger(object)
if !goOn {
return nil
}
}
}
return errors.New("expected conditions have not been met")
}
/*
Verifies whether the key-value pairs of expectedConditionsMap are contained in
actualConditionsMap.
The actualConditionsMap is a single record of a conditions array similar to:
map[lastTransitionTime:2024-01-03T07:04:28Z message:Restore object has been created reason:Initialized status:False type:PermanentlyFailed]
The ConditionsAreMet function has to be applied against all condition entries, each being a condition map.
*/
func ConditionsAreMet(actualConditionsMap, expectedConditionsMap map[string]interface{}) bool {
for key, expectedValue := range expectedConditionsMap {
makeup.PrintVerbose(fmt.Sprintf("\nConditionsAreMet? Checking whether %v / %v is in %v\n", key, expectedValue, actualConditionsMap))
actualValue, exists := actualConditionsMap[key]
if !exists || actualValue != expectedValue {
return false
}
}
return true
}
/*
Wait for the given service account in the given namespace to become ready.
Blocks during wait.
*/
func WaitForServiceAccount(unattendedMode bool, namespace, serviceAccountName string) {
for nrAttempts := 0; nrAttempts <= 600; nrAttempts++ {
// Wait x s for the the serviceAccountToShowUp
_, output, err := Kubectl(unattendedMode, "get", "serviceaccount", "-n", namespace, serviceAccountName)
if err == nil {
// Found the service account
return
}
if strings.Contains(string(output), "serviceaccounts \""+serviceAccountName+"\" not found") {
// Did not find the service account
makeup.Print(fmt.Sprintf("The service account %s does not exist (yet) in namespace %s.", serviceAccountName, namespace))
} else {
// Some other error occured
makeup.ExitDueToFatalError(err, "Can't get service account "+serviceAccountName+" in namespace "+namespace)
}
time.Sleep(2 * time.Second)
}
makeup.ExitDueToFatalError(nil, fmt.Sprintf("Timeout. Can't get service account "+serviceAccountName+" in namespace "+namespace))
}
func CreateNamespace(unattendedMode bool, namespace string) {
_, output, err := Kubectl(unattendedMode, "create", "namespace", namespace)
if err != nil {
makeup.ExitDueToFatalError(err, fmt.Sprintf("Couldn't create namespace %s. Output was: %s", namespace, string(output)))
}
}