-
Notifications
You must be signed in to change notification settings - Fork 0
/
k8s.go
235 lines (197 loc) · 6.58 KB
/
k8s.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
package cmd
import (
"context"
"encoding/json"
"fmt"
"os"
"path"
"time"
kafka "github.com/RedHatInsights/strimzi-client-go/apis/kafka.strimzi.io/v1beta2"
"github.com/astrolabsoftware/finkctl/v3/resources"
"github.com/spf13/cobra"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
applyv1 "k8s.io/client-go/applyconfigurations/core/v1"
applymetav1 "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
podv1 "k8s.io/kubernetes/pkg/api/v1/pod"
)
const (
kubeConfigEnvName = "KUBECONFIG"
kubeConfigDefaultFilename = ".kube/config"
configMapNameKafkaJaas = "fink-kafka-jaas"
configMapPathKafkaJaas = "/etc/fink-broker"
)
type kubeVars struct {
ConfigMapNameKafkaJaas string
ConfigMapPathKafkaJaas string
}
func getKubeVars() kubeVars {
kubeVarsInstance := kubeVars{
ConfigMapNameKafkaJaas: configMapNameKafkaJaas,
ConfigMapPathKafkaJaas: configMapPathKafkaJaas,
}
return kubeVarsInstance
}
func getKubeConfig() string {
kubeConfigFilename := os.Getenv(kubeConfigEnvName)
// Fallback to default kubeconfig file location if no env variable set
if kubeConfigFilename == "" {
home, err := os.UserHomeDir()
cobra.CheckErr(err)
kubeConfigFilename = path.Join(home, kubeConfigDefaultFilename)
}
return kubeConfigFilename
}
func setKubeClient() (*kubernetes.Clientset, *rest.Config) {
kubeconfig := getKubeConfig()
// use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
// create the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
return clientset, config
}
func createKafkaJaasConfigMap(c *DistributionConfig) {
kafkaJaasConf := format(resources.KafkaJaasConf, &c)
clientSet, _ := setKubeClient()
files := make(map[string]string)
files[resources.KafkaJaasConfFile] = kafkaJaasConf
kind := "ConfigMap"
version := "v1"
name := configMapNameKafkaJaas
cm := applyv1.ConfigMapApplyConfiguration{
TypeMetaApplyConfiguration: applymetav1.TypeMetaApplyConfiguration{
Kind: &kind,
APIVersion: &version,
},
ObjectMetaApplyConfiguration: &applymetav1.ObjectMetaApplyConfiguration{
Name: &name,
},
Data: files,
}
_, err := clientSet.CoreV1().ConfigMaps(getCurrentNamespace()).Apply(
context.TODO(), &cm,
metav1.ApplyOptions{FieldManager: "application/apply-patch"})
if err != nil {
panic(err.Error())
}
}
// getKafkaPasswordFromSecret returns the kafka password
// equivalent to "kubectl get -n kafka secrets/fink-producer --template={{.data.password}} | base64 --decode"
func getKafkaPasswordFromSecret() string {
clientSet, _ := setKubeClient()
secret, err := clientSet.CoreV1().Secrets(kafkaNamespace).Get(
context.TODO(), kafkaSecretName, metav1.GetOptions{})
if err != nil {
panic(err.Error())
}
return string(secret.Data["password"])
}
// getKafkaTopic returns the kafka topics
// equivalent to "kubectl get -n kafka kafkatopics.kafka.strimzi.io --template='{{range .items}}{{.metadata.name}}{{"\n"}}{{end}}'"
func getKafkaTopics() ([]string, error) {
clientSet, _ := setKubeClient()
topics := &kafka.KafkaTopicList{}
url := fmt.Sprintf("/apis/kafka.strimzi.io/v1beta2/namespaces/%s/kafkatopics", kafkaNamespace)
logger.Debugf("Get Kafka topics from %s", url)
d, err := clientSet.RESTClient().Get().AbsPath(url).DoRaw(context.TODO())
if err != nil {
return nil, fmt.Errorf("failed to get kafka topics. %+v", err)
}
if err := json.Unmarshal(d, &topics); err != nil {
return nil, fmt.Errorf("failed to unmarshal kafka topics. %+v", err)
}
topicNames := make([]string, len(topics.Items))
for _, topic := range topics.Items {
topicNames = append(topicNames, topic.Name)
}
return topicNames, nil
}
// getCurrentNamespace returns the current namespace
// for the current context of the kubeconfig file
func getCurrentNamespace() string {
kubeconfig := getKubeConfig()
config, err := clientcmd.LoadFromFile(kubeconfig)
if err != nil {
panic(err.Error())
}
ns := config.Contexts[config.CurrentContext].Namespace
if len(ns) == 0 {
ns = "default"
}
return ns
}
func waitForPodReady(ctx context.Context, clientset *kubernetes.Clientset, pod *v1.Pod, timeout time.Duration) error {
logger.Infof("waiting for pod %s to be running...", pod.Name)
return wait.PollUntilContextTimeout(ctx, 5*time.Second, timeout, true, func(context context.Context) (bool, error) {
pod, err := clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("failed to detect pod %s. %+v", pod.Name, err)
}
if podv1.IsPodTerminal(pod) {
return false, fmt.Errorf("job %s failed", pod.Name)
}
if podv1.IsPodReady(pod) {
return true, nil
}
logger.Debugf("pod is still initializing")
return false, nil
})
}
// Returns the list of currently scheduled or running pods in `namespace` with the given selector
func listPods(c *kubernetes.Clientset, namespace, selector string) (*v1.PodList, error) {
listOptions := metav1.ListOptions{LabelSelector: selector}
podList, err := c.CoreV1().Pods(namespace).List(context.TODO(), listOptions)
if err != nil {
return nil, err
}
return podList, nil
}
// Wait up to timeout seconds for all pods in 'namespace' with given 'selector' to enter running state.
// Returns an error if no pods are found or not all discovered pods enter running state.
func waitForPodReadyBySelector(c *kubernetes.Clientset, namespace, selector string, timeout time.Duration) error {
podList, err := listPods(c, namespace, selector)
if err != nil {
return err
}
if len(podList.Items) == 0 {
return fmt.Errorf("no pods in %s with selector %s", namespace, selector)
}
for _, pod := range podList.Items {
if err := waitForPodReady(context.TODO(), c, &pod, timeout); err != nil {
return err
}
}
return nil
}
func waitForPodExistsBySelector(c *kubernetes.Clientset, namespace, selector string, timeout time.Duration, expected int) error {
allPodsExists := make(chan bool, 1)
go func() {
for {
podList, _ := listPods(c, namespace, selector)
podCount := len(podList.Items)
logger.Debugf("Found %d pods with label %s", podCount, selector)
if podCount == expected {
allPodsExists <- true
return
}
time.Sleep(time.Millisecond * 500)
}
}()
select {
case <-allPodsExists:
logger.Debugf("Condition met: Found %d pods with label %s\n", expected, selector)
return nil
case <-time.After(timeout):
return fmt.Errorf("error: timed out waiting for pods with label %s", selector)
}
}