/
util.go
337 lines (295 loc) · 10.6 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package kubernetes
import (
"context"
"fmt"
"io/ioutil"
"os"
"strings"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/safemapstr"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"github.com/pkg/errors"
"github.com/elastic/beats/v7/libbeat/logp"
)
const namespaceFilePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
type HostDiscoveryUtils interface {
GetNamespace() (string, error)
GetPodName() (string, error)
GetMachineID() string
}
// DiscoverKubernetesNodeParams includes parameters for discovering kubernetes node
type DiscoverKubernetesNodeParams struct {
ConfigHost string
Client kubernetes.Interface
IsInCluster bool
HostUtils HostDiscoveryUtils
}
// DefaultDiscoveryUtils implements functions of HostDiscoveryUtils interface
type DefaultDiscoveryUtils struct{}
func GetKubeConfigEnvironmentVariable() string {
envKubeConfig := os.Getenv("KUBECONFIG")
if _, err := os.Stat(envKubeConfig); !os.IsNotExist(err) {
return envKubeConfig
}
return ""
}
// GetKubernetesClient returns a kubernetes client. If inCluster is true, it returns an
// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed,
// it parses the config file to get the config required to build a client.
func GetKubernetesClient(kubeconfig string, opt KubeClientOptions) (kubernetes.Interface, error) {
if kubeconfig == "" {
kubeconfig = GetKubeConfigEnvironmentVariable()
}
cfg, err := BuildConfig(kubeconfig)
if err != nil {
return nil, fmt.Errorf("unable to build kube config due to error: %+v", err)
}
cfg.QPS = opt.QPS
cfg.Burst = opt.Burst
client, err := kubernetes.NewForConfig(cfg)
if err != nil {
return nil, fmt.Errorf("unable to build kubernetes clientset: %+v", err)
}
return client, nil
}
// BuildConfig is a helper function that builds configs from a kubeconfig filepath.
// If kubeconfigPath is not passed in we fallback to inClusterConfig.
// If inClusterConfig fails, we fallback to the default config.
// This is a copy of `clientcmd.BuildConfigFromFlags` of `client-go` but without the annoying
// klog messages that are not possible to be disabled.
func BuildConfig(kubeconfigPath string) (*restclient.Config, error) {
if kubeconfigPath == "" {
kubeconfig, err := restclient.InClusterConfig()
if err == nil {
return kubeconfig, nil
}
}
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
&clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: ""}}).ClientConfig()
}
// IsInCluster takes a kubeconfig file path as input and deduces if Beats is running in cluster or not,
// taking into consideration the existence of KUBECONFIG variable
func IsInCluster(kubeconfig string) bool {
if kubeconfig != "" || GetKubeConfigEnvironmentVariable() != "" {
return false
}
return true
}
// DiscoverKubernetesNode figures out the Kubernetes node to use.
// If host is provided in the config use it directly.
// If it is empty then try
// 1. If beat is deployed in k8s cluster, use hostname of pod as the pod name to query pod metadata for node name.
// 2. If step 1 fails or beat is deployed outside k8s cluster, use machine-id to match against k8s nodes for node name.
// 3. If node cannot be discovered with step 1,2, fallback to NODE_NAME env var as default value. In case it is not set return error.
func DiscoverKubernetesNode(log *logp.Logger, nd *DiscoverKubernetesNodeParams) (string, error) {
ctx := context.TODO()
// Discover node by configuration file (NODE) if set
if nd.ConfigHost != "" {
log.Infof("kubernetes: Using node %s provided in the config", nd.ConfigHost)
return nd.ConfigHost, nil
}
// Discover node by serviceaccount namespace and pod's hostname in case Beats is running in cluster
if nd.IsInCluster {
node, err := discoverInCluster(nd, ctx)
if err == nil {
log.Infof("kubernetes: Node %s discovered by in cluster pod node query", node)
return node, nil
}
log.Debug(err)
}
// try discover node by machine id
node, err := discoverByMachineId(nd, ctx)
if err == nil {
log.Infof("kubernetes: Node %s discovered by machine-id matching", node)
return node, nil
}
log.Debug(err)
// fallback to environment variable NODE_NAME
node = os.Getenv("NODE_NAME")
if node != "" {
log.Infof("kubernetes: Node %s discovered by NODE_NAME environment variable", node)
return node, nil
}
return "", errors.New("kubernetes: Node could not be discovered with any known method. Consider setting env var NODE_NAME")
}
func discoverInCluster(nd *DiscoverKubernetesNodeParams, ctx context.Context) (node string, errorMsg error) {
ns, err := nd.HostUtils.GetNamespace()
if err != nil {
errorMsg = fmt.Errorf("kubernetes: Couldn't get namespace when beat is in cluster with error: %+v", err.Error())
return
}
podName, err := nd.HostUtils.GetPodName()
if err != nil {
errorMsg = fmt.Errorf("kubernetes: Couldn't get hostname as beat pod name in cluster with error: %+v", err.Error())
return
}
pod, err := nd.Client.CoreV1().Pods(ns).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
errorMsg = fmt.Errorf("kubernetes: Querying for pod failed with error: %+v", err)
return
}
return pod.Spec.NodeName, nil
}
func discoverByMachineId(nd *DiscoverKubernetesNodeParams, ctx context.Context) (nodeName string, errorMsg error) {
mid := nd.HostUtils.GetMachineID()
if mid == "" {
errorMsg = errors.New("kubernetes: Couldn't collect info from any of the files in /etc/machine-id /var/lib/dbus/machine-id")
return
}
nodes, err := nd.Client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
errorMsg = fmt.Errorf("kubernetes: Querying for nodes failed with error: %+v", err)
return
}
for _, n := range nodes.Items {
if n.Status.NodeInfo.MachineID == mid {
nodeName = n.GetObjectMeta().GetName()
return nodeName, nil
}
}
errorMsg = fmt.Errorf("kubernetes: Couldn't discover node %s", mid)
return
}
// GetMachineID returns the machine-idadd_kubernetes_metadata/indexers_test.go
// borrowed from machineID of cadvisor.
func (hd *DefaultDiscoveryUtils) GetMachineID() string {
for _, file := range []string{
"/etc/machine-id",
"/var/lib/dbus/machine-id",
} {
id, err := ioutil.ReadFile(file)
if err == nil {
return strings.TrimSpace(string(id))
}
}
return ""
}
// GetNamespace gets namespace from serviceaccount when beat is in cluster.
func (hd *DefaultDiscoveryUtils) GetNamespace() (string, error) {
return InClusterNamespace()
}
// GetPodName returns the hostname of the pod
func (hd *DefaultDiscoveryUtils) GetPodName() (string, error) {
return os.Hostname()
}
// InClusterNamespace gets namespace from serviceaccount when beat is in cluster. // code borrowed from client-go with some changes.
func InClusterNamespace() (string, error) {
// get namespace associated with the service account token, if available
data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
return "", err
}
return strings.TrimSpace(string(data)), nil
}
type ContainerInPod struct {
ID string
Runtime string
Spec Container
Status PodContainerStatus
}
// GetContainersInPod returns all the containers defined in a pod and their statuses.
// It includes init and ephemeral containers.
func GetContainersInPod(pod *Pod) []*ContainerInPod {
var containers []*ContainerInPod
for _, c := range pod.Spec.Containers {
containers = append(containers, &ContainerInPod{Spec: c})
}
for _, c := range pod.Spec.InitContainers {
containers = append(containers, &ContainerInPod{Spec: c})
}
for _, c := range pod.Spec.EphemeralContainers {
c := Container(c.EphemeralContainerCommon)
containers = append(containers, &ContainerInPod{Spec: c})
}
statuses := make(map[string]*PodContainerStatus)
mapStatuses := func(s []PodContainerStatus) {
for i := range s {
statuses[s[i].Name] = &s[i]
}
}
mapStatuses(pod.Status.ContainerStatuses)
mapStatuses(pod.Status.InitContainerStatuses)
mapStatuses(pod.Status.EphemeralContainerStatuses)
for _, c := range containers {
if s, ok := statuses[c.Spec.Name]; ok {
c.ID, c.Runtime = ContainerIDWithRuntime(*s)
c.Status = *s
}
}
return containers
}
// PodAnnotations returns the annotations in a pod
func PodAnnotations(pod *Pod) common.MapStr {
annotations := common.MapStr{}
for k, v := range pod.GetObjectMeta().GetAnnotations() {
safemapstr.Put(annotations, k, v)
}
return annotations
}
// PodNamespaceAnnotations returns the annotations of the namespace of the pod
func PodNamespaceAnnotations(pod *Pod, watcher Watcher) common.MapStr {
if watcher == nil {
return nil
}
rawNs, ok, err := watcher.Store().GetByKey(pod.Namespace)
if !ok || err != nil {
return nil
}
namespace, ok := rawNs.(*Namespace)
if !ok {
return nil
}
annotations := common.MapStr{}
for k, v := range namespace.GetAnnotations() {
safemapstr.Put(annotations, k, v)
}
return annotations
}
// PodTerminating returns true if a pod is marked for deletion or is in a phase beyond running.
func PodTerminating(pod *Pod) bool {
if pod.GetObjectMeta().GetDeletionTimestamp() != nil {
return true
}
switch pod.Status.Phase {
case PodRunning, PodPending:
default:
return true
}
return false
}
// PodTerminated returns true if a pod is terminated, this method considers a
// pod as terminated if none of its containers are running (or going to be running).
func PodTerminated(pod *Pod, containers []*ContainerInPod) bool {
// Pod is not marked for termination, so it is not terminated.
if !PodTerminating(pod) {
return false
}
// If any container is running, the pod is not terminated yet.
for _, container := range containers {
if container.Status.State.Running != nil {
return false
}
}
return true
}