forked from cisco-app-networking/networkservicemesh
/
collector.go
161 lines (138 loc) · 4.09 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package prefixcollector
import (
// "context"
"os"
"strings"
// "github.com/pkg/errors"
"github.com/sirupsen/logrus"
// metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes"
//"k8s.io/api/apps/v1beta2"
"cisco-app-networking.github.io/networkservicemesh/sdk/prefix_pool"
)
const (
// ExcludedPrefixesEnv is the name of the env variable to define excluded prefixes
ExcludedPrefixesEnv = "EXCLUDED_PREFIXES"
)
func getExcludedPrefixesChan(clientset *kubernetes.Clientset) (<-chan prefix_pool.PrefixPool, error) {
prefixes := getExcludedPrefixesFromEnv()
// trying to get excludePrefixes from kubeadm-config, if it exists
if configMapPrefixes, err := getExcludedPrefixesFromConfigMap(clientset); err == nil {
poolCh := make(chan prefix_pool.PrefixPool, 1)
pool, err := prefix_pool.NewPrefixPool(append(prefixes, configMapPrefixes...)...)
if err != nil {
return nil, err
}
poolCh <- pool
return poolCh, nil
}
// seems like we don't have kubeadm-config in cluster, starting monitor client
return monitorSubnets(clientset, prefixes...), nil
}
func getExcludedPrefixesFromEnv() []string {
excludedPrefixesEnv, ok := os.LookupEnv(ExcludedPrefixesEnv)
if !ok {
return []string{}
}
logrus.Infof("Getting excludedPrefixes from ENV: %v", excludedPrefixesEnv)
return strings.Split(excludedPrefixesEnv, ",")
}
func getExcludedPrefixesFromConfigMap(clientset *kubernetes.Clientset) ([]string, error) {
return []string{},nil
/* kubeadmConfig, err := clientset.CoreV1().
ConfigMaps("kube-system").
Get(context.TODO(), "kubeadm-config", metav1.GetOptions{})
if err != nil {
logrus.Error(err)
return nil, err
}
clusterConfiguration := &v1beta2.ClusterConfiguration{}
err = yaml.NewYAMLOrJSONDecoder(strings.NewReader(kubeadmConfig.Data["ClusterConfiguration"]), 4096).
Decode(clusterConfiguration)
if err != nil {
return nil, err
}
podSubnet := clusterConfiguration.Networking.PodSubnet
serviceSubnet := clusterConfiguration.Networking.ServiceSubnet
if podSubnet == "" {
return nil, errors.New("ClusterConfiguration.Networking.PodSubnet is empty")
}
if serviceSubnet == "" {
return nil, errors.New("ClusterConfiguration.Networking.ServiceSubnet is empty")
}
return []string{
podSubnet,
serviceSubnet,
}, nil
*/
}
func monitorSubnets(clientset *kubernetes.Clientset, additionalPrefixes ...string) <-chan prefix_pool.PrefixPool {
logrus.Infof("Start monitoring prefixes to exclude")
poolCh := make(chan prefix_pool.PrefixPool, 1)
go func() {
for {
errCh := make(chan error)
go monitorReservedSubnets(poolCh, errCh, clientset, additionalPrefixes)
err := <-errCh
logrus.Error(err)
}
}()
return poolCh
}
func monitorReservedSubnets(poolCh chan prefix_pool.PrefixPool, errCh chan<- error, clientset *kubernetes.Clientset, additionalPrefixes []string) {
pw, err := WatchPodCIDR(clientset)
if err != nil {
errCh <- err
return
}
defer pw.Stop()
sw, err := WatchServiceIpAddr(clientset)
if err != nil {
errCh <- err
return
}
defer sw.Stop()
var podSubnet, serviceSubnet string
for {
select {
case subnet, ok := <-pw.ResultChan():
if !ok {
return
}
podSubnet = subnet.String()
case subnet, ok := <-sw.ResultChan():
if !ok {
return
}
serviceSubnet = subnet.String()
}
sendPrefixPool(poolCh, podSubnet, serviceSubnet, additionalPrefixes)
}
}
func sendPrefixPool(poolCh chan prefix_pool.PrefixPool, podSubnet, serviceSubnet string, additionalPrefixes []string) {
pool, err := getPrefixPool(podSubnet, serviceSubnet, additionalPrefixes)
if err != nil {
logrus.Errorf("Failed to create a prefix pool: %v", err)
return
}
select {
case <-poolCh:
default:
}
poolCh <- pool
}
func getPrefixPool(podSubnet, serviceSubnet string, additionalPrefixes []string) (prefix_pool.PrefixPool, error) {
prefixes := additionalPrefixes
if len(podSubnet) > 0 {
prefixes = append(prefixes, podSubnet)
}
if len(serviceSubnet) > 0 {
prefixes = append(prefixes, serviceSubnet)
}
pool, err := prefix_pool.NewPrefixPool(prefixes...)
if err != nil {
return nil, err
}
return pool, nil
}