-
Notifications
You must be signed in to change notification settings - Fork 367
/
interface_cache.go
298 lines (266 loc) · 10 KB
/
interface_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
// Copyright 2019 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package interfacestore
import (
"fmt"
"sync"
"k8s.io/client-go/tools/cache"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/util/k8s"
)
const (
// interfaceNameIndex is the index built with InterfaceConfig.InterfaceName.
interfaceNameIndex = "interfaceName"
// interfaceTypeIndex is the index built with InterfaceConfig.Type.
interfaceTypeIndex = "interfaceType"
// containerIDIndex is the index built with InterfaceConfig.ContainerID.
// Only container interfaces will be indexed.
// One containerID should get at most one interface in theory.
containerIDIndex = "containerID"
// podIndex is the index built with InterfaceConfig.PodNamespace + Podname.
// Only container interfaces will be indexed.
// One Pod may get more than one interface.
podIndex = "pod"
// interfaceIPIndex is the index built with InterfaceConfig.IP
// Only the interfaces with IP get indexed.
interfaceIPIndex = "ip"
// ofPortIndex is the index built with InterfaceConfig.OFPort
ofPortIndex = "ofPort"
)
// Local cache for interfaces created on node, including container, host gateway, and tunnel
// ports, `Type` field is used to differentiate interface category.
// 1) For container interface, the fields should include: containerID, podName, Namespace,
// IP, MAC, and OVS Port configurations.
// 2) For host gateway port, the fields should include: name, IP, MAC, and OVS port
// configurations.
// 3) For tunnel port, the fields include: name and tunnel type; and for an IPsec tunnel,
// additionally: remoteIP, PSK and remote Node name.
// OVS Port configurations include PortUUID and OFPort.
// Container interface is added into cache after invocation of cniserver.CmdAdd, and removed
// from cache after invocation of cniserver.CmdDel. For cniserver.CmdCheck, the server would
// check previousResult with local cache.
// Host gateway and the default tunnel interfaces are added into cache in node initialization
// phase or retrieved from existing OVS ports.
// An IPsec tunnel interface is added into the cache when IPsec encyption is enabled, and
// NodeRouteController watches a new remote Node from K8s API, and is removed when the remote
// Node is deleted.
// Todo: add periodic task to sync local cache with container veth pair
type interfaceCache struct {
sync.RWMutex
cache cache.Indexer
}
func (c *interfaceCache) Initialize(interfaces []*InterfaceConfig) {
for _, intf := range interfaces {
c.cache.Add(intf)
if intf.Type == ContainerInterface {
metrics.PodCount.Inc()
}
}
}
// getInterfaceKey returns the key to access interfaceConfig from the cache.
// It implements cache.KeyFunc.
func getInterfaceKey(obj interface{}) (string, error) {
interfaceConfig := obj.(*InterfaceConfig)
var key string
if interfaceConfig.Type == ContainerInterface {
key = util.GenerateContainerInterfaceKey(interfaceConfig.ContainerID)
} else if interfaceConfig.Type == TunnelInterface && interfaceConfig.NodeName != "" {
// Tunnel interface for a Node.
key = util.GenerateNodeTunnelInterfaceKey(interfaceConfig.NodeName)
} else {
// Use the interface name as the key by default.
key = interfaceConfig.InterfaceName
}
return key, nil
}
// AddInterface adds interfaceConfig into local cache.
func (c *interfaceCache) AddInterface(interfaceConfig *InterfaceConfig) {
c.Lock()
defer c.Unlock()
c.cache.Add(interfaceConfig)
if interfaceConfig.Type == ContainerInterface {
metrics.PodCount.Inc()
}
}
// DeleteInterface deletes interface from local cache.
func (c *interfaceCache) DeleteInterface(interfaceConfig *InterfaceConfig) {
c.Lock()
defer c.Unlock()
c.cache.Delete(interfaceConfig)
if interfaceConfig.Type == ContainerInterface {
metrics.PodCount.Dec()
}
}
// GetInterface retrieves interface from local cache given the interface key.
func (c *interfaceCache) GetInterface(interfaceKey string) (*InterfaceConfig, bool) {
c.RLock()
defer c.RUnlock()
iface, found, _ := c.cache.GetByKey(interfaceKey)
if !found {
return nil, false
}
return iface.(*InterfaceConfig), found
}
// GetInterfaceByName retrieves interface from local cache given the interface
// name.
func (c *interfaceCache) GetInterfaceByName(interfaceName string) (*InterfaceConfig, bool) {
c.RLock()
defer c.RUnlock()
interfaceConfigs, _ := c.cache.ByIndex(interfaceNameIndex, interfaceName)
if len(interfaceConfigs) == 0 {
return nil, false
}
return interfaceConfigs[0].(*InterfaceConfig), true
}
// GetInterfaceByIP retrieves interface from local cache given the interface IP.
func (c *interfaceCache) GetInterfaceByIP(interfaceIP string) (*InterfaceConfig, bool) {
c.RLock()
defer c.RUnlock()
interfaceConfigs, _ := c.cache.ByIndex(interfaceIPIndex, interfaceIP)
if len(interfaceConfigs) == 0 {
return nil, false
}
return interfaceConfigs[0].(*InterfaceConfig), true
}
func (c *interfaceCache) GetContainerInterfaceNum() int {
c.RLock()
defer c.RUnlock()
keys, _ := c.cache.IndexKeys(interfaceTypeIndex, ContainerInterface.String())
return len(keys)
}
func (c *interfaceCache) GetInterfacesByType(interfaceType InterfaceType) []*InterfaceConfig {
c.RLock()
defer c.RUnlock()
objs, _ := c.cache.ByIndex(interfaceTypeIndex, interfaceType.String())
interfaces := make([]*InterfaceConfig, len(objs))
for i := range objs {
interfaces[i] = objs[i].(*InterfaceConfig)
}
return interfaces
}
func (c *interfaceCache) Len() int {
c.RLock()
defer c.RUnlock()
return len(c.cache.ListKeys())
}
func (c *interfaceCache) GetInterfaceKeysByType(interfaceType InterfaceType) []string {
c.RLock()
defer c.RUnlock()
keys, _ := c.cache.IndexKeys(interfaceTypeIndex, interfaceType.String())
return keys
}
// GetContainerInterface retrieves InterfaceConfig by the given container ID.
func (c *interfaceCache) GetContainerInterface(containerID string) (*InterfaceConfig, bool) {
c.RLock()
defer c.RUnlock()
objs, _ := c.cache.ByIndex(containerIDIndex, containerID)
if len(objs) == 0 {
return nil, false
}
return objs[0].(*InterfaceConfig), true
}
func (c *interfaceCache) GetInterfacesByEntity(name, namespace string) []*InterfaceConfig {
return c.GetContainerInterfacesByPod(name, namespace)
}
// GetContainerInterfacesByPod retrieves InterfaceConfigs for the Pod.
// It's possible that more than one container interface (with different containerIDs) has the same Pod namespace and
// name temporarily when the previous Pod is being deleted and the new Pod is being created almost simultaneously.
// https://github.com/antrea-io/antrea/issues/785#issuecomment-642051884
func (c *interfaceCache) GetContainerInterfacesByPod(podName string, podNamespace string) []*InterfaceConfig {
c.RLock()
defer c.RUnlock()
objs, _ := c.cache.ByIndex(podIndex, k8s.NamespacedName(podNamespace, podName))
interfaces := make([]*InterfaceConfig, len(objs))
for i := range objs {
interfaces[i] = objs[i].(*InterfaceConfig)
}
return interfaces
}
// GetNodeTunnelInterface retrieves InterfaceConfig for the tunnel to the Node.
func (c *interfaceCache) GetNodeTunnelInterface(nodeName string) (*InterfaceConfig, bool) {
key := util.GenerateNodeTunnelInterfaceKey(nodeName)
c.RLock()
defer c.RUnlock()
obj, ok, _ := c.cache.GetByKey(key)
if !ok {
return nil, false
}
return obj.(*InterfaceConfig), true
}
// GetInterfaceByOFPort retrieves InterfaceConfig by the given ofPort number.
func (c *interfaceCache) GetInterfaceByOFPort(ofPort uint32) (*InterfaceConfig, bool) {
c.RLock()
defer c.RUnlock()
ofportStr := fmt.Sprintf("%d", ofPort)
interfaceConfigs, _ := c.cache.ByIndex(ofPortIndex, ofportStr)
if len(interfaceConfigs) == 0 {
return nil, false
}
return interfaceConfigs[0].(*InterfaceConfig), true
}
func interfaceNameIndexFunc(obj interface{}) ([]string, error) {
interfaceConfig := obj.(*InterfaceConfig)
return []string{interfaceConfig.InterfaceName}, nil
}
func interfaceTypeIndexFunc(obj interface{}) ([]string, error) {
interfaceConfig := obj.(*InterfaceConfig)
return []string{interfaceConfig.Type.String()}, nil
}
func containerIDIndexFunc(obj interface{}) ([]string, error) {
interfaceConfig := obj.(*InterfaceConfig)
if interfaceConfig.Type != ContainerInterface {
return []string{}, nil
}
return []string{interfaceConfig.ContainerID}, nil
}
func podIndexFunc(obj interface{}) ([]string, error) {
interfaceConfig := obj.(*InterfaceConfig)
if interfaceConfig.Type != ContainerInterface {
return []string{}, nil
}
return []string{k8s.NamespacedName(interfaceConfig.PodNamespace, interfaceConfig.PodName)}, nil
}
func interfaceIPIndexFunc(obj interface{}) ([]string, error) {
interfaceConfig := obj.(*InterfaceConfig)
if interfaceConfig.IPs == nil {
// If interfaceConfig IP is not set, we return empty key.
return []string{}, nil
}
var intfIPs []string
for _, ip := range interfaceConfig.IPs {
intfIPs = append(intfIPs, ip.String())
}
return intfIPs, nil
}
func interfaceOFPortIndexFunc(obj interface{}) ([]string, error) {
interfaceConfig := obj.(*InterfaceConfig)
if interfaceConfig.OFPort < 0 {
// If interfaceConfig OFport is not valid, we return empty key.
return []string{}, nil
}
return []string{fmt.Sprintf("%d", interfaceConfig.OFPort)}, nil
}
func NewInterfaceStore() InterfaceStore {
return &interfaceCache{
cache: cache.NewIndexer(getInterfaceKey, cache.Indexers{
interfaceNameIndex: interfaceNameIndexFunc,
interfaceTypeIndex: interfaceTypeIndexFunc,
containerIDIndex: containerIDIndexFunc,
podIndex: podIndexFunc,
interfaceIPIndex: interfaceIPIndexFunc,
ofPortIndex: interfaceOFPortIndexFunc,
}),
}
}