-
Notifications
You must be signed in to change notification settings - Fork 212
/
partition.go
1503 lines (1384 loc) · 55.2 KB
/
partition.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
"github.com/looplab/fsm"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/configs"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/scheduler/objects"
"github.com/apache/yunikorn-core/pkg/scheduler/placement"
"github.com/apache/yunikorn-core/pkg/scheduler/policies"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
"github.com/apache/yunikorn-core/pkg/webservice/dao"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
type PartitionContext struct {
RmID string // the RM the partition belongs to
Name string // name of the partition (logging mainly)
// Private fields need protection
root *objects.Queue // start of the queue hierarchy
applications map[string]*objects.Application // applications assigned to this partition
completedApplications map[string]*objects.Application // completed applications from this partition
rejectedApplications map[string]*objects.Application // rejected applications from this partition
reservedApps map[string]int // applications reserved within this partition, with reservation count
nodes objects.NodeCollection // nodes assigned to this partition
placementManager *placement.AppPlacementManager // placement manager for this partition
partitionManager *partitionManager // manager for this partition
stateMachine *fsm.FSM // the state of the partition for scheduling
stateTime time.Time // last time the state was updated (needed for cleanup)
isPreemptable bool // can allocations be preempted
rules *[]configs.PlacementRule // placement rules to be loaded by the scheduler
userGroupCache *security.UserGroupCache // user cache per partition
totalPartitionResource *resources.Resource // Total node resources
allocations int // Number of allocations on the partition
stateDumpFilePath string // Path of output file for state dumps
ugm *ugm.Manager // User group manager
// The partition write lock must not be held while manipulating an application.
// Scheduling is running continuously as a lock free background task. Scheduling an application
// acquires a write lock of the application object. While holding the write lock a list of nodes is
// requested from the partition. This requires a read lock on the partition.
// If the partition write lock is held while manipulating an application a dead lock could occur.
// Since application objects handle their own locks there is no requirement to hold the partition lock
// while manipulating the application.
// Similarly adding, updating or removing a node or a queue should only hold the partition write lock
// while manipulating the partition information not while manipulating the underlying objects.
sync.RWMutex
}
func newPartitionContext(conf configs.PartitionConfig, rmID string, cc *ClusterContext) (*PartitionContext, error) {
if conf.Name == "" || rmID == "" {
log.Logger().Info("partition cannot be created",
zap.String("partition name", conf.Name),
zap.String("rmID", rmID),
zap.Any("cluster context", cc))
return nil, fmt.Errorf("partition cannot be created without name or RM, one is not set")
}
pc := &PartitionContext{
Name: conf.Name,
RmID: rmID,
stateMachine: objects.NewObjectState(),
stateTime: time.Now(),
applications: make(map[string]*objects.Application),
completedApplications: make(map[string]*objects.Application),
reservedApps: make(map[string]int),
nodes: objects.NewNodeCollection(conf.Name),
}
pc.partitionManager = newPartitionManager(pc, cc)
ugm.Init()
pc.ugm = ugm.GetUserManager()
if err := pc.initialPartitionFromConfig(conf); err != nil {
return nil, err
}
return pc, nil
}
// Initialise the partition
func (pc *PartitionContext) initialPartitionFromConfig(conf configs.PartitionConfig) error {
if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
return fmt.Errorf("partition cannot be created without root queue")
}
// Setup the queue structure: root first it should be the only queue at this level
// Add the rest of the queue structure recursively
queueConf := conf.Queues[0]
var err error
if pc.root, err = objects.NewConfiguredQueue(queueConf, nil); err != nil {
return err
}
// recursively add the queues to the root
if err = pc.addQueue(queueConf.Queues, pc.root); err != nil {
return err
}
log.Logger().Info("root queue added",
zap.String("partitionName", pc.Name),
zap.String("rmID", pc.RmID))
// set preemption needed flag
pc.isPreemptable = conf.Preemption.Enabled
pc.rules = &conf.PlacementRules
// We need to pass in the locked version of the GetQueue function.
// Placing an application will not have a lock on the partition context.
pc.placementManager = placement.NewPlacementManager(*pc.rules, pc.GetQueue)
// get the user group cache for the partition
// TODO get the resolver from the config
pc.userGroupCache = security.GetUserGroupCache("")
pc.updateNodeSortingPolicy(conf)
pc.updateStateDumpFilePath(conf)
return nil
}
// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
func (pc *PartitionContext) updateNodeSortingPolicy(conf configs.PartitionConfig) {
var configuredPolicy policies.SortingPolicy
configuredPolicy, err := policies.SortingPolicyFromString(conf.NodeSortPolicy.Type)
if err != nil {
log.Logger().Debug("NodeSorting policy incorrectly set or unknown",
zap.Error(err))
log.Logger().Info(fmt.Sprintf("NodeSorting policy not set using '%s' as default", configuredPolicy))
} else {
log.Logger().Info("NodeSorting policy set from config",
zap.String("policyName", configuredPolicy.String()))
}
pc.nodes.SetNodeSortingPolicy(objects.NewNodeSortingPolicy(conf.NodeSortPolicy.Type, conf.NodeSortPolicy.ResourceWeights))
}
// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
func (pc *PartitionContext) updateStateDumpFilePath(conf configs.PartitionConfig) {
stateDumpFilePath := pc.GetStateDumpFilePath()
if stateDumpFilePath != conf.StateDumpFilePath {
log.Logger().Info(fmt.Sprintf("State dump file path was %s, changing to %s", stateDumpFilePath, conf.StateDumpFilePath))
pc.stateDumpFilePath = conf.StateDumpFilePath
}
}
func (pc *PartitionContext) updatePartitionDetails(conf configs.PartitionConfig) error {
pc.Lock()
defer pc.Unlock()
if len(conf.Queues) == 0 || conf.Queues[0].Name != configs.RootQueue {
return fmt.Errorf("partition cannot be created without root queue")
}
if pc.placementManager.IsInitialised() {
log.Logger().Info("Updating placement manager rules on config reload")
err := pc.placementManager.UpdateRules(conf.PlacementRules)
if err != nil {
log.Logger().Info("New placement rules not activated, config reload failed", zap.Error(err))
return err
}
pc.rules = &conf.PlacementRules
} else {
log.Logger().Info("Creating new placement manager on config reload")
pc.rules = &conf.PlacementRules
// We need to pass in the locked version of the GetQueue function.
// Placing an application will not have a lock on the partition context.
pc.placementManager = placement.NewPlacementManager(*pc.rules, pc.GetQueue)
}
pc.updateNodeSortingPolicy(conf)
pc.updateStateDumpFilePath(conf)
// start at the root: there is only one queue
queueConf := conf.Queues[0]
root := pc.root
// update the root queue
if err := root.ApplyConf(queueConf); err != nil {
return err
}
root.UpdateSortType()
// update the rest of the queues recursively
return pc.updateQueues(queueConf.Queues, root)
}
// Process the config structure and create a queue info tree for this partition
func (pc *PartitionContext) addQueue(conf []configs.QueueConfig, parent *objects.Queue) error {
// create the queue at this level
for _, queueConf := range conf {
thisQueue, err := objects.NewConfiguredQueue(queueConf, parent)
if err != nil {
return err
}
// recursive create the queues below
if len(queueConf.Queues) > 0 {
err = pc.addQueue(queueConf.Queues, thisQueue)
if err != nil {
return err
}
}
}
return nil
}
// Update the passed in queues and then do this recursively for the children
//
// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
func (pc *PartitionContext) updateQueues(config []configs.QueueConfig, parent *objects.Queue) error {
// get the name of the passed in queue
parentPath := parent.QueuePath + configs.DOT
// keep track of which children we have updated
visited := map[string]bool{}
// walk over the queues recursively
for _, queueConfig := range config {
pathName := parentPath + queueConfig.Name
queue := pc.getQueueInternal(pathName)
var err error
if queue == nil {
queue, err = objects.NewConfiguredQueue(queueConfig, parent)
} else {
err = queue.ApplyConf(queueConfig)
}
if err != nil {
return err
}
// special call to convert to a real policy from the property
queue.UpdateSortType()
if err = pc.updateQueues(queueConfig.Queues, queue); err != nil {
return err
}
visited[queue.Name] = true
}
// remove all children that were not visited
for childName, childQueue := range parent.GetCopyOfChildren() {
if !visited[childName] {
childQueue.MarkQueueForRemoval()
}
}
return nil
}
// Mark the partition for removal from the system.
// This can be executed multiple times and is only effective the first time.
// The current cleanup sequence is "immediate". This is implemented to allow a graceful cleanup.
func (pc *PartitionContext) markPartitionForRemoval() {
if err := pc.handlePartitionEvent(objects.Remove); err != nil {
log.Logger().Error("failed to mark partition for deletion",
zap.String("partitionName", pc.Name),
zap.Error(err))
}
}
// Get the state of the partition.
// No new nodes and applications will be accepted if stopped or being removed.
func (pc *PartitionContext) isDraining() bool {
return pc.stateMachine.Current() == objects.Draining.String()
}
func (pc *PartitionContext) isRunning() bool {
return pc.stateMachine.Current() == objects.Active.String()
}
func (pc *PartitionContext) isStopped() bool {
return pc.stateMachine.Current() == objects.Stopped.String()
}
// Handle the state event for the partition.
// The state machine handles the locking.
func (pc *PartitionContext) handlePartitionEvent(event objects.ObjectEvent) error {
err := pc.stateMachine.Event(event.String(), pc.Name)
if err == nil {
pc.stateTime = time.Now()
return nil
}
// handle the same state transition not nil error (limit of fsm).
if err.Error() == "no transition" {
return nil
}
return err
}
// Get the placement manager. The manager could change when we process the configuration changes
// we thus need to lock.
func (pc *PartitionContext) getPlacementManager() *placement.AppPlacementManager {
pc.RLock()
defer pc.RUnlock()
return pc.placementManager
}
// Add a new application to the partition.
// NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
func (pc *PartitionContext) AddApplication(app *objects.Application) error {
if pc.isDraining() || pc.isStopped() {
return fmt.Errorf("partition %s is stopped cannot add a new application %s", pc.Name, app.ApplicationID)
}
// Check if the app exists
appID := app.ApplicationID
if pc.getApplication(appID) != nil {
return fmt.Errorf("adding application %s to partition %s, but application already existed", appID, pc.Name)
}
// Put app under the queue
queueName := app.GetQueuePath()
pm := pc.getPlacementManager()
if pm.IsInitialised() {
err := pm.PlaceApplication(app)
if err != nil {
return fmt.Errorf("failed to place application %s: %v", appID, err)
}
queueName = app.GetQueuePath()
if queueName == "" {
return fmt.Errorf("application rejected by placement rules: %s", appID)
}
}
// lock the partition and make the last change: we need to do this before creating the queues.
// queue cleanup might otherwise remove the queue again before we can add the application
pc.Lock()
defer pc.Unlock()
// we have a queue name either from placement or direct, get the queue
queue := pc.getQueueInternal(queueName)
if queue == nil {
// queue must exist if not using placement rules
if !pm.IsInitialised() {
return fmt.Errorf("application '%s' rejected, cannot create queue '%s' without placement rules", appID, queueName)
}
// with placement rules the hierarchy might not exist so try and create it
var err error
queue, err = pc.createQueue(queueName, app.GetUser())
if err != nil {
return fmt.Errorf("failed to create rule based queue %s for application %s", queueName, appID)
}
}
// check the queue: is a leaf queue with submit access
if !queue.IsLeafQueue() || !queue.CheckSubmitAccess(app.GetUser()) {
return fmt.Errorf("failed to find queue %s for application %s", queueName, appID)
}
// add the app to the queue to set the quota on the queue if needed
queue.AddApplication(app)
// check only for gang request
// - make sure the taskgroup request fits in the maximum set for the queue hierarchy
// - task groups should only be used in FIFO or StateAware queues
// if the check fails remove the app from the queue again
if placeHolder := app.GetPlaceholderAsk(); !resources.IsZero(placeHolder) {
// check the queue sorting
if !queue.SupportTaskGroup() {
queue.RemoveApplication(app)
return fmt.Errorf("queue %s cannot run application %s with task group request: unsupported sort type", queueName, appID)
}
// retrieve the max set
if maxQueue := queue.GetMaxQueueSet(); maxQueue != nil {
if !maxQueue.FitInMaxUndef(placeHolder) {
queue.RemoveApplication(app)
return fmt.Errorf("queue %s cannot fit application %s: task group request %s larger than max queue allocation %s", queueName, appID, placeHolder.String(), maxQueue.String())
}
}
}
// all is OK update the app and add it to the partition
app.SetQueue(queue)
app.SetTerminatedCallback(pc.moveTerminatedApp)
pc.applications[appID] = app
return nil
}
// Remove the application from the partition.
// This does not fail and handles missing app/queue/node/allocations internally
func (pc *PartitionContext) removeApplication(appID string) []*objects.Allocation {
// update the partition details, must be locked but all other updates should not hold partition lock
app := pc.removeAppInternal(appID)
if app == nil {
return nil
}
// Remove all asks and thus all reservations and pending resources (queue included)
_ = app.RemoveAllocationAsk("")
// Remove app from queue
if queue := app.GetQueue(); queue != nil {
queue.RemoveApplication(app)
}
// Remove all allocations
allocations := app.RemoveAllAllocations()
// Remove all allocations from node(s) (queues have been updated already)
if len(allocations) != 0 {
// track the number of allocations
pc.updateAllocationCount(-len(allocations))
for _, alloc := range allocations {
currentUUID := alloc.GetUUID()
node := pc.GetNode(alloc.GetNodeID())
if node == nil {
log.Logger().Warn("unknown node: not found in active node list",
zap.String("appID", appID),
zap.String("nodeID", alloc.GetNodeID()))
continue
}
if nodeAlloc := node.RemoveAllocation(currentUUID); nodeAlloc == nil {
log.Logger().Warn("unknown allocation: not found on the node",
zap.String("appID", appID),
zap.String("allocationId", currentUUID),
zap.String("nodeID", alloc.GetNodeID()))
}
}
}
return allocations
}
// Locked updates of the partition tracking info
func (pc *PartitionContext) removeAppInternal(appID string) *objects.Application {
pc.Lock()
defer pc.Unlock()
// Remove from applications map
app := pc.applications[appID]
if app == nil {
return nil
}
// remove from partition then cleanup underlying objects
delete(pc.applications, appID)
delete(pc.reservedApps, appID)
return app
}
func (pc *PartitionContext) getApplication(appID string) *objects.Application {
pc.RLock()
defer pc.RUnlock()
return pc.applications[appID]
}
func (pc *PartitionContext) getRejectedApplication(appID string) *objects.Application {
pc.RLock()
defer pc.RUnlock()
return pc.rejectedApplications[appID]
}
// Return a copy of the map of all reservations for the partition.
// This will return an empty map if there are no reservations.
// Visible for tests
func (pc *PartitionContext) getReservations() map[string]int {
pc.RLock()
defer pc.RUnlock()
reserve := make(map[string]int)
for key, num := range pc.reservedApps {
reserve[key] = num
}
return reserve
}
// Get the queue from the structure based on the fully qualified name.
// Wrapper around the unlocked version getQueueInternal()
// Visible by tests
func (pc *PartitionContext) GetQueue(name string) *objects.Queue {
pc.RLock()
defer pc.RUnlock()
return pc.getQueueInternal(name)
}
// Get the queue from the structure based on the fully qualified name.
// The name is not syntax checked and must be valid.
// Returns nil if the queue is not found otherwise the queue object.
//
// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
func (pc *PartitionContext) getQueueInternal(name string) *objects.Queue {
// start at the root
queue := pc.root
part := strings.Split(strings.ToLower(name), configs.DOT)
// no input
if len(part) == 0 || part[0] != configs.RootQueue {
return nil
}
// walk over the parts going down towards the requested queue
for i := 1; i < len(part); i++ {
// if child not found break out and return
if queue = queue.GetChildQueue(part[i]); queue == nil {
break
}
}
return queue
}
// Get the queue info for the whole queue structure to pass to the webservice
func (pc *PartitionContext) GetQueueInfo() dao.QueueDAOInfo {
return pc.root.GetQueueInfo()
}
// Get the queue info for the whole queue structure to pass to the webservice
func (pc *PartitionContext) GetPartitionQueues() dao.PartitionQueueDAOInfo {
var PartitionQueueDAOInfo = dao.PartitionQueueDAOInfo{}
PartitionQueueDAOInfo = pc.root.GetPartitionQueueDAOInfo()
PartitionQueueDAOInfo.Partition = common.GetPartitionNameWithoutClusterID(pc.Name)
return PartitionQueueDAOInfo
}
// Create a queue with full hierarchy. This is called when a new queue is created from a placement rule.
// The final leaf queue does not exist otherwise we would not get here.
// This means that at least 1 queue (a leaf queue) will be created
func (pc *PartitionContext) createQueue(name string, user security.UserGroup) (*objects.Queue, error) {
// find the queue furthest down the hierarchy that exists
var toCreate []string
if !strings.HasPrefix(name, configs.RootQueue) || !strings.Contains(name, configs.DOT) {
return nil, fmt.Errorf("illegal queue name passed in: %s", name)
}
current := name
queue := pc.getQueueInternal(current)
log.Logger().Debug("Checking queue creation")
for queue == nil {
toCreate = append(toCreate, current[strings.LastIndex(current, configs.DOT)+1:])
current = current[0:strings.LastIndex(current, configs.DOT)]
queue = pc.getQueueInternal(current)
}
// Check the ACL before we really create
// The existing parent queue is the lowest we need to look at
if !queue.CheckSubmitAccess(user) {
return nil, fmt.Errorf("submit access to queue %s denied during create of: %s", current, name)
}
if queue.IsLeafQueue() {
return nil, fmt.Errorf("creation of queue %s failed parent is already a leaf: %s", name, current)
}
log.Logger().Debug("Creating queue(s)",
zap.String("parent", current),
zap.String("fullPath", name))
for i := len(toCreate) - 1; i >= 0; i-- {
// everything is checked and there should be no errors
var err error
queue, err = objects.NewDynamicQueue(toCreate[i], i == 0, queue)
if err != nil {
log.Logger().Warn("Queue auto create failed unexpected",
zap.String("queueName", toCreate[i]),
zap.Error(err))
return nil, err
}
}
return queue, nil
}
// Get the state dump output file path from the partition.
// NOTE: this is a lock free call. It should only be called holding the PartitionContext lock.
func (pc *PartitionContext) GetStateDumpFilePath() string {
return pc.stateDumpFilePath
}
// Get a node from the partition by nodeID.
func (pc *PartitionContext) GetNode(nodeID string) *objects.Node {
pc.RLock()
defer pc.RUnlock()
return pc.nodes.GetNode(nodeID)
}
// Add the node to the partition and process the allocations that are reported by the node.
// NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
func (pc *PartitionContext) AddNode(node *objects.Node, existingAllocations []*objects.Allocation) error {
if node == nil {
return fmt.Errorf("cannot add 'nil' node to partition %s", pc.Name)
}
log.Logger().Info("adding node to partition",
zap.String("partition", pc.Name),
zap.String("nodeID", node.NodeID))
if pc.isDraining() || pc.isStopped() {
return fmt.Errorf("partition %s is stopped cannot add a new node %s", pc.Name, node.NodeID)
}
if err := pc.addNodeToList(node); err != nil {
return err
}
// Add allocations that exist on the node when added
if len(existingAllocations) > 0 {
for current, alloc := range existingAllocations {
if err := pc.addAllocation(alloc); err != nil {
// not expecting any inflight replacements on node recovery
released, _ := pc.removeNode(node.NodeID)
log.Logger().Info("Failed to add existing allocations, changes reversed",
zap.String("nodeID", node.NodeID),
zap.Int("existingAllocations", len(existingAllocations)),
zap.Int("releasedAllocations", len(released)),
zap.Int("processingAlloc", current),
zap.String("allocation", alloc.String()),
zap.Error(err))
// update failed metric, active metrics are tracked in add/remove from list
metrics.GetSchedulerMetrics().IncFailedNodes()
return err
}
}
}
return nil
}
// Update the partition resources based on the change of the node information
func (pc *PartitionContext) updatePartitionResource(delta *resources.Resource) {
pc.Lock()
defer pc.Unlock()
if delta != nil {
if pc.totalPartitionResource == nil {
pc.totalPartitionResource = delta.Clone()
} else {
pc.totalPartitionResource.AddTo(delta)
}
pc.root.SetMaxResource(pc.totalPartitionResource)
}
}
// Update the partition details when removing a node.
// This locks the partition. The partition may not be locked when we process the allocation
// additions to the node as that takes further app, queue or node locks
func (pc *PartitionContext) addNodeToList(node *objects.Node) error {
pc.Lock()
defer pc.Unlock()
// Node can be added to the system to allow processing of the allocations
if err := pc.nodes.AddNode(node); err != nil {
return fmt.Errorf("failed to add node %s to partition %s, error: %v", node.NodeID, pc.Name, err)
}
metrics.GetSchedulerMetrics().IncActiveNodes()
// update/set the resources available in the cluster
if pc.totalPartitionResource == nil {
pc.totalPartitionResource = node.GetCapacity().Clone()
} else {
pc.totalPartitionResource.AddTo(node.GetCapacity())
}
pc.root.SetMaxResource(pc.totalPartitionResource)
log.Logger().Info("Updated available resources from added node",
zap.String("partitionName", pc.Name),
zap.String("nodeID", node.NodeID),
zap.String("partitionResource", pc.totalPartitionResource.String()))
return nil
}
// Update the partition details when removing a node.
// This locks the partition. The partition may not be locked when we process the allocation
// removal from the node as that takes further app, queue or node locks
func (pc *PartitionContext) removeNodeFromList(nodeID string) *objects.Node {
pc.Lock()
defer pc.Unlock()
node := pc.nodes.RemoveNode(nodeID)
if node == nil {
log.Logger().Debug("node was not found, node already removed",
zap.String("nodeID", nodeID),
zap.String("partitionName", pc.Name))
return nil
}
// Remove node from list of tracked nodes
metrics.GetSchedulerMetrics().DecActiveNodes()
// found the node cleanup the available resources, partition resources cannot be nil at this point
pc.totalPartitionResource.SubFrom(node.GetCapacity())
pc.root.SetMaxResource(pc.totalPartitionResource)
log.Logger().Info("Updated available resources from removed node",
zap.String("partitionName", pc.Name),
zap.String("nodeID", node.NodeID),
zap.String("partitionResource", pc.totalPartitionResource.String()))
return node
}
// Remove a node from the partition. It returns all removed and confirmed allocations.
// The removed allocations are all linked to the current node.
// The confirmed allocations are real allocations that are linked to placeholders on the current node and are linked to
// other nodes.
// NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
func (pc *PartitionContext) removeNode(nodeID string) ([]*objects.Allocation, []*objects.Allocation) {
log.Logger().Info("removing node from partition",
zap.String("partition", pc.Name),
zap.String("nodeID", nodeID))
node := pc.removeNodeFromList(nodeID)
if node == nil {
return nil, nil
}
// found the node cleanup the allocations linked to the node
released, confirmed := pc.removeNodeAllocations(node)
// unreserve all the apps that were reserved on the node
reservedKeys, releasedAsks := node.UnReserveApps()
// update the partition reservations based on the node clean up
for i, appID := range reservedKeys {
pc.unReserveCount(appID, releasedAsks[i])
}
return released, confirmed
}
// Remove all allocations that are assigned to a node as part of the node removal. This is not part of the node object
// as updating the applications and queues is the only goal. Applications and queues are not accessible from the node.
// The removed and confirmed allocations are returned.
// NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
func (pc *PartitionContext) removeNodeAllocations(node *objects.Node) ([]*objects.Allocation, []*objects.Allocation) {
released := make([]*objects.Allocation, 0)
confirmed := make([]*objects.Allocation, 0)
// walk over all allocations still registered for this node
for _, alloc := range node.GetAllAllocations() {
allocID := alloc.GetUUID()
// since we are not locking the node and or application we could have had an update while processing
// note that we do not return the allocation if the app or allocation is not found and assume that it
// was already removed
app := pc.getApplication(alloc.GetApplicationID())
if app == nil {
log.Logger().Info("app is not found, skipping while removing the node",
zap.String("appID", alloc.GetApplicationID()),
zap.String("nodeID", node.NodeID))
continue
}
// check for an inflight replacement.
if alloc.GetReleaseCount() != 0 {
release := alloc.GetFirstRelease()
// allocation to update the ask on: this needs to happen on the real alloc never the placeholder
askAlloc := alloc
// placeholder gets handled differently from normal
if alloc.IsPlaceholder() {
// Check if the real allocation is made on the same node if not we should trigger a confirmation of
// the replacement. Trigger the replacement only if it is NOT on the same node.
// If it is on the same node we just keep going as the real allocation will be unlinked as a result of
// the removal of this placeholder. The ask update will trigger rescheduling later for the real alloc.
if alloc.GetNodeID() != release.GetNodeID() {
// ignore the return as that is the same as alloc, the alloc is gone after this call
_ = app.ReplaceAllocation(allocID)
// we need to check the resources equality
delta := resources.Sub(release.GetAllocatedResource(), alloc.GetAllocatedResource())
// Any negative value in the delta means that at least one of the requested resource in the
// placeholder is larger than the real allocation. The nodes are correct the queue needs adjusting.
// The reverse case is handled during allocation.
if delta.HasNegativeValue() {
// this looks incorrect but the delta is negative and the result will be a real decrease
err := app.GetQueue().IncAllocatedResource(delta, false)
// this should not happen as we really decrease the value
if err != nil {
log.Logger().Warn("unexpected failure during queue update: replacing placeholder",
zap.String("appID", alloc.GetApplicationID()),
zap.String("placeholderID", alloc.GetUUID()),
zap.String("allocationID", release.GetUUID()),
zap.Error(err))
}
log.Logger().Warn("replacing placeholder: placeholder is larger than real allocation",
zap.String("allocationID", release.GetUUID()),
zap.String("requested resource", release.GetAllocatedResource().String()),
zap.String("placeholderID", alloc.GetUUID()),
zap.String("placeholder resource", alloc.GetAllocatedResource().String()))
}
// track what we confirm on the other node to confirm it in the shim and get is bound
confirmed = append(confirmed, release)
// the allocation is removed so add it to the list that we return
released = append(released, alloc)
log.Logger().Info("allocation removed from node and replacement confirmed",
zap.String("nodeID", node.NodeID),
zap.String("allocationId", allocID),
zap.String("replacement nodeID", release.GetNodeID()),
zap.String("replacement allocationId", release.GetUUID()))
continue
}
askAlloc = release
}
// unlink the placeholder and allocation
release.ClearReleases()
alloc.ClearReleases()
// update the repeat on the real alloc to get it re-scheduled
_, err := app.UpdateAskRepeat(askAlloc.GetAsk().GetAllocationKey(), 1)
if err == nil {
log.Logger().Info("inflight placeholder replacement reversed due to node removal",
zap.String("appID", askAlloc.GetApplicationID()),
zap.String("allocationKey", askAlloc.GetAsk().GetAllocationKey()),
zap.String("nodeID", node.NodeID),
zap.String("replacement allocationId", askAlloc.GetUUID()))
} else {
log.Logger().Error("node removal: repeat update failure for inflight replacement",
zap.String("appID", askAlloc.GetApplicationID()),
zap.String("allocationKey", askAlloc.GetAsk().GetAllocationKey()),
zap.String("nodeID", node.NodeID),
zap.Error(err))
}
}
// check allocations on the app
if app.RemoveAllocation(allocID) == nil {
log.Logger().Info("allocation is not found, skipping while removing the node",
zap.String("allocationId", allocID),
zap.String("appID", app.ApplicationID),
zap.String("nodeID", node.NodeID))
continue
}
if err := app.GetQueue().DecAllocatedResource(alloc.GetAllocatedResource()); err != nil {
log.Logger().Warn("failed to release resources from queue",
zap.String("appID", alloc.GetApplicationID()),
zap.Error(err))
} else {
metrics.GetQueueMetrics(app.GetQueuePath()).IncReleasedContainer()
}
// the allocation is removed so add it to the list that we return
released = append(released, alloc)
log.Logger().Info("allocation removed from node",
zap.String("nodeID", node.NodeID),
zap.String("allocationId", allocID))
}
// track the number of allocations: decrement the released allocation AND increment with the confirmed
pc.updateAllocationCount(len(confirmed) - len(released))
return released, confirmed
}
func (pc *PartitionContext) calculateOutstandingRequests() []*objects.AllocationAsk {
if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) {
return nil
}
outstanding := make([]*objects.AllocationAsk, 0)
pc.root.GetQueueOutstandingRequests(&outstanding)
return outstanding
}
// Try regular allocation for the partition
// Lock free call this all locks are taken when needed in called functions
func (pc *PartitionContext) tryAllocate() *objects.Allocation {
if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) {
// nothing to do just return
return nil
}
// try allocating from the root down
alloc := pc.root.TryAllocate(pc.GetNodeIterator, pc.GetNode)
if alloc != nil {
return pc.allocate(alloc)
}
return nil
}
// Try process reservations for the partition
// Lock free call this all locks are taken when needed in called functions
func (pc *PartitionContext) tryReservedAllocate() *objects.Allocation {
if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) {
// nothing to do just return
return nil
}
// try allocating from the root down
alloc := pc.root.TryReservedAllocate(pc.GetNodeIterator)
if alloc != nil {
return pc.allocate(alloc)
}
return nil
}
// Try process placeholder for the partition
// Lock free call this all locks are taken when needed in called functions
func (pc *PartitionContext) tryPlaceholderAllocate() *objects.Allocation {
if !resources.StrictlyGreaterThanZero(pc.root.GetPendingResource()) {
// nothing to do just return
return nil
}
// try allocating from the root down
alloc := pc.root.TryPlaceholderAllocate(pc.GetNodeIterator, pc.GetNode)
if alloc != nil {
log.Logger().Info("scheduler replace placeholder processed",
zap.String("appID", alloc.GetApplicationID()),
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.String("uuid", alloc.GetUUID()),
zap.String("placeholder released uuid", alloc.GetFirstRelease().GetUUID()))
// pass the release back to the RM via the cluster context
return alloc
}
return nil
}
// Process the allocation and make the left over changes in the partition.
// NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
func (pc *PartitionContext) allocate(alloc *objects.Allocation) *objects.Allocation {
// find the app make sure it still exists
appID := alloc.GetApplicationID()
app := pc.getApplication(appID)
if app == nil {
log.Logger().Info("Application was removed while allocating",
zap.String("appID", appID))
return nil
}
// find the node make sure it still exists
// if the node was passed in use that ID instead of the one from the allocation
// the node ID is set when a reservation is allocated on a non-reserved node
var nodeID string
if alloc.GetReservedNodeID() == "" {
nodeID = alloc.GetNodeID()
} else {
nodeID = alloc.GetReservedNodeID()
log.Logger().Debug("Reservation allocated on different node",
zap.String("current node", alloc.GetNodeID()),
zap.String("reserved node", nodeID),
zap.String("appID", appID))
}
node := pc.GetNode(nodeID)
if node == nil {
log.Logger().Info("Node was removed while allocating",
zap.String("nodeID", nodeID),
zap.String("appID", appID))
return nil
}
// reservation
if alloc.GetResult() == objects.Reserved {
pc.reserve(app, node, alloc.GetAsk())
return nil
}
// unreserve
if alloc.GetResult() == objects.Unreserved || alloc.GetResult() == objects.AllocatedReserved {
pc.unReserve(app, node, alloc.GetAsk())
if alloc.GetResult() == objects.Unreserved {
return nil
}
// remove the link to the reserved node
alloc.SetReservedNodeID("")
}
// track the number of allocations
pc.updateAllocationCount(1)
log.Logger().Info("scheduler allocation processed",
zap.String("appID", alloc.GetApplicationID()),
zap.String("allocationKey", alloc.GetAllocationKey()),
zap.String("uuid", alloc.GetUUID()),
zap.String("allocatedResource", alloc.GetAllocatedResource().String()),
zap.Bool("placeholder", alloc.IsPlaceholder()),
zap.String("targetNode", alloc.GetNodeID()))
// pass the allocation back to the RM via the cluster context
return alloc
}
// Process the reservation in the scheduler
// Lock free call this must be called holding the context lock
func (pc *PartitionContext) reserve(app *objects.Application, node *objects.Node, ask *objects.AllocationAsk) {
appID := app.ApplicationID
// app has node already reserved cannot reserve again
if app.IsReservedOnNode(node.NodeID) {
log.Logger().Info("Application is already reserved on node",
zap.String("appID", appID),
zap.String("nodeID", node.NodeID))
return
}
// all ok, add the reservation to the app, this will also reserve the node
if err := app.Reserve(node, ask); err != nil {
log.Logger().Debug("Failed to handle reservation, error during update of app",
zap.Error(err))
return
}
// add the reservation to the queue list
app.GetQueue().Reserve(appID)
// increase the number of reservations for this app
pc.reservedApps[appID]++
log.Logger().Info("allocation ask is reserved",
zap.String("appID", appID),
zap.String("queue", app.GetQueuePath()),
zap.String("allocationKey", ask.GetAllocationKey()),
zap.String("node", node.NodeID))
}
// Process the unreservation in the scheduler
// NOTE: this is a lock free call. It must NOT be called holding the PartitionContext lock.
func (pc *PartitionContext) unReserve(app *objects.Application, node *objects.Node, ask *objects.AllocationAsk) {
appID := app.ApplicationID
if pc.reservedApps[appID] == 0 {
log.Logger().Info("Application is not reserved in partition",
zap.String("appID", appID))
return
}
// all ok, remove the reservation of the app, this will also unReserve the node
var err error
var num int
if num, err = app.UnReserve(node, ask); err != nil {
log.Logger().Info("Failed to unreserve, error during allocate on the app",
zap.Error(err))
return
}
// remove the reservation of the queue
app.GetQueue().UnReserve(appID, num)
// make sure we cannot go below 0
pc.unReserveCount(appID, num)
log.Logger().Info("allocation ask is unreserved",
zap.String("appID", appID),
zap.String("queue", app.GetQueuePath()),
zap.String("allocationKey", ask.GetAllocationKey()),
zap.String("node", node.NodeID),
zap.Int("reservationsRemoved", num))
}
// Create an ordered node iterator based on the node sort policy set for this partition.
// The iterator is nil if there are no unreserved nodes available.
func (pc *PartitionContext) GetNodeIterator() objects.NodeIterator {