/
roles.go
640 lines (595 loc) · 19.5 KB
/
roles.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
// Copyright 2019 Hewlett Packard Enterprise Development LP
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package kubedirectorcluster
import (
"fmt"
"strconv"
"sync/atomic"
"github.com/go-logr/logr"
kdv1 "github.com/bluek8s/kubedirector/pkg/apis/kubedirector/v1beta1"
"github.com/bluek8s/kubedirector/pkg/executor"
"github.com/bluek8s/kubedirector/pkg/observer"
"github.com/bluek8s/kubedirector/pkg/shared"
"k8s.io/apimachinery/pkg/api/errors"
)
// syncClusterRoles is responsible for dealing with roles being changed, added,
// or removed. It is the only function in this file that is invoked from another
// file (from the syncCluster function in cluster.go). Managing role changes
// may result in operations on k8s statefulsets. This function will also
// modify the role status data structures, and create a role info slice that
// can be referenced by the later syncs for other concerns.
func syncClusterRoles(
reqLogger logr.Logger,
cr *kdv1.KubeDirectorCluster,
) ([]*roleInfo, clusterStateInternal, error) {
// Construct the role info slice. Bail out now if that fails.
roles, rolesErr := initRoleInfo(reqLogger, cr)
if rolesErr != nil {
return nil, clusterMembersUnknown, rolesErr
}
// Role changes will be postponed if any members are currently in the
// creating state. Such members may have been informed of the current
// member set, and they are not yet ready to receive updates about
// changes to the member set.
for _, r := range roles {
if len(r.membersByState[memberCreating]) != 0 {
return roles, clusterMembersStableUnready, nil
}
}
// Assume cluster is stable until found otherwise.
allMembersReady := true
anyMembersChanged := false
// Reconcile each role as necessary.
for _, r := range roles {
switch {
case r.statefulSet == nil && r.roleStatus == nil:
// Role did not previously exist. Create it now.
createErr := handleRoleCreate(
reqLogger, cr, r, &anyMembersChanged)
if createErr != nil {
return nil, clusterMembersUnknown, createErr
}
case r.statefulSet == nil && r.roleStatus != nil:
// Role exists but there is no statefulset for it in k8s.
// Hmm, weird. Statefulset was deleted out-of-band? Let's fix.
reCreateErr := handleRoleReCreate(reqLogger, cr, r, &anyMembersChanged)
if reCreateErr != nil {
return nil, clusterMembersUnknown, reCreateErr
}
case r.statefulSet != nil && r.roleStatus != nil:
// Deal with an existing role and statefulset.
// First see if we need to reconcile any out-of-band statefulset
// changes.
handleRoleConfig(reqLogger, cr, r)
// Now check for desired changes in role population.
if len(r.roleStatus.Members) == 0 && r.desiredPop == 0 {
// Role is going away and we have finished removing pods.
handleRoleDelete(reqLogger, cr, r)
} else {
// Might need to change role population.
handleRoleResize(reqLogger, cr, r, &anyMembersChanged)
}
case r.statefulSet != nil && r.roleStatus == nil:
// "Can't happen" ... there should be no way to find the
// statefulset unless we have a role status.
panicMsg := fmt.Sprintf(
"StatefulSet{%s} for KubeDirectorCluster{%s/%s} has no role status",
r.statefulSet.Name,
cr.Namespace,
cr.Name,
)
panic(panicMsg)
}
if !allRoleMembersReadyOrError(cr, r) {
allMembersReady = false
}
}
// Let the caller know about significant changes that happened.
var returnState clusterStateInternal
if anyMembersChanged {
returnState = clusterMembersChangedUnready
} else {
if allMembersReady {
returnState = clusterMembersStableReady
} else {
returnState = clusterMembersStableUnready
}
}
return roles, returnState, nil
}
// initRoleInfo constructs a slice of elements representing all current or
// desired roles. Each element contains useful information about the role
// spec and status that will be used not only in syncRole but also by the
// sync logic for other concerns.
func initRoleInfo(
reqLogger logr.Logger,
cr *kdv1.KubeDirectorCluster,
) ([]*roleInfo, error) {
roles := make(map[string]*roleInfo)
numRoleSpecs := len(cr.Spec.Roles)
numRoleStatuses := len(cr.Status.Roles)
// Capture the desired roles and member count in the spec. The fields
// statefulSet, roleStatus, and membersByState may be populated later
// in this function.
for i := 0; i < numRoleSpecs; i++ {
roleSpec := &(cr.Spec.Roles[i])
roles[roleSpec.Name] = &roleInfo{
statefulSet: nil,
roleSpec: roleSpec,
roleStatus: nil,
membersByState: make(map[memberState][]*kdv1.MemberStatus),
desiredPop: int(*(roleSpec.Members)),
}
}
// We're about to start grabbing pointers into the role status slice,
// and we may have to add to that slice later. So let's grow its capacity
// now. We know that at most we will need to add a number of role statuses
// equal to the number of role specs.
newRoleStatuses := make(
[]kdv1.RoleStatus,
numRoleStatuses,
numRoleStatuses+numRoleSpecs,
)
copy(newRoleStatuses, cr.Status.Roles)
cr.Status.Roles = newRoleStatuses
// Now look at the existing roles we have status for. Update or add to
// the role info accordingly.
for i := 0; i < numRoleStatuses; i++ {
roleStatus := &(cr.Status.Roles[i])
statefulSet, statefulSetErr := observer.GetStatefulSet(
cr.Namespace,
roleStatus.StatefulSet,
)
if statefulSetErr != nil {
if errors.IsNotFound(statefulSetErr) {
statefulSet = nil
} else {
shared.LogErrorf(
reqLogger,
statefulSetErr,
cr,
shared.EventReasonRole,
"failed to query StatefulSet{%s} for role{%s}",
roleStatus.StatefulSet,
roleStatus.Name,
)
return nil, statefulSetErr
}
}
if role, ok := roles[roleStatus.Name]; ok {
// This role is in the spec. Update the roleinfo with the
// statefulset pointer (if any) and the role status pointer.
role.statefulSet = statefulSet
role.roleStatus = roleStatus
// If we might add to the role status members slice later,
// increase its capacity. Similarly to the overall role status
// slice, we want to make sure we can have stable pointers into
// this slice.
numMembers := len(roleStatus.Members)
if role.desiredPop > numMembers {
newMembers := make(
[]kdv1.MemberStatus,
numMembers,
role.desiredPop,
)
copy(newMembers, roleStatus.Members)
roleStatus.Members = newMembers
}
} else {
// This is not a role desired in the spec. Create a new info
// entry with desired member count at zero.
roles[roleStatus.Name] = &roleInfo{
statefulSet: statefulSet,
roleSpec: nil,
roleStatus: roleStatus,
membersByState: make(map[memberState][]*kdv1.MemberStatus),
desiredPop: 0,
}
}
}
// Return a slice of roleinfo made from the map values, and with the
// membersByState maps populated.
var result []*roleInfo
for _, info := range roles {
calcRoleMembersByState(info)
result = append(result, info)
}
return result, nil
}
// calcRoleMembersByState builds the members-by-state map based on the current
// member statuses in the role.
func calcRoleMembersByState(
role *roleInfo,
) {
if role.roleStatus == nil {
return
}
numMembers := len(role.roleStatus.Members)
for i := 0; i < numMembers; i++ {
member := &(role.roleStatus.Members[i])
role.membersByState[memberState(member.State)] = append(
role.membersByState[memberState(member.State)],
member)
}
}
// handleRoleCreate deals with a newly specified role. If the desired population
// is nonzero then it will create an associated statefulset and create the
// role status and its member statuses (initially as create pending). Failure
// to create a statefulset will be a reconciler-stopping error.
func handleRoleCreate(
reqLogger logr.Logger,
cr *kdv1.KubeDirectorCluster,
role *roleInfo,
anyMembersChanged *bool,
) error {
if role.desiredPop == 0 {
// Nothing to do if zero members desired... we won't even create
// the statefulset or the role status.
return nil
}
shared.LogInfof(
reqLogger,
cr,
shared.EventReasonRole,
"creating role{%s}",
role.roleSpec.Name,
)
nativeSystemdSupport := shared.GetNativeSystemdSupport()
// Create the associated statefulset.
statefulSet, createErr := executor.CreateStatefulSet(
reqLogger,
cr,
nativeSystemdSupport,
role.roleSpec,
role.roleStatus,
)
if createErr != nil {
// Not much to do if we can't create it... we'll just keep trying
// on every run through the reconciler.
shared.LogErrorf(
reqLogger,
createErr,
cr,
shared.EventReasonRole,
"failed to create StatefulSet for role{%s}",
role.roleSpec.Name,
)
return createErr
}
// OK we have the statefulset, so set up the role and member status.
*anyMembersChanged = true
role.statefulSet = statefulSet
if role.roleStatus == nil {
newRoleStatus := kdv1.RoleStatus{
Name: role.roleSpec.Name,
StatefulSet: statefulSet.Name,
Members: make([]kdv1.MemberStatus, 0, role.desiredPop),
}
// cr.Status.Roles was created with enough capacity to avoid
// realloc, so we can safely grow it w/o disturbing our
// pointers to its elements.
cr.Status.Roles = append(cr.Status.Roles, newRoleStatus)
role.roleStatus = &(cr.Status.Roles[len(cr.Status.Roles)-1])
} else {
role.roleStatus.StatefulSet = statefulSet.Name
}
addMemberStatuses(cr, role)
return nil
}
// handleRoleReCreate deals with the unusual-but-possible case of the role
// status existing but the statefulset gone missing. It may need to clean up
// the role status or re-create the statefulset. Failure to create a
// statefulset will be a reconciler-stopping error.
func handleRoleReCreate(
reqLogger logr.Logger,
cr *kdv1.KubeDirectorCluster,
role *roleInfo,
anyMembersChanged *bool,
) error {
if len(role.roleStatus.Members) == 0 {
// No lingering pod status to deal with.
if role.desiredPop == 0 {
// Looks like the role should be gone anyway, so mark it for removal.
role.roleStatus.StatefulSet = ""
} else {
// Create a new statefulset for the role.
return handleRoleCreate(reqLogger, cr, role, anyMembersChanged)
}
} else {
// Need to clean up from the old status before we make a new
// statefulset. For any pods that had reached "ready" or "error" state,
// we should mark them first as "delete pending" -- we know we have notified
// other pods of these creations, so we should now notify of deletions.
// For other pods we can just move them straight into deleting state.
numMembers := len(role.roleStatus.Members)
for i := 0; i < numMembers; i++ {
member := &(role.roleStatus.Members[i])
switch memberState(member.State) {
case memberDeletePending:
fallthrough
case memberReady:
fallthrough
case memberConfigError:
if member.State != string(memberDeletePending) {
member.State = string(memberDeletePending)
*anyMembersChanged = true
}
default:
if member.State != string(memberDeleting) {
member.State = string(memberDeleting)
*anyMembersChanged = true
}
}
}
if *anyMembersChanged {
shared.LogInfof(
reqLogger,
cr,
shared.EventReasonRole,
"restoring role{%s}",
role.roleStatus.Name,
)
}
// This should be quite unusual so we won't try to be clever about
// updating the membersByState map. Just nuke and re-create it.
role.membersByState = make(map[memberState][]*kdv1.MemberStatus)
calcRoleMembersByState(role)
}
return nil
}
// handleRoleConfig checks an existing statefulset to see if any of its
// important properties (other than replicas count) need to be reconciled.
// Failure to reconcile will not be treated as a reconciler-stopping error; we'll
// just try again next time.
func handleRoleConfig(
reqLogger logr.Logger,
cr *kdv1.KubeDirectorCluster,
role *roleInfo,
) {
updateErr := executor.UpdateStatefulSetNonReplicas(
reqLogger,
cr,
role.roleSpec,
role.statefulSet)
if updateErr != nil {
shared.LogErrorf(
reqLogger,
updateErr,
cr,
shared.EventReasonRole,
"failed to update StatefulSet{%s}",
role.statefulSet.Name,
)
}
}
// handleRoleDelete takes care of deleting the associated statefulset after
// the role members have been cleaned up. Failure to delete will not be
// treated as a reconciler-stopping error; we'll just try again next time.
func handleRoleDelete(
reqLogger logr.Logger,
cr *kdv1.KubeDirectorCluster,
role *roleInfo,
) {
shared.LogInfof(
reqLogger,
cr,
shared.EventReasonRole,
"finishing cleanup on role{%s}",
role.roleStatus.Name,
)
deleteErr := executor.DeleteStatefulSet(cr.Namespace, role.statefulSet.Name)
if deleteErr == nil || errors.IsNotFound(deleteErr) {
// Mark the role status for removal.
role.roleStatus.StatefulSet = ""
} else {
shared.LogErrorf(
reqLogger,
deleteErr,
cr,
shared.EventReasonRole,
"failed to delete StatefulSet{%s}",
role.statefulSet.Name,
)
}
}
// handleRoleResize deals with roles that already have corresponding
// statefulsets in k8s. If the desired population is different than the
// current member count it may need adjust the role/member status to start
// the resize process.
func handleRoleResize(
reqLogger logr.Logger,
cr *kdv1.KubeDirectorCluster,
role *roleInfo,
anyMembersChanged *bool,
) {
// We won't even be attempting a resize if there are any creating-state
// members, so the current set of "requested members" is just ready plus
// create pending.
prevDesiredPop :=
len(role.membersByState[memberReady]) +
len(role.membersByState[memberConfigError]) +
len(role.membersByState[memberCreatePending])
if role.desiredPop == prevDesiredPop {
return
}
if role.desiredPop > prevDesiredPop {
// Only expand if no members are in delete pending or deleting states;
// we can't use expand to "rescue" a member that is currently being
// deleted. (The way statefulsets reuse FQDNs, we might be able to get
// away with that actually, but let's not complicate things.)
if len(role.roleStatus.Members) == prevDesiredPop {
shared.LogInfof(
reqLogger,
cr,
shared.EventReasonRole,
"expanding role{%s}",
role.roleStatus.Name,
)
*anyMembersChanged = true
addMemberStatuses(cr, role)
}
} else {
// We can shrink in any state. This is a helpful thing to allow when
// the expand was overambitious and is waiting for resources.
shared.LogInfof(
reqLogger,
cr,
shared.EventReasonRole,
"shrinking role{%s}",
role.roleStatus.Name,
)
*anyMembersChanged = true
deleteMemberStatuses(role)
}
}
// addMemberStatuses adds member statuses to a role, in create pending state,
// to bring it up to the desired number of members. It also updates the
// members-by-state map accordingly.
func addMemberStatuses(
cr *kdv1.KubeDirectorCluster,
role *roleInfo,
) {
lastNodeID := &cr.Status.LastNodeID
currentPop := len(role.roleStatus.Members)
for i := currentPop; i < role.desiredPop; i++ {
indexString := strconv.Itoa(i)
// Pod name and PVC name will be generated by K8s in a predictable
// way, so go ahead and populate those here.
memberName := role.roleStatus.StatefulSet + "-" + indexString
var pvcName string
if role.roleSpec.Storage == nil {
pvcName = ""
} else {
pvcName = executor.PvcNamePrefix + "-" + memberName
}
// check if there is block device to be mounted in the member.
// assign path value if there is else it'd be an empty string
var blockDevPaths []string
if role.roleSpec.BlockStorage != nil {
numDevices := *role.roleSpec.BlockStorage.NumDevices
pathPrefix := *role.roleSpec.BlockStorage.Path
for i := int32(0); i < numDevices; i++ {
blockDevPath := pathPrefix + strconv.FormatInt(int64(i), 10)
blockDevPaths = append(blockDevPaths, blockDevPath)
}
}
// role.roleStatus.Members was created with enough capacity to
// avoid realloc, so we can safely grow it w/o disturbing our
// pointers to its elements.
role.roleStatus.Members = append(
role.roleStatus.Members,
kdv1.MemberStatus{
Pod: memberName,
Service: "",
PVC: pvcName,
NodeID: atomic.AddInt64(lastNodeID, 1),
State: string(memberCreatePending),
BlockDevicePaths: blockDevPaths,
},
)
role.membersByState[memberCreatePending] = append(
role.membersByState[memberCreatePending],
&(role.roleStatus.Members[i]))
}
}
// deleteMemberStatuses changes member statuses in a role by moving them from
// to delete pending state (if currently ready) or deleting state (if currently
// create pending or creating), to prepare to shrink the role to the desired
// number of members. It also updates the members-by-state map accordingly.
func deleteMemberStatuses(
role *roleInfo,
) {
currentPop := len(role.roleStatus.Members)
createPendingPop := len(role.membersByState[memberCreatePending])
readyPop := len(role.membersByState[memberReady])
errorPop := len(role.membersByState[memberConfigError])
// Don't need to worry about creating-state members, since if any existed
// we wouldn't be able to make role changes.
for i := role.desiredPop; i < currentPop; i++ {
member := &(role.roleStatus.Members[i])
switch memberState(member.State) {
case memberCreatePending:
member.State = string(memberDeleting)
role.membersByState[memberDeleting] = append(
role.membersByState[memberDeleting],
member,
)
createPendingPop--
case memberReady:
member.State = string(memberDeletePending)
role.membersByState[memberDeletePending] = append(
role.membersByState[memberDeletePending],
member,
)
readyPop--
case memberConfigError:
member.State = string(memberDeletePending)
role.membersByState[memberDeletePending] = append(
role.membersByState[memberDeletePending],
member,
)
errorPop--
default:
}
}
if createPendingPop > 0 {
role.membersByState[memberCreatePending] =
role.membersByState[memberCreatePending][:createPendingPop]
} else {
delete(role.membersByState, memberCreatePending)
}
if readyPop > 0 {
role.membersByState[memberReady] =
role.membersByState[memberReady][:readyPop]
} else {
delete(role.membersByState, memberReady)
}
if errorPop > 0 {
role.membersByState[memberConfigError] =
role.membersByState[memberConfigError][:errorPop]
} else {
delete(role.membersByState, memberConfigError)
}
}
// allRoleMembersReadyOrError examines the members-by-state map and returns
// whether all existing members are in the ready-state or error-state bucket.
// (The situation of "no members" will also return true.) Ready members are
// also checked to make sure they have processed all updates.
func allRoleMembersReadyOrError(
cr *kdv1.KubeDirectorCluster,
role *roleInfo,
) bool {
switch len(role.membersByState) {
case 0:
return true
default:
for state, members := range role.membersByState {
if state != memberReady && state != memberConfigError {
return false
}
if state == memberReady {
for _, m := range members {
if (m.StateDetail.LastSetupGeneration != nil) &&
(cr.Status.SpecGenerationToProcess != nil) {
if *m.StateDetail.LastSetupGeneration !=
*cr.Status.SpecGenerationToProcess {
return false
}
}
if len(m.StateDetail.PendingNotifyCmds) != 0 {
return false
}
}
}
}
return true
}
}