Skip to content

Commit

Permalink
Add EvictJobsFromNode
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq committed Jun 28, 2023
1 parent b3ac7c4 commit bd5a56f
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 67 deletions.
85 changes: 57 additions & 28 deletions internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,54 +544,83 @@ func bindJobToNodeInPlace(priorityClasses map[string]configuration.PriorityClass
return nil
}

// EvictJobFromNode returns a copy of node with job evicted from it. Specifically:
// EvictJobsFromNode returns a copy of node with all elements of jobs for which jobFilter returns
// true evicted from it, together with a slice containing exactly those jobs.
//
// - The job is marked as evicted on the node.
// - AllocatedByJobId and AllocatedByQueue are not updated.
// - Resources requested by the evicted pod are marked as allocated at priority evictedPriority.
func EvictJobFromNode(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) {
jobId := job.GetId()
queue := job.GetQueue()
requests := job.GetResourceRequirements().Requests

// Specifically:
//
// - The jobs that jobFilter returns true for are marked as evicted on the node.
// - Within AllocatableByPriorityAndResource, the resources allocated to these jobs are moved from
// the jobs' priorities to evictedPriority; they are not subtracted from AllocatedByJobId and
// AllocatedByQueue.
func EvictJobsFromNode(
priorityClasses map[string]configuration.PriorityClass,
jobFilter func(interfaces.LegacySchedulerJob) bool,
jobs []interfaces.LegacySchedulerJob,
node *schedulerobjects.Node,
) ([]interfaces.LegacySchedulerJob, *schedulerobjects.Node, error) {
evicted := make([]interfaces.LegacySchedulerJob, 0)
node = node.DeepCopy()

// Ensure we track allocated resources at evictedPriority.
if _, ok := node.AllocatableByPriorityAndResource[evictedPriority]; !ok {
pMin := int32(math.MaxInt32)
ok := false
for p := range node.AllocatableByPriorityAndResource {
if p < pMin {
pMin = p
ok = true
}
if jobFilter == nil {
return evicted, node, nil
}
for _, job := range jobs {
if !jobFilter(job) {
continue
}
if ok {
node.AllocatableByPriorityAndResource[evictedPriority] = node.AllocatableByPriorityAndResource[pMin].DeepCopy()
evicted = append(evicted, job)
if err := evictJobFromNodeInPlace(priorityClasses, job, node); err != nil {
return nil, nil, err
}
}
return evicted, node, nil
}

// evictJobFromNodeInPlace is the in-place operation backing EvictJobsFromNode.
func evictJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) error {
jobId := job.GetId()
if _, ok := node.AllocatedByJobId[jobId]; !ok {
return nil, errors.Errorf("job %s has no resources allocated on node %s", jobId, node.Id)
return errors.Errorf("job %s has no resources allocated on node %s", jobId, node.Id)
}

queue := job.GetQueue()
if _, ok := node.AllocatedByQueue[queue]; !ok {
return nil, errors.Errorf("queue %s has no resources allocated on node %s", queue, node.Id)
return errors.Errorf("queue %s has no resources allocated on node %s", queue, node.Id)
}

if node.EvictedJobRunIds == nil {
node.EvictedJobRunIds = make(map[string]bool)
}
if _, ok := node.EvictedJobRunIds[jobId]; ok {
// TODO: We're using run ids instead of job ids for now.
return nil, errors.Errorf("job %s is already evicted from node %s", jobId, node.Id)
} else {
node.EvictedJobRunIds[jobId] = true
return errors.Errorf("job %s is already evicted from node %s", jobId, node.Id)
}
node.EvictedJobRunIds[jobId] = true

if _, ok := node.AllocatableByPriorityAndResource[evictedPriority]; !ok {
minimumPriority := int32(math.MaxInt32)
foundPriority := false
for p := range node.AllocatableByPriorityAndResource {
if p < minimumPriority {
minimumPriority = p
foundPriority = true
}
}
if foundPriority {
node.AllocatableByPriorityAndResource[evictedPriority] = node.AllocatableByPriorityAndResource[minimumPriority].DeepCopy()
} else {
// We do not expect to hit this branch; however, if we do, then we need
// to make sure that evictedPriority is in this map so that the call to
// MarkAllocatedV1ResourceList() below knows about it.
node.AllocatableByPriorityAndResource[evictedPriority] = schedulerobjects.ResourceList{}
}
}
allocatable := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource)
priority := priorityClasses[job.GetPriorityClassName()].Priority
requests := job.GetResourceRequirements().Requests
allocatable.MarkAllocatableV1ResourceList(priority, requests)
allocatable.MarkAllocatedV1ResourceList(evictedPriority, requests)
return node, nil

return nil
}

// UnbindJobsFromNode returns a node with all elements of jobs unbound from it.
Expand Down
28 changes: 25 additions & 3 deletions internal/scheduler/nodedb/nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) {
}

func TestNodeBindingEvictionUnbinding(t *testing.T) {
jobFilter := func(job interfaces.LegacySchedulerJob) bool { return true }
node := testfixtures.Test8GpuNode(append(testfixtures.TestPriorities, evictedPriority))
job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0)
request := schedulerobjects.ResourceListFromV1ResourceList(job.GetResourceRequirements().Requests)
Expand All @@ -130,16 +131,17 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) {
unboundMultipleNode, err := UnbindJobsFromNode(testfixtures.TestPriorityClasses, []interfaces.LegacySchedulerJob{job}, boundNode)
require.NoError(t, err)

evictedNode, err := EvictJobFromNode(testfixtures.TestPriorityClasses, job, boundNode)
evictedJobs, evictedNode, err := EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, boundNode)
require.NoError(t, err)
assert.Equal(t, []interfaces.LegacySchedulerJob{job}, evictedJobs)

evictedUnboundNode, err := UnbindJobFromNode(testfixtures.TestPriorityClasses, job, evictedNode)
require.NoError(t, err)

evictedBoundNode, err := BindJobToNode(testfixtures.TestPriorityClasses, job, evictedNode)
require.NoError(t, err)

_, err = EvictJobFromNode(testfixtures.TestPriorityClasses, job, node)
_, _, err = EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, node)
require.Error(t, err)

_, err = UnbindJobFromNode(testfixtures.TestPriorityClasses, job, node)
Expand All @@ -148,7 +150,7 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) {
_, err = BindJobToNode(testfixtures.TestPriorityClasses, job, boundNode)
require.Error(t, err)

_, err = EvictJobFromNode(testfixtures.TestPriorityClasses, job, evictedNode)
_, _, err = EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job}, evictedNode)
require.Error(t, err)

assertNodeAccountingEqual(t, node, unboundNode)
Expand Down Expand Up @@ -255,6 +257,26 @@ func assertNodeAccountingEqual(t *testing.T, node1, node2 *schedulerobjects.Node
return rv
}

func TestEviction(t *testing.T) {
jobFilter := func(job interfaces.LegacySchedulerJob) bool {
priorityClassName := job.GetPriorityClassName()
priorityClass := testfixtures.TestPriorityClasses[priorityClassName]
return priorityClass.Preemptible
}
node := testfixtures.Test32CpuNode(testfixtures.TestPriorities)
job0 := testfixtures.Test1CpuJob("queue-alice", testfixtures.PriorityClass0)
job3 := testfixtures.Test1CpuJob("queue-alice", testfixtures.PriorityClass3)
var err error
node, err = BindJobToNode(testfixtures.TestPriorityClasses, job0, node)
require.NoError(t, err)
node, err = BindJobToNode(testfixtures.TestPriorityClasses, job3, node)
require.NoError(t, err)

evictedJobs, _, err := EvictJobsFromNode(testfixtures.TestPriorityClasses, jobFilter, []interfaces.LegacySchedulerJob{job0, job3}, node)
require.NoError(t, err)
assert.Equal(t, []interfaces.LegacySchedulerJob{job0}, evictedJobs)
}

func TestScheduleIndividually(t *testing.T) {
tests := map[string]struct {
Nodes []*schedulerobjects.Node
Expand Down
53 changes: 27 additions & 26 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/armadaproject/armada/internal/armada/configuration"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/internal/common/util"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
Expand Down Expand Up @@ -791,43 +790,45 @@ func (evi *Evictor) Evict(ctx context.Context, it nodedb.NodeIterator) (*Evictor
evictedJobsById := make(map[string]interfaces.LegacySchedulerJob)
affectedNodesById := make(map[string]*schedulerobjects.Node)
nodeIdByJobId := make(map[string]string)
result := &EvictorResult{
EvictedJobsById: evictedJobsById,
AffectedNodesById: affectedNodesById,
NodeIdByJobId: nodeIdByJobId,
}
if evi.jobFilter == nil || evi.nodeFilter == nil {
return result, nil
}
jobFilter := func(job interfaces.LegacySchedulerJob) bool { return evi.jobFilter(ctx, job) }
for node := it.NextNode(); node != nil; node = it.NextNode() {
if evi.nodeFilter != nil && !evi.nodeFilter(ctx, node) {
if !evi.nodeFilter(ctx, node) {
continue
}
jobIds := util.Filter(
maps.Keys(node.AllocatedByJobId),
func(jobId string) bool {
_, ok := node.EvictedJobRunIds[jobId]
return !ok
},
)
jobIds := make([]string, 0)
for jobId := range node.AllocatedByJobId {
if _, ok := node.EvictedJobRunIds[jobId]; !ok {
jobIds = append(jobIds, jobId)
}
}
jobs, err := evi.jobRepo.GetExistingJobsByIds(jobIds)
if err != nil {
return nil, err
}
for _, job := range jobs {
if evi.jobFilter != nil && !evi.jobFilter(ctx, job) {
continue
}
node, err = nodedb.EvictJobFromNode(evi.priorityClasses, job, node)
if err != nil {
return nil, err
}
evictedJobs, node, err := nodedb.EvictJobsFromNode(evi.priorityClasses, jobFilter, jobs, node)
if err != nil {
return nil, err
}
for _, job := range evictedJobs {
evictedJobsById[job.GetId()] = job
nodeIdByJobId[job.GetId()] = node.Id
if evi.postEvictFunc != nil {
evi.postEvictFunc(ctx, job, node)
}

evictedJobsById[job.GetId()] = job
nodeIdByJobId[job.GetId()] = node.Id
}
affectedNodesById[node.Id] = node
if len(evictedJobs) > 0 {
affectedNodesById[node.Id] = node
}
}
return &EvictorResult{
EvictedJobsById: evictedJobsById,
AffectedNodesById: affectedNodesById,
NodeIdByJobId: nodeIdByJobId,
}, nil
return result, nil
}

// TODO: This is only necessary for jobs not scheduled in this cycle.
Expand Down
18 changes: 8 additions & 10 deletions internal/scheduler/scheduling_algo.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,18 +432,18 @@ func (repo *schedulerJobRepositoryAdapter) GetExistingJobsByIds(ids []string) ([
}

// constructNodeDb constructs a node db with all jobs bound to it.
func (l *FairSchedulingAlgo) constructNodeDb(priorityClasses map[string]configuration.PriorityClass, jobs []*jobdb.Job, originalNodes []*schedulerobjects.Node) (*nodedb.NodeDb, error) {
originalNodesByName := make(map[string]*schedulerobjects.Node, len(originalNodes))
for _, node := range originalNodes {
originalNodesByName[node.Name] = node
func (l *FairSchedulingAlgo) constructNodeDb(priorityClasses map[string]configuration.PriorityClass, jobs []*jobdb.Job, nodes []*schedulerobjects.Node) (*nodedb.NodeDb, error) {
nodesByName := make(map[string]*schedulerobjects.Node, len(nodes))
for _, node := range nodes {
nodesByName[node.Name] = node
}
jobsByNodeName := make(map[string][]*jobdb.Job)
for _, job := range jobs {
if job.InTerminalState() || !job.HasRuns() {
continue
}
nodeName := job.LatestRun().Node()
if _, ok := originalNodesByName[nodeName]; !ok {
if _, ok := nodesByName[nodeName]; !ok {
log.Warnf(
"job %s assigned to node %s on executor %s, but no such node found",
job.Id(), nodeName, job.LatestRun().Executor(),
Expand All @@ -452,14 +452,12 @@ func (l *FairSchedulingAlgo) constructNodeDb(priorityClasses map[string]configur
}
jobsByNodeName[nodeName] = append(jobsByNodeName[nodeName], job)
}
updatedNodes := make([]*schedulerobjects.Node, 0, len(originalNodes))
for nodeName, jobsOnNode := range jobsByNodeName {
node := originalNodesByName[nodeName]
node, err := nodedb.BindJobsToNode(priorityClasses, jobsOnNode, node)
node, err := nodedb.BindJobsToNode(priorityClasses, jobsOnNode, nodesByName[nodeName])
if err != nil {
return nil, err
}
updatedNodes = append(updatedNodes, node)
nodesByName[nodeName] = node
}
nodeDb, err := nodedb.NewNodeDb(
priorityClasses,
Expand All @@ -471,7 +469,7 @@ func (l *FairSchedulingAlgo) constructNodeDb(priorityClasses map[string]configur
if err != nil {
return nil, err
}
if err := nodeDb.UpsertMany(updatedNodes); err != nil {
if err := nodeDb.UpsertMany(maps.Values(nodesByName)); err != nil {
return nil, err
}
return nodeDb, nil
Expand Down

0 comments on commit bd5a56f

Please sign in to comment.