-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
manager.go
325 lines (295 loc) · 11.4 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package ciliumendpointslice
import (
"github.com/sirupsen/logrus"
cilium_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
"github.com/cilium/cilium/pkg/k8s/resource"
"github.com/cilium/cilium/pkg/logging/logfields"
)
var (
// sequentialLetters contains lower case alphabets without vowels and few numbers.
// skipped vowels and numbers [0, 1] to avoid generating controversial names.
sequentialLetters = []rune("bcdfghjklmnpqrstvwxyz2456789")
)
// operations is an interface to all operations that a CES manager can perform.
type operations interface {
// External APIs to Insert/Remove CEP in local dataStore
UpdateCEPMapping(cep *cilium_v2.CoreCiliumEndpoint, ns string) []CESName
RemoveCEPMapping(cep *cilium_v2.CoreCiliumEndpoint, ns string) CESName
initializeMappingForCES(ces *cilium_v2.CiliumEndpointSlice) CESName
initializeMappingCEPtoCES(cep *cilium_v2.CoreCiliumEndpoint, ns string, ces CESName)
getCEPCountInCES(ces CESName) int
getCEPinCES(ces CESName) []CEPName
getCESData(ces CESName) CESData
isCEPinCES(cep CEPName, ces CESName) bool
}
// cesMgr is used to batch CEP into a CES, based on FirstComeFirstServe. If a new CEP
// is inserted, then the CEP is queued in any one of the available CES. CEPs are
// inserted into CESs without any preference or any priority.
type cesMgr struct {
logger logrus.FieldLogger
// mapping is used to map CESName to CESTracker[i.e. list of CEPs],
// as well as CEPName to CESName.
mapping *CESToCEPMapping
// maxCEPsInCES is the maximum number of CiliumCoreEndpoint(s) packed in
// a CiliumEndpointSlice Resource.
maxCEPsInCES int
}
// cesManagerFcfs use cesMgr by design, it inherits all the methods from the base cesMgr and there is no
// special handling required for cesManagerFcfs.
// cesManagerFcfs indicates ciliumEndpoints are batched based on FirstComeFirtServe algorithm.
// refer cesMgr comments for more information.
type cesManagerFcfs struct {
cesMgr
}
// cesManagerIdentity is used to batch CEPs in CES based on CEP identity.
type cesManagerIdentity struct {
cesMgr
// CEP identity to cesTracker map
identityToCES map[int64][]CESName
// reverse map of identityToCES i.e. cesName to CEP identity
cesToIdentity map[CESName]int64
}
// newCESManagerFcfs creates and initializes a new FirstComeFirstServe based CES
// manager, in this mode CEPs are batched based on FirstComeFirtServe algorithm.
func newCESManagerFcfs(maxCEPsInCES int, logger logrus.FieldLogger) operations {
return &cesManagerFcfs{
cesMgr{
logger: logger,
mapping: newCESToCEPMapping(),
maxCEPsInCES: maxCEPsInCES,
},
}
}
// newCESManagerIdentity creates and initializes a new Identity based manager.
func newCESManagerIdentity(maxCEPsInCES int, logger logrus.FieldLogger) operations {
return &cesManagerIdentity{
cesMgr: cesMgr{
logger: logger,
mapping: newCESToCEPMapping(),
maxCEPsInCES: maxCEPsInCES,
},
identityToCES: make(map[int64][]CESName),
cesToIdentity: make(map[CESName]int64),
}
}
// This function create a new ces and capacity to hold maximum ceps in a CES.
// This is called in 2 different scenarios:
// 1. During runtime, when ces manager decides to create a new ces, it calls
// with an empty name, it generates a random unique name and assign it to the CES.
// 2. During operator warm boot [after crash or software upgrade], slicing manager
// creates a CES, by passing unique name.
func (c *cesMgr) createCES(name, ns string) CESName {
if name == "" {
name = uniqueCESliceName(c.mapping)
}
cesName := NewCESName(name)
c.mapping.insertCES(cesName, ns)
c.logger.WithFields(logrus.Fields{
logfields.CESName: cesName.string(),
}).Debug("Generated CES")
return cesName
}
// UpdateCEPMapping is used to insert CEP in local cache, this may result in creating a new
// CES object or updating an existing CES object.
func (c *cesManagerFcfs) UpdateCEPMapping(cep *cilium_v2.CoreCiliumEndpoint, ns string) []CESName {
cepName := GetCEPNameFromCCEP(cep, ns)
c.logger.WithFields(logrus.Fields{
logfields.CEPName: cepName.string(),
}).Debug("Insert CEP in local cache")
// check the given cep is already exists in any of the CES.
// if yes, Update a ces with the given cep object.
cesName, exists := c.mapping.getCESName(cepName)
if exists {
c.logger.WithFields(logrus.Fields{
logfields.CEPName: cepName.string(),
logfields.CESName: cesName.string(),
}).Debug("CEP already mapped to CES")
return []CESName{cesName}
}
// Get the largest available CES.
// This ensures the minimum number of CES updates, as the CESs will be
// consistently filled up in order.
cesName = c.getLargestAvailableCESForNamespace(ns)
if cesName.Name == "" {
cesName = c.createCES("", ns)
}
c.mapping.insertCEP(cepName, cesName)
c.logger.WithFields(logrus.Fields{
logfields.CEPName: cepName.string(),
logfields.CESName: cesName.string(),
}).Debug("CEP mapped to CES")
return []CESName{cesName}
}
func (c *cesManagerFcfs) RemoveCEPMapping(cep *cilium_v2.CoreCiliumEndpoint, ns string) CESName {
cepName := GetCEPNameFromCCEP(cep, ns)
c.logger.WithFields(logrus.Fields{
logfields.CEPName: cepName.string(),
}).Debug("Removing CEP from local cache")
cesName, exists := c.mapping.getCESName(cepName)
if exists {
c.logger.WithFields(logrus.Fields{
logfields.CEPName: cepName.string(),
logfields.CESName: cesName.string(),
}).Debug("Removing CEP from CES")
c.mapping.deleteCEP(cepName)
if c.mapping.countCEPsInCES(cesName) == 0 {
c.mapping.deleteCES(cesName)
}
return cesName
}
return CESName(resource.Key{})
}
// getLargestAvailableCESForNamespace returns the largest CES from cache for the
// specified namespace that has at least 1 CEP and 1 available spot (less than
// maximum CEPs). If it is not found, a nil is returned.
func (c *cesManagerFcfs) getLargestAvailableCESForNamespace(ns string) CESName {
largestCEPCount := 0
selectedCES := CESName(resource.Key{})
for _, ces := range c.mapping.getAllCESs() {
cepCount := c.mapping.countCEPsInCES(ces)
if cepCount < c.maxCEPsInCES && cepCount > largestCEPCount && c.mapping.getCESData(ces).ns == ns {
selectedCES = ces
largestCEPCount = cepCount
if largestCEPCount == c.maxCEPsInCES-1 {
break
}
}
}
return selectedCES
}
// UpdateCEPMapping is used to insert CEP in local cache, this may result in creating a new
// CES object or updating an existing CES object. CEPs are grouped based on CEP identity.
func (c *cesManagerIdentity) UpdateCEPMapping(cep *cilium_v2.CoreCiliumEndpoint, ns string) []CESName {
// check the given cep is already exists in any of the CES.
// if yes, compare the given CEP Identity with the CEPs stored in CES.
// If they are same UPDATE the CEP in the CES. This will trigger CES UPDATE to k8s-apiserver.
// If the Identities differ, remove the CEP from the existing CES
// and find a new CES to batch the given CEP in a CES. This will trigger following actions,
// 1) CES UPDATE to k8s-apiserver, removing CEP in old CES
// 2) CES CREATE to k8s-apiserver, inserting the given CEP in a new CES or
// 3) CES UPDATE to k8s-apiserver, inserting the given CEP in existing CES
cepName := GetCEPNameFromCCEP(cep, ns)
c.logger.WithFields(logrus.Fields{
logfields.CEPName: cepName.string(),
}).Debug("Insert CEP in local cache")
var cesName CESName
var exists bool
removedFromCES := CESName(resource.Key{})
if cesName, exists = c.mapping.getCESName(cepName); exists {
if c.cesToIdentity[cesName] != cep.IdentityID {
c.logger.WithFields(logrus.Fields{
logfields.CEPName: cepName.string(),
logfields.CESName: cesName.string(),
logfields.OldIdentity: c.cesToIdentity[cesName],
logfields.Identity: cep.IdentityID,
}).Debug("CEP already mapped to CES but identity has changed")
removedFromCES = cesName
c.mapping.deleteCEP(cepName)
} else {
c.logger.WithFields(logrus.Fields{
logfields.CEPName: cepName.string(),
logfields.CESName: cesName.string(),
}).Debug("CEP already mapped to CES")
return []CESName{cesName}
}
}
// If given cep object isn't packed in any of the CES. find a new ces
// to pack this cep.
cesName = c.getLargestAvailableCESForIdentity(cep.IdentityID, ns)
if cesName.Name == "" {
cesName = c.createCES("", ns)
// Update the identityToCES and cesToIdentity maps respectively.
c.identityToCES[cep.IdentityID] = append(c.identityToCES[cep.IdentityID], cesName)
c.cesToIdentity[cesName] = cep.IdentityID
}
c.mapping.insertCEP(cepName, cesName)
c.logger.WithFields(logrus.Fields{
logfields.CEPName: cepName.string(),
logfields.CESName: cesName.string(),
}).Debug("CEP mapped to CES")
return []CESName{removedFromCES, cesName}
}
func (c *cesManagerIdentity) getLargestAvailableCESForIdentity(id int64, ns string) CESName {
largestCEPCount := 0
selectedCES := CESName(resource.Key{})
if cess, exist := c.identityToCES[id]; exist {
for _, ces := range cess {
cepCount := c.mapping.countCEPsInCES(ces)
if cepCount < c.maxCEPsInCES && cepCount > largestCEPCount && c.mapping.getCESData(ces).ns == ns {
selectedCES = ces
largestCEPCount = cepCount
if largestCEPCount == c.maxCEPsInCES-1 {
break
}
}
}
}
return selectedCES
}
func (c *cesManagerIdentity) RemoveCEPMapping(cep *cilium_v2.CoreCiliumEndpoint, ns string) CESName {
cepName := GetCEPNameFromCCEP(cep, ns)
c.logger.WithFields(logrus.Fields{
logfields.CEPName: cepName.string(),
}).Debug("Removing CEP from local cache")
cesName, exists := c.mapping.getCESName(cepName)
if exists {
c.logger.WithFields(logrus.Fields{
logfields.CEPName: cepName.string(),
logfields.CESName: cesName.string(),
}).Debug("Removing CEP from CES")
c.mapping.deleteCEP(cepName)
if c.mapping.countCEPsInCES(cesName) == 0 {
c.removeCESToIdentity(cep.IdentityID, cesName)
c.mapping.deleteCES(cesName)
}
return cesName
}
return CESName(resource.Key{})
}
func (c *cesManagerIdentity) removeCESToIdentity(id int64, cesName CESName) {
cesSlice := c.identityToCES[id]
removed := 0
for i, ces := range cesSlice {
if ces == cesName {
cesSlice[i] = cesSlice[len(cesSlice)-1]
removed = removed + 1
}
}
if removed < len(cesSlice) {
c.identityToCES[id] = cesSlice[:len(cesSlice)-removed]
} else {
delete(c.identityToCES, id)
}
delete(c.cesToIdentity, cesName)
}
// initializeMappingCEPtoCES overrides the same method on cesMgr and is used to
// populate the local cache for the given CEP, including identity-related maps
// specific to the cesManagerIdentity.
func (c *cesManagerIdentity) initializeMappingCEPtoCES(cep *cilium_v2.CoreCiliumEndpoint, ns string, ces CESName) {
cepName := GetCEPNameFromCCEP(cep, ns)
c.mapping.insertCEP(cepName, ces)
c.identityToCES[cep.IdentityID] = append(c.identityToCES[cep.IdentityID], ces)
c.cesToIdentity[ces] = cep.IdentityID
}
func (c *cesMgr) initializeMappingForCES(ces *cilium_v2.CiliumEndpointSlice) CESName {
return c.createCES(ces.Name, ces.Namespace)
}
func (c *cesMgr) initializeMappingCEPtoCES(cep *cilium_v2.CoreCiliumEndpoint, ns string, ces CESName) {
cepName := GetCEPNameFromCCEP(cep, ns)
c.mapping.insertCEP(cepName, ces)
}
func (c *cesMgr) getCEPCountInCES(ces CESName) int {
return c.mapping.countCEPsInCES(ces)
}
func (c *cesMgr) getCESData(ces CESName) CESData {
return c.mapping.getCESData(ces)
}
func (c *cesMgr) getCEPinCES(ces CESName) []CEPName {
return c.mapping.getCEPsInCES(ces)
}
func (c *cesMgr) isCEPinCES(cep CEPName, ces CESName) bool {
mappedCES, exists := c.mapping.getCESName(cep)
return exists && mappedCES.Name == ces.Name
}