-
Notifications
You must be signed in to change notification settings - Fork 362
/
antrea_ipam.go
364 lines (317 loc) · 11.4 KB
/
antrea_ipam.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
// Copyright 2021 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ipam
import (
"fmt"
"net"
"sync"
"time"
"github.com/containernetworking/cni/pkg/invoke"
cnitypes "github.com/containernetworking/cni/pkg/types"
current "github.com/containernetworking/cni/pkg/types/100"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"antrea.io/antrea/pkg/agent/cniserver/types"
crdv1a2 "antrea.io/antrea/pkg/apis/crd/v1alpha2"
"antrea.io/antrea/pkg/ipam/poolallocator"
)
const (
AntreaIPAMType = "antrea"
)
// Antrea IPAM driver would allocate IP addresses according to object IPAM annotation,
// if present. If annotation is not present, the driver will delegate functionality
// to traditional IPAM driver.
type AntreaIPAM struct {
controller *AntreaIPAMController
controllerMutex sync.RWMutex
}
// Global variable is needed to work around order of initialization
// Controller will be assigned to the driver after it is initialized
// by agent init.
var antreaIPAMDriver *AntreaIPAM
type mineType uint8
const (
mineUnknown mineType = iota
mineFalse
mineTrue
)
// Resource needs to be unique since it is used as identifier in Del.
// Therefore Container ID is used, while Pod/Namespace are shown for visibility.
func getAllocationOwner(args *invoke.Args, k8sArgs *types.K8sArgs, reservedOwner *crdv1a2.IPAddressOwner, secondary bool) crdv1a2.IPAddressOwner {
podOwner := &crdv1a2.PodOwner{
Name: string(k8sArgs.K8S_POD_NAME),
Namespace: string(k8sArgs.K8S_POD_NAMESPACE),
ContainerID: args.ContainerID,
}
if secondary {
// Add interface name for secondary network to uniquely identify
// the secondary network interface.
podOwner.IFName = args.IfName
}
if reservedOwner != nil {
owner := *reservedOwner
owner.Pod = podOwner
return owner
}
return crdv1a2.IPAddressOwner{
Pod: podOwner,
}
}
// Helper to generate IP config and default route, taking IP version into account
func generateIPConfig(ip net.IP, prefixLength int, gwIP net.IP) (*current.IPConfig, *cnitypes.Route) {
ipAddrBits := 32
dstNet := net.IPNet{
IP: net.ParseIP("0.0.0.0"),
Mask: net.CIDRMask(0, ipAddrBits),
}
if ip.To4() == nil {
ipAddrBits = 128
dstNet = net.IPNet{
IP: net.ParseIP("::0"),
Mask: net.CIDRMask(0, ipAddrBits),
}
}
defaultRoute := cnitypes.Route{
Dst: dstNet,
GW: gwIP,
}
ipConfig := current.IPConfig{
Address: net.IPNet{IP: ip, Mask: net.CIDRMask(int(prefixLength), ipAddrBits)},
Gateway: gwIP,
}
return &ipConfig, &defaultRoute
}
func parseStaticAddresses(ipamConfig *types.IPAMConfig) error {
for i := range ipamConfig.Addresses {
ip, addr, err := net.ParseCIDR(ipamConfig.Addresses[i].Address)
if err != nil {
return fmt.Errorf("invalid address %s", ipamConfig.Addresses[i].Address)
}
ipamConfig.Addresses[i].IPNet = *addr
ipamConfig.Addresses[i].IPNet.IP = ip
if ip.To4() != nil {
ipamConfig.Addresses[i].Version = "4"
} else {
ipamConfig.Addresses[i].Version = "6"
}
}
return nil
}
func (d *AntreaIPAM) setController(controller *AntreaIPAMController) {
d.controllerMutex.Lock()
defer d.controllerMutex.Unlock()
d.controller = controller
}
// Add allocates next available IP address from associated IP Pool
// Allocated IP and associated resource are stored in IP Pool status
func (d *AntreaIPAM) Add(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig []byte) (bool, *IPAMResult, error) {
mine, allocator, ips, reservedOwner, err := d.owns(k8sArgs)
if err != nil {
return true, nil, err
}
if mine == mineFalse {
// pass this request to next driver
return false, nil, nil
}
owner := getAllocationOwner(args, k8sArgs, reservedOwner, false)
var ip net.IP
var subnetInfo *crdv1a2.SubnetInfo
if reservedOwner != nil {
ip, subnetInfo, err = allocator.AllocateReservedOrNext(crdv1a2.IPAddressPhaseAllocated, owner)
} else if len(ips) == 0 {
ip, subnetInfo, err = allocator.AllocateNext(crdv1a2.IPAddressPhaseAllocated, owner)
} else {
ip = ips[0]
subnetInfo, err = allocator.AllocateIP(ip, crdv1a2.IPAddressPhaseAllocated, owner)
}
if err != nil {
return true, nil, err
}
klog.V(4).InfoS("IP allocation successful", "IP", ip.String(), "Pod", string(k8sArgs.K8S_POD_NAME))
result := IPAMResult{Result: current.Result{CNIVersion: current.ImplementedSpecVersion}, VLANID: subnetInfo.VLAN}
gwIP := net.ParseIP(subnetInfo.Gateway)
ipConfig, defaultRoute := generateIPConfig(ip, int(subnetInfo.PrefixLength), gwIP)
result.Routes = append(result.Routes, defaultRoute)
result.IPs = append(result.IPs, ipConfig)
return true, &result, nil
}
// Del deletes IP associated with resource from IP Pool status
func (d *AntreaIPAM) Del(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig []byte) (bool, error) {
owner := getAllocationOwner(args, k8sArgs, nil, false)
foundAllocation, err := d.del(owner.Pod)
if err != nil {
// Let the invoker retry at error.
return true, err
}
// If no allocation found, pass CNI DEL to the next driver.
return foundAllocation, nil
}
// Check verifues IP associated with resource is tracked in IP Pool status
func (d *AntreaIPAM) Check(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig []byte) (bool, error) {
mine, allocator, _, _, err := d.owns(k8sArgs)
if err != nil {
return true, err
}
if mine == mineFalse {
// pass this request to next driver
return false, nil
}
ip, err := allocator.GetContainerIP(args.ContainerID, "")
if err != nil {
return true, err
}
if ip == nil {
return true, fmt.Errorf("no IP Address association found for container %s", string(k8sArgs.K8S_POD_NAME))
}
return true, nil
}
func (d *AntreaIPAM) secondaryNetworkAdd(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) (*current.Result, error) {
ipamConf := networkConfig.IPAM
numPools := len(ipamConf.IPPools)
if err := parseStaticAddresses(ipamConf); err != nil {
return nil, fmt.Errorf("failed to parse static addresses in the IPAM config: %v", err)
}
if numPools == 0 && len(ipamConf.Addresses) == 0 {
return nil, fmt.Errorf("at least one Antrea IPPool or static address must be specified")
}
result := ¤t.Result{}
if numPools > 0 {
if err := d.waitForControllerReady(); err != nil {
// Return error to let the invoker retry.
return nil, err
}
owner := getAllocationOwner(args, k8sArgs, nil, true)
var allocatorsToRelease []*poolallocator.IPPoolAllocator
defer func() {
for _, allocator := range allocatorsToRelease {
// Try to release the allocated IPs after an error.
allocator.ReleaseContainer(owner.Pod.ContainerID, owner.Pod.IFName)
}
}()
for _, p := range ipamConf.IPPools {
allocator, err := d.controller.getPoolAllocatorByName(p)
if err != nil {
return nil, err
}
var ip net.IP
var subnetInfo *crdv1a2.SubnetInfo
ip, subnetInfo, err = allocator.AllocateNext(crdv1a2.IPAddressPhaseAllocated, owner)
if err != nil {
return nil, err
}
if numPools > 1 {
allocatorsToRelease = append(allocatorsToRelease, allocator)
}
gwIP := net.ParseIP(subnetInfo.Gateway)
ipConfig, _ := generateIPConfig(ip, int(subnetInfo.PrefixLength), gwIP)
// CNI spec 0.2.0 and below support only one v4 and one v6 address. But we
// assume the CNI version >= 0.3.0, and so do not check the number of
// addresses.
result.IPs = append(result.IPs, ipConfig)
}
// No failed allocation, so do not release allocated IPs.
allocatorsToRelease = nil
}
// Add static addresses.
for _, a := range ipamConf.Addresses {
result.IPs = append(result.IPs, ¤t.IPConfig{
Address: a.IPNet,
Gateway: a.Gateway})
}
// Copy routes and DNS from the input IPAM configuration.
result.Routes = ipamConf.Routes
result.DNS = ipamConf.DNS
return result, nil
}
func (d *AntreaIPAM) secondaryNetworkDel(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) error {
owner := getAllocationOwner(args, k8sArgs, nil, true)
_, err := d.del(owner.Pod)
return err
}
func (d *AntreaIPAM) secondaryNetworkCheck(args *invoke.Args, k8sArgs *types.K8sArgs, networkConfig *types.NetworkConfig) error {
return fmt.Errorf("CNI CHECK is not implemented for secondary network")
}
func (d *AntreaIPAM) del(podOwner *crdv1a2.PodOwner) (foundAllocation bool, err error) {
if err := d.waitForControllerReady(); err != nil {
// Return error to let the invoker retry.
return false, err
}
// The Pod resource might have been removed; and for a secondary we
// would rely on the passed IPPool for CNI DEL. So, search IPPools with
// the matched PodOwner.
allocators, err := d.controller.getPoolAllocatorsByOwner(podOwner)
if err != nil {
return false, err
}
if len(allocators) == 0 {
return false, nil
}
// Multiple allocators can be returned if the network interface has IPs
// allocated from more than one IPPools.
for _, a := range allocators {
err = a.ReleaseContainer(podOwner.ContainerID, podOwner.IFName)
if err != nil {
return true, err
}
}
return true, nil
}
// owns checks whether this driver owns coming IPAM request. This decision is based on
// Antrea IPAM annotation for the resource (only Namespace annotation is supported as
// of today). If annotation is not present, or annotated IP Pool not found, the driver
// will not own the request and fall back to next IPAM driver.
// return types:
// mineUnknown + PodNotFound error
// mineUnknown + InvalidIPAnnotation error
// mineFalse + nil error
// mineTrue + timeout error
// mineTrue + IPPoolNotFound error
// mineTrue + nil error
func (d *AntreaIPAM) owns(k8sArgs *types.K8sArgs) (mineType, *poolallocator.IPPoolAllocator, []net.IP, *crdv1a2.IPAddressOwner, error) {
// Wait controller ready to avoid inappropriate behavior on CNI request
if err := d.waitForControllerReady(); err != nil {
// Return mineTrue to make this request failed and kubelet will retry.
return mineTrue, nil, nil, nil, err
}
// As of today, only Namespace annotation is supported
// In future, Deployment, Statefulset and Pod annotations will be
// supported as well
namespace := string(k8sArgs.K8S_POD_NAMESPACE)
podName := string(k8sArgs.K8S_POD_NAME)
klog.V(2).InfoS("Inspecting IPAM annotation", "Namespace", namespace, "Pod", podName)
return d.controller.getPoolAllocatorByPod(namespace, podName)
}
func (d *AntreaIPAM) waitForControllerReady() error {
err := wait.PollImmediate(500*time.Millisecond, 5*time.Second, func() (bool, error) {
d.controllerMutex.RLock()
defer d.controllerMutex.RUnlock()
if d.controller == nil {
klog.Warningf("Antrea IPAM driver is not ready.")
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("Antrea IPAM driver not ready: %v", err)
}
return nil
}
func init() {
// Antrea driver must come first.
// NOTE: this is global variable that requires follow-up setup post agent initialization.
antreaIPAMDriver = &AntreaIPAM{}
RegisterIPAMDriver(AntreaIPAMType, antreaIPAMDriver)
// Host local plugin is fallback driver
RegisterIPAMDriver(AntreaIPAMType, &IPAMDelegator{pluginType: ipamHostLocal})
}