Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre-emption Based On Adjusted Fair Share #3714

Merged
merged 11 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ scheduling:
disableScheduling: false
enableAssertions: false
protectedFractionOfFairShare: 1.0
useAdjustedFairShareProtection: true
nodeIdLabel: "kubernetes.io/hostname"
priorityClasses:
armada-default:
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ type SchedulingConfig struct {
EnableAssertions bool
// Only queues allocated more than this fraction of their fair share are considered for preemption.
ProtectedFractionOfFairShare float64 `validate:"gte=0"`
// Use Max(AdjustedFairShare, FairShare) for fair share protection. If false then FairShare will be used.
UseAdjustedFairShareProtection bool
// Armada adds a node selector term to every scheduled pod using this label with the node name as value.
// This to force kube-scheduler to schedule pods on the node chosen by Armada.
// For example, if NodeIdLabel is "kubernetes.io/hostname" and armada schedules a pod on node "myNode",
Expand Down
76 changes: 76 additions & 0 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (sctx *SchedulingContext) ClearUnfeasibleSchedulingKeys() {
func (sctx *SchedulingContext) AddQueueSchedulingContext(
queue string, weight float64,
initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string],
demand schedulerobjects.ResourceList,
limiter *rate.Limiter,
) error {
if _, ok := sctx.QueueSchedulingContexts[queue]; ok {
Expand Down Expand Up @@ -137,6 +138,7 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext(
Weight: weight,
Limiter: limiter,
Allocated: allocated,
Demand: demand,
AllocatedByPriorityClass: initialAllocatedByPriorityClass,
ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
Expand Down Expand Up @@ -167,6 +169,73 @@ func (sctx *SchedulingContext) TotalCost() float64 {
return rv
}

// UpdateFairShares updates FairShare and AdjustedFairShare for every QueueSchedulingContext associated with the
// SchedulingContext. This works by calculating a far share as queue_weight/sum_of_all_queue_weights and an
// AdjustedFairShare by resharing any unused capacity (as determined by a queue's demand)
func (sctx *SchedulingContext) UpdateFairShares() {
const maxIterations = 5

type queueInfo struct {
queueName string
adjustedShare float64
fairShare float64
weight float64
cappedShare float64
}

queueInfos := make([]*queueInfo, 0, len(sctx.QueueSchedulingContexts))
for queueName, qctx := range sctx.QueueSchedulingContexts {
cappedShare := 1.0
if !sctx.TotalResources.IsZero() {
cappedShare = sctx.FairnessCostProvider.CostFromAllocationAndWeight(qctx.Demand, qctx.Weight) * qctx.Weight
}
queueInfos = append(queueInfos, &queueInfo{
queueName: queueName,
adjustedShare: 0,
fairShare: qctx.Weight / sctx.WeightSum,
weight: qctx.Weight,
cappedShare: cappedShare,
})
}

// We do this so that we get deterministic output
slices.SortFunc(queueInfos, func(a, b *queueInfo) int {
return strings.Compare(a.queueName, b.queueName)
})

unallocated := 1.0 // this is the proportion of the cluster that we can share each time

// We will reshare unused capacity until we've reshared 99% of all capacity or we've completed 5 iteration
for i := 0; i < maxIterations && unallocated > 0.01; i++ {
totalWeight := 0.0
for _, q := range queueInfos {
totalWeight += q.weight
}

for _, q := range queueInfos {
if q.weight > 0 {
share := (q.weight / totalWeight) * unallocated
q.adjustedShare += share
}
}
unallocated = 0.0
for _, q := range queueInfos {
excessShare := q.adjustedShare - q.cappedShare
if excessShare > 0 {
q.adjustedShare = q.cappedShare
q.weight = 0.0
unallocated += excessShare
}
}
}

for _, q := range queueInfos {
qtx := sctx.QueueSchedulingContexts[q.queueName]
qtx.FairShare = q.fairShare
qtx.AdjustedFairShare = q.adjustedShare
}
}

func (sctx *SchedulingContext) ReportString(verbosity int32) string {
var sb strings.Builder
w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0)
Expand Down Expand Up @@ -343,6 +412,13 @@ type QueueSchedulingContext struct {
// Total resources assigned to the queue across all clusters by priority class priority.
// Includes jobs scheduled during this invocation of the scheduler.
Allocated schedulerobjects.ResourceList
// Total demand from this queue. This is essentially the cumulative resources of all non-terminal jobs at the
// start of the scheduling cycle
Demand schedulerobjects.ResourceList
// Fair share is the weight of this queue over the sum of the weights of all queues
FairShare float64
// AdjustedFairShare modifies fair share such that queues that have a demand cost less than their fair share, have their fair share reallocated.
AdjustedFairShare float64
// Total resources assigned to the queue across all clusters by priority class.
// Includes jobs scheduled during this invocation of the scheduler.
AllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
Expand Down
156 changes: 155 additions & 1 deletion internal/scheduler/context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestSchedulingContextAccounting(t *testing.T) {
},
}
for _, queue := range []string{"A", "B"} {
err := sctx.AddQueueSchedulingContext(queue, priorityFactorByQueue[queue], allocatedByQueueAndPriorityClass[queue], nil)
err := sctx.AddQueueSchedulingContext(queue, priorityFactorByQueue[queue], allocatedByQueueAndPriorityClass[queue], schedulerobjects.ResourceList{}, nil)
require.NoError(t, err)
}

Expand Down Expand Up @@ -114,3 +114,157 @@ func TestJobSchedulingContext_SetAssignedNodeId(t *testing.T) {
assert.Len(t, jctx.AdditionalNodeSelectors, 1)
assert.Equal(t, map[string]string{configuration.NodeIdLabel: "node1"}, jctx.AdditionalNodeSelectors)
}

func TestCalculateFairShares(t *testing.T) {
zeroCpu := schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{"cpu": resource.MustParse("0")},
}
oneCpu := schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")},
}
fortyCpu := schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{"cpu": resource.MustParse("40")},
}
oneHundredCpu := schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{"cpu": resource.MustParse("100")},
}
oneThousandCpu := schedulerobjects.ResourceList{
Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1000")},
}
tests := map[string]struct {
availableResources schedulerobjects.ResourceList
queueCtxs map[string]*QueueSchedulingContext
expectedFairShares map[string]float64
expectedAdjustedFairShares map[string]float64
}{
"one queue, demand exceeds capacity": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneThousandCpu},
},
expectedFairShares: map[string]float64{"queueA": 1.0},
expectedAdjustedFairShares: map[string]float64{"queueA": 1.0},
},
"one queue, demand less than capacity": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneCpu},
},
expectedFairShares: map[string]float64{"queueA": 1.0},
expectedAdjustedFairShares: map[string]float64{"queueA": 0.01},
},
"two queues, equal weights, demand exceeds capacity": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneThousandCpu},
"queueB": {Weight: 1.0, Demand: oneThousandCpu},
},
expectedFairShares: map[string]float64{"queueA": 0.5, "queueB": 0.5},
expectedAdjustedFairShares: map[string]float64{"queueA": 0.5, "queueB": 0.5},
},
"two queues, equal weights, demand less than capacity for both queues": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneCpu},
"queueB": {Weight: 1.0, Demand: oneCpu},
},
expectedFairShares: map[string]float64{"queueA": 0.5, "queueB": 0.5},
expectedAdjustedFairShares: map[string]float64{"queueA": 0.01, "queueB": 0.01},
},
"two queues, equal weights, demand less than capacity for one queue": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneCpu},
"queueB": {Weight: 1.0, Demand: oneThousandCpu},
},
expectedFairShares: map[string]float64{"queueA": 0.5, "queueB": 0.5},
expectedAdjustedFairShares: map[string]float64{"queueA": 0.01, "queueB": 0.99},
},
"two queues, non equal weights, demand exceeds capacity for both queues": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneThousandCpu},
"queueB": {Weight: 3.0, Demand: oneThousandCpu},
},
expectedFairShares: map[string]float64{"queueA": 0.25, "queueB": 0.75},
expectedAdjustedFairShares: map[string]float64{"queueA": 0.25, "queueB": 0.75},
},
"two queues, non equal weights, demand exceeds capacity for higher priority queue only": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneCpu},
"queueB": {Weight: 3.0, Demand: oneThousandCpu},
},
expectedFairShares: map[string]float64{"queueA": 0.25, "queueB": 0.75},
expectedAdjustedFairShares: map[string]float64{"queueA": 0.01, "queueB": 0.99},
},
"two queues, non equal weights, demand exceeds capacity for lower priority queue only": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneThousandCpu},
"queueB": {Weight: 3.0, Demand: oneCpu},
},
expectedFairShares: map[string]float64{"queueA": 0.25, "queueB": 0.75},
expectedAdjustedFairShares: map[string]float64{"queueA": 0.99, "queueB": 0.01},
},
"three queues, equal weights. Adjusted fair share requires multiple iterations": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneCpu},
"queueB": {Weight: 1.0, Demand: fortyCpu},
"queueC": {Weight: 1.0, Demand: oneThousandCpu},
},
expectedFairShares: map[string]float64{"queueA": 1.0 / 3, "queueB": 1.0 / 3, "queueC": 1.0 / 3},
expectedAdjustedFairShares: map[string]float64{"queueA": 0.01, "queueB": 0.4, "queueC": 0.59},
},
"No demand": {
availableResources: oneHundredCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: zeroCpu},
"queueB": {Weight: 1.0, Demand: zeroCpu},
"queueC": {Weight: 1.0, Demand: zeroCpu},
},
expectedFairShares: map[string]float64{"queueA": 1.0 / 3, "queueB": 1.0 / 3, "queueC": 1.0 / 3},
expectedAdjustedFairShares: map[string]float64{"queueA": 0.0, "queueB": 0.0, "queueC": 0.0},
},
"No capacity": {
availableResources: zeroCpu,
queueCtxs: map[string]*QueueSchedulingContext{
"queueA": {Weight: 1.0, Demand: oneCpu},
"queueB": {Weight: 1.0, Demand: oneCpu},
"queueC": {Weight: 1.0, Demand: oneCpu},
},
expectedFairShares: map[string]float64{"queueA": 1.0 / 3, "queueB": 1.0 / 3, "queueC": 1.0 / 3},
expectedAdjustedFairShares: map[string]float64{"queueA": 1.0 / 3, "queueB": 1.0 / 3, "queueC": 1.0 / 3},
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
fairnessCostProvider, err := fairness.NewDominantResourceFairness(tc.availableResources, []string{"cpu"})
require.NoError(t, err)
sctx := NewSchedulingContext(
"executor",
"pool",
testfixtures.TestPriorityClasses,
testfixtures.TestDefaultPriorityClass,
fairnessCostProvider,
nil,
tc.availableResources,
)
for qName, q := range tc.queueCtxs {
err = sctx.AddQueueSchedulingContext(
qName, q.Weight, schedulerobjects.QuantityByTAndResourceType[string]{}, q.Demand, nil)
require.NoError(t, err)
}
sctx.UpdateFairShares()
for qName, qctx := range sctx.QueueSchedulingContexts {
expectedFairShare, ok := tc.expectedFairShares[qName]
require.True(t, ok, "Expected fair share for queue %s not found", qName)
expectedAdjustedFairShare, ok := tc.expectedAdjustedFairShares[qName]
require.True(t, ok, "Expected adjusted fair share for queue %s not found", qName)
assert.Equal(t, expectedFairShare, qctx.FairShare, "Fair share for queue %s", qName)
assert.Equal(t, expectedAdjustedFairShare, qctx.AdjustedFairShare, "Adjusted Fair share for queue %s", qName)
}
})
}
}
1 change: 1 addition & 0 deletions internal/scheduler/gang_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,7 @@ func TestGangScheduler(t *testing.T) {
queue,
priorityFactor,
nil,
schedulerobjects.NewResourceList(0),
rate.NewLimiter(
rate.Limit(tc.SchedulingConfig.MaximumPerQueueSchedulingRate),
tc.SchedulingConfig.MaximumPerQueueSchedulingBurst,
Expand Down
35 changes: 21 additions & 14 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scheduler

import (
"fmt"
"math"
"reflect"
"time"

Expand All @@ -26,11 +27,12 @@ import (
// PreemptingQueueScheduler is a scheduler that makes a unified decisions on which jobs to preempt and schedule.
// Uses QueueScheduler as a building block.
type PreemptingQueueScheduler struct {
schedulingContext *schedulercontext.SchedulingContext
constraints schedulerconstraints.SchedulingConstraints
protectedFractionOfFairShare float64
jobRepo JobRepository
nodeDb *nodedb.NodeDb
schedulingContext *schedulercontext.SchedulingContext
constraints schedulerconstraints.SchedulingConstraints
protectedFractionOfFairShare float64
useAdjustedFairShareProtection bool
jobRepo JobRepository
nodeDb *nodedb.NodeDb
// Maps job ids to the id of the node the job is associated with.
// For scheduled or running jobs, that is the node the job is assigned to.
// For preempted jobs, that is the node the job was preempted from.
Expand All @@ -49,6 +51,7 @@ func NewPreemptingQueueScheduler(
sctx *schedulercontext.SchedulingContext,
constraints schedulerconstraints.SchedulingConstraints,
protectedFractionOfFairShare float64,
useAdjustedFairShareProtection bool,
jobRepo JobRepository,
nodeDb *nodedb.NodeDb,
initialNodeIdByJobId map[string]string,
Expand All @@ -69,14 +72,15 @@ func NewPreemptingQueueScheduler(
initialJobIdsByGangId[gangId] = maps.Clone(jobIds)
}
return &PreemptingQueueScheduler{
schedulingContext: sctx,
constraints: constraints,
protectedFractionOfFairShare: protectedFractionOfFairShare,
jobRepo: jobRepo,
nodeDb: nodeDb,
nodeIdByJobId: maps.Clone(initialNodeIdByJobId),
jobIdsByGangId: initialJobIdsByGangId,
gangIdByJobId: maps.Clone(initialGangIdByJobId),
schedulingContext: sctx,
constraints: constraints,
protectedFractionOfFairShare: protectedFractionOfFairShare,
useAdjustedFairShareProtection: useAdjustedFairShareProtection,
jobRepo: jobRepo,
nodeDb: nodeDb,
nodeIdByJobId: maps.Clone(initialNodeIdByJobId),
jobIdsByGangId: initialJobIdsByGangId,
gangIdByJobId: maps.Clone(initialGangIdByJobId),
}
}

Expand Down Expand Up @@ -127,8 +131,11 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
return false
}
if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.Queue()]; ok {
fairShare := qctx.Weight / sch.schedulingContext.WeightSum
actualShare := sch.schedulingContext.FairnessCostProvider.CostFromQueue(qctx) / totalCost
fairShare := qctx.FairShare
if sch.useAdjustedFairShareProtection {
fairShare = math.Max(qctx.AdjustedFairShare, fairShare)
}
fractionOfFairShare := actualShare / fairShare
if fractionOfFairShare <= sch.protectedFractionOfFairShare {
return false
Expand Down
Loading