From 1acfdb7a5e2b1d9d69ba88ce76f0783f91914cd4 Mon Sep 17 00:00:00 2001 From: qzhu Date: Tue, 14 Nov 2023 21:33:47 +0800 Subject: [PATCH 1/6] YUNIKORN-2151 Report resource used by placeholder pods in the app summary --- pkg/scheduler/objects/application.go | 66 ++++++++++++++--------- pkg/scheduler/objects/application_test.go | 17 ++++++ 2 files changed, 59 insertions(+), 24 deletions(-) diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index da3ca668a..247559c15 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -87,8 +87,9 @@ type Application struct { user security.UserGroup // owner of the application allocatedResource *resources.Resource // total allocated resources - usedResource *resources.TrackedResource // keep track of resource usage of the application - preemptedResource *resources.TrackedResource // keep track of preempted resource usage of the application + usedResource *resources.TrackedResource // keep track of resource usage of the application + preemptedResource *resources.TrackedResource // keep track of preempted resource usage of the application + placeholderResource *resources.TrackedResource // keep track of placeholder resource usage of the application maxAllocatedResource *resources.Resource // max allocated resources allocatedPlaceholder *resources.Resource // total allocated placeholder resources @@ -118,16 +119,17 @@ type Application struct { } type ApplicationSummary struct { - ApplicationID string - SubmissionTime time.Time - StartTime time.Time - FinishTime time.Time - User string - Queue string - State string - RmID string - ResourceUsage *resources.TrackedResource - PreemptedResource *resources.TrackedResource + ApplicationID string + SubmissionTime time.Time + StartTime time.Time + FinishTime time.Time + User string + Queue string + State string + RmID string + ResourceUsage *resources.TrackedResource + PreemptedResource *resources.TrackedResource + PlaceHolderResource *resources.TrackedResource } func (as *ApplicationSummary) DoLogging() { @@ -149,19 +151,21 @@ func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary { sa.RLock() defer sa.RUnlock() state := sa.stateMachine.Current() - ru := sa.usedResource.Clone() - pu := sa.preemptedResource.Clone() + resourceUsage := sa.usedResource.Clone() + preemptedUsage := sa.preemptedResource.Clone() + placeHolderUsage := sa.placeholderResource.Clone() appSummary := &ApplicationSummary{ - ApplicationID: sa.ApplicationID, - SubmissionTime: sa.SubmissionTime, - StartTime: sa.startTime, - FinishTime: sa.finishedTime, - User: sa.user.User, - Queue: sa.queuePath, - State: state, - RmID: rmID, - ResourceUsage: ru, - PreemptedResource: pu, + ApplicationID: sa.ApplicationID, + SubmissionTime: sa.SubmissionTime, + StartTime: sa.startTime, + FinishTime: sa.finishedTime, + User: sa.user.User, + Queue: sa.queuePath, + State: state, + RmID: rmID, + ResourceUsage: resourceUsage, + PreemptedResource: preemptedUsage, + PlaceHolderResource: placeHolderUsage, } return appSummary } @@ -177,6 +181,7 @@ func NewApplication(siApp *si.AddApplicationRequest, ugi security.UserGroup, eve allocatedResource: resources.NewResource(), usedResource: resources.NewTrackedResource(), preemptedResource: resources.NewTrackedResource(), + placeholderResource: resources.NewTrackedResource(), maxAllocatedResource: resources.NewResource(), allocatedPlaceholder: resources.NewResource(), requests: make(map[string]*AllocationAsk), @@ -1684,6 +1689,8 @@ func (sa *Application) decUserResourceUsage(resource *resources.Resource, remove func (sa *Application) trackCompletedResource(info *Allocation) { if info.IsPreempted() { sa.updatePreemptedResource(info) + } else if info.IsPlaceholder() { + sa.updatePlaceholderResource(info) } else { sa.updateUsedResource(info) } @@ -1696,6 +1703,13 @@ func (sa *Application) updateUsedResource(info *Allocation) { info.GetAllocatedResource(), info.GetBindTime()) } +// When the placeholder allocated with this allocation is to be removed, +// have the placeholderResource to aggregate the resource used by this allocation +func (sa *Application) updatePlaceholderResource(info *Allocation) { + sa.placeholderResource.AggregateTrackedResource(info.GetInstanceType(), + info.GetAllocatedResource(), info.GetBindTime()) +} + // When the resource allocated with this allocation is to be preempted, // have the preemptedResource to aggregate the resource used by this allocation func (sa *Application) updatePreemptedResource(info *Allocation) { @@ -1791,6 +1805,9 @@ func (sa *Application) removeAllocationInternal(uuid string, releaseType si.Term eventWarning = "Application state not changed while removing a placeholder allocation" } } + // Aggregate the resources used by this alloc to the application's resource tracker + sa.trackCompletedResource(alloc) + sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp) } else { sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource()) @@ -2033,6 +2050,7 @@ func (sa *Application) cleanupAsks() { func (sa *Application) cleanupTrackedResource() { sa.usedResource = nil + sa.placeholderResource = nil sa.preemptedResource = nil } diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 2dae255f6..45903fc76 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -1067,6 +1067,13 @@ func assertResourceUsage(t *testing.T, appSummary *ApplicationSummary, memorySec assert.Equal(t, vcoresSecconds, detailedResource["vcores"]) } +func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, memorySeconds int64, + vcoresSecconds int64) { + detailedResource := appSummary.PlaceHolderResource.TrackedResourceMap[instType1] + assert.Equal(t, memorySeconds, detailedResource["memory"]) + assert.Equal(t, vcoresSecconds, detailedResource["vcores"]) +} + func TestResourceUsageAggregation(t *testing.T) { setupUGM() @@ -1332,6 +1339,9 @@ func TestReplaceAllocationTracking(t *testing.T) { ph1 := newPlaceholderAlloc(appID1, "uuid-1", nodeID1, res) ph2 := newPlaceholderAlloc(appID1, "uuid-2", nodeID1, res) ph3 := newPlaceholderAlloc(appID1, "uuid-3", nodeID1, res) + ph1.SetInstanceType(instType1) + ph2.SetInstanceType(instType1) + ph3.SetInstanceType(instType1) app.AddAllocation(ph1) assert.NilError(t, err, "could not add ask") app.addPlaceholderDataWithLocking(ph1.GetAsk()) @@ -1343,6 +1353,9 @@ func TestReplaceAllocationTracking(t *testing.T) { assert.NilError(t, err, "could not add ask") app.addPlaceholderDataWithLocking(ph3.GetAsk()) + // set time to 1 second, in order to test placeholder resource usage + time.Sleep(1 * time.Second) + // replace placeholders realAlloc1 := newAllocation(appID1, "uuid-100", nodeID1, res) realAlloc1.SetResult(Replaced) @@ -1365,6 +1378,10 @@ func TestReplaceAllocationTracking(t *testing.T) { app.RemoveAllocation("uuid-3", si.TerminationType_PLACEHOLDER_REPLACED) assert.Equal(t, "uuid-3", alloc.uuid) assert.Equal(t, false, app.HasPlaceholderAllocation()) + + // check placeholder resource usage + appSummary := app.GetApplicationSummary("default") + assertPlaceHolderResource(t, appSummary, 300, 30) } func TestTimeoutPlaceholderSoftStyle(t *testing.T) { From 4f486b02cf5e2eb662d463e1f596003430c0a27e Mon Sep 17 00:00:00 2001 From: qzhu Date: Wed, 15 Nov 2023 15:24:14 +0800 Subject: [PATCH 2/6] Add clean logic --- pkg/scheduler/objects/application.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index 247559c15..dc6de77a9 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -144,6 +144,7 @@ func (as *ApplicationSummary) DoLogging() { zap.String("rmID", as.RmID), zap.Any("resourceUsage", as.ResourceUsage.TrackedResourceMap), zap.Any("preemptedResource", as.PreemptedResource.TrackedResourceMap), + zap.Any("placeHolderResource", as.PlaceHolderResource.TrackedResourceMap), ) } @@ -2067,6 +2068,8 @@ func (sa *Application) LogAppSummary(rmID string) { appSummary := sa.GetApplicationSummary(rmID) appSummary.DoLogging() appSummary.ResourceUsage = nil + appSummary.PreemptedResource = nil + appSummary.PlaceHolderResource = nil } func (sa *Application) HasPlaceholderAllocation() bool { From 7ea83f85bb4f9ce203f74430e6d33d483eb3c37f Mon Sep 17 00:00:00 2001 From: qzhu Date: Wed, 15 Nov 2023 17:41:58 +0800 Subject: [PATCH 3/6] Address new comments --- pkg/scheduler/objects/application.go | 8 +++--- pkg/scheduler/objects/application_test.go | 31 +++++++++++++++-------- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/pkg/scheduler/objects/application.go b/pkg/scheduler/objects/application.go index dc6de77a9..3e27613df 100644 --- a/pkg/scheduler/objects/application.go +++ b/pkg/scheduler/objects/application.go @@ -129,7 +129,7 @@ type ApplicationSummary struct { RmID string ResourceUsage *resources.TrackedResource PreemptedResource *resources.TrackedResource - PlaceHolderResource *resources.TrackedResource + PlaceholderResource *resources.TrackedResource } func (as *ApplicationSummary) DoLogging() { @@ -144,7 +144,7 @@ func (as *ApplicationSummary) DoLogging() { zap.String("rmID", as.RmID), zap.Any("resourceUsage", as.ResourceUsage.TrackedResourceMap), zap.Any("preemptedResource", as.PreemptedResource.TrackedResourceMap), - zap.Any("placeHolderResource", as.PlaceHolderResource.TrackedResourceMap), + zap.Any("placeHolderResource", as.PlaceholderResource.TrackedResourceMap), ) } @@ -166,7 +166,7 @@ func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary { RmID: rmID, ResourceUsage: resourceUsage, PreemptedResource: preemptedUsage, - PlaceHolderResource: placeHolderUsage, + PlaceholderResource: placeHolderUsage, } return appSummary } @@ -2069,7 +2069,7 @@ func (sa *Application) LogAppSummary(rmID string) { appSummary.DoLogging() appSummary.ResourceUsage = nil appSummary.PreemptedResource = nil - appSummary.PlaceHolderResource = nil + appSummary.PlaceholderResource = nil } func (sa *Application) HasPlaceholderAllocation() bool { diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 45903fc76..89f76173a 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -1069,7 +1069,7 @@ func assertResourceUsage(t *testing.T, appSummary *ApplicationSummary, memorySec func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, memorySeconds int64, vcoresSecconds int64) { - detailedResource := appSummary.PlaceHolderResource.TrackedResourceMap[instType1] + detailedResource := appSummary.PlaceholderResource.TrackedResourceMap[instType1] assert.Equal(t, memorySeconds, detailedResource["memory"]) assert.Equal(t, vcoresSecconds, detailedResource["vcores"]) } @@ -1090,6 +1090,7 @@ func TestResourceUsageAggregation(t *testing.T) { alloc := newAllocation(appID1, "uuid-1", nodeID1, res) alloc.SetInstanceType(instType1) app.AddAllocation(alloc) + if !resources.Equals(app.allocatedResource, res) { t.Errorf("allocated resources is not updated correctly: %v", app.allocatedResource) } @@ -1101,8 +1102,6 @@ func TestResourceUsageAggregation(t *testing.T) { err = app.HandleApplicationEvent(RunApplication) assert.NilError(t, err, "no error expected new to accepted (completed test)") - time.Sleep(3 * time.Second) - appSummary := app.GetApplicationSummary("default") appSummary.DoLogging() assertResourceUsage(t, appSummary, 0, 0) @@ -1113,7 +1112,10 @@ func TestResourceUsageAggregation(t *testing.T) { app.AddAllocation(alloc) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) - time.Sleep(3 * time.Second) + // Mock the time to be 3 seconds before + start := time.Now() + start = start.Add(-3 * time.Second) + alloc.SetBindTime(start) // remove one of the 2 if alloc = app.RemoveAllocation("uuid-2", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc == nil { @@ -1132,19 +1134,25 @@ func TestResourceUsageAggregation(t *testing.T) { assert.Equal(t, len(allocs), 2) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) - time.Sleep(3 * time.Second) + // Mock the time to be 12 seconds before + start = time.Now() + start = start.Add(-12 * time.Second) + alloc.SetBindTime(start) appSummary = app.GetApplicationSummary("default") appSummary.DoLogging() assertResourceUsage(t, appSummary, 300, 30) + // Mock the time to be 18 seconds before + start = time.Now() + start = start.Add(-18 * time.Second) + alloc.SetBindTime(start) + // try to remove a non existing alloc if alloc = app.RemoveAllocation("does-not-exist", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc != nil { t.Errorf("returned allocations was not allocation was incorrectly removed: %v", alloc) } - time.Sleep(3 * time.Second) - // remove all left over allocations if allocs = app.RemoveAllAllocations(); allocs == nil || len(allocs) != 2 { t.Errorf("returned number of allocations was incorrect: %v", allocs) @@ -1353,8 +1361,11 @@ func TestReplaceAllocationTracking(t *testing.T) { assert.NilError(t, err, "could not add ask") app.addPlaceholderDataWithLocking(ph3.GetAsk()) - // set time to 1 second, in order to test placeholder resource usage - time.Sleep(1 * time.Second) + start := time.Now() + start = start.Add(-10 * time.Second) + ph1.SetBindTime(start) + ph2.SetBindTime(start) + ph3.SetBindTime(start) // replace placeholders realAlloc1 := newAllocation(appID1, "uuid-100", nodeID1, res) @@ -1381,7 +1392,7 @@ func TestReplaceAllocationTracking(t *testing.T) { // check placeholder resource usage appSummary := app.GetApplicationSummary("default") - assertPlaceHolderResource(t, appSummary, 300, 30) + assertPlaceHolderResource(t, appSummary, 3000, 300) } func TestTimeoutPlaceholderSoftStyle(t *testing.T) { From 7d8dfd25f95e121dd95d89e35560254e77957bdb Mon Sep 17 00:00:00 2001 From: qzhu Date: Wed, 15 Nov 2023 18:30:03 +0800 Subject: [PATCH 4/6] Make time gap more accurate --- pkg/scheduler/objects/application_test.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 89f76173a..27c8b0f59 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -1089,6 +1089,10 @@ func TestResourceUsageAggregation(t *testing.T) { assert.NilError(t, err, "failed to create resource with error") alloc := newAllocation(appID1, "uuid-1", nodeID1, res) alloc.SetInstanceType(instType1) + // Mock the time to be 3 seconds before + start := time.Now() + start = start.Add(-3 * time.Second) + alloc.SetBindTime(start) app.AddAllocation(alloc) if !resources.Equals(app.allocatedResource, res) { @@ -1109,13 +1113,13 @@ func TestResourceUsageAggregation(t *testing.T) { // add more allocations to test the removals alloc = newAllocation(appID1, "uuid-2", nodeID1, res) alloc.SetInstanceType(instType1) - app.AddAllocation(alloc) - assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) // Mock the time to be 3 seconds before - start := time.Now() + start = time.Now() start = start.Add(-3 * time.Second) alloc.SetBindTime(start) + app.AddAllocation(alloc) + assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) // remove one of the 2 if alloc = app.RemoveAllocation("uuid-2", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc == nil { @@ -1134,20 +1138,10 @@ func TestResourceUsageAggregation(t *testing.T) { assert.Equal(t, len(allocs), 2) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) - // Mock the time to be 12 seconds before - start = time.Now() - start = start.Add(-12 * time.Second) - alloc.SetBindTime(start) - appSummary = app.GetApplicationSummary("default") appSummary.DoLogging() assertResourceUsage(t, appSummary, 300, 30) - // Mock the time to be 18 seconds before - start = time.Now() - start = start.Add(-18 * time.Second) - alloc.SetBindTime(start) - // try to remove a non existing alloc if alloc = app.RemoveAllocation("does-not-exist", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc != nil { t.Errorf("returned allocations was not allocation was incorrectly removed: %v", alloc) @@ -1166,7 +1160,7 @@ func TestResourceUsageAggregation(t *testing.T) { appSummary = app.GetApplicationSummary("default") appSummary.DoLogging() - assertResourceUsage(t, appSummary, 2100, 210) + assertResourceUsage(t, appSummary, 600, 60) } func TestRejected(t *testing.T) { From 63fa20c29bafc469d269eb5e572dc18d1233629f Mon Sep 17 00:00:00 2001 From: qzhu Date: Thu, 16 Nov 2023 09:17:24 +0800 Subject: [PATCH 5/6] Remove more sleep --- pkg/scheduler/objects/application_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 27c8b0f59..31eccd409 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -1726,8 +1726,7 @@ func TestFinishedTime(t *testing.T) { assert.Assert(t, app.finishedTime.IsZero()) assert.Assert(t, app.FinishedTime().IsZero()) - // sleep 1 second to make finished time bigger than zero - time.Sleep(1 * time.Second) + // Don't need sleep here, anytime finished, we will set finishedTime for now app.UnSetQueue() assert.Assert(t, !app.finishedTime.IsZero()) assert.Assert(t, !app.FinishedTime().IsZero()) From 64429706d65235fe3313fcace5232ad24dba7625 Mon Sep 17 00:00:00 2001 From: qzhu Date: Thu, 16 Nov 2023 13:40:43 +0800 Subject: [PATCH 6/6] Nit remove start value --- pkg/scheduler/objects/application_test.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/pkg/scheduler/objects/application_test.go b/pkg/scheduler/objects/application_test.go index 31eccd409..d52b86cd9 100644 --- a/pkg/scheduler/objects/application_test.go +++ b/pkg/scheduler/objects/application_test.go @@ -1090,9 +1090,7 @@ func TestResourceUsageAggregation(t *testing.T) { alloc := newAllocation(appID1, "uuid-1", nodeID1, res) alloc.SetInstanceType(instType1) // Mock the time to be 3 seconds before - start := time.Now() - start = start.Add(-3 * time.Second) - alloc.SetBindTime(start) + alloc.SetBindTime(time.Now().Add(-3 * time.Second)) app.AddAllocation(alloc) if !resources.Equals(app.allocatedResource, res) { @@ -1115,9 +1113,7 @@ func TestResourceUsageAggregation(t *testing.T) { alloc.SetInstanceType(instType1) // Mock the time to be 3 seconds before - start = time.Now() - start = start.Add(-3 * time.Second) - alloc.SetBindTime(start) + alloc.SetBindTime(time.Now().Add(-3 * time.Second)) app.AddAllocation(alloc) assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2)) @@ -1355,11 +1351,9 @@ func TestReplaceAllocationTracking(t *testing.T) { assert.NilError(t, err, "could not add ask") app.addPlaceholderDataWithLocking(ph3.GetAsk()) - start := time.Now() - start = start.Add(-10 * time.Second) - ph1.SetBindTime(start) - ph2.SetBindTime(start) - ph3.SetBindTime(start) + ph1.SetBindTime(time.Now().Add(-10 * time.Second)) + ph2.SetBindTime(time.Now().Add(-10 * time.Second)) + ph3.SetBindTime(time.Now().Add(-10 * time.Second)) // replace placeholders realAlloc1 := newAllocation(appID1, "uuid-100", nodeID1, res)