Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YUNIKORN-2151 Report resource used by placeholder pods in the app su… #715

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 45 additions & 24 deletions pkg/scheduler/objects/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@
user security.UserGroup // owner of the application
allocatedResource *resources.Resource // total allocated resources

usedResource *resources.TrackedResource // keep track of resource usage of the application
preemptedResource *resources.TrackedResource // keep track of preempted resource usage of the application
usedResource *resources.TrackedResource // keep track of resource usage of the application
preemptedResource *resources.TrackedResource // keep track of preempted resource usage of the application
placeholderResource *resources.TrackedResource // keep track of placeholder resource usage of the application

maxAllocatedResource *resources.Resource // max allocated resources
allocatedPlaceholder *resources.Resource // total allocated placeholder resources
Expand Down Expand Up @@ -118,16 +119,17 @@
}

type ApplicationSummary struct {
ApplicationID string
SubmissionTime time.Time
StartTime time.Time
FinishTime time.Time
User string
Queue string
State string
RmID string
ResourceUsage *resources.TrackedResource
PreemptedResource *resources.TrackedResource
ApplicationID string
SubmissionTime time.Time
StartTime time.Time
FinishTime time.Time
User string
Queue string
State string
RmID string
ResourceUsage *resources.TrackedResource
PreemptedResource *resources.TrackedResource
PlaceholderResource *resources.TrackedResource
}

func (as *ApplicationSummary) DoLogging() {
Expand All @@ -142,26 +144,29 @@
zap.String("rmID", as.RmID),
zap.Any("resourceUsage", as.ResourceUsage.TrackedResourceMap),
zap.Any("preemptedResource", as.PreemptedResource.TrackedResourceMap),
zap.Any("placeHolderResource", as.PlaceholderResource.TrackedResourceMap),
)
}

func (sa *Application) GetApplicationSummary(rmID string) *ApplicationSummary {
sa.RLock()
defer sa.RUnlock()
state := sa.stateMachine.Current()
ru := sa.usedResource.Clone()
pu := sa.preemptedResource.Clone()
resourceUsage := sa.usedResource.Clone()
preemptedUsage := sa.preemptedResource.Clone()
placeHolderUsage := sa.placeholderResource.Clone()
appSummary := &ApplicationSummary{
ApplicationID: sa.ApplicationID,
SubmissionTime: sa.SubmissionTime,
StartTime: sa.startTime,
FinishTime: sa.finishedTime,
User: sa.user.User,
Queue: sa.queuePath,
State: state,
RmID: rmID,
ResourceUsage: ru,
PreemptedResource: pu,
ApplicationID: sa.ApplicationID,
SubmissionTime: sa.SubmissionTime,
StartTime: sa.startTime,
FinishTime: sa.finishedTime,
User: sa.user.User,
Queue: sa.queuePath,
State: state,
RmID: rmID,
ResourceUsage: resourceUsage,
PreemptedResource: preemptedUsage,
PlaceholderResource: placeHolderUsage,
}
return appSummary
}
Expand All @@ -177,6 +182,7 @@
allocatedResource: resources.NewResource(),
usedResource: resources.NewTrackedResource(),
preemptedResource: resources.NewTrackedResource(),
placeholderResource: resources.NewTrackedResource(),
maxAllocatedResource: resources.NewResource(),
allocatedPlaceholder: resources.NewResource(),
requests: make(map[string]*AllocationAsk),
Expand Down Expand Up @@ -1684,6 +1690,8 @@
func (sa *Application) trackCompletedResource(info *Allocation) {
if info.IsPreempted() {
sa.updatePreemptedResource(info)
} else if info.IsPlaceholder() {
sa.updatePlaceholderResource(info)
} else {
sa.updateUsedResource(info)
}
Expand All @@ -1696,6 +1704,13 @@
info.GetAllocatedResource(), info.GetBindTime())
}

// When the placeholder allocated with this allocation is to be removed,
// have the placeholderResource to aggregate the resource used by this allocation
func (sa *Application) updatePlaceholderResource(info *Allocation) {
sa.placeholderResource.AggregateTrackedResource(info.GetInstanceType(),
info.GetAllocatedResource(), info.GetBindTime())
}

// When the resource allocated with this allocation is to be preempted,
// have the preemptedResource to aggregate the resource used by this allocation
func (sa *Application) updatePreemptedResource(info *Allocation) {
Expand Down Expand Up @@ -1791,6 +1806,9 @@
eventWarning = "Application state not changed while removing a placeholder allocation"
}
}
// Aggregate the resources used by this alloc to the application's resource tracker
sa.trackCompletedResource(alloc)

sa.decUserResourceUsage(alloc.GetAllocatedResource(), removeApp)
} else {
sa.allocatedResource = resources.Sub(sa.allocatedResource, alloc.GetAllocatedResource())
Expand Down Expand Up @@ -2033,6 +2051,7 @@

func (sa *Application) cleanupTrackedResource() {
sa.usedResource = nil
sa.placeholderResource = nil
sa.preemptedResource = nil
}

Expand All @@ -2049,6 +2068,8 @@
appSummary := sa.GetApplicationSummary(rmID)
appSummary.DoLogging()
appSummary.ResourceUsage = nil
appSummary.PreemptedResource = nil
appSummary.PlaceholderResource = nil

Check warning on line 2072 in pkg/scheduler/objects/application.go

View check run for this annotation

Codecov / codecov/patch

pkg/scheduler/objects/application.go#L2071-L2072

Added lines #L2071 - L2072 were not covered by tests
}

func (sa *Application) HasPlaceholderAllocation() bool {
Expand Down
37 changes: 26 additions & 11 deletions pkg/scheduler/objects/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,13 @@ func assertResourceUsage(t *testing.T, appSummary *ApplicationSummary, memorySec
assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
}

func assertPlaceHolderResource(t *testing.T, appSummary *ApplicationSummary, memorySeconds int64,
vcoresSecconds int64) {
detailedResource := appSummary.PlaceholderResource.TrackedResourceMap[instType1]
assert.Equal(t, memorySeconds, detailedResource["memory"])
assert.Equal(t, vcoresSecconds, detailedResource["vcores"])
}

func TestResourceUsageAggregation(t *testing.T) {
setupUGM()

Expand All @@ -1082,7 +1089,10 @@ func TestResourceUsageAggregation(t *testing.T) {
assert.NilError(t, err, "failed to create resource with error")
alloc := newAllocation(appID1, "uuid-1", nodeID1, res)
alloc.SetInstanceType(instType1)
// Mock the time to be 3 seconds before
alloc.SetBindTime(time.Now().Add(-3 * time.Second))
app.AddAllocation(alloc)

if !resources.Equals(app.allocatedResource, res) {
t.Errorf("allocated resources is not updated correctly: %v", app.allocatedResource)
}
Expand All @@ -1094,20 +1104,19 @@ func TestResourceUsageAggregation(t *testing.T) {
err = app.HandleApplicationEvent(RunApplication)
assert.NilError(t, err, "no error expected new to accepted (completed test)")

time.Sleep(3 * time.Second)

appSummary := app.GetApplicationSummary("default")
appSummary.DoLogging()
assertResourceUsage(t, appSummary, 0, 0)

// add more allocations to test the removals
alloc = newAllocation(appID1, "uuid-2", nodeID1, res)
alloc.SetInstanceType(instType1)

// Mock the time to be 3 seconds before
alloc.SetBindTime(time.Now().Add(-3 * time.Second))
app.AddAllocation(alloc)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))

time.Sleep(3 * time.Second)

// remove one of the 2
if alloc = app.RemoveAllocation("uuid-2", si.TerminationType_UNKNOWN_TERMINATION_TYPE); alloc == nil {
t.Error("returned allocations was nil allocation was not removed")
Expand All @@ -1125,8 +1134,6 @@ func TestResourceUsageAggregation(t *testing.T) {
assert.Equal(t, len(allocs), 2)
assertUserGroupResource(t, getTestUserGroup(), resources.Multiply(res, 2))

time.Sleep(3 * time.Second)

appSummary = app.GetApplicationSummary("default")
appSummary.DoLogging()
assertResourceUsage(t, appSummary, 300, 30)
Expand All @@ -1136,8 +1143,6 @@ func TestResourceUsageAggregation(t *testing.T) {
t.Errorf("returned allocations was not allocation was incorrectly removed: %v", alloc)
}

time.Sleep(3 * time.Second)

// remove all left over allocations
if allocs = app.RemoveAllAllocations(); allocs == nil || len(allocs) != 2 {
t.Errorf("returned number of allocations was incorrect: %v", allocs)
Expand All @@ -1151,7 +1156,7 @@ func TestResourceUsageAggregation(t *testing.T) {

appSummary = app.GetApplicationSummary("default")
appSummary.DoLogging()
assertResourceUsage(t, appSummary, 2100, 210)
assertResourceUsage(t, appSummary, 600, 60)
}

func TestRejected(t *testing.T) {
Expand Down Expand Up @@ -1332,6 +1337,9 @@ func TestReplaceAllocationTracking(t *testing.T) {
ph1 := newPlaceholderAlloc(appID1, "uuid-1", nodeID1, res)
ph2 := newPlaceholderAlloc(appID1, "uuid-2", nodeID1, res)
ph3 := newPlaceholderAlloc(appID1, "uuid-3", nodeID1, res)
ph1.SetInstanceType(instType1)
ph2.SetInstanceType(instType1)
ph3.SetInstanceType(instType1)
app.AddAllocation(ph1)
assert.NilError(t, err, "could not add ask")
app.addPlaceholderDataWithLocking(ph1.GetAsk())
Expand All @@ -1343,6 +1351,10 @@ func TestReplaceAllocationTracking(t *testing.T) {
assert.NilError(t, err, "could not add ask")
app.addPlaceholderDataWithLocking(ph3.GetAsk())

ph1.SetBindTime(time.Now().Add(-10 * time.Second))
ph2.SetBindTime(time.Now().Add(-10 * time.Second))
ph3.SetBindTime(time.Now().Add(-10 * time.Second))

// replace placeholders
realAlloc1 := newAllocation(appID1, "uuid-100", nodeID1, res)
realAlloc1.SetResult(Replaced)
Expand All @@ -1365,6 +1377,10 @@ func TestReplaceAllocationTracking(t *testing.T) {
app.RemoveAllocation("uuid-3", si.TerminationType_PLACEHOLDER_REPLACED)
assert.Equal(t, "uuid-3", alloc.uuid)
assert.Equal(t, false, app.HasPlaceholderAllocation())

// check placeholder resource usage
appSummary := app.GetApplicationSummary("default")
assertPlaceHolderResource(t, appSummary, 3000, 300)
}

func TestTimeoutPlaceholderSoftStyle(t *testing.T) {
Expand Down Expand Up @@ -1704,8 +1720,7 @@ func TestFinishedTime(t *testing.T) {
assert.Assert(t, app.finishedTime.IsZero())
assert.Assert(t, app.FinishedTime().IsZero())

// sleep 1 second to make finished time bigger than zero
time.Sleep(1 * time.Second)
// Don't need sleep here, anytime finished, we will set finishedTime for now
app.UnSetQueue()
assert.Assert(t, !app.finishedTime.IsZero())
assert.Assert(t, !app.FinishedTime().IsZero())
Expand Down