Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into zuqq/operate-on-jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
zuqq committed Jun 27, 2023
2 parents 682e6a4 + d6aef28 commit fa1523e
Show file tree
Hide file tree
Showing 12 changed files with 228 additions and 97 deletions.
1 change: 1 addition & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ scheduling:
preemption:
nodeEvictionProbability: 1.0
nodeOversubscriptionEvictionProbability: 1.0
protectedFractionOfFairShare: 1.0
setNodeIdSelector: true
nodeIdLabel: kubernetes.io/hostname
setNodeName: false
Expand Down
2 changes: 2 additions & 0 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ type PreemptionConfig struct {
// the probability of evicting jobs on oversubscribed nodes, i.e.,
// nodes on which the total resource requests are greater than the available resources.
NodeOversubscriptionEvictionProbability float64
// Only queues allocated more than this fraction of their fair share are considered for preemption.
ProtectedFractionOfFairShare float64
// If true, the Armada scheduler will add to scheduled pods a node selector
// NodeIdLabel: <value of label on node selected by scheduler>.
// If true, NodeIdLabel must be non-empty.
Expand Down
7 changes: 6 additions & 1 deletion internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,11 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
schedulerobjects.ResourceList{Resources: totalCapacity},
)
for queue, priorityFactor := range priorityFactorByQueue {
if err := sctx.AddQueueSchedulingContext(queue, priorityFactor, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
var weight float64 = 1
if priorityFactor > 0 {
weight = 1 / priorityFactor
}
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
return nil, err
}
}
Expand All @@ -482,6 +486,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
constraints,
q.schedulingConfig.Preemption.NodeEvictionProbability,
q.schedulingConfig.Preemption.NodeOversubscriptionEvictionProbability,
q.schedulingConfig.Preemption.ProtectedFractionOfFairShare,
&SchedulerJobRepositoryAdapter{
r: q.jobRepository,
},
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestResourceListAsWeightedMillis(t *testing.T) {
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
assert.Equal(t, tc.expected, ResourceListAsWeightedMillis(tc.weights, tc.rl))
assert.Equal(t, tc.expected, tc.rl.AsWeightedMillis(tc.weights))
})
}
}
Expand All @@ -149,6 +149,6 @@ func BenchmarkResourceListAsWeightedMillis(b *testing.B) {
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
ResourceListAsWeightedMillis(weights, rl)
rl.AsWeightedMillis(weights)
}
}
27 changes: 23 additions & 4 deletions internal/scheduler/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ type SchedulingContext struct {
ResourceScarcity map[string]float64
// Per-queue scheduling contexts.
QueueSchedulingContexts map[string]*QueueSchedulingContext
// Sum of weights across all queues.
WeightSum float64
// Total resources across all clusters available at the start of the scheduling cycle.
TotalResources schedulerobjects.ResourceList
// = TotalResources.AsWeightedMillis(ResourceScarcity).
TotalResourcesAsWeightedMillis int64
// Resources assigned across all queues during this scheduling cycle.
ScheduledResources schedulerobjects.ResourceList
ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]
Expand Down Expand Up @@ -80,6 +84,7 @@ func NewSchedulingContext(
ResourceScarcity: resourceScarcity,
QueueSchedulingContexts: make(map[string]*QueueSchedulingContext),
TotalResources: totalResources.DeepCopy(),
TotalResourcesAsWeightedMillis: totalResources.AsWeightedMillis(resourceScarcity),
ScheduledResources: schedulerobjects.NewResourceListWithDefaultSize(),
ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
Expand All @@ -106,7 +111,7 @@ func (sctx *SchedulingContext) ClearUnfeasibleSchedulingKeys() {
sctx.UnfeasibleSchedulingKeys = make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext)
}

func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, priorityFactor float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]) error {
func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, weight float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]) error {
if _, ok := sctx.QueueSchedulingContexts[queue]; ok {
return errors.WithStack(&armadaerrors.ErrInvalidArgument{
Name: "queue",
Expand All @@ -123,12 +128,13 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, priorityF
for _, rl := range initialAllocatedByPriorityClass {
allocated.Add(rl)
}
sctx.WeightSum += weight
qctx := &QueueSchedulingContext{
SchedulingContext: sctx,
Created: time.Now(),
ExecutorId: sctx.ExecutorId,
Queue: queue,
PriorityFactor: priorityFactor,
Weight: weight,
Allocated: allocated,
AllocatedByPriorityClass: initialAllocatedByPriorityClass,
ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]),
Expand Down Expand Up @@ -313,8 +319,8 @@ type QueueSchedulingContext struct {
ExecutorId string
// Queue name.
Queue string
// These factors influence the fraction of resources assigned to each queue.
PriorityFactor float64
// Determines the fair share of this queue relative to other queues.
Weight float64
// Total resources assigned to the queue across all clusters by priority class priority.
// Includes jobs scheduled during this invocation of the scheduler.
Allocated schedulerobjects.ResourceList
Expand Down Expand Up @@ -490,6 +496,19 @@ func (qctx *QueueSchedulingContext) ClearJobSpecs() {
}
}

// FractionOfFairShare returns a number in [0, 1] indicating what fraction of its fair share this queue is allocated.
func (qctx *QueueSchedulingContext) FractionOfFairShare() float64 {
return qctx.FractionOfFairShareWithAllocation(qctx.Allocated)
}

// FractionOfFairShareWithAllocation returns a number in [0, 1] indicating what
// fraction of its fair share this queue is allocated if the total allocation of this queue is given by allocated.
func (qctx *QueueSchedulingContext) FractionOfFairShareWithAllocation(allocated schedulerobjects.ResourceList) float64 {
fairShare := qctx.Weight / qctx.SchedulingContext.WeightSum
allocatedAsWeightedMillis := allocated.AsWeightedMillis(qctx.SchedulingContext.ResourceScarcity)
return (float64(allocatedAsWeightedMillis) / float64(qctx.SchedulingContext.TotalResourcesAsWeightedMillis)) / fairShare
}

type GangSchedulingContext struct {
Created time.Time
Queue string
Expand Down
74 changes: 32 additions & 42 deletions internal/scheduler/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type PreemptingQueueScheduler struct {
constraints schedulerconstraints.SchedulingConstraints
nodeEvictionProbability float64
nodeOversubscriptionEvictionProbability float64
protectedFractionOfFairShare float64
jobRepo JobRepository
nodeDb *nodedb.NodeDb
// Maps job ids to the id of the node the job is associated with.
Expand All @@ -53,6 +54,7 @@ func NewPreemptingQueueScheduler(
constraints schedulerconstraints.SchedulingConstraints,
nodeEvictionProbability float64,
nodeOversubscriptionEvictionProbability float64,
protectedFractionOfFairShare float64,
jobRepo JobRepository,
nodeDb *nodedb.NodeDb,
initialNodeIdByJobId map[string]string,
Expand All @@ -77,6 +79,7 @@ func NewPreemptingQueueScheduler(
constraints: constraints,
nodeEvictionProbability: nodeEvictionProbability,
nodeOversubscriptionEvictionProbability: nodeOversubscriptionEvictionProbability,
protectedFractionOfFairShare: protectedFractionOfFairShare,
jobRepo: jobRepo,
nodeDb: nodeDb,
nodeIdByJobId: maps.Clone(initialNodeIdByJobId),
Expand All @@ -99,7 +102,7 @@ func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() {
func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerResult, error) {
log := ctxlogrus.Extract(ctx)
log = log.WithField("service", "PreemptingQueueScheduler")
if ResourceListAsWeightedMillis(sch.schedulingContext.ResourceScarcity, sch.schedulingContext.TotalResources) == 0 {
if sch.schedulingContext.TotalResources.AsWeightedMillis(sch.schedulingContext.ResourceScarcity) == 0 {
// This refers to resources available across all clusters, i.e.,
// it may include resources not currently considered for scheduling.
log.Infof(
Expand All @@ -108,7 +111,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe
)
return &SchedulerResult{}, nil
}
if ResourceListAsWeightedMillis(sch.schedulingContext.ResourceScarcity, sch.nodeDb.TotalResources()) == 0 {
if rl := sch.nodeDb.TotalResources(); rl.AsWeightedMillis(sch.schedulingContext.ResourceScarcity) == 0 {
// This refers to the resources currently considered for scheduling.
log.Infof(
"no resources with non-zero weight available for scheduling in NodeDb: resource scarcity %v, total resources %v",
Expand Down Expand Up @@ -137,11 +140,31 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe
ctx,
log.WithField("stage", "evict for resource balancing"),
),
NewStochasticEvictor(
NewNodeEvictor(
sch.jobRepo,
sch.schedulingContext.PriorityClasses,
sch.schedulingContext.DefaultPriorityClass,
sch.nodeEvictionProbability,
func(ctx context.Context, job interfaces.LegacySchedulerJob) bool {
if job.GetAnnotations() == nil {
log := ctxlogrus.Extract(ctx)
log.Errorf("can't evict job %s: annotations not initialised", job.GetId())
return false
}
if job.GetNodeSelector() == nil {
log := ctxlogrus.Extract(ctx)
log.Errorf("can't evict job %s: nodeSelector not initialised", job.GetId())
return false
}
if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.GetQueue()]; ok {
if qctx.FractionOfFairShare() <= sch.protectedFractionOfFairShare {
return false
}
}
if priorityClass, ok := sch.schedulingContext.PriorityClasses[job.GetPriorityClassName()]; ok {
return priorityClass.Preemptible
}
return false
},
nil,
),
)
Expand Down Expand Up @@ -647,13 +670,11 @@ type EvictorResult struct {
NodeIdByJobId map[string]string
}

// NewStochasticEvictor returns a new evictor that for each node evicts
// all preemptible jobs from that node with probability perNodeEvictionProbability.
func NewStochasticEvictor(
func NewNodeEvictor(
jobRepo JobRepository,
priorityClasses map[string]configuration.PriorityClass,
defaultPriorityClass string,
perNodeEvictionProbability float64,
jobFilter func(context.Context, interfaces.LegacySchedulerJob) bool,
random *rand.Rand,
) *Evictor {
if perNodeEvictionProbability <= 0 {
Expand All @@ -662,44 +683,13 @@ func NewStochasticEvictor(
if random == nil {
random = rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
}
return NewPreemptibleEvictor(
jobRepo,
priorityClasses,
defaultPriorityClass,
func(_ context.Context, node *schedulerobjects.Node) bool {
return len(node.AllocatedByJobId) > 0 && random.Float64() < perNodeEvictionProbability
},
)
}

// NewPreemptibleEvictor returns a new evictor that evicts all preemptible jobs
// on nodes for which nodeFilter returns true.
func NewPreemptibleEvictor(
jobRepo JobRepository,
priorityClasses map[string]configuration.PriorityClass,
defaultPriorityClass string,
nodeFilter func(context.Context, *schedulerobjects.Node) bool,
) *Evictor {
return &Evictor{
jobRepo: jobRepo,
priorityClasses: priorityClasses,
nodeFilter: nodeFilter,
jobFilter: func(ctx context.Context, job interfaces.LegacySchedulerJob) bool {
if job.GetAnnotations() == nil {
log := ctxlogrus.Extract(ctx)
log.Warnf("can't evict job %s: annotations not initialised", job.GetId())
return false
}
priorityClassName := job.GetPriorityClassName()
priorityClass, ok := priorityClasses[priorityClassName]
if !ok {
priorityClass = priorityClasses[defaultPriorityClass]
}
if priorityClass.Preemptible {
return true
}
return false
nodeFilter: func(_ context.Context, node *schedulerobjects.Node) bool {
return len(node.AllocatedByJobId) > 0 && random.Float64() < perNodeEvictionProbability
},
jobFilter: jobFilter,
postEvictFunc: defaultPostEvictFunc,
}
}
Expand Down
Loading

0 comments on commit fa1523e

Please sign in to comment.