Skip to content

Commit

Permalink
Improved test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
manirajv06 committed Nov 11, 2022
1 parent 6c00bd9 commit 88daab9
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 23 deletions.
22 changes: 16 additions & 6 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func (sa *Application) timeoutPlaceholderProcessing() {
if sa.gangSchedulingStyle == Hard {
event = FailApplication
}

if err := sa.HandleApplicationEventWithInfo(event, "ResourceReservationTimeout"); err != nil {
log.Logger().Debug("Application state change failed when placeholder timed out",
zap.String("AppID", sa.ApplicationID),
Expand Down Expand Up @@ -541,6 +542,8 @@ func (sa *Application) removeAsksInternal(allocKey string) int {
// When the resource trackers are zero we should not expect anything to come in later.
hasPlaceHolderAllocations := len(sa.getPlaceholderAllocations()) > 0
if resources.IsZero(sa.pending) && resources.IsZero(sa.allocatedResource) && !sa.IsFailing() && !sa.IsCompleting() && !hasPlaceHolderAllocations {
//sa.decUserResourceUsage(sa.allocatedResource, true)
log.Logger().Debug("step 1", zap.String("dd", sa.allocatedResource.String()))
if err := sa.HandleApplicationEvent(CompleteApplication); err != nil {
log.Logger().Warn("Application state not changed to Completing while updating ask(s)",
zap.String("currentState", sa.CurrentState()),
Expand Down Expand Up @@ -1530,6 +1533,8 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term

var event applicationEvent = EventNotNeeded
var eventWarning string
removeApp := false

// update correct allocation tracker
if alloc.IsPlaceholder() {
// make sure we account for the placeholders being removed in the tracking data
Expand All @@ -1540,34 +1545,38 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term
}
// as and when every ph gets removed (for replacement), resource usage would be reduced.
// When real allocation happens as part of replacement, usage would be increased again with real alloc resource
removeApp := false
sa.allocatedPlaceholder = resources.Sub(sa.allocatedPlaceholder, alloc.GetAllocatedResource())
if resources.IsZero(sa.allocatedPlaceholder) {
removeApp = true
}
sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
// if all the placeholders are replaced, clear the placeholder timer
if resources.IsZero(sa.allocatedPlaceholder) {
log.Logger().Debug("step 3")
sa.clearPlaceholderTimer()
if (sa.IsCompleting() && sa.stateTimer == nil) || sa.IsFailing() || sa.IsResuming() || sa.hasZeroAllocations() {
removeApp = true
log.Logger().Debug("step 7")
event = CompleteApplication
if sa.IsFailing() {
event = FailApplication
}
if sa.IsResuming() {
log.Logger().Debug("step 6")
event = RunApplication
removeApp = false
}
eventWarning = "Application state not changed while removing a placeholder allocation"
}
}
sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
} else {
sa.decUserResourceUsage(alloc.GetAllocatedResource(), false)
log.Logger().Debug("step 4")
sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource())
// When the resource trackers are zero we should not expect anything to come in later.
if sa.hasZeroAllocations() {
log.Logger().Debug("step 5")
removeApp = true
event = CompleteApplication
eventWarning = "Application state not changed to Waiting while removing an allocation"
}
sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
}
if event != EventNotNeeded {
if err := sa.HandleApplicationEvent(event); err != nil {
Expand Down Expand Up @@ -1601,6 +1610,7 @@ func (sa *Application) RemoveAllAllocations() []*Allocation {
sa.allocations = make(map[string]*Allocation)
// When the resource trackers are zero we should not expect anything to come in later.
if resources.IsZero(sa.pending) {
sa.decUserResourceUsage(resources.Add(sa.allocatedResource, sa.allocatedPlaceholder), true)
if err := sa.HandleApplicationEvent(CompleteApplication); err != nil {
log.Logger().Warn("Application state not changed to Waiting while removing all allocations",
zap.String("currentState", sa.CurrentState()),
Expand Down
29 changes: 18 additions & 11 deletions pkg/scheduler/objects/application_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,24 +152,31 @@ func NewAppState() *fsm.FSM {
},
fmt.Sprintf("enter_%s", Resuming.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.decUserResourceUsage(app.allocatedPlaceholder, true)
//app.decUserResourceUsage(app.allocatedPlaceholder, true)
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
},
fmt.Sprintf("enter_%s", Completing.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.setStateTimer(completingTimeout, app.stateMachine.Current(), CompleteApplication)
log.Logger().Debug("step 1", zap.Int("ss", len(app.placeholderData)))
// log.Logger().Debug("step 2", zap.String("ss", resources.Zero)
//if app.placeholderData == nil {
// Usage needs to be tracked (decreased) only for the app which crossed starting state.
starting := false
for _, stateLog := range app.stateLog {
if stateLog.ApplicationState == Starting.String() {
starting = true
break
/*
starting := false
for _, stateLog := range app.stateLog {
if stateLog.ApplicationState == Starting.String() {
starting = true
break
}
}
}
if starting {
app.decUserResourceUsage(app.allocatedResource, true)
}
if starting {
app.decUserResourceUsage(app.allocatedResource, true)
}*/
//} else {
//app.decUserResourceUsage(app.allocatedPlaceholder, true)
//}
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
},
Expand Down Expand Up @@ -199,7 +206,7 @@ func NewAppState() *fsm.FSM {
},
fmt.Sprintf("enter_%s", Failing.String()): func(event *fsm.Event) {
app := event.Args[0].(*Application) //nolint:errcheck
app.decUserResourceUsage(app.allocatedPlaceholder, true)
//app.decUserResourceUsage(app.allocatedPlaceholder, true)
metrics.GetQueueMetrics(app.queuePath).DecQueueApplicationsRunning()
metrics.GetQueueMetrics(app.queuePath).IncQueueApplicationsFailed()
metrics.GetSchedulerMetrics().DecTotalApplicationsRunning()
Expand Down
16 changes: 13 additions & 3 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,8 @@ func TestStateChangeOnPlaceholderAdd(t *testing.T) {
released = app.RemoveAllocationAsk(askID)
assert.Equal(t, released, 0, "allocation ask should not have been reserved")
assert.Assert(t, app.IsCompleting(), "Application should have stayed same, changed unexpectedly: %s", app.CurrentState())
assertUserGroupNilResourceWithError(t)
//assertUserGroupNilResourceWithError(t)
//assertUserGroupResource(t, res)

log := app.GetStateLog()
assert.Equal(t, len(log), 2, "wrong number of app events")
Expand All @@ -822,6 +823,10 @@ func TestStateChangeOnPlaceholderAdd(t *testing.T) {
}

func TestAllocations(t *testing.T) {
ugm := ugm.GetUserManager()
ugm.ClearUserTrackers()
ugm.ClearGroupTrackers()

app := newApplication(appID1, "default", "root.a")

// nothing allocated
Expand Down Expand Up @@ -1202,7 +1207,7 @@ func TestReplaceAllocation(t *testing.T) {
if _, ok := app.allocations["not-added"]; ok {
t.Fatalf("real allocation added which shouldn't have been added")
}
assertUserGroupResource(t, resources.Multiply(res, 1))
assertUserGroupResource(t, resources.Multiply(res, 2))
}

func TestTimeoutPlaceholderSoftStyle(t *testing.T) {
Expand Down Expand Up @@ -1267,7 +1272,7 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin
assert.Equal(t, app.placeholderData[tg1].TimedOut, app.placeholderData[tg1].Count, "When the app is in an accepted state, timeout should equal to count")
assert.NilError(t, err, "Placeholder timeout cleanup did not trigger unexpectedly")
assert.Equal(t, app.stateMachine.Current(), expectedState, "Application did not progress into expected state")
assertUserGroupNilResourceWithError(t)
assertUserGroupResource(t, resources.Multiply(res, 2))

events := testHandler.GetEvents()
var found int
Expand All @@ -1291,6 +1296,7 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin
assert.Assert(t, resources.IsZero(app.GetPendingResource()), "pending placeholder resources should be zero")
// a released placeholder still holds the resource until release confirmed by the RM
assert.Assert(t, resources.Equals(app.GetPlaceholderResource(), resources.Multiply(res, 2)), "Unexpected placeholder resources for the app")
assertUserGroupResource(t, resources.Multiply(res, 2))

log := app.GetStateLog()
assert.Equal(t, len(log), 2, "wrong number of app events")
Expand All @@ -1299,6 +1305,10 @@ func runTimeoutPlaceholderTest(t *testing.T, expectedState string, gangSchedulin
}

func TestTimeoutPlaceholderAllocReleased(t *testing.T) {
ugm := ugm.GetUserManager()
ugm.ClearUserTrackers()
ugm.ClearGroupTrackers()

originalPhTimeout := defaultPlaceholderTimeout
defaultPlaceholderTimeout = 5 * time.Millisecond
defer func() { defaultPlaceholderTimeout = originalPhTimeout }()
Expand Down

0 comments on commit 88daab9

Please sign in to comment.