Skip to content

Commit

Permalink
Plumb through unschedulable
Browse files Browse the repository at this point in the history
  • Loading branch information
severinson committed May 12, 2023
1 parent e5047e7 commit a5986b4
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 134 deletions.
2 changes: 1 addition & 1 deletion internal/scheduler/nodedb/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ func (nodeDb *NodeDb) UpsertWithTxn(txn *memdb.Txn, node *schedulerobjects.Node)
// Add a special taint to unschedulable nodes before inserting.
// Adding a corresponding toleration to evicted pods ensures they can be re-scheduled.
// To prevent scheduling new pods onto cordoned nodes, only evicted pods should have this toleration.
if node.IsUnschedulable() {
if node.Unschedulable {
node.Taints = append(node.Taints, UnschedulableTaint())
}

Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,7 +861,7 @@ func defaultPostEvictFunc(ctx context.Context, job interfaces.LegacySchedulerJob
}

// Add a toleration to allow the job to be re-scheduled even if node is unschedulable.
if node.IsUnschedulable() {
if node.Unschedulable {
req.Tolerations = append(req.Tolerations, nodedb.UnschedulableToleration())
}

Expand Down
42 changes: 42 additions & 0 deletions internal/scheduler/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func TestPreemptingQueueScheduler(t *testing.T) {
// For each queue, indices of jobs to unbind before scheduling, to, simulate jobs terminating.
// E.g., IndicesToUnbind["A"][0] is the indices of jobs declared for queue A in round 0.
IndicesToUnbind map[string]map[int][]int
// Indices of nodes that should be cordoned before scheduling.
NodeIndicesToCordon []int
}
tests := map[string]struct {
SchedulingConfig configuration.SchedulingConfig
Expand Down Expand Up @@ -1139,6 +1141,36 @@ func TestPreemptingQueueScheduler(t *testing.T) {
"B": 1,
},
},
"Cordoning prevents scheduling new jobs but not re-scheduling running jobs": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.TestNCpuNode(1, testfixtures.TestPriorities),
Rounds: []SchedulingRound{
{
JobsByQueue: map[string][]*jobdb.Job{
"A": testfixtures.NSmallCpuJob("A", testfixtures.PriorityClass1, 1),
},
ExpectedScheduledIndices: map[string][]int{
"A": testfixtures.IntRange(0, 0),
},
},
{
JobsByQueue: map[string][]*jobdb.Job{
"B": testfixtures.NSmallCpuJob("B", testfixtures.PriorityClass1, 1),
},
NodeIndicesToCordon: []int{0},
},
{
JobsByQueue: map[string][]*jobdb.Job{
"B": testfixtures.NSmallCpuJob("B", testfixtures.PriorityClass1, 1),
},
},
{}, // Empty round to make sure nothing changes.
},
PriorityFactorByQueue: map[string]float64{
"A": 1,
"B": 1,
},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -1200,6 +1232,16 @@ func TestPreemptingQueueScheduler(t *testing.T) {
}
}

// Cordon nodes.
for _, j := range round.NodeIndicesToCordon {
node, err := nodeDb.GetNode(tc.Nodes[j].Id)
require.NoError(t, err)
node = node.DeepCopy()
node.Unschedulable = true
err = nodeDb.Upsert(node)
require.NoError(t, err)
}

// If not provided, set total resources equal to the aggregate over tc.Nodes.
if tc.TotalResources.Resources == nil {
tc.TotalResources = nodeDb.TotalResources()
Expand Down
6 changes: 1 addition & 5 deletions internal/scheduler/schedulerobjects/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ import (
armadamaps "github.com/armadaproject/armada/internal/common/maps"
)

func (node *Node) IsUnschedulable() bool {
// TODO: Use unschedulable flag on the job once added.
return false
}

func (node *Node) AvailableQuantityByPriorityAndResource(priority int32, resourceType string) resource.Quantity {
return AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource).Get(priority, resourceType)
}
Expand All @@ -40,6 +35,7 @@ func (node *Node) DeepCopy() *Node {
AllocatedByQueue: armadamaps.DeepCopy(node.AllocatedByQueue),
EvictedJobRunIds: maps.Clone(node.EvictedJobRunIds),
NonArmadaAllocatedResources: armadamaps.DeepCopy(node.NonArmadaAllocatedResources),
Unschedulable: node.Unschedulable,
}
}

Expand Down

0 comments on commit a5986b4

Please sign in to comment.