-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
node_manager.go
457 lines (393 loc) · 14.9 KB
/
node_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
// Copyright 2019-2020 Authors of Cilium
// Copyright 2017 Lyft, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ipam
import (
"context"
"fmt"
"sort"
"time"
"github.com/cilium/cilium/pkg/controller"
ipamTypes "github.com/cilium/cilium/pkg/ipam/types"
v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
"github.com/cilium/cilium/pkg/lock"
"github.com/cilium/cilium/pkg/trigger"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)
// CiliumNodeGetterUpdater defines the interface used to interact with the k8s
// apiserver to retrieve and update the CiliumNode custom resource
type CiliumNodeGetterUpdater interface {
Create(node *v2.CiliumNode) (*v2.CiliumNode, error)
Update(origResource, newResource *v2.CiliumNode) (*v2.CiliumNode, error)
UpdateStatus(origResource, newResource *v2.CiliumNode) (*v2.CiliumNode, error)
Get(name string) (*v2.CiliumNode, error)
Delete(name string) error
}
// NodeOperations is the interface an IPAM implementation must provide in order
// to provide IP allocation for a node. The structure implementing this API
// *must* be aware of the node connected to this implementation. This is
// achieved by considering the node context provided in
// AllocationImplementation.CreateNode() function and returning a
// NodeOperations implementation which performs operations in the context of
// that node.
type NodeOperations interface {
// UpdateNode is called when an update to the CiliumNode is received.
UpdatedNode(obj *v2.CiliumNode)
// PopulateStatusFields is called to give the implementation a chance
// to populate any implementation specific fields in CiliumNode.Status.
PopulateStatusFields(resource *v2.CiliumNode)
// CreateInterface is called to create a new interface. This is only
// done if PrepareIPAllocation indicates that no more IPs are available
// (AllocationAction.AvailableForAllocation == 0) for allocation but
// interfaces are available for creation
// (AllocationAction.AvailableInterfaces > 0). This function must
// create the interface *and* allocate up to
// AllocationAction.MaxIPsToAllocate.
CreateInterface(ctx context.Context, allocation *AllocationAction, scopedLog *logrus.Entry) (int, string, error)
// ResyncInterfacesAndIPs is called to synchronize the latest list of
// interfaces and IPs associated with the node. This function is called
// sparingly as this information is kept in sync based on the success
// of the functions AllocateIPs(), ReleaseIPs() and CreateInterface().
ResyncInterfacesAndIPs(ctx context.Context, scopedLog *logrus.Entry) (ipamTypes.AllocationMap, error)
// PrepareIPAllocation is called to calculate the number of IPs that
// can be allocated on the node and whether a new network interface
// must be attached to the node.
PrepareIPAllocation(scopedLog *logrus.Entry) (*AllocationAction, error)
// AllocateIPs is called after invoking PrepareIPAllocation and needs
// to perform the actual allocation.
AllocateIPs(ctx context.Context, allocation *AllocationAction) error
// PrepareIPRelease is called to calculate whether any IP excess needs
// to be resolved. It behaves identical to PrepareIPAllocation but
// indicates a need to release IPs.
PrepareIPRelease(excessIPs int, scopedLog *logrus.Entry) *ReleaseAction
// ReleaseIPs is called after invoking PrepareIPRelease and needs to
// perform the release of IPs.
ReleaseIPs(ctx context.Context, release *ReleaseAction) error
// GetMaximumAllocatableIPv4 returns the maximum amount of IPv4 addresses
// that can be allocated to the instance
GetMaximumAllocatableIPv4() int
}
// AllocationImplementation is the interface an implementation must provide.
// Other than NodeOperations, this implementation is not related to a node
// specifically.
type AllocationImplementation interface {
// CreateNode is called when the IPAM layer has learned about a new
// node which requires IPAM services. This function must return a
// NodeOperations implementation which will render IPAM services to the
// node context provided.
CreateNode(obj *v2.CiliumNode, node *Node) NodeOperations
// GetPoolQuota is called to retrieve the remaining IP addresses in all
// IP pools known to the IPAM implementation.
GetPoolQuota() ipamTypes.PoolQuotaMap
// Resync is called periodically to give the IPAM implementation a
// chance to resync its own state with external APIs or systems. It is
// also called when the IPAM layer detects that state got out of sync.
Resync(ctx context.Context) time.Time
}
// MetricsAPI represents the metrics being maintained by a NodeManager
type MetricsAPI interface {
IncAllocationAttempt(status, subnetID string)
AddIPAllocation(subnetID string, allocated int64)
AddIPRelease(subnetID string, released int64)
SetAllocatedIPs(typ string, allocated int)
SetAvailableInterfaces(available int)
SetAvailableIPsPerSubnet(subnetID string, availabilityZone string, available int)
SetNodes(category string, nodes int)
IncResyncCount()
PoolMaintainerTrigger() trigger.MetricsObserver
K8sSyncTrigger() trigger.MetricsObserver
ResyncTrigger() trigger.MetricsObserver
}
// nodeMap is a mapping of node names to ENI nodes
type nodeMap map[string]*Node
// NodeManager manages all nodes with ENIs
type NodeManager struct {
mutex lock.RWMutex
nodes nodeMap
instancesAPI AllocationImplementation
k8sAPI CiliumNodeGetterUpdater
metricsAPI MetricsAPI
resyncTrigger *trigger.Trigger
parallelWorkers int64
releaseExcessIPs bool
stableInstancesAPI bool
}
// NewNodeManager returns a new NodeManager
func NewNodeManager(instancesAPI AllocationImplementation, k8sAPI CiliumNodeGetterUpdater, metrics MetricsAPI, parallelWorkers int64, releaseExcessIPs bool) (*NodeManager, error) {
if parallelWorkers < 1 {
parallelWorkers = 1
}
mngr := &NodeManager{
nodes: nodeMap{},
instancesAPI: instancesAPI,
k8sAPI: k8sAPI,
metricsAPI: metrics,
parallelWorkers: parallelWorkers,
releaseExcessIPs: releaseExcessIPs,
}
resyncTrigger, err := trigger.NewTrigger(trigger.Parameters{
Name: "ipam-node-manager-resync",
MinInterval: 10 * time.Millisecond,
MetricsObserver: metrics.ResyncTrigger(),
TriggerFunc: func(reasons []string) {
if syncTime, ok := mngr.instancesAPIResync(context.TODO()); ok {
mngr.Resync(context.TODO(), syncTime)
}
},
})
if err != nil {
return nil, fmt.Errorf("unable to initialize resync trigger: %s", err)
}
mngr.resyncTrigger = resyncTrigger
// Assume readiness, the initial blocking resync in Start() will update
// the readiness
mngr.SetInstancesAPIReadiness(true)
return mngr, nil
}
func (n *NodeManager) instancesAPIResync(ctx context.Context) (time.Time, bool) {
syncTime := n.instancesAPI.Resync(ctx)
success := !syncTime.IsZero()
n.SetInstancesAPIReadiness(success)
return syncTime, success
}
// Start kicks of the NodeManager by performing the initial state
// synchronization and starting the background sync go routine
func (n *NodeManager) Start(ctx context.Context) error {
// Trigger the initial resync in a blocking manner
if _, ok := n.instancesAPIResync(ctx); !ok {
return fmt.Errorf("Initial synchronization with instances API failed")
}
// Start an interval based background resync for safety, it will
// synchronize the state regularly and resolve eventual deficit if the
// event driven trigger fails, and also release excess IP addresses
// if release-excess-ips is enabled
go func() {
time.Sleep(time.Minute)
mngr := controller.NewManager()
mngr.UpdateController("ipam-node-interval-refresh",
controller.ControllerParams{
RunInterval: time.Minute,
DoFunc: func(ctx context.Context) error {
if syncTime, ok := n.instancesAPIResync(ctx); ok {
n.Resync(ctx, syncTime)
}
return nil
},
})
}()
return nil
}
// SetInstancesAPIReadiness sets the readiness state of the instances API
func (n *NodeManager) SetInstancesAPIReadiness(ready bool) {
n.mutex.Lock()
n.stableInstancesAPI = ready
n.mutex.Unlock()
}
// InstancesAPIIsReady returns true if the instances API is stable and ready
func (n *NodeManager) InstancesAPIIsReady() bool {
n.mutex.Lock()
defer n.mutex.Unlock()
return n.stableInstancesAPI
}
// GetNames returns the list of all node names
func (n *NodeManager) GetNames() (allNodeNames []string) {
n.mutex.RLock()
defer n.mutex.RUnlock()
allNodeNames = make([]string, 0, len(n.nodes))
for name := range n.nodes {
allNodeNames = append(allNodeNames, name)
}
return
}
func (n *NodeManager) Create(resource *v2.CiliumNode) bool {
return n.Update(resource)
}
// Update is called whenever a CiliumNode resource has been updated in the
// Kubernetes apiserver
func (n *NodeManager) Update(resource *v2.CiliumNode) (nodeSynced bool) {
nodeSynced = true
n.mutex.Lock()
node, ok := n.nodes[resource.Name]
defer func() {
n.mutex.Unlock()
if nodeSynced {
nodeSynced = node.UpdatedResource(resource)
}
}()
if !ok {
node = &Node{
name: resource.Name,
manager: n,
}
node.ops = n.instancesAPI.CreateNode(resource, node)
poolMaintainer, err := trigger.NewTrigger(trigger.Parameters{
Name: fmt.Sprintf("ipam-pool-maintainer-%s", resource.Name),
MinInterval: 10 * time.Millisecond,
MetricsObserver: n.metricsAPI.PoolMaintainerTrigger(),
TriggerFunc: func(reasons []string) {
if err := node.MaintainIPPool(context.TODO()); err != nil {
node.logger().WithError(err).Warning("Unable to maintain ip pool of node")
}
},
})
if err != nil {
node.logger().WithError(err).Error("Unable to create pool-maintainer trigger")
return false
}
retry, err := trigger.NewTrigger(trigger.Parameters{
Name: fmt.Sprintf("ipam-pool-maintainer-%s-retry", resource.Name),
MinInterval: time.Minute, // large minimal interval to not retry too often
TriggerFunc: func(reasons []string) { poolMaintainer.Trigger() },
})
if err != nil {
node.logger().WithError(err).Error("Unable to create pool-maintainer-retry trigger")
return false
}
node.retry = retry
k8sSync, err := trigger.NewTrigger(trigger.Parameters{
Name: fmt.Sprintf("ipam-node-k8s-sync-%s", resource.Name),
MinInterval: 10 * time.Millisecond,
MetricsObserver: n.metricsAPI.K8sSyncTrigger(),
TriggerFunc: func(reasons []string) {
node.syncToAPIServer()
},
})
if err != nil {
poolMaintainer.Shutdown()
node.logger().WithError(err).Error("Unable to create k8s-sync trigger")
return false
}
node.poolMaintainer = poolMaintainer
node.k8sSync = k8sSync
n.nodes[node.name] = node
log.WithField(fieldName, resource.Name).Info("Discovered new CiliumNode custom resource")
}
return
}
// Delete is called after a CiliumNode resource has been deleted via the
// Kubernetes apiserver
func (n *NodeManager) Delete(nodeName string) {
n.mutex.Lock()
if node, ok := n.nodes[nodeName]; ok {
if node.poolMaintainer != nil {
node.poolMaintainer.Shutdown()
}
if node.k8sSync != nil {
node.k8sSync.Shutdown()
}
}
delete(n.nodes, nodeName)
n.mutex.Unlock()
}
// Get returns the node with the given name
func (n *NodeManager) Get(nodeName string) *Node {
n.mutex.RLock()
node := n.nodes[nodeName]
n.mutex.RUnlock()
return node
}
// GetNodesByIPWatermark returns all nodes that require addresses to be
// allocated or released, sorted by the number of addresses needed to be operated
// in descending order. Number of addresses to be released is negative value
// so that nodes with IP deficit are resolved first
func (n *NodeManager) GetNodesByIPWatermark() []*Node {
n.mutex.RLock()
list := make([]*Node, len(n.nodes))
index := 0
for _, node := range n.nodes {
list[index] = node
index++
}
n.mutex.RUnlock()
sort.Slice(list, func(i, j int) bool {
valuei := list[i].GetNeededAddresses()
valuej := list[j].GetNeededAddresses()
// Number of addresses to be released is negative value,
// nodes with more excess addresses are released earlier
if valuei < 0 && valuej < 0 {
return valuei < valuej
}
return valuei > valuej
})
return list
}
type resyncStats struct {
mutex lock.Mutex
totalUsed int
totalAvailable int
totalNeeded int
remainingInterfaces int
nodes int
nodesAtCapacity int
nodesInDeficit int
}
func (n *NodeManager) resyncNode(ctx context.Context, node *Node, stats *resyncStats, syncTime time.Time) {
node.updateLastResync(syncTime)
node.recalculate()
allocationNeeded := node.allocationNeeded()
releaseNeeded := node.releaseNeeded()
if allocationNeeded || releaseNeeded {
node.requirePoolMaintenance()
node.poolMaintainer.Trigger()
}
nodeStats := node.Stats()
stats.mutex.Lock()
stats.totalUsed += nodeStats.UsedIPs
availableOnNode := nodeStats.AvailableIPs - nodeStats.UsedIPs
stats.totalAvailable += availableOnNode
stats.totalNeeded += nodeStats.NeededIPs
stats.remainingInterfaces += nodeStats.RemainingInterfaces
stats.nodes++
if allocationNeeded {
stats.nodesInDeficit++
}
if nodeStats.RemainingInterfaces == 0 && availableOnNode == 0 {
stats.nodesAtCapacity++
}
stats.mutex.Unlock()
node.k8sSync.Trigger()
}
// Resync will attend all nodes and resolves IP deficits. The order of
// attendance is defined by the number of IPs needed to reach the configured
// watermarks. Any updates to the node resource are synchronized to the
// Kubernetes apiserver.
func (n *NodeManager) Resync(ctx context.Context, syncTime time.Time) {
n.metricsAPI.IncResyncCount()
stats := resyncStats{}
sem := semaphore.NewWeighted(n.parallelWorkers)
for _, node := range n.GetNodesByIPWatermark() {
err := sem.Acquire(ctx, 1)
if err != nil {
continue
}
go func(node *Node, stats *resyncStats) {
n.resyncNode(ctx, node, stats, syncTime)
sem.Release(1)
}(node, &stats)
}
// Acquire the full semaphore, this requires all go routines to
// complete and thus blocks until all nodes are synced
sem.Acquire(ctx, n.parallelWorkers)
n.metricsAPI.SetAllocatedIPs("used", stats.totalUsed)
n.metricsAPI.SetAllocatedIPs("available", stats.totalAvailable)
n.metricsAPI.SetAllocatedIPs("needed", stats.totalNeeded)
n.metricsAPI.SetAvailableInterfaces(stats.remainingInterfaces)
n.metricsAPI.SetNodes("total", stats.nodes)
n.metricsAPI.SetNodes("in-deficit", stats.nodesInDeficit)
n.metricsAPI.SetNodes("at-capacity", stats.nodesAtCapacity)
for poolID, quota := range n.instancesAPI.GetPoolQuota() {
n.metricsAPI.SetAvailableIPsPerSubnet(string(poolID), quota.AvailabilityZone, quota.AvailableIPs)
}
}