Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Separate PC and PC priority limits #2457

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
if err := q.usageRepository.UpdateClusterQueueResourceUsage(req.ClusterId, executorReport); err != nil {
return nil, errors.WithMessagef(err, "failed to update cluster usage for cluster %s", req.ClusterId)
}
allocatedByQueueForPool := q.aggregateUsage(reportsByExecutor, req.Pool)
log.Infof("allocated resources per queue for pool %s before scheduling: %v", req.Pool, allocatedByQueueForPool)
allocatedByQueueAndPriorityClassName := q.aggregateAllocatedByQueueAndPriorityClassName(reportsByExecutor, req.Pool)
log.Infof("allocated resources per queue for pool %s before scheduling: %v", req.Pool, allocatedByQueueAndPriorityClassName)

// Store executor details in Redis so they can be used by submit checks and the new scheduler.
if err := q.executorRepository.StoreExecutor(ctx, &schedulerobjects.Executor{
Expand Down Expand Up @@ -472,11 +472,23 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
q.schedulingConfig.Preemption.PriorityClasses,
q.schedulingConfig.Preemption.DefaultPriorityClass,
q.schedulingConfig.ResourceScarcity,
// May need priority factors for inactive queues for rescheduling evicted jobs.
priorityFactorByQueue,
schedulerobjects.ResourceList{Resources: totalCapacity},
allocatedByQueueForPool,
)
// sctx := schedulercontext.NewSchedulingContext(
// req.ClusterId,
// req.Pool,
// q.schedulingConfig.Preemption.PriorityClasses,
// q.schedulingConfig.Preemption.DefaultPriorityClass,
// q.schedulingConfig.ResourceScarcity,
// // May need priority factors for inactive queues for rescheduling evicted jobs.
// priorityFactorByQueue,
// schedulerobjects.ResourceList{Resources: totalCapacity},
// allocatedByQueueForPool,
// )
for queue, priorityFactor := range priorityFactorByQueue {
initialAllocatedByPriorityClassName := allocatedByQueueAndPriorityClassName[queue]
sctx.AddQueueSchedulingContext(queue, priorityFactor, initialAllocatedByPriorityClassName)
}
constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig(
req.Pool,
schedulerobjects.ResourceList{Resources: req.MinimumJobSize},
Expand Down Expand Up @@ -658,8 +670,8 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
if err := q.usageRepository.UpdateClusterQueueResourceUsage(req.ClusterId, executorReport); err != nil {
logging.WithStacktrace(log, err).Errorf("failed to update cluster usage")
}
allocatedByQueueForPool = q.aggregateUsage(reportsByExecutor, req.Pool)
log.Infof("allocated resources per queue for pool %s after scheduling: %v", req.Pool, allocatedByQueueForPool)
allocatedByQueueAndPriorityClassName = q.aggregateAllocatedByQueueAndPriorityClassName(reportsByExecutor, req.Pool)
log.Infof("allocated resources per queue for pool %s after scheduling: %v", req.Pool, allocatedByQueueAndPriorityClassName)

// Optionally set node id selectors on scheduled jobs.
if q.schedulingConfig.Preemption.SetNodeIdSelector {
Expand Down Expand Up @@ -737,12 +749,12 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
return successfullyLeasedApiJobs, nil
}

// aggregateUsage Creates a map of resource usage first by cluster and then by queue.
// Clusters in pools other than pool are excluded.
func (q *AggregatedQueueServer) aggregateUsage(reportsByCluster map[string]*schedulerobjects.ClusterResourceUsageReport, pool string) map[string]schedulerobjects.QuantityByPriorityAndResourceType {
// aggregateAllocatedByQueueAndPriorityClassName returns allocationByQueueAndPriorityClassName
// aggregated across all non-stale clusters in the provided pool.
func (q *AggregatedQueueServer) aggregateAllocatedByQueueAndPriorityClassName(reportsByCluster map[string]*schedulerobjects.ClusterResourceUsageReport, pool string) map[string]map[string]schedulerobjects.ResourceList {
const activeClusterExpiry = 10 * time.Minute
now := q.clock.Now()
aggregatedUsageByQueue := make(map[string]schedulerobjects.QuantityByPriorityAndResourceType)
allocatedByQueueAndPriorityClassName := make(map[string]map[string]schedulerobjects.ResourceList)
for _, clusterReport := range reportsByCluster {
if clusterReport.Pool != pool {
// Separate resource accounting per pool.
Expand All @@ -753,15 +765,19 @@ func (q *AggregatedQueueServer) aggregateUsage(reportsByCluster map[string]*sche
continue
}
for queue, report := range clusterReport.ResourcesByQueue {
quantityByPriorityAndResourceType, ok := aggregatedUsageByQueue[queue]
allocatedByPriorityClassName, ok := allocatedByQueueAndPriorityClassName[queue]
if !ok {
quantityByPriorityAndResourceType = make(schedulerobjects.QuantityByPriorityAndResourceType)
aggregatedUsageByQueue[queue] = quantityByPriorityAndResourceType
allocatedByPriorityClassName = make(map[string]schedulerobjects.ResourceList)
allocatedByQueueAndPriorityClassName[queue] = allocatedByPriorityClassName
}
for priorityClassName, allocatedForPriorityClassName := range report.ResourcesByPriorityClassName {
rl := allocatedByPriorityClassName[priorityClassName]
rl.Add(allocatedForPriorityClassName)
allocatedByPriorityClassName[priorityClassName] = rl
}
quantityByPriorityAndResourceType.Add(report.ResourcesByPriority)
}
}
return aggregatedUsageByQueue
return allocatedByQueueAndPriorityClassName
}

func (q *AggregatedQueueServer) decompressJobOwnershipGroups(jobs []*api.Job) error {
Expand Down
Loading