-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathkubernetes.go
181 lines (154 loc) · 5.39 KB
/
kubernetes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
package fipcontroller
import (
"context"
"fmt"
"k8s.io/client-go/util/retry"
"net"
"strings"
"github.com/cbeneke/hcloud-fip-controller/internal/pkg/configuration"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func newKubernetesClient() (*kubernetes.Clientset, error) {
kubeConfig, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("could not get kubeconfig: %v", err)
}
kubernetesClient, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return nil, fmt.Errorf("could not get kubernetes client: %v", err)
}
return kubernetesClient, nil
}
// Search and return the IP address of a given kubernetes node name.
// Will return first found internal or external IP depending on nodeAddressType parameter
func (controller *Controller) nodeAddressList(ctx context.Context, nodeAddressType configuration.NodeAddressType) (addressList [][]net.IP, err error) {
podLabelSelector, err := controller.createPodLabelSelector(ctx)
if err != nil {
return nil, fmt.Errorf("could not get information about pod: %v", err)
}
// Try to get deployment pods if certain label is specified
listOptions := metav1.ListOptions{}
listOptions.LabelSelector = podLabelSelector
var pods *corev1.PodList
err = retry.OnError(controller.Backoff, alwaysRetry, func() error {
pods, err = controller.KubernetesClient.CoreV1().Pods(controller.Configuration.Namespace).List(ctx, listOptions)
return err
})
if err != nil {
return nil, fmt.Errorf("could not list nodes: %v", err)
}
controller.Logger.Debugf("Found %d pods", len(pods.Items))
var nodeNames []string
for _, pod := range pods.Items {
nodeNames = append(nodeNames, pod.Spec.NodeName)
}
if len(nodeNames) > 0 {
var nodes *corev1.NodeList
err = retry.OnError(controller.Backoff, alwaysRetry, func() error {
nodes, err = controller.KubernetesClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
return err
})
if err != nil {
return nil, fmt.Errorf("could not list nodes: %v", err)
}
for _, node := range nodes.Items {
if hasNodeName(nodeNames, node.Name) {
addressList = append(addressList, searchForAddresses(node.Status.Addresses))
}
}
}
if len(addressList) > 0 {
controller.Logger.Debugf("Found %d ips from pods", len(addressList))
return
}
// Create list options with optional labelSelector
listOptions = metav1.ListOptions{}
if controller.Configuration.NodeLabelSelector != "" {
listOptions.LabelSelector = controller.Configuration.NodeLabelSelector
}
var nodes *corev1.NodeList
err = retry.OnError(controller.Backoff, alwaysRetry, func() error {
nodes, err = controller.KubernetesClient.CoreV1().Nodes().List(ctx, listOptions)
return err
})
if err != nil {
return nil, fmt.Errorf("could not list nodes: %v", err)
}
controller.Logger.Debugf("Found %d nodes", len(nodes.Items))
for _, node := range nodes.Items {
// Skip unhealthy nodes
if !isNodeHealthy(node) {
continue
}
addresses := node.Status.Addresses
controller.Logger.Debugf("Found %d addresses for node %s", len(addresses), node.Name)
checkAddressType := corev1.NodeExternalIP
if nodeAddressType == configuration.NodeAddressTypeInternal {
checkAddressType = corev1.NodeInternalIP
}
controller.Logger.Debugf("Using address type '%s' for node %s", checkAddressType, node.Name)
addressList = append(addressList, searchForAddresses(addresses))
}
if len(addressList) == 0 {
return nil, fmt.Errorf("could not find any healthy nodes")
}
return
}
// Check if node is healthy
func isNodeHealthy(node corev1.Node) bool {
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady {
return condition.Status == corev1.ConditionTrue
}
}
return false
}
func searchForAddresses(addresses []corev1.NodeAddress) (possibleIPs []net.IP) {
for _, address := range addresses {
if address.Type == corev1.NodeExternalIP || address.Type == corev1.NodeInternalIP {
possibleIPs = append(possibleIPs, net.ParseIP(address.Address))
}
}
return possibleIPs
}
func hasNodeName(nodeNames []string, nodeName string) bool {
for _, name := range nodeNames {
if name == nodeName {
return true
}
}
return false
}
func (controller *Controller) createPodLabelSelector(ctx context.Context) (string, error) {
if controller.Configuration.PodLabelSelector != "" {
return controller.Configuration.PodLabelSelector, nil
}
if controller.Configuration.PodName == "" {
controller.Logger.Warn("no pod name specified in configuration, all pods in namespace will be used")
return "", nil
}
var pod *corev1.Pod
var err error
err = retry.OnError(controller.Backoff, alwaysRetry, func() error {
pod, err = controller.KubernetesClient.CoreV1().Pods(controller.Configuration.Namespace).Get(ctx, controller.Configuration.PodName, metav1.GetOptions{})
return err
})
if err != nil {
return "", fmt.Errorf("Could not get pod information: %v", err)
}
if len(pod.Labels) < 1 {
controller.Logger.Warnf("fip-controller pod has no labels, all pods in namespace will be used")
return "", nil
}
var stringBuilder strings.Builder
for key, value := range pod.Labels {
fmt.Fprintf(&stringBuilder, "%s=%s,", key, value)
}
labelSelector := stringBuilder.String()
labelSelector = labelSelector[:stringBuilder.Len()-1] // remove trailing ,
controller.Logger.Debugf("pod label selector created: %s", labelSelector)
return labelSelector, nil
}