/
k8sapi.go
125 lines (108 loc) · 3.16 KB
/
k8sapi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package cache
import (
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
clog "github.com/coredns/coredns/plugin/pkg/log"
)
type k8sAPI struct {
// Client cache for the Kubernetes API
store kcache.Store
reflector *kcache.Reflector
reflectorChan chan struct{}
// Kubernetes credentials (copied from Kubernetes plugin)
APIServerList []string
APICertAuth string
APIClientCert string
APIClientKey string
}
func newK8sAPI() (*k8sAPI, error) {
k := new(k8sAPI)
clientset, err := k.getKubernetesClient()
if err != nil {
return k, err
}
optionsModifier := func(options *metav1.ListOptions) {
options.LabelSelector = "k8s-cache.coredns.io/early-refresh=true"
}
lw := kcache.NewFilteredListWatchFromClient(
clientset.CoreV1().RESTClient(),
"pods",
metav1.NamespaceAll,
optionsModifier,
)
k.store, k.reflector = kcache.NewNamespaceKeyedIndexerAndReflector(lw, &v1.Pod{}, time.Second*10)
k.reflectorChan = make(chan struct{})
go k.reflector.Run(k.reflectorChan)
return k, nil
}
func (k *k8sAPI) getKubernetesClient() (*kubernetes.Clientset, error) {
config, err := k.getClientConfig()
if err != nil {
return nil, err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return clientset, nil
}
// Copied from the getClientConfig method of the kubernetes plugin
func (k *k8sAPI) getClientConfig() (*rest.Config, error) {
loadingRules := &clientcmd.ClientConfigLoadingRules{}
overrides := &clientcmd.ConfigOverrides{}
clusterinfo := clientcmdapi.Cluster{}
authinfo := clientcmdapi.AuthInfo{}
// Connect to API from in cluster
if len(k.APIServerList) == 0 {
cc, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
cc.ContentType = "application/vnd.kubernetes.protobuf"
return cc, err
}
// Connect to API from out of cluster
// Only the first one is used. We will deprecate multiple endpoints later.
clusterinfo.Server = k.APIServerList[0]
if len(k.APICertAuth) > 0 {
clusterinfo.CertificateAuthority = k.APICertAuth
}
if len(k.APIClientCert) > 0 {
authinfo.ClientCertificate = k.APIClientCert
}
if len(k.APIClientKey) > 0 {
authinfo.ClientKey = k.APIClientKey
}
overrides.ClusterInfo = clusterinfo
overrides.AuthInfo = authinfo
clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides)
cc, err := clientConfig.ClientConfig()
if err != nil {
return nil, err
}
cc.ContentType = "application/vnd.kubernetes.protobuf"
return cc, err
}
// Get all IP addresses of all pods selected by k.reflector, i.e. those who should receive early cache refreshes.
func (k *k8sAPI) getEarlyRefreshIPs() []string {
items := k.store.List()
ips := make([]string, 0, len(items))
for _, item := range items {
pod, ok := item.(*v1.Pod)
if !ok {
log := clog.NewWithPlugin("k8s_cache")
log.Errorf("Cache item is not a *v1.Pod")
return nil
}
for ip := range pod.Status.PodIPs {
ips = append(ips, pod.Status.PodIPs[ip].IP)
}
}
return ips
}