-
Notifications
You must be signed in to change notification settings - Fork 212
/
application.go
1764 lines (1628 loc) · 63.9 KB
/
application.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package objects
import (
"fmt"
"math"
"strings"
"sync"
"time"
"github.com/looplab/fsm"
"go.uber.org/zap"
"github.com/apache/yunikorn-core/pkg/common"
"github.com/apache/yunikorn-core/pkg/common/resources"
"github.com/apache/yunikorn-core/pkg/common/security"
"github.com/apache/yunikorn-core/pkg/events"
"github.com/apache/yunikorn-core/pkg/handler"
"github.com/apache/yunikorn-core/pkg/log"
"github.com/apache/yunikorn-core/pkg/metrics"
"github.com/apache/yunikorn-core/pkg/rmproxy/rmevent"
"github.com/apache/yunikorn-core/pkg/scheduler/ugm"
siCommon "github.com/apache/yunikorn-scheduler-interface/lib/go/common"
"github.com/apache/yunikorn-scheduler-interface/lib/go/si"
)
var (
reservationDelay = 2 * time.Second
startingTimeout = 5 * time.Minute
completingTimeout = 30 * time.Second
terminatedTimeout = 3 * 24 * time.Hour
defaultPlaceholderTimeout = 15 * time.Minute
)
const (
Soft string = "Soft"
Hard string = "Hard"
)
type PlaceholderData struct {
TaskGroupName string
Count int64
MinResource *resources.Resource
Replaced int64
TimedOut int64
}
type StateLogEntry struct {
Time time.Time
ApplicationState string
}
type Application struct {
ApplicationID string
Partition string
SubmissionTime time.Time
// Private fields need protection
queuePath string
queue *Queue // queue the application is running in
pending *resources.Resource // pending resources from asks for the app
reservations map[string]*reservation // a map of reservations
requests map[string]*AllocationAsk // a map of asks
sortedRequests []*AllocationAsk
user security.UserGroup // owner of the application
tags map[string]string // application tags used in scheduling
allocatedResource *resources.Resource // total allocated resources
maxAllocatedResource *resources.Resource // max allocated resources
allocatedPlaceholder *resources.Resource // total allocated placeholder resources
allocations map[string]*Allocation // list of all allocations
placeholderAsk *resources.Resource // total placeholder request for the app (all task groups)
stateMachine *fsm.FSM // application state machine
stateTimer *time.Timer // timer for state time
startTimeout time.Duration // timeout for the application starting state
execTimeout time.Duration // execTimeout for the application run
placeholderTimer *time.Timer // placeholder replace timer
gangSchedulingStyle string // gang scheduling style can be hard (after timeout we fail the application), or soft (after timeeout we schedule it as a normal application)
finishedTime time.Time // the time of finishing this application. the default value is zero time
rejectedMessage string // If the application is rejected, save the rejected message
stateLog []*StateLogEntry // state log for this application
placeholderData map[string]*PlaceholderData // track placeholder and gang related info
rmEventHandler handler.EventHandler
rmID string
terminatedCallback func(appID string)
sync.RWMutex
}
func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eventHandler handler.EventHandler, rmID string) *Application {
app := &Application{
ApplicationID: siApp.ApplicationID,
Partition: siApp.PartitionName,
SubmissionTime: time.Now(),
queuePath: siApp.QueueName,
tags: siApp.Tags,
pending: resources.NewResource(),
allocatedResource: resources.NewResource(),
maxAllocatedResource: resources.NewResource(),
allocatedPlaceholder: resources.NewResource(),
requests: make(map[string]*AllocationAsk),
reservations: make(map[string]*reservation),
allocations: make(map[string]*Allocation),
stateMachine: NewAppState(),
placeholderAsk: resources.NewResourceFromProto(siApp.PlaceholderAsk),
finishedTime: time.Time{},
rejectedMessage: "",
stateLog: make([]*StateLogEntry, 0),
}
placeholderTimeout := common.ConvertSITimeoutWithAdjustment(siApp, defaultPlaceholderTimeout)
gangSchedStyle := siApp.GetGangSchedulingStyle()
if gangSchedStyle != Soft && gangSchedStyle != Hard {
log.Logger().Info("Unknown gang scheduling style, using soft style as default",
zap.String("gang scheduling style", gangSchedStyle))
gangSchedStyle = Soft
}
app.gangSchedulingStyle = gangSchedStyle
app.execTimeout = placeholderTimeout
if app.GetTag(siCommon.AppTagStateAwareDisable) != "" {
app.startTimeout = 0 // transition immediately to Running
} else {
app.startTimeout = startingTimeout
}
app.user = ugi
app.rmEventHandler = eventHandler
app.rmID = rmID
return app
}
func (sa *Application) String() string {
if sa == nil {
return "application is nil"
}
return fmt.Sprintf("applicationID: %s, Partition: %s, SubmissionTime: %x, State: %s",
sa.ApplicationID, sa.Partition, sa.SubmissionTime, sa.stateMachine.Current())
}
func (sa *Application) SetState(state string) {
sa.stateMachine.SetState(state)
}
func (sa *Application) recordState(appState string) {
// lock not acquired here as it is already held during HandleApplicationEvent() / OnStateChange()
sa.stateLog = append(sa.stateLog, &StateLogEntry{
Time: time.Now(),
ApplicationState: appState,
})
}
func (sa *Application) GetStateLog() []*StateLogEntry {
sa.RLock()
defer sa.RUnlock()
return sa.stateLog
}
// Set the reservation delay.
// Set when the cluster context is created to disable reservation.
func SetReservationDelay(delay time.Duration) {
log.Logger().Debug("Set reservation delay",
zap.Duration("delay", delay))
reservationDelay = delay
}
// Return the current state or a checked specific state for the application.
// The state machine handles the locking.
func (sa *Application) CurrentState() string {
return sa.stateMachine.Current()
}
func (sa *Application) IsStarting() bool {
return sa.stateMachine.Is(Starting.String())
}
func (sa *Application) IsAccepted() bool {
return sa.stateMachine.Is(Accepted.String())
}
func (sa *Application) IsNew() bool {
return sa.stateMachine.Is(New.String())
}
func (sa *Application) IsRunning() bool {
return sa.stateMachine.Is(Running.String())
}
func (sa *Application) IsCompleting() bool {
return sa.stateMachine.Is(Completing.String())
}
func (sa *Application) IsCompleted() bool {
return sa.stateMachine.Is(Completed.String())
}
func (sa *Application) IsRejected() bool {
return sa.stateMachine.Is(Rejected.String())
}
func (sa *Application) IsExpired() bool {
return sa.stateMachine.Is(Expired.String())
}
func (sa *Application) IsFailing() bool {
return sa.stateMachine.Is(Failing.String())
}
func (sa *Application) IsFailed() bool {
return sa.stateMachine.Is(Failed.String())
}
func (sa *Application) IsResuming() bool {
return sa.stateMachine.Is(Resuming.String())
}
// HandleApplicationEvent handles the state event for the application.
// The application lock is expected to be held.
func (sa *Application) HandleApplicationEvent(event applicationEvent) error {
err := sa.stateMachine.Event(event.String(), sa)
// handle the same state transition not nil error (limit of fsm).
if err != nil && err.Error() == noTransition {
return nil
}
return err
}
// HandleApplicationEventWithInfo handles the state event for the application with associated info object.
// The application lock is expected to be held.
func (sa *Application) HandleApplicationEventWithInfo(event applicationEvent, eventInfo string) error {
err := sa.stateMachine.Event(event.String(), sa, eventInfo)
// handle the same state transition not nil error (limit of fsm).
if err != nil && err.Error() == noTransition {
return nil
}
return err
}
// OnStatChange every time the application enters a new state.
// It sends an event about the state change to the shim as an application update.
// The only state that does not generate an event is Rejected.
func (sa *Application) OnStateChange(event *fsm.Event, eventInfo string) {
sa.recordState(event.Dst)
if event.Dst == Rejected.String() || sa.rmEventHandler == nil {
return
}
var message string
if len(eventInfo) == 0 {
message = event.Event
} else {
message = eventInfo
}
sa.rmEventHandler.HandleEvent(
&rmevent.RMApplicationUpdateEvent{
RmID: sa.rmID,
AcceptedApplications: make([]*si.AcceptedApplication, 0),
RejectedApplications: make([]*si.RejectedApplication, 0),
UpdatedApplications: []*si.UpdatedApplication{{
ApplicationID: sa.ApplicationID,
State: sa.stateMachine.Current(),
StateTransitionTimestamp: time.Now().UnixNano(),
Message: message,
}},
})
}
// Set the starting timer to make sure the application will not get stuck in a starting state too long.
// This prevents an app from not progressing to Running when it only has 1 allocation.
// Called when entering the Starting state by the state machine.
func (sa *Application) setStateTimer(timeout time.Duration, currentState string, event applicationEvent) {
log.Logger().Debug("Application state timer initiated",
zap.String("appID", sa.ApplicationID),
zap.String("state", sa.stateMachine.Current()),
zap.Duration("timeout", timeout))
sa.stateTimer = time.AfterFunc(timeout, sa.timeoutStateTimer(currentState, event))
}
func (sa *Application) timeoutStateTimer(expectedState string, event applicationEvent) func() {
return func() {
sa.Lock()
defer sa.Unlock()
// make sure we are still in the right state
// we could have been failed or something might have happened while waiting for a lock
if expectedState == sa.stateMachine.Current() {
log.Logger().Debug("Application state: auto progress",
zap.String("applicationID", sa.ApplicationID),
zap.String("state", sa.stateMachine.Current()))
// if the app is completing, but there are placeholders left, first do the cleanup
if sa.IsCompleting() && !resources.IsZero(sa.allocatedPlaceholder) {
var toRelease []*Allocation
for _, alloc := range sa.getPlaceholderAllocations() {
// skip over the allocations that are already marked for release
if alloc.IsReleased() {
continue
}
alloc.SetReleased(true)
toRelease = append(toRelease, alloc)
}
sa.notifyRMAllocationReleased(sa.rmID, toRelease, si.TerminationType_TIMEOUT, "releasing placeholders on app complete")
sa.clearStateTimer()
} else {
//nolint: errcheck
_ = sa.HandleApplicationEvent(event)
}
}
}
}
// Clear the starting timer. If the application has progressed out of the starting state we need to stop the
// timer and clean up.
// Called when leaving the Starting state by the state machine.
func (sa *Application) clearStateTimer() {
if sa == nil || sa.stateTimer == nil {
return
}
sa.stateTimer.Stop()
sa.stateTimer = nil
log.Logger().Debug("Application state timer cleared",
zap.String("appID", sa.ApplicationID),
zap.String("state", sa.stateMachine.Current()))
}
func (sa *Application) initPlaceholderTimer() {
if sa.placeholderTimer != nil || !sa.IsAccepted() || sa.execTimeout <= 0 {
return
}
log.Logger().Debug("Application placeholder timer initiated",
zap.String("AppID", sa.ApplicationID),
zap.Duration("Timeout", sa.execTimeout))
sa.placeholderTimer = time.AfterFunc(sa.execTimeout, sa.timeoutPlaceholderProcessing)
}
func (sa *Application) clearPlaceholderTimer() {
if sa == nil || sa.placeholderTimer == nil {
return
}
sa.placeholderTimer.Stop()
sa.placeholderTimer = nil
log.Logger().Debug("Application placeholder timer cleared",
zap.String("AppID", sa.ApplicationID),
zap.Duration("Timeout", sa.execTimeout))
}
// timeoutPlaceholderProcessing cleans up all placeholder asks and allocations that are not used after the timeout.
// If the application has started processing, Starting state or further, the application keeps on processing without
// being able to use the placeholders.
// If the application is in New or Accepted state we clean up and take followup action based on the gang scheduling
// style.
func (sa *Application) timeoutPlaceholderProcessing() {
sa.Lock()
defer sa.Unlock()
switch {
// Case 1: if all app's placeholders are allocated, only part of them gets replaced, just delete the remaining placeholders
case (sa.IsRunning() || sa.IsStarting() || sa.IsCompleting()) && !resources.IsZero(sa.allocatedPlaceholder):
var toRelease []*Allocation
replacing := 0
for _, alloc := range sa.getPlaceholderAllocations() {
// skip over the allocations that are already marked for release, they will be replaced soon
if alloc.IsReleased() {
replacing++
continue
}
alloc.SetReleased(true)
toRelease = append(toRelease, alloc)
// mark as timeout out in the tracking data
if _, ok := sa.placeholderData[alloc.GetTaskGroup()]; ok {
sa.placeholderData[alloc.GetTaskGroup()].TimedOut++
}
}
log.Logger().Info("Placeholder timeout, releasing placeholders",
zap.String("AppID", sa.ApplicationID),
zap.Int("placeholders being replaced", replacing),
zap.Int("releasing placeholders", len(toRelease)))
sa.notifyRMAllocationReleased(sa.rmID, toRelease, si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
// Case 2: in every other case fail the application, and notify the context about the expired placeholder asks
default:
log.Logger().Info("Placeholder timeout, releasing asks and placeholders",
zap.String("AppID", sa.ApplicationID),
zap.Int("releasing placeholders", len(sa.allocations)),
zap.Int("releasing asks", len(sa.requests)),
zap.String("gang scheduling style", sa.gangSchedulingStyle))
// change the status of the app based on gang style: soft resume normal allocations, hard fail the app
event := ResumeApplication
if sa.gangSchedulingStyle == Hard {
event = FailApplication
}
if err := sa.HandleApplicationEventWithInfo(event, "ResourceReservationTimeout"); err != nil {
log.Logger().Debug("Application state change failed when placeholder timed out",
zap.String("AppID", sa.ApplicationID),
zap.String("currentState", sa.CurrentState()),
zap.Error(err))
}
sa.notifyRMAllocationAskReleased(sa.rmID, sa.getAllRequestsInternal(), si.TerminationType_TIMEOUT, "releasing placeholders asks on placeholder timeout")
sa.removeAsksInternal("")
// all allocations are placeholders but GetAllAllocations is locked and cannot be used
sa.notifyRMAllocationReleased(sa.rmID, sa.getPlaceholderAllocations(), si.TerminationType_TIMEOUT, "releasing allocated placeholders on placeholder timeout")
// we are in an accepted or new state so nothing can be replaced yet: mark everything as timedout
for _, phData := range sa.placeholderData {
phData.TimedOut = phData.Count
}
}
sa.clearPlaceholderTimer()
}
// GetReservations returns an array of all reservation keys for the application.
// This will return an empty array if there are no reservations.
// Visible for tests
func (sa *Application) GetReservations() []string {
sa.RLock()
defer sa.RUnlock()
keys := make([]string, 0)
for key := range sa.reservations {
keys = append(keys, key)
}
return keys
}
// Return the allocation ask for the key, nil if not found
func (sa *Application) GetAllocationAsk(allocationKey string) *AllocationAsk {
sa.RLock()
defer sa.RUnlock()
return sa.requests[allocationKey]
}
// Return the allocated resources for this application
func (sa *Application) GetAllocatedResource() *resources.Resource {
sa.RLock()
defer sa.RUnlock()
return sa.allocatedResource.Clone()
}
func (sa *Application) GetMaxAllocatedResource() *resources.Resource {
sa.RLock()
defer sa.RUnlock()
return sa.maxAllocatedResource.Clone()
}
// Return the allocated placeholder resources for this application
func (sa *Application) GetPlaceholderResource() *resources.Resource {
sa.RLock()
defer sa.RUnlock()
return sa.allocatedPlaceholder.Clone()
}
// Return the total placeholder ask for this application
// Is only set on app creation and used when app is added to a queue
func (sa *Application) GetPlaceholderAsk() *resources.Resource {
sa.RLock()
defer sa.RUnlock()
return sa.placeholderAsk
}
// Return the pending resources for this application
func (sa *Application) GetPendingResource() *resources.Resource {
sa.RLock()
defer sa.RUnlock()
return sa.pending
}
// Remove one or more allocation asks from this application.
// This also removes any reservations that are linked to the ask.
// The return value is the number of reservations released
func (sa *Application) RemoveAllocationAsk(allocKey string) int {
sa.Lock()
defer sa.Unlock()
return sa.removeAsksInternal(allocKey)
}
// unlocked version of the allocation ask removal
func (sa *Application) removeAsksInternal(allocKey string) int {
// shortcut no need to do anything
if len(sa.requests) == 0 {
return 0
}
var deltaPendingResource *resources.Resource = nil
// when allocation key not specified, cleanup all allocation ask
var toRelease int
if allocKey == "" {
// cleanup all reservations
for key, reserve := range sa.reservations {
releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
if err != nil {
log.Logger().Warn("Removal of reservation failed while removing all allocation asks",
zap.String("appID", sa.ApplicationID),
zap.String("reservationKey", key),
zap.Error(err))
continue
}
// clean up the queue reservation (one at a time)
sa.queue.UnReserve(sa.ApplicationID, releases)
toRelease += releases
}
// Cleanup total pending resource
deltaPendingResource = sa.pending
sa.pending = resources.NewResource()
sa.requests = make(map[string]*AllocationAsk)
} else {
// cleanup the reservation for this allocation
for _, key := range sa.GetAskReservations(allocKey) {
reserve := sa.reservations[key]
releases, err := sa.unReserveInternal(reserve.node, reserve.ask)
if err != nil {
log.Logger().Warn("Removal of reservation failed while removing allocation ask",
zap.String("appID", sa.ApplicationID),
zap.String("reservationKey", key),
zap.Error(err))
continue
}
// clean up the queue reservation
sa.queue.UnReserve(sa.ApplicationID, releases)
toRelease += releases
}
if ask := sa.requests[allocKey]; ask != nil {
deltaPendingResource = resources.MultiplyBy(ask.GetAllocatedResource(), float64(ask.GetPendingAskRepeat()))
sa.pending = resources.Sub(sa.pending, deltaPendingResource)
delete(sa.requests, allocKey)
}
}
// clean up the queue pending resources
sa.queue.decPendingResource(deltaPendingResource)
// Check if we need to change state based on the ask removal:
// 1) if pending is zero (no more asks left)
// 2) if confirmed allocations is zero (no real tasks running)
// Change the state to completing.
// When the resource trackers are zero we should not expect anything to come in later.
hasPlaceHolderAllocations := len(sa.getPlaceholderAllocations()) > 0
if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) && !sa.IsFailing() && !sa.IsCompleting() && !hasPlaceHolderAllocations {
if err := sa.HandleApplicationEvent(CompleteApplication); err != nil {
log.Logger().Warn("Application state not changed to Completing while updating ask(s)",
zap.String("currentState", sa.CurrentState()),
zap.Error(err))
}
}
log.Logger().Info("ask removed successfully from application",
zap.String("appID", sa.ApplicationID),
zap.String("ask", allocKey),
zap.String("pendingDelta", deltaPendingResource.String()))
return toRelease
}
// Add an allocation ask to this application
// If the ask already exist update the existing info
func (sa *Application) AddAllocationAsk(ask *AllocationAsk) error {
sa.Lock()
defer sa.Unlock()
if ask == nil {
return fmt.Errorf("ask cannot be nil when added to app %s", sa.ApplicationID)
}
if ask.GetPendingAskRepeat() == 0 || resources.IsZero(ask.GetAllocatedResource()) {
return fmt.Errorf("invalid ask added to app %s: %v", sa.ApplicationID, ask)
}
delta := resources.Multiply(ask.GetAllocatedResource(), int64(ask.GetPendingAskRepeat()))
var oldAskResource *resources.Resource = nil
if oldAsk := sa.requests[ask.GetAllocationKey()]; oldAsk != nil {
oldAskResource = resources.Multiply(oldAsk.GetAllocatedResource(), int64(oldAsk.GetPendingAskRepeat()))
}
// Check if we need to change state based on the ask added, there are two cases:
// 1) first ask added on a new app: state is New
// 2) all asks and allocation have been removed: state is Completing
// Move the state and get it scheduling (again)
if sa.stateMachine.Is(New.String()) || sa.stateMachine.Is(Completing.String()) {
if err := sa.HandleApplicationEvent(RunApplication); err != nil {
log.Logger().Debug("Application state change failed while adding new ask",
zap.String("currentState", sa.CurrentState()),
zap.Error(err))
}
}
sa.requests[ask.GetAllocationKey()] = ask
// Update total pending resource
delta.SubFrom(oldAskResource)
sa.pending = resources.Add(sa.pending, delta)
sa.queue.incPendingResource(delta)
if ask.IsPlaceholder() {
sa.addPlaceholderData(ask)
}
log.Logger().Info("ask added successfully to application",
zap.String("appID", sa.ApplicationID),
zap.String("ask", ask.GetAllocationKey()),
zap.Bool("placeholder", ask.IsPlaceholder()),
zap.String("pendingDelta", delta.String()))
return nil
}
// Add the ask when a node allocation is recovered. Maintaining the rule that an Allocation always has a
// link to an AllocationAsk.
// Safeguarded against a nil but the recovery generates the ask and should never be nil.
func (sa *Application) RecoverAllocationAsk(ask *AllocationAsk) {
sa.Lock()
defer sa.Unlock()
if ask == nil {
return
}
sa.requests[ask.GetAllocationKey()] = ask
// progress the application from New to Accepted.
if sa.IsNew() {
if err := sa.HandleApplicationEvent(RunApplication); err != nil {
log.Logger().Debug("Application state change failed while recovering allocation ask",
zap.Error(err))
}
}
}
func (sa *Application) UpdateAskRepeat(allocKey string, delta int32) (*resources.Resource, error) {
sa.Lock()
defer sa.Unlock()
if ask := sa.requests[allocKey]; ask != nil {
return sa.updateAskRepeatInternal(ask, delta)
}
return nil, fmt.Errorf("failed to locate ask with key %s", allocKey)
}
func (sa *Application) updateAskRepeatInternal(ask *AllocationAsk, delta int32) (*resources.Resource, error) {
// updating with delta does error checking internally
if !ask.updatePendingAskRepeat(delta) {
return nil, fmt.Errorf("ask repaeat not updated resulting repeat less than zero for ask %s on app %s", ask.GetAllocationKey(), sa.ApplicationID)
}
deltaPendingResource := resources.Multiply(ask.GetAllocatedResource(), int64(delta))
sa.pending = resources.Add(sa.pending, deltaPendingResource)
// update the pending of the queue with the same delta
sa.queue.incPendingResource(deltaPendingResource)
return deltaPendingResource, nil
}
// HasReserved returns true if the application has any reservations.
func (sa *Application) HasReserved() bool {
sa.RLock()
defer sa.RUnlock()
return len(sa.reservations) > 0
}
// IsReservedOnNode returns true if and only if the node has been reserved by the application
// An empty nodeID is never reserved.
func (sa *Application) IsReservedOnNode(nodeID string) bool {
if nodeID == "" {
return false
}
sa.RLock()
defer sa.RUnlock()
// make sure matches only for the whole nodeID
separator := nodeID + "|"
for key := range sa.reservations {
if strings.HasPrefix(key, separator) {
return true
}
}
return false
}
// Reserve the application for this node and ask combination.
// If the reservation fails the function returns false, if the reservation is made it returns true.
// If the node and ask combination was already reserved for the application this is a noop and returns true.
func (sa *Application) Reserve(node *Node, ask *AllocationAsk) error {
sa.Lock()
defer sa.Unlock()
return sa.reserveInternal(node, ask)
}
// Unlocked version for Reserve that really does the work.
// Must only be called while holding the application lock.
func (sa *Application) reserveInternal(node *Node, ask *AllocationAsk) error {
// create the reservation (includes nil checks)
nodeReservation := newReservation(node, sa, ask, true)
if nodeReservation == nil {
log.Logger().Debug("reservation creation failed unexpectedly",
zap.String("app", sa.ApplicationID),
zap.Any("node", node),
zap.Any("ask", ask))
return fmt.Errorf("reservation creation failed node or ask are nil on appID %s", sa.ApplicationID)
}
allocKey := ask.GetAllocationKey()
if sa.requests[allocKey] == nil {
log.Logger().Debug("ask is not registered to this app",
zap.String("app", sa.ApplicationID),
zap.String("allocKey", allocKey))
return fmt.Errorf("reservation creation failed ask %s not found on appID %s", allocKey, sa.ApplicationID)
}
if !sa.canAskReserve(ask) {
return fmt.Errorf("reservation of ask exceeds pending repeat, pending ask repeat %d", ask.GetPendingAskRepeat())
}
// check if we can reserve the node before reserving on the app
if err := node.Reserve(sa, ask); err != nil {
return err
}
sa.reservations[nodeReservation.getKey()] = nodeReservation
log.Logger().Info("reservation added successfully",
zap.String("app", sa.ApplicationID),
zap.String("node", node.NodeID),
zap.String("ask", allocKey))
return nil
}
// UnReserve the application for this node and ask combination.
// This first removes the reservation from the node.
// If the reservation does not exist it returns 0 for reservations removed, if the reservation is removed it returns 1.
// The error is set if the reservation key cannot be removed from the app or node.
func (sa *Application) UnReserve(node *Node, ask *AllocationAsk) (int, error) {
sa.Lock()
defer sa.Unlock()
return sa.unReserveInternal(node, ask)
}
// Unlocked version for UnReserve that really does the work.
// Must only be called while holding the application lock.
func (sa *Application) unReserveInternal(node *Node, ask *AllocationAsk) (int, error) {
resKey := reservationKey(node, nil, ask)
if resKey == "" {
log.Logger().Debug("unreserve reservation key create failed unexpectedly",
zap.String("appID", sa.ApplicationID),
zap.String("node", node.String()),
zap.String("ask", ask.String()))
return 0, fmt.Errorf("reservation key failed node or ask are nil for appID %s", sa.ApplicationID)
}
// unReserve the node before removing from the app
var num int
var err error
if num, err = node.unReserve(sa, ask); err != nil {
return 0, err
}
// if the unreserve worked on the node check the app
if _, found := sa.reservations[resKey]; found {
// worked on the node means either found or not but no error, log difference here
if num == 0 {
log.Logger().Info("reservation not found while removing from node, app has reservation",
zap.String("appID", sa.ApplicationID),
zap.String("nodeID", node.NodeID),
zap.String("ask", ask.GetAllocationKey()))
}
delete(sa.reservations, resKey)
log.Logger().Info("reservation removed successfully", zap.String("node", node.NodeID),
zap.String("app", ask.GetApplicationID()), zap.String("ask", ask.GetAllocationKey()))
return 1, nil
}
// reservation was not found
log.Logger().Info("reservation not found while removing from app",
zap.String("appID", sa.ApplicationID),
zap.String("nodeID", node.NodeID),
zap.String("ask", ask.GetAllocationKey()),
zap.Int("nodeReservationsRemoved", num))
return 0, nil
}
// Return the allocation reservations on any node.
// The returned array is 0 or more keys into the reservations map.
// No locking must be called while holding the lock
func (sa *Application) GetAskReservations(allocKey string) []string {
reservationKeys := make([]string, 0)
if allocKey == "" {
return reservationKeys
}
for key := range sa.reservations {
if strings.HasSuffix(key, allocKey) {
reservationKeys = append(reservationKeys, key)
}
}
return reservationKeys
}
// Check if the allocation has already been reserved. An ask can reserve multiple nodes if the request has a repeat set
// larger than 1. It can never reserve more than the repeat number of nodes.
// No locking must be called while holding the lock
func (sa *Application) canAskReserve(ask *AllocationAsk) bool {
allocKey := ask.GetAllocationKey()
pending := int(ask.GetPendingAskRepeat())
resNumber := sa.GetAskReservations(allocKey)
if len(resNumber) >= pending {
log.Logger().Debug("reservation exceeds repeats",
zap.String("askKey", allocKey),
zap.Int("askPending", pending),
zap.Int("askReserved", len(resNumber)))
}
return pending > len(resNumber)
}
// Sort the request for the app in order based on the priority of the request.
// The sorted list only contains candidates that have an outstanding repeat.
// No locking must be called while holding the lock
func (sa *Application) sortRequests(ascending bool) {
sa.sortedRequests = nil
for _, request := range sa.requests {
if request.GetPendingAskRepeat() == 0 {
continue
}
sa.sortedRequests = append(sa.sortedRequests, request)
}
// we might not have any requests
if len(sa.sortedRequests) > 0 {
sortAskByPriority(sa.sortedRequests, ascending)
}
}
func (sa *Application) getOutstandingRequests(headRoom *resources.Resource, total *[]*AllocationAsk) {
// make sure the request are sorted
sa.Lock()
sa.sortRequests(false)
sa.Unlock()
sa.RLock()
defer sa.RUnlock()
for _, request := range sa.sortedRequests {
// ignore nil checks resource function calls are nil safe
if headRoom.FitInMaxUndef(request.GetAllocatedResource()) {
// if headroom is still enough for the resources
*total = append(*total, request)
headRoom.SubFrom(request.GetAllocatedResource())
}
}
}
// canReplace returns true if there is a placeholder for the task group available for the request.
// False for all other cases. Placeholder replacements are handled separately from normal allocations.
func (sa *Application) canReplace(request *AllocationAsk) bool {
// a placeholder or a request without task group can never replace a placeholder
if request == nil || request.IsPlaceholder() || request.GetTaskGroup() == "" {
return false
}
// get the tracked placeholder data and check if there are still placeholder that can be replaced
if phData, ok := sa.placeholderData[request.GetTaskGroup()]; ok {
return phData.Count > (phData.Replaced + phData.TimedOut)
}
return false
}
// tryAllocate will perform a regular allocation of a pending request, includes placeholders.
func (sa *Application) tryAllocate(headRoom *resources.Resource, nodeIterator func() NodeIterator, getNodeFn func(string) *Node) *Allocation {
sa.Lock()
defer sa.Unlock()
// make sure the request are sorted
sa.sortRequests(false)
// get all the requests from the app sorted in order
for _, request := range sa.sortedRequests {
// check if there is a replacement possible
if sa.canReplace(request) {
continue
}
// resource must fit in headroom otherwise skip the request
if !headRoom.FitInMaxUndef(request.GetAllocatedResource()) {
// post scheduling events via the event plugin
if eventCache := events.GetEventCache(); eventCache != nil {
message := fmt.Sprintf("Application %s does not fit into %s queue", request.GetApplicationID(), sa.queuePath)
if event, err := events.CreateRequestEventRecord(request.GetAllocationKey(), request.GetApplicationID(), "InsufficientQueueResources", message); err != nil {
log.Logger().Warn("Event creation failed",
zap.String("event message", message),
zap.Error(err))
} else {
eventCache.AddEvent(event)
}
}
continue
}
requiredNode := request.GetRequiredNode()
// does request have any constraint to run on specific node?
if requiredNode != "" {
// the iterator might not have the node we need as it could be reserved, or we have not added it yet
node := getNodeFn(requiredNode)
if node == nil {
log.Logger().Warn("required node is not found (could be transient)",
zap.String("application ID", sa.ApplicationID),
zap.String("allocationKey", request.GetAllocationKey()),
zap.String("required node", requiredNode))
return nil
}
alloc := sa.tryNode(node, request)
if alloc != nil {
// check if the node was reserved and we allocated after a release
if _, ok := sa.reservations[reservationKey(node, nil, request)]; ok {
log.Logger().Debug("allocation on required node after release",
zap.String("appID", sa.ApplicationID),
zap.String("nodeID", requiredNode),
zap.String("allocationKey", request.GetAllocationKey()))
alloc.SetResult(AllocatedReserved)
return alloc
}
log.Logger().Debug("allocation on required node is completed",
zap.String("nodeID", node.NodeID),
zap.String("allocationKey", request.GetAllocationKey()),
zap.String("AllocationResult", alloc.GetResult().String()))
return alloc
}
return newReservedAllocation(Reserved, node.NodeID, request)
}
iterator := nodeIterator()
if iterator != nil {
alloc := sa.tryNodes(request, iterator)
// have a candidate return it
if alloc != nil {
return alloc
}
}
}
// no requests fit, skip to next app
return nil
}
// tryPlaceholderAllocate tries to replace a placeholder that is allocated with a real allocation
//nolint:funlen
func (sa *Application) tryPlaceholderAllocate(nodeIterator func() NodeIterator, getNodeFn func(string) *Node) *Allocation {
sa.Lock()
defer sa.Unlock()
// nothing to do if we have no placeholders allocated
if resources.IsZero(sa.allocatedPlaceholder) {
return nil
}
// make sure the request are sorted
sa.sortRequests(false)
// keep the first fits for later
var phFit *Allocation
var reqFit *AllocationAsk
// get all the requests from the app sorted in order
for _, request := range sa.sortedRequests {
// skip placeholders they follow standard allocation
// this should also be part of a task group just make sure it is
if request.IsPlaceholder() || request.GetTaskGroup() == "" {
continue
}
// walk over the placeholders, allow for processing all as we can have multiple task groups
phAllocs := sa.getPlaceholderAllocations()
for _, ph := range phAllocs {
// we could have already released this placeholder and are waiting for the shim to confirm
// and check that we have the correct task group before trying to swap
if ph.IsReleased() || request.GetTaskGroup() != ph.GetTaskGroup() {
continue
}
// before we check anything we need to check the resources equality
delta := resources.Sub(ph.GetAllocatedResource(), request.GetAllocatedResource())
// Any negative value in the delta means that at least one of the requested resource in the real
// allocation is larger than the placeholder. We need to cancel this placeholder and check the next
// placeholder. This should trigger the removal of all the placeholder that are part of this task group.
// All placeholders in the same task group are always the same size.
if delta.HasNegativeValue() {
log.Logger().Warn("releasing placeholder: real allocation is larger than placeholder",
zap.String("requested resource", request.GetAllocatedResource().String()),
zap.String("placeholderID", ph.GetUUID()),
zap.String("placeholder resource", ph.GetAllocatedResource().String()))
// release the placeholder and tell the RM
ph.SetReleased(true)
sa.notifyRMAllocationReleased(sa.rmID, []*Allocation{ph}, si.TerminationType_TIMEOUT, "cancel placeholder: resource incompatible")
// add an event on the app to show the release
if eventCache := events.GetEventCache(); eventCache != nil {
message := fmt.Sprintf("Task group '%s' in application '%s': allocation resources '%s' are not matching placeholder '%s' allocation with ID '%s'", ph.GetTaskGroup(), sa.ApplicationID, request.GetAllocatedResource().String(), ph.GetAllocatedResource().String(), ph.GetAllocationKey())
if event, err := events.CreateRequestEventRecord(ph.GetAllocationKey(), sa.ApplicationID, "releasing placeholder: real allocation is larger than placeholder", message); err != nil {
log.Logger().Warn("Event creation failed",
zap.String("event message", message),
zap.Error(err))
} else {
eventCache.AddEvent(event)
}
}
continue
}
// placeholder is the same or larger continue processing and difference is handled when the placeholder
// is swapped with the real one.
if phFit == nil && reqFit == nil {
phFit = ph
reqFit = request
}
node := getNodeFn(ph.GetNodeID())
// got the node run same checks as for reservation (all but fits)
// resource usage should not change anyway between placeholder and real one at this point
if node != nil && node.preReserveConditions(request) {
alloc := NewAllocation(common.GetNewUUID(), node.NodeID, request)
// double link to make it easier to find
// alloc (the real one) releases points to the placeholder in the releases list
alloc.SetRelease(ph)
// placeholder point to the real one in the releases list
ph.SetRelease(alloc)
alloc.SetResult(Replaced)
// mark placeholder as released
ph.SetReleased(true)
_, err := sa.updateAskRepeatInternal(request, -1)
if err != nil {
log.Logger().Warn("ask repeat update failed unexpectedly",