diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index da3ca668a..3e27613df 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -87,8 +87,9 @@ type Application struct { user security.UserGroup // owner of the application allocatedResource *resources.Resource // total allocated resources - usedResource *resources.TrackedResource // keep track of resource usage of the application - preemptedResource *resources.TrackedResource // keep track of preempted resource usage of the application + usedResource *resources.TrackedResource // keep track of resource usage of the application + preemptedResource *resources.TrackedResource // keep track of preempted resource usage of the application + placeholderResource *resources.TrackedResource // keep track of placeholder resource usage of the application maxAllocatedResource *resources.Resource // max allocated resources allocatedPlaceholder *resources.Resource // total allocated placeholder resources @@ -118,16 +119,17 @@ type Application struct { } type ApplicationSummary struct { - ApplicationID string - SubmissionTime time.Time - StartTime time.Time - FinishTime time.Time - User string - Queue string - State string - RmID string - ResourceUsage *resources.TrackedResource - PreemptedResource *resources.TrackedResource + ApplicationID string + SubmissionTime time.Time + StartTime time.Time + FinishTime time.Time + User string + Queue string + State string + RmID string + ResourceUsage *resources.TrackedResource + PreemptedResource *resources.TrackedResource + PlaceholderResource *resources.TrackedResource } func (as *ApplicationSummary) DoLogging() { @@ -142,6 +144,7 @@ func (as *ApplicationSummary) DoLogging() { zap.String("rmID", as.RmID), zap.Any("resourceUsage", as.ResourceUsage.TrackedResourceMap), zap.Any("preemptedResource", as.PreemptedResource.TrackedResourceMap), + zap.Any("placeHolderResource", as.PlaceholderResource.TrackedResourceMap), ) } @@ -149,19 +152,21 @@ func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary { sa.RLock() defer sa.RUnlock() state := sa.stateMachine.Current() - ru := sa.usedResource.Clone() - pu := sa.preemptedResource.Clone() + resourceUsage := sa.usedResource.Clone() + preemptedUsage := sa.preemptedResource.Clone() + placeHolderUsage := sa.placeholderResource.Clone() appSummary := &ApplicationSummary{ - ApplicationID: sa.ApplicationID, - SubmissionTime: sa.SubmissionTime, - StartTime: sa.startTime, - FinishTime: sa.finishedTime, - User: sa.user.User, - Queue: sa.queuePath, - State: state, - RmID: rmID, - ResourceUsage: ru, - PreemptedResource: pu, + ApplicationID: sa.ApplicationID, + SubmissionTime: sa.SubmissionTime, + StartTime: sa.startTime, + FinishTime: sa.finishedTime, + User: sa.user.User, + Queue: sa.queuePath, + State: state, + RmID: rmID, + ResourceUsage: resourceUsage, + PreemptedResource: preemptedUsage, + PlaceholderResource: placeHolderUsage, } return appSummary } @@ -177,6 +182,7 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve allocatedResource: resources.NewResource(), usedResource: resources.NewTrackedResource(), preemptedResource: resources.NewTrackedResource(), + placeholderResource: resources.NewTrackedResource(), maxAllocatedResource: resources.NewResource(), allocatedPlaceholder: resources.NewResource(), requests: make(map[string]*AllocationAsk), @@ -1684,6 +1690,8 @@ func (sa *Application) decUserResourceUsage(resource *resources.Resource, remove func (sa *Application) trackCompletedResource(info *Allocation) { if info.IsPreempted() { sa.updatePreemptedResource(info) + } else if info.IsPlaceholder() { + sa.updatePlaceholderResource(info) } else { sa.updateUsedResource(info) } @@ -1696,6 +1704,13 @@ func (sa *Application) updateUsedResource(info *Allocation) { info.GetAllocatedResource(), info.GetBindTime()) } +// When the placeholder allocated with this allocation is to be removed, +// have the placeholderResource to aggregate the resource used by this allocation +func (sa *Application) updatePlaceholderResource(info *Allocation) { + sa.placeholderResource.AggregateTrackedResource(info.GetInstanceType(), + info.GetAllocatedResource(), info.GetBindTime()) +} + // When the resource allocated with this allocation is to be preempted, // have the preemptedResource to aggregate the resource used by this allocation func (sa *Application) updatePreemptedResource(info *Allocation) { @@ -1791,6 +1806,9 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term eventWarning = "Application state not changed while removing a placeholder allocation" } } + // Aggregate the resources used by this alloc to the application's resource tracker + sa.trackCompletedResource(alloc) + sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp) } else { sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource()) @@ -2033,6 +2051,7 @@ func (sa *Application) cleanupAsks() { func (sa *Application) cleanupTrackedResource() { sa.usedResource = nil + sa.placeholderResource = nil sa.preemptedResource = nil } @@ -2049,6 +2068,8 @@ func (sa *Application) LogAppSummary(rmID string) { appSummary := sa.GetApplicationSummary(rmID) appSummary.DoLogging() appSummary.ResourceUsage = nil + appSummary.PreemptedResource = nil + appSummary.PlaceholderResource = nil } func (sa *Application) HasPlaceholderAllocation() bool { diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 2dae255f6..d52b86cd9 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -1067,6 +1067,13 @@ func assertResourceUsage(t *testing.T, appSummary *ApplicationSummary, memorySec assert.Equal(t, vcoresSecconds, detailedResource["vcores"]) } +func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, memorySeconds int64, + vcoresSecconds int64) { + detailedResource := appSummary.PlaceholderResource.TrackedResourceMap[instType1] + assert.Equal(t, memorySeconds, detailedResource["memory"]) + assert.Equal(t, vcoresSecconds, detailedResource["vcores"]) +} + func TestResourceUsageAggregation(t *testing.T) { setupUGM() @@ -1082,7 +1089,10 @@ func TestResourceUsageAggregation(t *testing.T) { assert.NilError(t, err, "failed to create resource with error") alloc := newAllocation(appID1, "uuid-1", nodeID1, res) alloc.SetInstanceType(instType1) + // Mock the time to be 3 seconds before + alloc.SetBindTime(time.Now().Add(-3 * time.Second)) app.AddAllocation(alloc) + if !resources.Equals(app.allocatedResource, res) { t.Errorf("allocated resources is not updated correctly: %v", app.allocatedResource) } @@ -1094,8 +1104,6 @@ func TestResourceUsageAggregation(t *testing.T) { err = app.HandleApplicationEvent(RunApplication) assert.NilError(t, err, "no error expected new to accepted (completed test)") - time.Sleep(3 * time.Second) - appSummary := app.GetApplicationSummary("default") appSummary.DoLogging() assertResourceUsage(t, appSummary, 0, 0) @@ -1103,11 +1111,12 @@ func TestResourceUsageAggregation(t *testing.T) { // add more allocations to test the removals alloc = newAllocation(appID1, "uuid-2", nodeID1, res) alloc.SetInstanceType(instType1) + + // Mock the time to be 3 seconds before + alloc.SetBindTime(time.Now().Add(-3 * time.Second)) app.AddAllocation(alloc) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) - time.Sleep(3 * time.Second) - // remove one of the 2 if alloc = app.RemoveAllocation("uuid-2", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc == nil { t.Error("returned allocations was nil allocation was not removed") @@ -1125,8 +1134,6 @@ func TestResourceUsageAggregation(t *testing.T) { assert.Equal(t, len(allocs), 2) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) - time.Sleep(3 * time.Second) - appSummary = app.GetApplicationSummary("default") appSummary.DoLogging() assertResourceUsage(t, appSummary, 300, 30) @@ -1136,8 +1143,6 @@ func TestResourceUsageAggregation(t *testing.T) { t.Errorf("returned allocations was not allocation was incorrectly removed: %v", alloc) } - time.Sleep(3 * time.Second) - // remove all left over allocations if allocs = app.RemoveAllAllocations(); allocs == nil || len(allocs) != 2 { t.Errorf("returned number of allocations was incorrect: %v", allocs) @@ -1151,7 +1156,7 @@ func TestResourceUsageAggregation(t *testing.T) { appSummary = app.GetApplicationSummary("default") appSummary.DoLogging() - assertResourceUsage(t, appSummary, 2100, 210) + assertResourceUsage(t, appSummary, 600, 60) } func TestRejected(t *testing.T) { @@ -1332,6 +1337,9 @@ func TestReplaceAllocationTracking(t *testing.T) { ph1 := newPlaceholderAlloc(appID1, "uuid-1", nodeID1, res) ph2 := newPlaceholderAlloc(appID1, "uuid-2", nodeID1, res) ph3 := newPlaceholderAlloc(appID1, "uuid-3", nodeID1, res) + ph1.SetInstanceType(instType1) + ph2.SetInstanceType(instType1) + ph3.SetInstanceType(instType1) app.AddAllocation(ph1) assert.NilError(t, err, "could not add ask") app.addPlaceholderDataWithLocking(ph1.GetAsk()) @@ -1343,6 +1351,10 @@ func TestReplaceAllocationTracking(t *testing.T) { assert.NilError(t, err, "could not add ask") app.addPlaceholderDataWithLocking(ph3.GetAsk()) + ph1.SetBindTime(time.Now().Add(-10 * time.Second)) + ph2.SetBindTime(time.Now().Add(-10 * time.Second)) + ph3.SetBindTime(time.Now().Add(-10 * time.Second)) + // replace placeholders realAlloc1 := newAllocation(appID1, "uuid-100", nodeID1, res) realAlloc1.SetResult(Replaced) @@ -1365,6 +1377,10 @@ func TestReplaceAllocationTracking(t *testing.T) { app.RemoveAllocation("uuid-3", si.TerminationType_PLACEHOLDER_REPLACED) assert.Equal(t, "uuid-3", alloc.uuid) assert.Equal(t, false, app.HasPlaceholderAllocation()) + + // check placeholder resource usage + appSummary := app.GetApplicationSummary("default") + assertPlaceHolderResource(t, appSummary, 3000, 300) } func TestTimeoutPlaceholderSoftStyle(t *testing.T) { @@ -1704,8 +1720,7 @@ func TestFinishedTime(t *testing.T) { assert.Assert(t, app.finishedTime.IsZero()) assert.Assert(t, app.FinishedTime().IsZero()) - // sleep 1 second to make finished time bigger than zero - time.Sleep(1 * time.Second) + // Don't need sleep here, anytime finished, we will set finishedTime for now app.UnSetQueue() assert.Assert(t, !app.finishedTime.IsZero()) assert.Assert(t, !app.FinishedTime().IsZero())