From fbff206b55b819611714b4ad867634d85d50d300 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Tue, 27 Jun 2023 15:18:05 +0100 Subject: [PATCH 1/3] Fix runs getting stuck in non-terminal states (#2609) * Fix runs getting stuck in non-terminal states There is a very weak form of communication between pod_issue_handler and job_event_reporter - Once something marks a pod for deletion, job_event_reporter will stop sending phase change updates for this pod. The issue is pod_issue_handler may try to delete a pod, then the pod actually runs to completion. So pod_issue_handler stops "handling" this pod but also job_event_reporter also has stopped sending updates The end result is no events get sent and the pod ends up stuck forever Now if a pod issue is being handled and delete has been called, pod_issue_handler is responsible for sending the terminal events for that pod In practice: - Don't ever resolve an issue early, if delete has been called - If a pod unexpected starts up during deletion, consider the run as failed * Swap log to info * Fix typo --- .../executor/reporter/job_event_reporter.go | 1 + .../executor/service/pod_issue_handler.go | 39 +++++++++++-------- .../service/pod_issue_handler_test.go | 31 +++++++++++++++ 3 files changed, 55 insertions(+), 16 deletions(-) diff --git a/internal/executor/reporter/job_event_reporter.go b/internal/executor/reporter/job_event_reporter.go index 1ae228ed4c3..88c1091e002 100644 --- a/internal/executor/reporter/job_event_reporter.go +++ b/internal/executor/reporter/job_event_reporter.go @@ -169,6 +169,7 @@ func (eventReporter *JobEventReporter) reportStatusUpdate(old *v1.Pod, new *v1.P // Don't report status change for pods Armada is deleting // This prevents reporting JobFailed when we delete a pod - for example due to cancellation if util.IsMarkedForDeletion(new) { + log.Infof("not sending event to report pod %s moving into phase %s as pod is marked for deletion", new.Name, new.Status.Phase) return } eventReporter.reportCurrentStatus(new) diff --git a/internal/executor/service/pod_issue_handler.go b/internal/executor/service/pod_issue_handler.go index 7cf61f0007a..8d15d26bc84 100644 --- a/internal/executor/service/pod_issue_handler.go +++ b/internal/executor/service/pod_issue_handler.go @@ -27,6 +27,7 @@ const ( StuckStartingUp StuckTerminating ExternallyDeleted + ErrorDuringIssueHandling ) type podIssue struct { @@ -303,9 +304,7 @@ func (p *IssueHandler) handleNonRetryableJobIssue(issue *issue) { // - Report JobUnableToScheduleEvent // - Report JobReturnLeaseEvent // -// Special consideration must be taken that most of these pods are somewhat "stuck" in pending. -// So can transition to Running/Completed/Failed in the middle of this -// We must not return the lease if the pod state changes - as likely it has become "unstuck" +// If the pod becomes Running/Completed/Failed in the middle of being deleted - swap this issue to a nonRetryableIssue where it will be Failed func (p *IssueHandler) handleRetryableJobIssue(issue *issue) { if !issue.RunIssue.Reported { log.Infof("Retryable issue detected for job %s run %s - %s", issue.RunIssue.JobId, issue.RunIssue.RunId, issue.RunIssue.PodIssue.Message) @@ -321,7 +320,25 @@ func (p *IssueHandler) handleRetryableJobIssue(issue *issue) { } if issue.CurrentPodState != nil { - // TODO consider moving this to a synchronous call - but long termination periods would need to be handled + if issue.CurrentPodState.Status.Phase != v1.PodPending { + p.markIssuesResolved(issue.RunIssue) + if issue.RunIssue.PodIssue.DeletionRequested { + p.registerIssue(&runIssue{ + JobId: issue.RunIssue.JobId, + RunId: issue.RunIssue.RunId, + PodIssue: &podIssue{ + OriginalPodState: issue.RunIssue.PodIssue.OriginalPodState, + Message: "Pod unexpectedly started up after delete was called", + Retryable: false, + DeletionRequested: false, + Type: ErrorDuringIssueHandling, + Cause: api.Cause_Error, + }, + }) + } + return + } + err := p.clusterContext.DeletePodWithCondition(issue.CurrentPodState, func(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodPending }, true) @@ -359,20 +376,10 @@ func hasPodIssueSelfResolved(issue *issue) bool { return false } - // Pod has completed - no need to report any issues - if util.IsInTerminalState(issue.CurrentPodState) { - return true - } - - // Pod has started running, and we haven't requested deletion - let it continue - if issue.CurrentPodState.Status.Phase == v1.PodRunning && !issue.RunIssue.PodIssue.DeletionRequested { + // Pod has started up and we haven't tried to delete the pod yet - so resolve the issue + if issue.CurrentPodState.Status.Phase != v1.PodPending && !issue.RunIssue.PodIssue.DeletionRequested { return true } - // TODO There is an edge case here where the pod has started running but we have requested deletion - // Without a proper state model, we can't easily handle this correctly - // Ideally we'd see if it completes or deletes first and report it accordingly - // If it completes first - do nothing - // If it deletes first - report JobFailed (as we accidentally deleted it during the run) } return false diff --git a/internal/executor/service/pod_issue_handler_test.go b/internal/executor/service/pod_issue_handler_test.go index 45a9168cdd5..ccb8226d43d 100644 --- a/internal/executor/service/pod_issue_handler_test.go +++ b/internal/executor/service/pod_issue_handler_test.go @@ -97,6 +97,37 @@ func TestPodIssueService_DeletesPodAndReportsLeaseReturned_IfRetryableStuckPod(t assert.True(t, ok) } +func TestPodIssueService_DeletesPodAndReportsFailed_IfRetryableStuckPodStartsUpAfterDeletionCalled(t *testing.T) { + podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{}) + retryableStuckPod := makeRetryableStuckPod(false) + addPod(t, fakeClusterContext, retryableStuckPod) + + podIssueService.HandlePodIssues() + + // Reports UnableToSchedule + assert.Len(t, eventsReporter.ReceivedEvents, 1) + _, ok := eventsReporter.ReceivedEvents[0].Event.(*api.JobUnableToScheduleEvent) + assert.True(t, ok) + + // Reset events, and add pod back as running + eventsReporter.ReceivedEvents = []reporter.EventMessage{} + retryableStuckPod.Status.Phase = v1.PodRunning + addPod(t, fakeClusterContext, retryableStuckPod) + + // Detects pod is now unexpectedly running and marks it non-retryable + podIssueService.HandlePodIssues() + assert.Len(t, eventsReporter.ReceivedEvents, 0) + assert.Len(t, getActivePods(t, fakeClusterContext), 1) + + // Now processes the issue as non-retryable and fails the pod + podIssueService.HandlePodIssues() + assert.Len(t, getActivePods(t, fakeClusterContext), 0) + + assert.Len(t, eventsReporter.ReceivedEvents, 1) + _, ok = eventsReporter.ReceivedEvents[0].Event.(*api.JobFailedEvent) + assert.True(t, ok) +} + func TestPodIssueService_ReportsFailed_IfDeletedExternally(t *testing.T) { podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{}) runningPod := makeRunningPod(false) From d6aef288654426ac3211391bfc46154057004d28 Mon Sep 17 00:00:00 2001 From: Albin Severinson Date: Tue, 27 Jun 2023 17:30:18 +0100 Subject: [PATCH 2/3] Avoid evicting queues below their fair share (#2611) * Move fair share comp. into context * Avoid evicting queues below their fair share * Add tests * Comment * Comment * Avoid divide by zero --- config/armada/config.yaml | 1 + internal/armada/configuration/types.go | 2 + internal/armada/server/lease.go | 7 +- internal/scheduler/common.go | 12 -- internal/scheduler/common_test.go | 4 +- internal/scheduler/context/context.go | 27 +++- .../scheduler/preempting_queue_scheduler.go | 74 +++++----- .../preempting_queue_scheduler_test.go | 128 +++++++++++++++++- internal/scheduler/queue_scheduler.go | 49 ++----- internal/scheduler/queue_scheduler_test.go | 3 +- .../schedulerobjects/resourcelist.go | 12 ++ internal/scheduler/scheduling_algo.go | 7 +- .../scheduler/testfixtures/testfixtures.go | 11 +- 13 files changed, 228 insertions(+), 109 deletions(-) diff --git a/config/armada/config.yaml b/config/armada/config.yaml index 265c2d80822..90b8af17421 100644 --- a/config/armada/config.yaml +++ b/config/armada/config.yaml @@ -34,6 +34,7 @@ scheduling: preemption: nodeEvictionProbability: 1.0 nodeOversubscriptionEvictionProbability: 1.0 + protectedFractionOfFairShare: 1.0 setNodeIdSelector: true nodeIdLabel: kubernetes.io/hostname setNodeName: false diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index dc93ae7ec4d..cf9db749e82 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -209,6 +209,8 @@ type PreemptionConfig struct { // the probability of evicting jobs on oversubscribed nodes, i.e., // nodes on which the total resource requests are greater than the available resources. NodeOversubscriptionEvictionProbability float64 + // Only queues allocated more than this fraction of their fair share are considered for preemption. + ProtectedFractionOfFairShare float64 // If true, the Armada scheduler will add to scheduled pods a node selector // NodeIdLabel: . // If true, NodeIdLabel must be non-empty. diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 3d8224cf4c4..456ba84b815 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -469,7 +469,11 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL schedulerobjects.ResourceList{Resources: totalCapacity}, ) for queue, priorityFactor := range priorityFactorByQueue { - if err := sctx.AddQueueSchedulingContext(queue, priorityFactor, allocatedByQueueAndPriorityClassForPool[queue]); err != nil { + var weight float64 = 1 + if priorityFactor > 0 { + weight = 1 / priorityFactor + } + if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue]); err != nil { return nil, err } } @@ -484,6 +488,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL constraints, q.schedulingConfig.Preemption.NodeEvictionProbability, q.schedulingConfig.Preemption.NodeOversubscriptionEvictionProbability, + q.schedulingConfig.Preemption.ProtectedFractionOfFairShare, &SchedulerJobRepositoryAdapter{ r: q.jobRepository, }, diff --git a/internal/scheduler/common.go b/internal/scheduler/common.go index 2ed57336356..2b8fdbe2abf 100644 --- a/internal/scheduler/common.go +++ b/internal/scheduler/common.go @@ -2,7 +2,6 @@ package scheduler import ( "fmt" - "math" "strconv" "time" @@ -170,17 +169,6 @@ func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string, return gangId, gangCardinality, true, nil } -// ResourceListAsWeightedMillis returns the linear combination of the milli values in rl with given weights. -// This function overflows for values that exceed MaxInt64. E.g., 1Pi is fine but not 10Pi. -func ResourceListAsWeightedMillis(weights map[string]float64, rl schedulerobjects.ResourceList) int64 { - var rv int64 - for t, f := range weights { - q := rl.Get(t) - rv += int64(math.Round(float64(q.MilliValue()) * f)) - } - return rv -} - func PodRequirementsFromLegacySchedulerJobs[S ~[]E, E interfaces.LegacySchedulerJob](jobs S, priorityClasses map[string]configuration.PriorityClass) []*schedulerobjects.PodRequirements { rv := make([]*schedulerobjects.PodRequirements, len(jobs)) for i, job := range jobs { diff --git a/internal/scheduler/common_test.go b/internal/scheduler/common_test.go index e1a87d287c1..c71cd16513b 100644 --- a/internal/scheduler/common_test.go +++ b/internal/scheduler/common_test.go @@ -134,7 +134,7 @@ func TestResourceListAsWeightedMillis(t *testing.T) { } for name, tc := range tests { t.Run(name, func(t *testing.T) { - assert.Equal(t, tc.expected, ResourceListAsWeightedMillis(tc.weights, tc.rl)) + assert.Equal(t, tc.expected, tc.rl.AsWeightedMillis(tc.weights)) }) } } @@ -151,6 +151,6 @@ func BenchmarkResourceListAsWeightedMillis(b *testing.B) { } b.ResetTimer() for n := 0; n < b.N; n++ { - ResourceListAsWeightedMillis(weights, rl) + rl.AsWeightedMillis(weights) } } diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 74dc564ac6f..85c93624345 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -38,8 +38,12 @@ type SchedulingContext struct { ResourceScarcity map[string]float64 // Per-queue scheduling contexts. QueueSchedulingContexts map[string]*QueueSchedulingContext + // Sum of weights across all queues. + WeightSum float64 // Total resources across all clusters available at the start of the scheduling cycle. TotalResources schedulerobjects.ResourceList + // = TotalResources.AsWeightedMillis(ResourceScarcity). + TotalResourcesAsWeightedMillis int64 // Resources assigned across all queues during this scheduling cycle. ScheduledResources schedulerobjects.ResourceList ScheduledResourcesByPriorityClass schedulerobjects.QuantityByTAndResourceType[string] @@ -80,6 +84,7 @@ func NewSchedulingContext( ResourceScarcity: resourceScarcity, QueueSchedulingContexts: make(map[string]*QueueSchedulingContext), TotalResources: totalResources.DeepCopy(), + TotalResourcesAsWeightedMillis: totalResources.AsWeightedMillis(resourceScarcity), ScheduledResources: schedulerobjects.NewResourceListWithDefaultSize(), ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]), EvictedResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]), @@ -106,7 +111,7 @@ func (sctx *SchedulingContext) ClearUnfeasibleSchedulingKeys() { sctx.UnfeasibleSchedulingKeys = make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext) } -func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, priorityFactor float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]) error { +func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, weight float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]) error { if _, ok := sctx.QueueSchedulingContexts[queue]; ok { return errors.WithStack(&armadaerrors.ErrInvalidArgument{ Name: "queue", @@ -123,12 +128,13 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, priorityF for _, rl := range initialAllocatedByPriorityClass { allocated.Add(rl) } + sctx.WeightSum += weight qctx := &QueueSchedulingContext{ SchedulingContext: sctx, Created: time.Now(), ExecutorId: sctx.ExecutorId, Queue: queue, - PriorityFactor: priorityFactor, + Weight: weight, Allocated: allocated, AllocatedByPriorityClass: initialAllocatedByPriorityClass, ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]), @@ -313,8 +319,8 @@ type QueueSchedulingContext struct { ExecutorId string // Queue name. Queue string - // These factors influence the fraction of resources assigned to each queue. - PriorityFactor float64 + // Determines the fair share of this queue relative to other queues. + Weight float64 // Total resources assigned to the queue across all clusters by priority class priority. // Includes jobs scheduled during this invocation of the scheduler. Allocated schedulerobjects.ResourceList @@ -490,6 +496,19 @@ func (qctx *QueueSchedulingContext) ClearJobSpecs() { } } +// FractionOfFairShare returns a number in [0, 1] indicating what fraction of its fair share this queue is allocated. +func (qctx *QueueSchedulingContext) FractionOfFairShare() float64 { + return qctx.FractionOfFairShareWithAllocation(qctx.Allocated) +} + +// FractionOfFairShareWithAllocation returns a number in [0, 1] indicating what +// fraction of its fair share this queue is allocated if the total allocation of this queue is given by allocated. +func (qctx *QueueSchedulingContext) FractionOfFairShareWithAllocation(allocated schedulerobjects.ResourceList) float64 { + fairShare := qctx.Weight / qctx.SchedulingContext.WeightSum + allocatedAsWeightedMillis := allocated.AsWeightedMillis(qctx.SchedulingContext.ResourceScarcity) + return (float64(allocatedAsWeightedMillis) / float64(qctx.SchedulingContext.TotalResourcesAsWeightedMillis)) / fairShare +} + type GangSchedulingContext struct { Created time.Time Queue string diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 63bcb83949c..636d8caf713 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -32,6 +32,7 @@ type PreemptingQueueScheduler struct { constraints schedulerconstraints.SchedulingConstraints nodeEvictionProbability float64 nodeOversubscriptionEvictionProbability float64 + protectedFractionOfFairShare float64 jobRepo JobRepository nodeDb *nodedb.NodeDb // Maps job ids to the id of the node the job is associated with. @@ -53,6 +54,7 @@ func NewPreemptingQueueScheduler( constraints schedulerconstraints.SchedulingConstraints, nodeEvictionProbability float64, nodeOversubscriptionEvictionProbability float64, + protectedFractionOfFairShare float64, jobRepo JobRepository, nodeDb *nodedb.NodeDb, initialNodeIdByJobId map[string]string, @@ -77,6 +79,7 @@ func NewPreemptingQueueScheduler( constraints: constraints, nodeEvictionProbability: nodeEvictionProbability, nodeOversubscriptionEvictionProbability: nodeOversubscriptionEvictionProbability, + protectedFractionOfFairShare: protectedFractionOfFairShare, jobRepo: jobRepo, nodeDb: nodeDb, nodeIdByJobId: maps.Clone(initialNodeIdByJobId), @@ -99,7 +102,7 @@ func (sch *PreemptingQueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() { func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerResult, error) { log := ctxlogrus.Extract(ctx) log = log.WithField("service", "PreemptingQueueScheduler") - if ResourceListAsWeightedMillis(sch.schedulingContext.ResourceScarcity, sch.schedulingContext.TotalResources) == 0 { + if sch.schedulingContext.TotalResources.AsWeightedMillis(sch.schedulingContext.ResourceScarcity) == 0 { // This refers to resources available across all clusters, i.e., // it may include resources not currently considered for scheduling. log.Infof( @@ -108,7 +111,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe ) return &SchedulerResult{}, nil } - if ResourceListAsWeightedMillis(sch.schedulingContext.ResourceScarcity, sch.nodeDb.TotalResources()) == 0 { + if rl := sch.nodeDb.TotalResources(); rl.AsWeightedMillis(sch.schedulingContext.ResourceScarcity) == 0 { // This refers to the resources currently considered for scheduling. log.Infof( "no resources with non-zero weight available for scheduling in NodeDb: resource scarcity %v, total resources %v", @@ -137,11 +140,31 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx context.Context) (*SchedulerRe ctx, log.WithField("stage", "evict for resource balancing"), ), - NewStochasticEvictor( + NewNodeEvictor( sch.jobRepo, sch.schedulingContext.PriorityClasses, - sch.schedulingContext.DefaultPriorityClass, sch.nodeEvictionProbability, + func(ctx context.Context, job interfaces.LegacySchedulerJob) bool { + if job.GetAnnotations() == nil { + log := ctxlogrus.Extract(ctx) + log.Errorf("can't evict job %s: annotations not initialised", job.GetId()) + return false + } + if job.GetNodeSelector() == nil { + log := ctxlogrus.Extract(ctx) + log.Errorf("can't evict job %s: nodeSelector not initialised", job.GetId()) + return false + } + if qctx, ok := sch.schedulingContext.QueueSchedulingContexts[job.GetQueue()]; ok { + if qctx.FractionOfFairShare() <= sch.protectedFractionOfFairShare { + return false + } + } + if priorityClass, ok := sch.schedulingContext.PriorityClasses[job.GetPriorityClassName()]; ok { + return priorityClass.Preemptible + } + return false + }, nil, ), ) @@ -655,13 +678,11 @@ type EvictorResult struct { NodeIdByJobId map[string]string } -// NewStochasticEvictor returns a new evictor that for each node evicts -// all preemptible jobs from that node with probability perNodeEvictionProbability. -func NewStochasticEvictor( +func NewNodeEvictor( jobRepo JobRepository, priorityClasses map[string]configuration.PriorityClass, - defaultPriorityClass string, perNodeEvictionProbability float64, + jobFilter func(context.Context, interfaces.LegacySchedulerJob) bool, random *rand.Rand, ) *Evictor { if perNodeEvictionProbability <= 0 { @@ -670,44 +691,13 @@ func NewStochasticEvictor( if random == nil { random = rand.New(rand.NewSource(int64(time.Now().Nanosecond()))) } - return NewPreemptibleEvictor( - jobRepo, - priorityClasses, - defaultPriorityClass, - func(_ context.Context, node *schedulerobjects.Node) bool { - return len(node.AllocatedByJobId) > 0 && random.Float64() < perNodeEvictionProbability - }, - ) -} - -// NewPreemptibleEvictor returns a new evictor that evicts all preemptible jobs -// on nodes for which nodeFilter returns true. -func NewPreemptibleEvictor( - jobRepo JobRepository, - priorityClasses map[string]configuration.PriorityClass, - defaultPriorityClass string, - nodeFilter func(context.Context, *schedulerobjects.Node) bool, -) *Evictor { return &Evictor{ jobRepo: jobRepo, priorityClasses: priorityClasses, - nodeFilter: nodeFilter, - jobFilter: func(ctx context.Context, job interfaces.LegacySchedulerJob) bool { - if job.GetAnnotations() == nil { - log := ctxlogrus.Extract(ctx) - log.Warnf("can't evict job %s: annotations not initialised", job.GetId()) - return false - } - priorityClassName := job.GetPriorityClassName() - priorityClass, ok := priorityClasses[priorityClassName] - if !ok { - priorityClass = priorityClasses[defaultPriorityClass] - } - if priorityClass.Preemptible { - return true - } - return false + nodeFilter: func(_ context.Context, node *schedulerobjects.Node) bool { + return len(node.AllocatedByJobId) > 0 && random.Float64() < perNodeEvictionProbability }, + jobFilter: jobFilter, postEvictFunc: defaultPostEvictFunc, } } diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 7e386adcac1..7ab6ae1d3fb 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -1136,6 +1136,122 @@ func TestPreemptingQueueScheduler(t *testing.T) { "B": 1, }, }, + "ProtectedFractionOfFairShare": { + SchedulingConfig: testfixtures.WithProtectedFractionOfFairShareConfig( + 1.0, + testfixtures.TestSchedulingConfig(), + ), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Rounds: []SchedulingRound{ + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 10), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 9), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "B": testfixtures.N1CpuJobs("B", testfixtures.PriorityClass3, 22), + }, + ExpectedScheduledIndices: map[string][]int{ + "B": testfixtures.IntRange(0, 21), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "C": testfixtures.N1CpuJobs("C", testfixtures.PriorityClass0, 1), + }, + }, + {}, // Empty round to make sure nothing changes. + }, + PriorityFactorByQueue: map[string]float64{ + "A": 1, + "B": 1, + "C": 1, + }, + }, + "ProtectedFractionOfFairShare at limit": { + SchedulingConfig: testfixtures.WithProtectedFractionOfFairShareConfig( + 0.5, + testfixtures.TestSchedulingConfig(), + ), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Rounds: []SchedulingRound{ + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 8), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 7), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "B": testfixtures.N1CpuJobs("B", testfixtures.PriorityClass3, 24), + }, + ExpectedScheduledIndices: map[string][]int{ + "B": testfixtures.IntRange(0, 23), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "C": testfixtures.N1CpuJobs("C", testfixtures.PriorityClass0, 1), + }, + }, + {}, // Empty round to make sure nothing changes. + }, + PriorityFactorByQueue: map[string]float64{ + "A": 0.5, + "B": 1, + "C": 1, + }, + }, + "ProtectedFractionOfFairShare above limit": { + SchedulingConfig: testfixtures.WithProtectedFractionOfFairShareConfig( + 0.5, + testfixtures.TestSchedulingConfig(), + ), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Rounds: []SchedulingRound{ + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 9), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 8), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "B": testfixtures.N1CpuJobs("B", testfixtures.PriorityClass3, 23), + }, + ExpectedScheduledIndices: map[string][]int{ + "B": testfixtures.IntRange(0, 22), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "C": testfixtures.N1CpuJobs("C", testfixtures.PriorityClass0, 1), + }, + ExpectedScheduledIndices: map[string][]int{ + "C": testfixtures.IntRange(0, 0), + }, + ExpectedPreemptedIndices: map[string]map[int][]int{ + "A": { + 0: testfixtures.IntRange(8, 8), + }, + }, + }, + {}, // Empty round to make sure nothing changes. + }, + PriorityFactorByQueue: map[string]float64{ + "A": 1, + "B": 1, + "C": 1, + }, + }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -1221,7 +1337,8 @@ func TestPreemptingQueueScheduler(t *testing.T) { tc.TotalResources, ) for queue, priorityFactor := range tc.PriorityFactorByQueue { - err := sctx.AddQueueSchedulingContext(queue, priorityFactor, allocatedByQueueAndPriorityClass[queue]) + weight := 1 / priorityFactor + err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue]) require.NoError(t, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( @@ -1235,6 +1352,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { constraints, tc.SchedulingConfig.Preemption.NodeEvictionProbability, tc.SchedulingConfig.Preemption.NodeOversubscriptionEvictionProbability, + tc.SchedulingConfig.Preemption.ProtectedFractionOfFairShare, repo, nodeDb, nodeIdByJobId, @@ -1482,7 +1600,8 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { nodeDb.TotalResources(), ) for queue, priorityFactor := range priorityFactorByQueue { - err := sctx.AddQueueSchedulingContext(queue, priorityFactor, make(schedulerobjects.QuantityByTAndResourceType[string])) + weight := 1 / priorityFactor + err := sctx.AddQueueSchedulingContext(queue, weight, make(schedulerobjects.QuantityByTAndResourceType[string])) require.NoError(b, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( @@ -1496,6 +1615,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { constraints, tc.SchedulingConfig.Preemption.NodeEvictionProbability, tc.SchedulingConfig.Preemption.NodeOversubscriptionEvictionProbability, + tc.SchedulingConfig.Preemption.ProtectedFractionOfFairShare, jobRepo, nodeDb, nil, @@ -1542,7 +1662,8 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { nodeDb.TotalResources(), ) for queue, priorityFactor := range priorityFactorByQueue { - err := sctx.AddQueueSchedulingContext(queue, priorityFactor, allocatedByQueueAndPriorityClass[queue]) + weight := 1 / priorityFactor + err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue]) require.NoError(b, err) } sch := NewPreemptingQueueScheduler( @@ -1550,6 +1671,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { constraints, tc.SchedulingConfig.Preemption.NodeEvictionProbability, tc.SchedulingConfig.Preemption.NodeOversubscriptionEvictionProbability, + tc.SchedulingConfig.Preemption.ProtectedFractionOfFairShare, jobRepo, nodeDb, nil, diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index 9408015ec3c..55883bb2158 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -3,7 +3,6 @@ package scheduler import ( "container/heap" "context" - "math" "reflect" "time" @@ -63,7 +62,7 @@ func (sch *QueueScheduler) SkipUnsuccessfulSchedulingKeyCheck() { func (sch *QueueScheduler) Schedule(ctx context.Context) (*SchedulerResult, error) { log := ctxlogrus.Extract(ctx) - if ResourceListAsWeightedMillis(sch.schedulingContext.ResourceScarcity, sch.schedulingContext.TotalResources) == 0 { + if sch.schedulingContext.TotalResources.AsWeightedMillis(sch.schedulingContext.ResourceScarcity) == 0 { // This refers to resources available across all clusters, i.e., // it may include resources not currently considered for scheduling. log.Infof( @@ -72,8 +71,8 @@ func (sch *QueueScheduler) Schedule(ctx context.Context) (*SchedulerResult, erro ) return &SchedulerResult{}, nil } - if ResourceListAsWeightedMillis(sch.schedulingContext.ResourceScarcity, sch.gangScheduler.nodeDb.TotalResources()) == 0 { - // This refers to the resources currently considered for schedling. + if rl := sch.gangScheduler.nodeDb.TotalResources(); rl.AsWeightedMillis(sch.schedulingContext.ResourceScarcity) == 0 { + // This refers to the resources currently considered for scheduling. log.Infof( "no resources with non-zero weight available for scheduling in NodeDb: resource scarcity %v, total resources %v", sch.schedulingContext.ResourceScarcity, sch.gangScheduler.nodeDb.TotalResources(), @@ -277,12 +276,6 @@ type CandidateGangIterator struct { SchedulingContext *schedulercontext.SchedulingContext // If true, this iterator only yields gangs where all jobs are evicted. onlyYieldEvicted bool - // For each queue, weight is the inverse of the priority factor. - weightByQueue map[string]float64 - // Sum of all weights. - weightSum float64 - // Total weighted resources. - totalResourcesAsWeightedMillis int64 // Reusable buffer to avoid allocations. buffer schedulerobjects.ResourceList // Priority queue containing per-queue iterators. @@ -294,28 +287,10 @@ func NewCandidateGangIterator( sctx *schedulercontext.SchedulingContext, iteratorsByQueue map[string]*QueuedGangIterator, ) (*CandidateGangIterator, error) { - weightSum := 0.0 - weightByQueue := make(map[string]float64, len(iteratorsByQueue)) - for queue := range iteratorsByQueue { - qctx := sctx.QueueSchedulingContexts[queue] - if qctx == nil { - return nil, errors.Errorf("no scheduling context for queue %s", queue) - } - weight := 1 / math.Max(qctx.PriorityFactor, 1) - weightByQueue[queue] = weight - weightSum += weight - } - totalResourcesAsWeightedMillis := ResourceListAsWeightedMillis(sctx.ResourceScarcity, sctx.TotalResources) - if totalResourcesAsWeightedMillis < 1 { - totalResourcesAsWeightedMillis = 1 - } it := &CandidateGangIterator{ - SchedulingContext: sctx, - weightByQueue: weightByQueue, - weightSum: weightSum, - totalResourcesAsWeightedMillis: totalResourcesAsWeightedMillis, - buffer: schedulerobjects.NewResourceListWithDefaultSize(), - pq: make(QueueCandidateGangIteratorPQ, 0, len(iteratorsByQueue)), + SchedulingContext: sctx, + buffer: schedulerobjects.NewResourceListWithDefaultSize(), + pq: make(QueueCandidateGangIteratorPQ, 0, len(iteratorsByQueue)), } for queue, queueIt := range iteratorsByQueue { if _, err := it.updateAndPushPQItem(it.newPQItem(queue, queueIt)); err != nil { @@ -372,17 +347,11 @@ func (it *CandidateGangIterator) updatePQItem(item *QueueCandidateGangIteratorIt // fractionOfFairShareWithGctx returns the fraction of its fair share this queue would have if the jobs in gctx were scheduled. func (it *CandidateGangIterator) fractionOfFairShareWithGctx(gctx *schedulercontext.GangSchedulingContext) float64 { + qctx := it.SchedulingContext.QueueSchedulingContexts[gctx.Queue] it.buffer.Zero() - it.buffer.Add(it.SchedulingContext.QueueSchedulingContexts[gctx.Queue].Allocated) + it.buffer.Add(qctx.Allocated) it.buffer.Add(gctx.TotalResourceRequests) - queueWeight := it.weightByQueue[gctx.Queue] - if queueWeight == 0 { - return 1 - } else { - fairShare := queueWeight / it.weightSum - used := ResourceListAsWeightedMillis(it.SchedulingContext.ResourceScarcity, it.buffer) - return (float64(used) / float64(it.totalResourcesAsWeightedMillis)) / fairShare - } + return qctx.FractionOfFairShareWithAllocation(it.buffer) } // Clear removes the first item in the iterator. diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index bd0a8c47937..0c0348179b2 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -453,7 +453,8 @@ func TestQueueScheduler(t *testing.T) { tc.TotalResources, ) for queue, priorityFactor := range tc.PriorityFactorByQueue { - err := sctx.AddQueueSchedulingContext(queue, priorityFactor, tc.InitialAllocatedByQueueAndPriorityClass[queue]) + weight := 1 / priorityFactor + err := sctx.AddQueueSchedulingContext(queue, weight, tc.InitialAllocatedByQueueAndPriorityClass[queue]) require.NoError(t, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( diff --git a/internal/scheduler/schedulerobjects/resourcelist.go b/internal/scheduler/schedulerobjects/resourcelist.go index 7c98a95eff3..9a3c67eb5e3 100644 --- a/internal/scheduler/schedulerobjects/resourcelist.go +++ b/internal/scheduler/schedulerobjects/resourcelist.go @@ -2,6 +2,7 @@ package schedulerobjects import ( "fmt" + math "math" "strings" v1 "k8s.io/api/core/v1" @@ -303,6 +304,17 @@ func (rl ResourceList) CompactString() string { return sb.String() } +// AsWeightedMillis returns the linear combination of the milli values in rl with given weights. +// This function overflows for values greater than MaxInt64. E.g., 1Pi is fine but not 10Pi. +func (rl *ResourceList) AsWeightedMillis(weights map[string]float64) int64 { + var rv int64 + for t, w := range weights { + q := rl.Get(t) + rv += int64(math.Round(float64(q.MilliValue()) * w)) + } + return rv +} + func (rl *ResourceList) initialise() { if rl.Resources == nil { rl.Resources = make(map[string]resource.Quantity) diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index d23afd4423b..cc91f19709c 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -327,7 +327,11 @@ func (l *FairSchedulingAlgo) scheduleOnExecutor( if allocatedByQueueAndPriorityClass := accounting.allocationByPoolAndQueueAndPriorityClass[executor.Pool]; allocatedByQueueAndPriorityClass != nil { allocatedByPriorityClass = allocatedByQueueAndPriorityClass[queue] } - if err := sctx.AddQueueSchedulingContext(queue, priorityFactor, allocatedByPriorityClass); err != nil { + var weight float64 = 1 + if priorityFactor > 0 { + weight = 1 / priorityFactor + } + if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByPriorityClass); err != nil { return nil, nil, err } } @@ -342,6 +346,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutor( constraints, l.config.Preemption.NodeEvictionProbability, l.config.Preemption.NodeOversubscriptionEvictionProbability, + l.config.Preemption.ProtectedFractionOfFairShare, &schedulerJobRepositoryAdapter{ txn: txn, db: db, diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index f00ced1b4a0..07ccae0f531 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -88,7 +88,7 @@ func ContextWithDefaultLogger(ctx context.Context) context.Context { func TestSchedulingConfig() configuration.SchedulingConfig { return configuration.SchedulingConfig{ - ResourceScarcity: map[string]float64{"cpu": 1, "memory": 0}, + ResourceScarcity: map[string]float64{"cpu": 1}, Preemption: configuration.PreemptionConfig{ PriorityClasses: maps.Clone(TestPriorityClasses), DefaultPriorityClass: TestDefaultPriorityClass, @@ -101,8 +101,13 @@ func TestSchedulingConfig() configuration.SchedulingConfig { } } -func WithMaxUnacknowledgedJobsPerExecutor(i uint, config configuration.SchedulingConfig) configuration.SchedulingConfig { - config.MaxUnacknowledgedJobsPerExecutor = i +func WithMaxUnacknowledgedJobsPerExecutorConfig(v uint, config configuration.SchedulingConfig) configuration.SchedulingConfig { + config.MaxUnacknowledgedJobsPerExecutor = v + return config +} + +func WithProtectedFractionOfFairShareConfig(v float64, config configuration.SchedulingConfig) configuration.SchedulingConfig { + config.Preemption.ProtectedFractionOfFairShare = v return config } From dac1cf36d96ae46707451b5bc180220ee2827061 Mon Sep 17 00:00:00 2001 From: Noah Held <41909795+zuqq@users.noreply.github.com> Date: Wed, 28 Jun 2023 10:45:51 +0100 Subject: [PATCH 3/3] Make the node database operate on jobs instead of pods (#2612) --- internal/armada/server/lease.go | 8 +- internal/scheduler/common.go | 60 ---- internal/scheduler/common_test.go | 8 +- .../scheduler/configuration/configuration.go | 17 +- internal/scheduler/context/context.go | 50 +-- internal/scheduler/context/context_test.go | 8 +- internal/scheduler/gang_scheduler.go | 18 +- internal/scheduler/gang_scheduler_test.go | 2 +- internal/scheduler/interfaces/interfaces.go | 2 +- internal/scheduler/jobdb/job.go | 32 +- internal/scheduler/jobdb/job_test.go | 1 - internal/scheduler/metrics.go | 2 +- internal/scheduler/nodedb/nodedb.go | 247 ++++++--------- internal/scheduler/nodedb/nodedb_test.go | 297 ++++++++++-------- internal/scheduler/pool_assigner.go | 39 ++- .../scheduler/preempting_queue_scheduler.go | 16 +- .../preempting_queue_scheduler_test.go | 11 +- internal/scheduler/queue_scheduler.go | 11 +- internal/scheduler/reports_test.go | 11 +- internal/scheduler/scheduler.go | 19 +- internal/scheduler/scheduler_test.go | 18 +- .../schedulerobjects/requirements.go | 11 - .../schedulerobjects/schedulinginfo.go | 10 + internal/scheduler/scheduling_algo.go | 7 +- internal/scheduler/scheduling_algo_test.go | 3 +- internal/scheduler/submitcheck.go | 89 +++--- internal/scheduler/submitcheck_test.go | 23 +- .../scheduler/testfixtures/testfixtures.go | 47 +-- pkg/api/util.go | 17 +- pkg/api/util_test.go | 186 ++++------- 30 files changed, 518 insertions(+), 752 deletions(-) delete mode 100644 internal/scheduler/schedulerobjects/requirements.go create mode 100644 internal/scheduler/schedulerobjects/schedulinginfo.go diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index 456ba84b815..448068c98e7 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -370,11 +370,9 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL // Bind pods to nodes, thus ensuring resources are marked as allocated on the node. skipNode := false for _, job := range jobs { - node, err = nodedb.BindPodToNode( - scheduler.PodRequirementFromLegacySchedulerJob( - job, - q.schedulingConfig.Preemption.PriorityClasses, - ), + node, err = nodedb.BindJobToNode( + q.schedulingConfig.Preemption.PriorityClasses, + job, node, ) if err != nil { diff --git a/internal/scheduler/common.go b/internal/scheduler/common.go index 2b8fdbe2abf..5c27148b871 100644 --- a/internal/scheduler/common.go +++ b/internal/scheduler/common.go @@ -3,7 +3,6 @@ package scheduler import ( "fmt" "strconv" - "time" "github.com/pkg/errors" "golang.org/x/exp/maps" @@ -12,7 +11,6 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" armadaslices "github.com/armadaproject/armada/internal/common/slices" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" - schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -111,27 +109,6 @@ func JobsSummary(jobs []interfaces.LegacySchedulerJob) string { ) } -func jobSchedulingContextsFromJobs[T interfaces.LegacySchedulerJob](jobs []T, executorId string, priorityClasses map[string]configuration.PriorityClass) []*schedulercontext.JobSchedulingContext { - if jobs == nil { - return nil - } - if len(jobs) == 0 { - return make([]*schedulercontext.JobSchedulingContext, 0) - } - jctxs := make([]*schedulercontext.JobSchedulingContext, len(jobs)) - timestamp := time.Now() - for i, job := range jobs { - jctxs[i] = &schedulercontext.JobSchedulingContext{ - Created: timestamp, - ExecutorId: executorId, - JobId: job.GetId(), - Job: job, - Req: PodRequirementFromLegacySchedulerJob(job, priorityClasses), - } - } - return jctxs -} - func isEvictedJob(job interfaces.LegacySchedulerJob) bool { return job.GetAnnotations()[schedulerconfig.IsEvictedAnnotation] == "true" } @@ -168,40 +145,3 @@ func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string, } return gangId, gangCardinality, true, nil } - -func PodRequirementsFromLegacySchedulerJobs[S ~[]E, E interfaces.LegacySchedulerJob](jobs S, priorityClasses map[string]configuration.PriorityClass) []*schedulerobjects.PodRequirements { - rv := make([]*schedulerobjects.PodRequirements, len(jobs)) - for i, job := range jobs { - rv[i] = PodRequirementFromLegacySchedulerJob(job, priorityClasses) - } - return rv -} - -func PodRequirementFromLegacySchedulerJob[E interfaces.LegacySchedulerJob](job E, priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements { - annotations := make(map[string]string, len(configuration.ArmadaManagedAnnotations)+len(schedulerconfig.ArmadaSchedulerManagedAnnotations)) - for _, key := range configuration.ArmadaManagedAnnotations { - if value, ok := job.GetAnnotations()[key]; ok { - annotations[key] = value - } - } - for _, key := range schedulerconfig.ArmadaSchedulerManagedAnnotations { - if value, ok := job.GetAnnotations()[key]; ok { - annotations[key] = value - } - } - annotations[schedulerconfig.JobIdAnnotation] = job.GetId() - annotations[schedulerconfig.QueueAnnotation] = job.GetQueue() - info := job.GetJobSchedulingInfo(priorityClasses) - req := PodRequirementFromJobSchedulingInfo(info) - req.Annotations = annotations - return req -} - -func PodRequirementFromJobSchedulingInfo(info *schedulerobjects.JobSchedulingInfo) *schedulerobjects.PodRequirements { - for _, oreq := range info.ObjectRequirements { - if preq := oreq.GetPodRequirements(); preq != nil { - return preq - } - } - return nil -} diff --git a/internal/scheduler/common_test.go b/internal/scheduler/common_test.go index c71cd16513b..73aae7be637 100644 --- a/internal/scheduler/common_test.go +++ b/internal/scheduler/common_test.go @@ -10,12 +10,11 @@ import ( "github.com/armadaproject/armada/internal/armada/configuration" "github.com/armadaproject/armada/internal/common/util" - schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/pkg/api" ) -func TestPodRequirementFromLegacySchedulerJob(t *testing.T) { +func TestGetPodRequirements(t *testing.T) { resourceLimit := v1.ResourceList{ "cpu": resource.MustParse("1"), "memory": resource.MustParse("128Mi"), @@ -64,13 +63,12 @@ func TestPodRequirementFromLegacySchedulerJob(t *testing.T) { PreemptionPolicy: string(v1.PreemptLowerPriority), ResourceRequirements: requirements, Annotations: map[string]string{ + "something": "test", configuration.GangIdAnnotation: "gang-id", configuration.GangCardinalityAnnotation: "1", - schedulerconfig.JobIdAnnotation: j.Id, - schedulerconfig.QueueAnnotation: j.Queue, }, } - actual := PodRequirementFromLegacySchedulerJob(j, map[string]configuration.PriorityClass{"armada-default": {Priority: int32(1)}}) + actual := j.GetPodRequirements(map[string]configuration.PriorityClass{"armada-default": {Priority: int32(1)}}) assert.Equal(t, expected, actual) } diff --git a/internal/scheduler/configuration/configuration.go b/internal/scheduler/configuration/configuration.go index 2a6227501d5..c4caa35019e 100644 --- a/internal/scheduler/configuration/configuration.go +++ b/internal/scheduler/configuration/configuration.go @@ -10,23 +10,14 @@ import ( ) const ( - // IsEvictedAnnotation, indicates a pod was evicted in this round and is currently running. - // Used by the scheduler to differentiate between pods from running and queued jobs. + // IsEvictedAnnotation is set on evicted jobs; the scheduler uses it to differentiate between + // already-running and queued jobs. IsEvictedAnnotation = "armadaproject.io/isEvicted" - // JobIdAnnotation if set on a pod, indicates which job this pod is part of. - JobIdAnnotation = "armadaproject.io/jobId" - // QueueAnnotation if set on a pod, indicates which queue this pod is part of. - QueueAnnotation = "armadaproject.io/queue" - // IdNodeLabel is automatically added to nodes in the NodeDb. + // NodeIdLabel is set on evicted jobs, so that the scheduler only tries to schedule them on the + // nodes that they are already running on; nodedb is responsible for labelling its Node objects. NodeIdLabel = "armadaproject.io/nodeId" ) -var ArmadaSchedulerManagedAnnotations = []string{ - IsEvictedAnnotation, - JobIdAnnotation, - QueueAnnotation, -} - type Configuration struct { // Database configuration Postgres configuration.PostgresConfig diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 85c93624345..6c5372bf362 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -228,12 +228,12 @@ func (sctx *SchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingContex } if jctx.IsSuccessful() { if evictedInThisRound { - sctx.EvictedResources.SubV1ResourceList(jctx.Req.ResourceRequirements.Requests) - sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests) + sctx.EvictedResources.SubV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) + sctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) sctx.NumEvictedJobs-- } else { - sctx.ScheduledResources.AddV1ResourceList(jctx.Req.ResourceRequirements.Requests) - sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests) + sctx.ScheduledResources.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) + sctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) sctx.NumScheduledJobs++ } } @@ -440,23 +440,23 @@ func (qctx *QueueSchedulingContext) AddJobSchedulingContext(jctx *JobSchedulingC } _, evictedInThisRound := qctx.EvictedJobsById[jctx.JobId] if jctx.IsSuccessful() { - if jctx.Req == nil { + if jctx.PodRequirements == nil { return false, errors.Errorf("failed adding job %s to queue: job requirements are missing", jctx.JobId) } // Always update ResourcesByPriority. // Since ResourcesByPriority is used to order queues by fraction of fair share. - qctx.Allocated.AddV1ResourceList(jctx.Req.ResourceRequirements.Requests) - qctx.AllocatedByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests) + qctx.Allocated.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) + qctx.AllocatedByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) // Only if the job is not evicted, update ScheduledResourcesByPriority. // Since ScheduledResourcesByPriority is used to control per-round scheduling constraints. if evictedInThisRound { delete(qctx.EvictedJobsById, jctx.JobId) - qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests) + qctx.EvictedResourcesByPriorityClass.SubV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) } else { qctx.SuccessfulJobSchedulingContexts[jctx.JobId] = jctx - qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.Req.ResourceRequirements.Requests) + qctx.ScheduledResourcesByPriorityClass.AddV1ResourceList(jctx.Job.GetPriorityClassName(), jctx.PodRequirements.ResourceRequirements.Requests) } } else { qctx.UnsuccessfulJobSchedulingContexts[jctx.JobId] = jctx @@ -531,7 +531,7 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont totalResourceRequests := schedulerobjects.NewResourceList(4) for _, jctx := range jctxs { allJobsEvicted = allJobsEvicted && isEvictedJob(jctx.Job) - totalResourceRequests.AddV1ResourceList(jctx.Req.ResourceRequirements.Requests) + totalResourceRequests.AddV1ResourceList(jctx.PodRequirements.ResourceRequirements.Requests) } return &GangSchedulingContext{ Created: time.Now(), @@ -543,14 +543,6 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont } } -func (gctx GangSchedulingContext) PodRequirements() []*schedulerobjects.PodRequirements { - rv := make([]*schedulerobjects.PodRequirements, len(gctx.JobSchedulingContexts)) - for i, jctx := range gctx.JobSchedulingContexts { - rv[i] = jctx.Req - } - return rv -} - func isEvictedJob(job interfaces.LegacySchedulerJob) bool { return job.GetAnnotations()[schedulerconfig.IsEvictedAnnotation] == "true" } @@ -560,17 +552,13 @@ func isEvictedJob(job interfaces.LegacySchedulerJob) bool { type JobSchedulingContext struct { // Time at which this context was created. Created time.Time - // Executor this job was attempted to be assigned to. - ExecutorId string - // Total number of nodes in the cluster when trying to schedule. - NumNodes int // Id of the job this pod corresponds to. JobId string // Job spec. Job interfaces.LegacySchedulerJob // Scheduling requirements of this job. // We currently require that each job contains exactly one pod spec. - Req *schedulerobjects.PodRequirements + PodRequirements *schedulerobjects.PodRequirements // Reason for why the job could not be scheduled. // Empty if the job was scheduled successfully. UnschedulableReason string @@ -583,7 +571,6 @@ func (jctx *JobSchedulingContext) String() string { w := tabwriter.NewWriter(&sb, 1, 1, 1, ' ', 0) fmt.Fprintf(w, "Time:\t%s\n", jctx.Created) fmt.Fprintf(w, "Job ID:\t%s\n", jctx.JobId) - fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", jctx.NumNodes) if jctx.UnschedulableReason != "" { fmt.Fprintf(w, "UnschedulableReason:\t%s\n", jctx.UnschedulableReason) } else { @@ -600,6 +587,20 @@ func (jctx *JobSchedulingContext) IsSuccessful() bool { return jctx.UnschedulableReason == "" } +func JobSchedulingContextsFromJobs[T interfaces.LegacySchedulerJob](priorityClasses map[string]configuration.PriorityClass, jobs []T) []*JobSchedulingContext { + jctxs := make([]*JobSchedulingContext, len(jobs)) + timestamp := time.Now() + for i, job := range jobs { + jctxs[i] = &JobSchedulingContext{ + Created: timestamp, + JobId: job.GetId(), + Job: job, + PodRequirements: job.GetPodRequirements(priorityClasses), + } + } + return jctxs +} + // PodSchedulingContext is returned by SelectAndBindNodeToPod and // contains detailed information on the scheduling decision made for this pod. type PodSchedulingContext struct { @@ -626,6 +627,7 @@ func (pctx *PodSchedulingContext) String() string { } else { fmt.Fprint(w, "Node:\tnone\n") } + fmt.Fprintf(w, "Number of nodes in cluster:\t%d\n", pctx.NumNodes) if len(pctx.NumExcludedNodesByReason) == 0 { fmt.Fprint(w, "Excluded nodes:\tnone\n") } else { diff --git a/internal/scheduler/context/context_test.go b/internal/scheduler/context/context_test.go index e00a9a5d0cb..3443ad63679 100644 --- a/internal/scheduler/context/context_test.go +++ b/internal/scheduler/context/context_test.go @@ -84,10 +84,8 @@ func testNSmallCpuJobSchedulingContext(queue, priorityClassName string, n int) [ func testSmallCpuJobSchedulingContext(queue, priorityClassName string) *JobSchedulingContext { job := testfixtures.Test1CpuJob(queue, priorityClassName) return &JobSchedulingContext{ - ExecutorId: "executor", - NumNodes: 1, - JobId: job.GetId(), - Job: job, - Req: job.GetJobSchedulingInfo(nil).ObjectRequirements[0].GetPodRequirements(), + JobId: job.GetId(), + Job: job, + PodRequirements: job.GetPodRequirements(testfixtures.TestPriorityClasses), } } diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index 1ebbcb189a6..34bdf4f3b1c 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -4,8 +4,6 @@ import ( "context" "fmt" - "github.com/pkg/errors" - "github.com/armadaproject/armada/internal/common/util" schedulerconstraints "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" @@ -94,7 +92,7 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G // Check that the job is large enough for this executor. // This check needs to be here, since it relates to a specific job. // Only perform limit checks for new jobs to avoid preempting jobs if, e.g., MinimumJobSize changes. - if ok, unschedulableReason = requestIsLargeEnough(gctx.TotalResourceRequests, sch.constraints.MinimumJobSize); !ok { + if ok, unschedulableReason = requestsAreLargeEnough(gctx.TotalResourceRequests, sch.constraints.MinimumJobSize); !ok { return } if ok, unschedulableReason, err = sch.constraints.CheckPerQueueAndPriorityClassConstraints( @@ -112,20 +110,10 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G } func (sch *GangScheduler) trySchedule(ctx context.Context, gctx *schedulercontext.GangSchedulingContext) (bool, string, error) { - pctxs, ok, err := sch.nodeDb.ScheduleMany(gctx.PodRequirements()) + ok, err := sch.nodeDb.ScheduleMany(gctx.JobSchedulingContexts) if err != nil { return false, "", err } - if len(pctxs) > len(gctx.JobSchedulingContexts) { - return false, "", errors.Errorf( - "received %d pod scheduling context(s), but gang has cardinality %d", - len(pctxs), len(gctx.JobSchedulingContexts), - ) - } - for i, pctx := range pctxs { - gctx.JobSchedulingContexts[i].PodSchedulingContext = pctx - gctx.JobSchedulingContexts[i].NumNodes = pctx.NumNodes - } if !ok { unschedulableReason := "" if len(gctx.JobSchedulingContexts) > 1 { @@ -138,7 +126,7 @@ func (sch *GangScheduler) trySchedule(ctx context.Context, gctx *schedulercontex return true, "", nil } -func requestIsLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) { +func requestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) { if len(minRequest.Resources) == 0 { return true, "" } diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index 8a9d7f2f3f6..40b23b2d4f6 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -256,7 +256,7 @@ func TestGangScheduler(t *testing.T) { var actualScheduledIndices []int for i, gang := range tc.Gangs { - jctxs := jobSchedulingContextsFromJobs(gang, "", testfixtures.TestPriorityClasses) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, gang) gctx := schedulercontext.NewGangSchedulingContext(jctxs) ok, reason, err := sch.Schedule(context.Background(), gctx) require.NoError(t, err) diff --git a/internal/scheduler/interfaces/interfaces.go b/internal/scheduler/interfaces/interfaces.go index 409409f3bf0..d77432d46ab 100644 --- a/internal/scheduler/interfaces/interfaces.go +++ b/internal/scheduler/interfaces/interfaces.go @@ -17,7 +17,7 @@ type LegacySchedulerJob interface { GetPerQueuePriority() uint32 GetSubmitTime() time.Time GetAnnotations() map[string]string - GetJobSchedulingInfo(map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo + GetPodRequirements(priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements GetPriorityClassName() string GetNodeSelector() map[string]string GetAffinity() *v1.Affinity diff --git a/internal/scheduler/jobdb/job.go b/internal/scheduler/jobdb/job.go index 519fce9d495..b9ae76d27b4 100644 --- a/internal/scheduler/jobdb/job.go +++ b/internal/scheduler/jobdb/job.go @@ -180,18 +180,12 @@ func (job *Job) JobSchedulingInfo() *schedulerobjects.JobSchedulingInfo { // GetAnnotations returns the annotations on the job. // This is needed for compatibility with interfaces.LegacySchedulerJob func (job *Job) GetAnnotations() map[string]string { - if req := job.getPodRequirements(); req != nil { + if req := job.PodRequirements(); req != nil { return req.Annotations } return nil } -// GetRequirements returns the scheduling requirements associated with the job. -// Needed for compatibility with interfaces.LegacySchedulerJob -func (job *Job) GetJobSchedulingInfo(_ map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo { - return job.JobSchedulingInfo() -} - // Needed for compatibility with interfaces.LegacySchedulerJob func (job *Job) GetPriorityClassName() string { return job.JobSchedulingInfo().PriorityClassName @@ -199,7 +193,7 @@ func (job *Job) GetPriorityClassName() string { // Needed for compatibility with interfaces.LegacySchedulerJob func (job *Job) GetNodeSelector() map[string]string { - if req := job.getPodRequirements(); req != nil { + if req := job.PodRequirements(); req != nil { return req.NodeSelector } return nil @@ -207,7 +201,7 @@ func (job *Job) GetNodeSelector() map[string]string { // Needed for compatibility with interfaces.LegacySchedulerJob func (job *Job) GetAffinity() *v1.Affinity { - if req := job.getPodRequirements(); req != nil { + if req := job.PodRequirements(); req != nil { return req.Affinity } return nil @@ -215,7 +209,7 @@ func (job *Job) GetAffinity() *v1.Affinity { // Needed for compatibility with interfaces.LegacySchedulerJob func (job *Job) GetTolerations() []v1.Toleration { - if req := job.getPodRequirements(); req != nil { + if req := job.PodRequirements(); req != nil { return req.Tolerations } return nil @@ -223,21 +217,19 @@ func (job *Job) GetTolerations() []v1.Toleration { // Needed for compatibility with interfaces.LegacySchedulerJob func (job *Job) GetResourceRequirements() v1.ResourceRequirements { - if req := job.getPodRequirements(); req != nil { + if req := job.PodRequirements(); req != nil { return req.ResourceRequirements } return v1.ResourceRequirements{} } -func (job *Job) getPodRequirements() *schedulerobjects.PodRequirements { - requirements := job.jobSchedulingInfo.GetObjectRequirements() - if len(requirements) == 0 { - return nil - } - if podReqs := requirements[0].GetPodRequirements(); podReqs != nil { - return podReqs - } - return nil +func (job *Job) PodRequirements() *schedulerobjects.PodRequirements { + return job.jobSchedulingInfo.GetPodRequirements() +} + +// GetPodRequirements is needed for compatibility with interfaces.LegacySchedulerJob. +func (job *Job) GetPodRequirements(_ map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements { + return job.PodRequirements() } // Queued returns true if the job should be considered by the scheduler for assignment or false otherwise. diff --git a/internal/scheduler/jobdb/job_test.go b/internal/scheduler/jobdb/job_test.go index 8ab6c17a835..f5121c107ca 100644 --- a/internal/scheduler/jobdb/job_test.go +++ b/internal/scheduler/jobdb/job_test.go @@ -54,7 +54,6 @@ func TestJob_TestGetter(t *testing.T) { assert.Equal(t, baseJob.queue, baseJob.Queue()) assert.Equal(t, baseJob.queue, baseJob.GetQueue()) assert.Equal(t, baseJob.created, baseJob.Created()) - assert.Equal(t, schedulingInfo, baseJob.GetJobSchedulingInfo(nil)) assert.Equal(t, schedulingInfo, baseJob.JobSchedulingInfo()) assert.Equal(t, baseJob.GetAnnotations(), map[string]string{ "foo": "bar", diff --git a/internal/scheduler/metrics.go b/internal/scheduler/metrics.go index 16e55f040bd..f982f8c10f1 100644 --- a/internal/scheduler/metrics.go +++ b/internal/scheduler/metrics.go @@ -259,7 +259,7 @@ func (c *MetricsCollector) updateClusterMetrics(ctx context.Context) ([]promethe } phaseCountByQueue[key]++ - podRequirements := PodRequirementFromJobSchedulingInfo(job.JobSchedulingInfo()) + podRequirements := job.PodRequirements() if podRequirements != nil { queueKey := queueMetricKey{ cluster: executor.Id, diff --git a/internal/scheduler/nodedb/nodedb.go b/internal/scheduler/nodedb/nodedb.go index 4ddd839a9d6..4a93162b71e 100644 --- a/internal/scheduler/nodedb/nodedb.go +++ b/internal/scheduler/nodedb/nodedb.go @@ -20,6 +20,7 @@ import ( "github.com/armadaproject/armada/internal/common/util" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" + "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) @@ -263,94 +264,66 @@ func NodeJobDiff(txnA, txnB *memdb.Txn) (map[string]*schedulerobjects.Node, map[ return preempted, scheduled, nil } -// ScheduleMany assigns a set of pods to nodes. -// The assignment is atomic, i.e., either all pods are successfully assigned to nodes or none are. -// The returned bool indicates whether assignment succeeded or not. +// ScheduleMany assigns a set of jobs to nodes. The assignment is atomic, i.e., either all jobs are +// successfully assigned to nodes or none are. The returned bool indicates whether assignment +// succeeded (true) or not (false). +// +// This method sets the PodSchedulingContext field on each JobSchedulingContext that it attempts to +// schedule; if it returns early (e.g., because it finds an unschedulable JobSchedulingContext), +// then this field will not be set on the remaining items. // TODO: Pass through contexts to support timeouts. -func (nodeDb *NodeDb) ScheduleMany(reqs []*schedulerobjects.PodRequirements) ([]*schedulercontext.PodSchedulingContext, bool, error) { +func (nodeDb *NodeDb) ScheduleMany(jctxs []*schedulercontext.JobSchedulingContext) (bool, error) { txn := nodeDb.db.Txn(true) defer txn.Abort() - pctxs, ok, err := nodeDb.ScheduleManyWithTxn(txn, reqs) + ok, err := nodeDb.ScheduleManyWithTxn(txn, jctxs) if ok && err == nil { // All pods can be scheduled; commit the transaction. txn.Commit() } else { // On failure, clear the node binding. - for _, pctx := range pctxs { + for _, jctx := range jctxs { + pctx := jctx.PodSchedulingContext + if pctx == nil { + continue + } pctx.Node = nil } } - return pctxs, ok, err + return ok, err } -func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, reqs []*schedulerobjects.PodRequirements) ([]*schedulercontext.PodSchedulingContext, bool, error) { +func (nodeDb *NodeDb) ScheduleManyWithTxn(txn *memdb.Txn, jctxs []*schedulercontext.JobSchedulingContext) (bool, error) { // Attempt to schedule pods one by one in a transaction. - pctxs := make([]*schedulercontext.PodSchedulingContext, 0, len(reqs)) - for _, req := range reqs { - pctx, err := nodeDb.SelectNodeForPodWithTxn(txn, req) - if err != nil { - return nil, false, err + for _, jctx := range jctxs { + if err := nodeDb.SelectNodeForJobWithTxn(txn, jctx); err != nil { + return false, err } - pctxs = append(pctxs, pctx) - + pctx := jctx.PodSchedulingContext // If we found a node for this pod, bind it and continue to the next pod. - // - // Otherwise, zero out the node binding for all pods and abort the transaction. - if pctx.Node != nil { - if node, err := BindPodToNode(req, pctx.Node); err != nil { - return nil, false, err + if pctx != nil && pctx.Node != nil { + if node, err := BindJobToNode(nodeDb.priorityClasses, jctx.Job, pctx.Node); err != nil { + return false, err } else { if err := nodeDb.UpsertWithTxn(txn, node); err != nil { - return nil, false, err + return false, err } pctx.Node = node } } else { - return pctxs, false, nil - } - } - return pctxs, true, nil -} - -func (nodeDb *NodeDb) SelectAndBindNodeToPod(req *schedulerobjects.PodRequirements) (*schedulercontext.PodSchedulingContext, error) { - txn := nodeDb.db.Txn(true) - defer txn.Abort() - pctx, err := nodeDb.SelectAndBindNodeToPodWithTxn(txn, req) - if err != nil { - return nil, err - } - txn.Commit() - return pctx, nil -} - -func (nodeDb *NodeDb) SelectAndBindNodeToPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*schedulercontext.PodSchedulingContext, error) { - pctx, err := nodeDb.SelectNodeForPodWithTxn(txn, req) - if err != nil { - return nil, err - } - if pctx.Node != nil { - if node, err := BindPodToNode(req, pctx.Node); err != nil { - return nil, err - } else { - if err := nodeDb.UpsertWithTxn(txn, node); err != nil { - return nil, err - } - pctx.Node = node + return false, nil } } - return pctx, nil + return true, nil } -func (nodeDb *NodeDb) SelectNodeForPod(req *schedulerobjects.PodRequirements) (*schedulercontext.PodSchedulingContext, error) { - return nodeDb.SelectNodeForPodWithTxn(nodeDb.db.Txn(false), req) -} +// SelectNodeForJobWithTxn selects a node on which the job can be scheduled. +func (nodeDb *NodeDb) SelectNodeForJobWithTxn(txn *memdb.Txn, jctx *schedulercontext.JobSchedulingContext) error { + req := jctx.PodRequirements -// SelectNodeForPodWithTxn selects a node on which the pod can be scheduled. -func (nodeDb *NodeDb) SelectNodeForPodWithTxn(txn *memdb.Txn, req *schedulerobjects.PodRequirements) (*schedulercontext.PodSchedulingContext, error) { // Collect all node types that could potentially schedule the pod. matchingNodeTypes, numExcludedNodesByReason, err := nodeDb.NodeTypesMatchingPod(req) if err != nil { - return nil, err + return err } // Create a pctx to be returned to the caller. @@ -360,6 +333,7 @@ func (nodeDb *NodeDb) SelectNodeForPodWithTxn(txn *memdb.Txn, req *schedulerobje NumNodes: nodeDb.numNodes, NumExcludedNodesByReason: maps.Clone(numExcludedNodesByReason), } + jctx.PodSchedulingContext = pctx // For pods that failed to schedule, add an exclusion reason for implicitly excluded nodes. defer func() { @@ -380,12 +354,12 @@ func (nodeDb *NodeDb) SelectNodeForPodWithTxn(txn *memdb.Txn, req *schedulerobje // and schedule onto that node even if it requires preempting other jobs. if nodeId, ok := req.NodeSelector[schedulerconfig.NodeIdLabel]; ok { if it, err := txn.Get("nodes", "id", nodeId); err != nil { - return nil, errors.WithStack(err) + return errors.WithStack(err) } else { if _, err := nodeDb.selectNodeForPodWithIt(pctx, it, req.Priority, req, true); err != nil { - return nil, err + return err } else { - return pctx, nil + return nil } } } @@ -401,24 +375,24 @@ func (nodeDb *NodeDb) SelectNodeForPodWithTxn(txn *memdb.Txn, req *schedulerobje // (since we may consider all nodes at each priority). pctx.NumExcludedNodesByReason = maps.Clone(numExcludedNodesByReason) - // To to find a node at this priority. + // Try to find a node at this priority. node, err := nodeDb.selectNodeForPodAtPriority(txn, pctx, priority, req) if err != nil { - return nil, err + return err } if node != nil { if pctx.Node == nil { - return nil, errors.New("pctx.Node not set") + return errors.New("pctx.Node not set") } if node.Id != pctx.Node.Id { - return nil, errors.New("pctx.Node.Id does not match that of the returned node") + return errors.New("pctx.Node.Id does not match that of the returned node") } - return pctx, nil + return nil } else if pctx.Node != nil { - return nil, errors.New("pctx.Node is set, but no node was returned") + return errors.New("pctx.Node is set, but no node was returned") } } - return pctx, nil + return nil } func (nodeDb *NodeDb) selectNodeForPodAtPriority( @@ -512,20 +486,16 @@ func (nodeDb *NodeDb) selectNodeForPodWithIt( return selectedNode, nil } -// BindPodToNode returns a copy of node with req bound to it. -func BindPodToNode(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { - jobId, err := JobIdFromPodRequirements(req) - if err != nil { - return nil, err - } - queue, err := QueueFromPodRequirements(req) - if err != nil { - return nil, err - } - _, isEvicted := node.EvictedJobRunIds[jobId] +// BindJobToNode returns a copy of node with job bound to it. +func BindJobToNode(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { + jobId := job.GetId() + requests := job.GetResourceRequirements().Requests node = node.DeepCopy() + _, isEvicted := node.EvictedJobRunIds[jobId] + delete(node.EvictedJobRunIds, jobId) + if !isEvicted { if node.AllocatedByJobId == nil { node.AllocatedByJobId = make(map[string]schedulerobjects.ResourceList) @@ -533,42 +503,39 @@ func BindPodToNode(req *schedulerobjects.PodRequirements, node *schedulerobjects if allocatedToJob, ok := node.AllocatedByJobId[jobId]; ok { return nil, errors.Errorf("job %s already has resources allocated on node %s", jobId, node.Id) } else { - allocatedToJob.AddV1ResourceList(req.ResourceRequirements.Requests) + allocatedToJob.AddV1ResourceList(requests) node.AllocatedByJobId[jobId] = allocatedToJob } + if node.AllocatedByQueue == nil { node.AllocatedByQueue = make(map[string]schedulerobjects.ResourceList) } + queue := job.GetQueue() allocatedToQueue := node.AllocatedByQueue[queue] - allocatedToQueue.AddV1ResourceList(req.ResourceRequirements.Requests) + allocatedToQueue.AddV1ResourceList(requests) node.AllocatedByQueue[queue] = allocatedToQueue } - delete(node.EvictedJobRunIds, jobId) + allocatable := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource) + priority := priorityClasses[job.GetPriorityClassName()].Priority + allocatable.MarkAllocatedV1ResourceList(priority, requests) if isEvicted { - schedulerobjects.AllocatableByPriorityAndResourceType( - node.AllocatableByPriorityAndResource, - ).MarkAllocatableV1ResourceList(evictedPriority, req.ResourceRequirements.Requests) + allocatable.MarkAllocatableV1ResourceList(evictedPriority, requests) } - schedulerobjects.AllocatableByPriorityAndResourceType( - node.AllocatableByPriorityAndResource, - ).MarkAllocatedV1ResourceList(req.Priority, req.ResourceRequirements.Requests) + return node, nil } -// EvictPodFromNode returns a copy of node with req evicted from it. Specifically: +// EvictJobFromNode returns a copy of node with job evicted from it. Specifically: +// // - The job is marked as evicted on the node. // - AllocatedByJobId and AllocatedByQueue are not updated. // - Resources requested by the evicted pod are marked as allocated at priority evictedPriority. -func EvictPodFromNode(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { - jobId, err := JobIdFromPodRequirements(req) - if err != nil { - return nil, err - } - queue, err := QueueFromPodRequirements(req) - if err != nil { - return nil, err - } +func EvictJobFromNode(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { + jobId := job.GetId() + queue := job.GetQueue() + requests := job.GetResourceRequirements().Requests + node = node.DeepCopy() // Ensure we track allocated resources at evictedPriority. @@ -602,99 +569,65 @@ func EvictPodFromNode(req *schedulerobjects.PodRequirements, node *schedulerobje node.EvictedJobRunIds[jobId] = true } - schedulerobjects.AllocatableByPriorityAndResourceType( - node.AllocatableByPriorityAndResource, - ).MarkAllocatableV1ResourceList(req.Priority, req.ResourceRequirements.Requests) - schedulerobjects.AllocatableByPriorityAndResourceType( - node.AllocatableByPriorityAndResource, - ).MarkAllocatedV1ResourceList(evictedPriority, req.ResourceRequirements.Requests) + allocatable := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource) + priority := priorityClasses[job.GetPriorityClassName()].Priority + allocatable.MarkAllocatableV1ResourceList(priority, requests) + allocatable.MarkAllocatedV1ResourceList(evictedPriority, requests) return node, nil } -// UnbindPodsFromNode returns a node with all reqs unbound from it. -func UnbindPodsFromNode(reqs []*schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { +// UnbindJobsFromNode returns a node with all reqs unbound from it. +func UnbindJobsFromNode(priorityClasses map[string]configuration.PriorityClass, jobs []interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { node = node.DeepCopy() - for _, req := range reqs { - if err := unbindPodFromNodeInPlace(req, node); err != nil { + for _, job := range jobs { + if err := unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil { return nil, err } } return node, nil } -// UnbindPodFromNode returns a copy of node with req unbound from it. -func UnbindPodFromNode(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { +// UnbindJobFromNode returns a copy of node with req unbound from it. +func UnbindJobFromNode(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) (*schedulerobjects.Node, error) { node = node.DeepCopy() - if err := unbindPodFromNodeInPlace(req, node); err != nil { + if err := unbindJobFromNodeInPlace(priorityClasses, job, node); err != nil { return nil, err } return node, nil } -// unbindPodFromNodeInPlace is like UnbindPodFromNode, but doesn't make a copy of the node. -func unbindPodFromNodeInPlace(req *schedulerobjects.PodRequirements, node *schedulerobjects.Node) error { - jobId, err := JobIdFromPodRequirements(req) - if err != nil { - return err - } - queue, err := QueueFromPodRequirements(req) - if err != nil { - return err - } +// unbindPodFromNodeInPlace is like UnbindJobFromNode, but doesn't make a copy of the node. +func unbindJobFromNodeInPlace(priorityClasses map[string]configuration.PriorityClass, job interfaces.LegacySchedulerJob, node *schedulerobjects.Node) error { + jobId := job.GetId() + requests := job.GetResourceRequirements().Requests + _, isEvicted := node.EvictedJobRunIds[jobId] + delete(node.EvictedJobRunIds, jobId) if _, ok := node.AllocatedByJobId[jobId]; !ok { return errors.Errorf("job %s has no resources allocated on node %s", jobId, node.Id) } else { delete(node.AllocatedByJobId, jobId) } + + queue := job.GetQueue() if allocatedToQueue, ok := node.AllocatedByQueue[queue]; !ok { return errors.Errorf("queue %s has no resources allocated on node %s", queue, node.Id) } else { - allocatedToQueue.SubV1ResourceList(req.ResourceRequirements.Requests) - if allocatedToQueue.Equal(schedulerobjects.ResourceList{}) { + allocatedToQueue.SubV1ResourceList(requests) + if allocatedToQueue.IsZero() { delete(node.AllocatedByQueue, queue) - } else { - node.AllocatedByQueue[queue] = allocatedToQueue } } - delete(node.EvictedJobRunIds, jobId) - priority := req.Priority + allocatable := schedulerobjects.AllocatableByPriorityAndResourceType(node.AllocatableByPriorityAndResource) + priority := priorityClasses[job.GetPriorityClassName()].Priority if isEvicted { priority = evictedPriority } - schedulerobjects.AllocatableByPriorityAndResourceType( - node.AllocatableByPriorityAndResource, - ).MarkAllocatableV1ResourceList(priority, req.ResourceRequirements.Requests) - return nil -} - -func JobIdFromPodRequirements(req *schedulerobjects.PodRequirements) (string, error) { - return valueFromPodRequirements(req, schedulerconfig.JobIdAnnotation) -} + allocatable.MarkAllocatableV1ResourceList(priority, requests) -func QueueFromPodRequirements(req *schedulerobjects.PodRequirements) (string, error) { - return valueFromPodRequirements(req, schedulerconfig.QueueAnnotation) -} - -func valueFromPodRequirements(req *schedulerobjects.PodRequirements, key string) (string, error) { - v, ok := req.Annotations[key] - if !ok { - return "", errors.WithStack(&armadaerrors.ErrInvalidArgument{ - Name: "req.Annotations", - Value: req.Annotations, - Message: fmt.Sprintf("%s annotation missing", key), - }) - } - if v == "" { - return "", errors.WithStack(&armadaerrors.ErrInvalidArgument{ - Name: key, - Value: v, - Message: fmt.Sprintf("value of %s is empty", key), - }) - } - return v, nil + return nil } // NodeTypesMatchingPod returns a slice with all node types a pod could be scheduled on. diff --git a/internal/scheduler/nodedb/nodedb_test.go b/internal/scheduler/nodedb/nodedb_test.go index 9f932aecc01..a41148ad157 100644 --- a/internal/scheduler/nodedb/nodedb_test.go +++ b/internal/scheduler/nodedb/nodedb_test.go @@ -12,6 +12,9 @@ import ( armadamaps "github.com/armadaproject/armada/internal/common/maps" schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" + schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" + "github.com/armadaproject/armada/internal/scheduler/interfaces" + "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduler/testfixtures" ) @@ -66,15 +69,20 @@ func TestSelectNodeForPod_NodeIdLabel_Success(t *testing.T) { require.NotEmpty(t, nodeId) db, err := createNodeDb(nodes) require.NoError(t, err) - reqs := testfixtures.WithNodeSelectorPodReqs( + jobs := testfixtures.WithNodeSelectorJobs( map[string]string{schedulerconfig.NodeIdLabel: nodeId}, - testfixtures.N1CpuPodReqs("A", 0, 1), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 1), ) - for _, req := range reqs { - pctx, err := db.SelectNodeForPod(req) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) + for _, jctx := range jctxs { + txn := db.Txn(false) + err := db.SelectNodeForJobWithTxn(txn, jctx) + txn.Abort() if !assert.NoError(t, err) { continue } + pctx := jctx.PodSchedulingContext + require.NotNil(t, pctx) require.NotNil(t, pctx.Node) assert.Equal(t, nodeId, pctx.Node.Id) assert.Equal(t, 0, len(pctx.NumExcludedNodesByReason)) @@ -88,15 +96,20 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) { require.NotEmpty(t, nodeId) db, err := createNodeDb(nodes) require.NoError(t, err) - reqs := testfixtures.WithNodeSelectorPodReqs( + jobs := testfixtures.WithNodeSelectorJobs( map[string]string{schedulerconfig.NodeIdLabel: "this node does not exist"}, - testfixtures.N1CpuPodReqs("A", 0, 1), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 1), ) - for _, req := range reqs { - pctx, err := db.SelectNodeForPod(req) + jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) + for _, jctx := range jctxs { + txn := db.Txn(false) + err := db.SelectNodeForJobWithTxn(txn, jctx) + txn.Abort() if !assert.NoError(t, err) { continue } + pctx := jctx.PodSchedulingContext + require.NotNil(t, pctx) assert.Nil(t, pctx.Node) assert.Equal(t, 1, len(pctx.NumExcludedNodesByReason)) } @@ -104,39 +117,38 @@ func TestSelectNodeForPod_NodeIdLabel_Failure(t *testing.T) { func TestNodeBindingEvictionUnbinding(t *testing.T) { node := testfixtures.Test8GpuNode(append(testfixtures.TestPriorities, evictedPriority)) - req := testfixtures.N1GpuPodReqs("A", 0, 1)[0] - request := schedulerobjects.ResourceListFromV1ResourceList(req.ResourceRequirements.Requests) - jobId, err := JobIdFromPodRequirements(req) - require.NoError(t, err) + job := testfixtures.Test1GpuJob("A", testfixtures.PriorityClass0) + request := schedulerobjects.ResourceListFromV1ResourceList(job.GetResourceRequirements().Requests) + jobId := job.GetId() - boundNode, err := BindPodToNode(req, node) + boundNode, err := BindJobToNode(testfixtures.TestPriorityClasses, job, node) require.NoError(t, err) - unboundNode, err := UnbindPodFromNode(req, boundNode) + unboundNode, err := UnbindJobFromNode(testfixtures.TestPriorityClasses, job, boundNode) require.NoError(t, err) - unboundMultipleNode, err := UnbindPodsFromNode([]*schedulerobjects.PodRequirements{req}, boundNode) + unboundMultipleNode, err := UnbindJobsFromNode(testfixtures.TestPriorityClasses, []interfaces.LegacySchedulerJob{job}, boundNode) require.NoError(t, err) - evictedNode, err := EvictPodFromNode(req, boundNode) + evictedNode, err := EvictJobFromNode(testfixtures.TestPriorityClasses, job, boundNode) require.NoError(t, err) - evictedUnboundNode, err := UnbindPodFromNode(req, evictedNode) + evictedUnboundNode, err := UnbindJobFromNode(testfixtures.TestPriorityClasses, job, evictedNode) require.NoError(t, err) - evictedBoundNode, err := BindPodToNode(req, evictedNode) + evictedBoundNode, err := BindJobToNode(testfixtures.TestPriorityClasses, job, evictedNode) require.NoError(t, err) - _, err = EvictPodFromNode(req, node) + _, err = EvictJobFromNode(testfixtures.TestPriorityClasses, job, node) require.Error(t, err) - _, err = UnbindPodFromNode(req, node) + _, err = UnbindJobFromNode(testfixtures.TestPriorityClasses, job, node) require.Error(t, err) - _, err = BindPodToNode(req, boundNode) + _, err = BindJobToNode(testfixtures.TestPriorityClasses, job, boundNode) require.Error(t, err) - _, err = EvictPodFromNode(req, evictedNode) + _, err = EvictJobFromNode(testfixtures.TestPriorityClasses, job, evictedNode) require.Error(t, err) assertNodeAccountingEqual(t, node, unboundNode) @@ -177,7 +189,8 @@ func TestNodeBindingEvictionUnbinding(t *testing.T) { expectedAllocatable := boundNode.TotalResources.DeepCopy() expectedAllocatable.Sub(request) - assert.True(t, expectedAllocatable.Equal(boundNode.AllocatableByPriorityAndResource[req.Priority])) + priority := testfixtures.TestPriorityClasses[job.GetPriorityClassName()].Priority + assert.True(t, expectedAllocatable.Equal(boundNode.AllocatableByPriorityAndResource[priority])) assert.Empty(t, unboundNode.AllocatedByJobId) assert.Empty(t, unboundNode.AllocatedByQueue) @@ -242,47 +255,59 @@ func assertNodeAccountingEqual(t *testing.T, node1, node2 *schedulerobjects.Node return rv } -func TestSelectAndBindNodeToPod(t *testing.T) { +func TestScheduleIndividually(t *testing.T) { tests := map[string]struct { Nodes []*schedulerobjects.Node - Reqs []*schedulerobjects.PodRequirements + Jobs []*jobdb.Job ExpectSuccess []bool }{ "all jobs fit": { Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - Reqs: testfixtures.N1CpuPodReqs("A", 0, 32), + Jobs: testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 32), ExpectSuccess: testfixtures.Repeat(true, 32), }, "not all jobs fit": { Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - Reqs: testfixtures.N1CpuPodReqs("A", 0, 33), + Jobs: testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 33), ExpectSuccess: append(testfixtures.Repeat(true, 32), testfixtures.Repeat(false, 1)...), }, "unavailable resource": { Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - Reqs: testfixtures.N1GpuPodReqs("A", 0, 1), + Jobs: testfixtures.N1GpuJobs("A", testfixtures.PriorityClass0, 1), ExpectSuccess: testfixtures.Repeat(false, 1), }, "unsupported resource": { Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - Reqs: testfixtures.WithRequestsPodReqs( + Jobs: testfixtures.WithRequestsJobs( schedulerobjects.ResourceList{ Resources: map[string]resource.Quantity{ "gibberish": resource.MustParse("1"), }, }, - testfixtures.N1CpuPodReqs("A", 0, 1), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 1), ), ExpectSuccess: testfixtures.Repeat(false, 1), }, "preemption": { - Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - Reqs: append(append(testfixtures.N1CpuPodReqs("A", 0, 32), testfixtures.N1CpuPodReqs("A", 1, 32)...), testfixtures.N1CpuPodReqs("A", 0, 32)...), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Jobs: append( + append( + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 32), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass1, 32)..., + ), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 32)..., + ), ExpectSuccess: append(testfixtures.Repeat(true, 64), testfixtures.Repeat(false, 32)...), }, "taints/tolerations": { - Nodes: testfixtures.NTainted32CpuNodes(1, testfixtures.TestPriorities), - Reqs: append(append(testfixtures.N1CpuPodReqs("A", 0, 1), testfixtures.N1GpuPodReqs("A", 0, 1)...), testfixtures.N32CpuPodReqs("A", 0, 1)...), + Nodes: testfixtures.NTainted32CpuNodes(1, testfixtures.TestPriorities), + Jobs: append( + append( + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 1), + testfixtures.N1GpuJobs("A", testfixtures.PriorityClass0, 1)..., + ), + testfixtures.N32CpuJobs("A", testfixtures.PriorityClass0, 1)..., + ), ExpectSuccess: []bool{false, false, true}, }, "node selector": { @@ -295,11 +320,11 @@ func TestSelectAndBindNodeToPod(t *testing.T) { testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), )..., ), - Reqs: testfixtures.WithNodeSelectorPodReqs( + Jobs: testfixtures.WithNodeSelectorJobs( map[string]string{ "key": "value", }, - testfixtures.N1CpuPodReqs("A", 0, 33), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 33), ), ExpectSuccess: append(testfixtures.Repeat(true, 32), testfixtures.Repeat(false, 1)...), }, @@ -310,21 +335,21 @@ func TestSelectAndBindNodeToPod(t *testing.T) { }, testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), ), - Reqs: testfixtures.WithNodeSelectorPodReqs( + Jobs: testfixtures.WithNodeSelectorJobs( map[string]string{ "key": "this is the wrong value", }, - testfixtures.N1CpuPodReqs("A", 0, 1), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 1), ), ExpectSuccess: testfixtures.Repeat(false, 1), }, "node selector with missing label": { Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - Reqs: testfixtures.WithNodeSelectorPodReqs( + Jobs: testfixtures.WithNodeSelectorJobs( map[string]string{ "this label does not exist": "value", }, - testfixtures.N1CpuPodReqs("A", 0, 1), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 1), ), ExpectSuccess: testfixtures.Repeat(false, 1), }, @@ -338,7 +363,7 @@ func TestSelectAndBindNodeToPod(t *testing.T) { testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), )..., ), - Reqs: testfixtures.WithNodeAffinityPodReqs( + Jobs: testfixtures.WithNodeAffinityJobs( []v1.NodeSelectorTerm{ { MatchExpressions: []v1.NodeSelectorRequirement{ @@ -350,7 +375,7 @@ func TestSelectAndBindNodeToPod(t *testing.T) { }, }, }, - testfixtures.N1CpuPodReqs("A", 0, 33), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 33), ), ExpectSuccess: append(testfixtures.Repeat(true, 32), testfixtures.Repeat(false, 1)...), }, @@ -359,21 +384,35 @@ func TestSelectAndBindNodeToPod(t *testing.T) { t.Run(name, func(t *testing.T) { nodeDb, err := createNodeDb(tc.Nodes) require.NoError(t, err) - for i, req := range tc.Reqs { - report, err := nodeDb.SelectAndBindNodeToPod(req) + + jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, tc.Jobs) + + for i, jctx := range jctxs { + ok, err := nodeDb.ScheduleMany([]*schedulercontext.JobSchedulingContext{jctx}) require.NoError(t, err) + pctx := jctx.PodSchedulingContext + if !tc.ExpectSuccess[i] { - assert.Nil(t, report.Node) + assert.False(t, ok) + if pctx != nil { + assert.Nil(t, pctx.Node) + } continue } - assert.NotNil(t, report.Node) - node, err := nodeDb.GetNode(report.Node.Id) - require.NoError(t, err) - jobId, err := JobIdFromPodRequirements(req) - require.NoError(t, err) - expected := schedulerobjects.ResourceListFromV1ResourceList(req.ResourceRequirements.Requests) - actual, ok := node.AllocatedByJobId[jobId] + assert.True(t, ok) + require.NotNil(t, pctx) + + node := pctx.Node + if !tc.ExpectSuccess[i] { + assert.Nil(t, node) + continue + } + require.NotNil(t, node) + + job := jctx.Job + expected := schedulerobjects.ResourceListFromV1ResourceList(job.GetResourceRequirements().Requests) + actual, ok := node.AllocatedByJobId[job.GetId()] require.True(t, ok) assert.True(t, actual.Equal(expected)) } @@ -387,34 +426,37 @@ func TestScheduleMany(t *testing.T) { Nodes []*schedulerobjects.Node // Schedule one group of jobs at a time. // Each group is composed of a slice of pods. - Reqs [][]*schedulerobjects.PodRequirements + Jobs [][]*jobdb.Job // For each group, whether we expect scheduling to succeed. ExpectSuccess []bool }{ "simple success": { Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - Reqs: [][]*schedulerobjects.PodRequirements{testfixtures.N1CpuPodReqs("A", 0, 32)}, + Jobs: [][]*jobdb.Job{testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 32)}, ExpectSuccess: []bool{true}, }, "simple failure": { Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - Reqs: [][]*schedulerobjects.PodRequirements{testfixtures.N1CpuPodReqs("A", 0, 33)}, + Jobs: [][]*jobdb.Job{testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 33)}, ExpectSuccess: []bool{false}, }, "correct rollback": { Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), - Reqs: [][]*schedulerobjects.PodRequirements{ - testfixtures.N1CpuPodReqs("A", 0, 32), - testfixtures.N1CpuPodReqs("A", 0, 33), - testfixtures.N1CpuPodReqs("A", 0, 32), + Jobs: [][]*jobdb.Job{ + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 32), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 33), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 32), }, ExpectSuccess: []bool{true, false, true}, }, "varying job size": { Nodes: testfixtures.N32CpuNodes(2, testfixtures.TestPriorities), - Reqs: [][]*schedulerobjects.PodRequirements{ - append(testfixtures.N32CpuPodReqs("A", 0, 1), testfixtures.N1CpuPodReqs("A", 0, 32)...), - testfixtures.N1CpuPodReqs("A", 0, 1), + Jobs: [][]*jobdb.Job{ + append( + testfixtures.N32CpuJobs("A", testfixtures.PriorityClass0, 1), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 32)..., + ), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 1), }, ExpectSuccess: []bool{true, false}, }, @@ -422,24 +464,18 @@ func TestScheduleMany(t *testing.T) { for name, tc := range tests { t.Run(name, func(t *testing.T) { nodeDb, err := createNodeDb(tc.Nodes) - if !assert.NoError(t, err) { - return - } - for i, reqs := range tc.Reqs { - reports, ok, err := nodeDb.ScheduleMany(reqs) - if !assert.NoError(t, err) { - return - } - if tc.ExpectSuccess[i] { - assert.Equal(t, len(reqs), len(reports)) - for _, report := range reports { - if !assert.NotNil(t, report.Node) { - return - } + require.NoError(t, err) + for i, jobs := range tc.Jobs { + jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) + ok, err := nodeDb.ScheduleMany(jctxs) + require.NoError(t, err) + assert.Equal(t, tc.ExpectSuccess[i], ok) + for _, jctx := range jctxs { + pctx := jctx.PodSchedulingContext + require.NotNil(t, pctx) + if tc.ExpectSuccess[i] { + assert.NotNil(t, pctx.Node) } - assert.True(t, ok) - } else { - assert.False(t, ok) } } }) @@ -478,7 +514,7 @@ func BenchmarkUpsert100000(b *testing.B) { benchmarkUpsert(testfixtures.N32CpuNodes(100000, testfixtures.TestPriorities), b) } -func benchmarkSelectAndBindNodeToPod(nodes []*schedulerobjects.Node, reqs []*schedulerobjects.PodRequirements, b *testing.B) { +func benchmarkScheduleMany(b *testing.B, nodes []*schedulerobjects.Node, jobs []*jobdb.Job) { nodeDb, err := NewNodeDb( testfixtures.TestPriorityClasses, testfixtures.TestMaxExtraNodesToConsider, @@ -493,109 +529,108 @@ func benchmarkSelectAndBindNodeToPod(nodes []*schedulerobjects.Node, reqs []*sch b.ResetTimer() for n := 0; n < b.N; n++ { + jctxs := schedulercontext.JobSchedulingContextsFromJobs(testfixtures.TestPriorityClasses, jobs) txn := nodeDb.Txn(true) - for _, req := range reqs { - _, err := nodeDb.SelectAndBindNodeToPodWithTxn(txn, req) - require.NoError(b, err) - } + _, err := nodeDb.ScheduleManyWithTxn(txn, jctxs) txn.Abort() + require.NoError(b, err) } } -func BenchmarkSelectAndBindNodeToPod10CpuNodes320SmallJobs(b *testing.B) { - benchmarkSelectAndBindNodeToPod( - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - testfixtures.N1CpuPodReqs("A", 0, 320), +func BenchmarkScheduleMany10CpuNodes320SmallJobs(b *testing.B) { + benchmarkScheduleMany( b, + testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 320), ) } -func BenchmarkSelectAndBindNodeToPod10CpuNodes640SmallJobs(b *testing.B) { - benchmarkSelectAndBindNodeToPod( - testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), - testfixtures.N1CpuPodReqs("A", 0, 640), +func BenchmarkScheduleMany10CpuNodes640SmallJobs(b *testing.B) { + benchmarkScheduleMany( b, + testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 640), ) } -func BenchmarkSelectAndBindNodeToPod100CpuNodes3200SmallJobs(b *testing.B) { - benchmarkSelectAndBindNodeToPod( - testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), - testfixtures.N1CpuPodReqs("A", 0, 3200), +func BenchmarkScheduleMany100CpuNodes3200SmallJobs(b *testing.B) { + benchmarkScheduleMany( b, + testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 3200), ) } -func BenchmarkSelectAndBindNodeToPod100CpuNodes6400SmallJobs(b *testing.B) { - benchmarkSelectAndBindNodeToPod( - testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), - testfixtures.N1CpuPodReqs("A", 0, 6400), +func BenchmarkScheduleMany100CpuNodes6400SmallJobs(b *testing.B) { + benchmarkScheduleMany( b, + testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 6400), ) } -func BenchmarkSelectAndBindNodeToPod1000CpuNodes32000SmallJobs(b *testing.B) { - benchmarkSelectAndBindNodeToPod( - testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), - testfixtures.N1CpuPodReqs("A", 0, 32000), +func BenchmarkScheduleMany1000CpuNodes32000SmallJobs(b *testing.B) { + benchmarkScheduleMany( b, + testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 32000), ) } -func BenchmarkSelectAndBindNodeToPod1000CpuNodes64000SmallJobs(b *testing.B) { - benchmarkSelectAndBindNodeToPod( - testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), - testfixtures.N1CpuPodReqs("A", 0, 64000), +func BenchmarkScheduleMany1000CpuNodes64000SmallJobs(b *testing.B) { + benchmarkScheduleMany( b, + testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 64000), ) } -func BenchmarkSelectAndBindNodeToPod100CpuNodes1CpuUnused(b *testing.B) { - benchmarkSelectAndBindNodeToPod( +func BenchmarkScheduleMany100CpuNodes1CpuUnused(b *testing.B) { + benchmarkScheduleMany( + b, testfixtures.WithUsedResourcesNodes( 0, schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("31")}}, testfixtures.N32CpuNodes(100, testfixtures.TestPriorities), ), - testfixtures.N1CpuPodReqs("A", 0, 100), - b, + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 100), ) } -func BenchmarkSelectAndBindNodeToPod1000CpuNodes1CpuUnused(b *testing.B) { - benchmarkSelectAndBindNodeToPod( +func BenchmarkScheduleMany1000CpuNodes1CpuUnused(b *testing.B) { + benchmarkScheduleMany( + b, testfixtures.WithUsedResourcesNodes( 0, schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("31")}}, testfixtures.N32CpuNodes(1000, testfixtures.TestPriorities), ), - testfixtures.N1CpuPodReqs("A", 0, 1000), - b, + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 1000), ) } -func BenchmarkSelectAndBindNodeToPod10000CpuNodes1CpuUnused(b *testing.B) { - benchmarkSelectAndBindNodeToPod( +func BenchmarkScheduleMany10000CpuNodes1CpuUnused(b *testing.B) { + benchmarkScheduleMany( + b, testfixtures.WithUsedResourcesNodes( 0, schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("31")}}, testfixtures.N32CpuNodes(10000, testfixtures.TestPriorities), ), - testfixtures.N1CpuPodReqs("A", 0, 10000), - b, + testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 10000), ) } -func BenchmarkSelectAndBindNodeToPodResourceConstrained(b *testing.B) { +func BenchmarkScheduleManyResourceConstrained(b *testing.B) { nodes := append(append( testfixtures.N32CpuNodes(500, testfixtures.TestPriorities), testfixtures.N8GpuNodes(1, testfixtures.TestPriorities)...), testfixtures.N32CpuNodes(499, testfixtures.TestPriorities)..., ) - benchmarkSelectAndBindNodeToPod( - nodes, - testfixtures.N1GpuPodReqs("A", 0, 1), + benchmarkScheduleMany( b, + nodes, + testfixtures.N1GpuJobs("A", testfixtures.PriorityClass0, 1), ) } @@ -637,17 +672,3 @@ func randomString(n int) string { } return s } - -func GetTestNodeDb() *NodeDb { - nodeDb, err := NewNodeDb( - testfixtures.TestPriorityClasses, - 0, - testfixtures.TestResources, - testfixtures.TestIndexedTaints, - testfixtures.TestIndexedNodeLabels, - ) - if err != nil { - panic(err) - } - return nodeDb -} diff --git a/internal/scheduler/pool_assigner.go b/internal/scheduler/pool_assigner.go index 14528c619e4..0387fa3d58c 100644 --- a/internal/scheduler/pool_assigner.go +++ b/internal/scheduler/pool_assigner.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "github.com/armadaproject/armada/internal/armada/configuration" + schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" @@ -118,27 +119,33 @@ func (p *DefaultPoolAssigner) AssignPool(j *jobdb.Job) (string, error) { return cachedPool.(string), nil } - req := PodRequirementFromJobSchedulingInfo(j.JobSchedulingInfo()) + req := j.PodRequirements() req = p.clearAnnotations(req) // Otherwise iterate through each pool and detect the first one the job is potentially schedulable on for pool, executors := range p.executorsByPool { for _, e := range executors { - minReqsMet, _ := requestIsLargeEnough(schedulerobjects.ResourceListFromV1ResourceList( - req.GetResourceRequirements().Requests, - ), e.minimumJobSize) - if minReqsMet { - nodeDb := e.nodeDb - txn := nodeDb.Txn(true) - report, err := nodeDb.SelectNodeForPodWithTxn(txn, req) - txn.Abort() - if err != nil { - return "", errors.WithMessagef(err, "error selecting node for job %s", j.Id()) - } - if report.Node != nil { - p.poolCache.Add(schedulingKey, pool) - return pool, nil - } + requests := req.GetResourceRequirements().Requests + if ok, _ := requestsAreLargeEnough(schedulerobjects.ResourceListFromV1ResourceList(requests), e.minimumJobSize); !ok { + continue + } + nodeDb := e.nodeDb + txn := nodeDb.Txn(true) + jctx := &schedulercontext.JobSchedulingContext{ + Created: time.Now(), + JobId: j.GetId(), + Job: j, + PodRequirements: j.GetPodRequirements(p.priorityClasses), + } + err := nodeDb.SelectNodeForJobWithTxn(txn, jctx) + txn.Abort() + if err != nil { + return "", errors.WithMessagef(err, "error selecting node for job %s", j.Id()) + } + pctx := jctx.PodSchedulingContext + if pctx != nil && pctx.Node != nil { + p.poolCache.Add(schedulingKey, pool) + return pool, nil } } } diff --git a/internal/scheduler/preempting_queue_scheduler.go b/internal/scheduler/preempting_queue_scheduler.go index 636d8caf713..a4a08546ed3 100644 --- a/internal/scheduler/preempting_queue_scheduler.go +++ b/internal/scheduler/preempting_queue_scheduler.go @@ -544,15 +544,7 @@ func (sch *PreemptingQueueScheduler) unbindJobs(jobs []interfaces.LegacySchedule if err != nil { return err } - node, err = nodedb.UnbindPodsFromNode( - util.Map( - jobsOnNode, - func(job interfaces.LegacySchedulerJob) *schedulerobjects.PodRequirements { - return PodRequirementFromLegacySchedulerJob(job, sch.schedulingContext.PriorityClasses) - }, - ), - node, - ) + node, err = nodedb.UnbindJobsFromNode(sch.schedulingContext.PriorityClasses, jobsOnNode, node) if err != nil { return err } @@ -814,11 +806,7 @@ func (evi *Evictor) Evict(ctx context.Context, it nodedb.NodeIterator) (*Evictor if evi.jobFilter != nil && !evi.jobFilter(ctx, job) { continue } - req := PodRequirementFromLegacySchedulerJob(job, evi.priorityClasses) - if req == nil { - continue - } - node, err = nodedb.EvictPodFromNode(req, node) + node, err = nodedb.EvictJobFromNode(evi.priorityClasses, job, node) if err != nil { return nil, err } diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 7ab6ae1d3fb..39748894585 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -34,9 +34,8 @@ func TestEvictOversubscribed(t *testing.T) { testfixtures.N1CpuJobs("A", testfixtures.PriorityClass0, 20), testfixtures.N1CpuJobs("A", testfixtures.PriorityClass1, 20)..., ) - reqs := PodRequirementsFromLegacySchedulerJobs(jobs, testfixtures.TestPriorityClasses) - for _, req := range reqs { - node, err = nodedb.BindPodToNode(req, node) + for _, job := range jobs { + node, err = nodedb.BindJobToNode(testfixtures.TestPriorityClasses, job, node) require.NoError(t, err) } nodes[0] = node @@ -1297,11 +1296,10 @@ func TestPreemptingQueueScheduler(t *testing.T) { for roundIndex, reqIndices := range reqIndicesByRoundIndex { for _, reqIndex := range reqIndices { job := tc.Rounds[roundIndex].JobsByQueue[queue][reqIndex] - req := PodRequirementFromLegacySchedulerJob(job, tc.SchedulingConfig.Preemption.PriorityClasses) nodeId := nodeIdByJobId[job.GetId()] node, err := nodeDb.GetNode(nodeId) require.NoError(t, err) - node, err = nodedb.UnbindPodFromNode(req, node) + node, err = nodedb.UnbindJobFromNode(tc.SchedulingConfig.Preemption.PriorityClasses, job, node) require.NoError(t, err) err = nodeDb.Upsert(node) require.NoError(t, err) @@ -1641,8 +1639,7 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { for _, job := range result.ScheduledJobs { nodeId := result.NodeIdByJobId[job.GetId()] node := nodesById[nodeId] - podRequirements := PodRequirementFromLegacySchedulerJob(job, tc.SchedulingConfig.Preemption.PriorityClasses) - node, err = nodedb.BindPodToNode(podRequirements, node) + node, err = nodedb.BindJobToNode(tc.SchedulingConfig.Preemption.PriorityClasses, job, node) require.NoError(b, err) nodesById[nodeId] = node } diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index 55883bb2158..3e512aaf8af 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -215,7 +215,6 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e if unsuccessfulJctx, ok := it.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; ok { jctx := &schedulercontext.JobSchedulingContext{ Created: time.Now(), - ExecutorId: it.schedulingContext.ExecutorId, JobId: job.GetId(), Job: job, UnschedulableReason: unsuccessfulJctx.UnschedulableReason, @@ -241,20 +240,18 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e if len(gang) == gangCardinality { delete(it.jobsByGangId, gangId) it.next = schedulercontext.NewGangSchedulingContext( - jobSchedulingContextsFromJobs( - gang, - it.schedulingContext.ExecutorId, + schedulercontext.JobSchedulingContextsFromJobs( it.schedulingContext.PriorityClasses, + gang, ), ) return it.next, nil } } else { it.next = schedulercontext.NewGangSchedulingContext( - jobSchedulingContextsFromJobs( - []interfaces.LegacySchedulerJob{job}, - it.schedulingContext.ExecutorId, + schedulercontext.JobSchedulingContextsFromJobs( it.schedulingContext.PriorityClasses, + []interfaces.LegacySchedulerJob{job}, ), ) return it.next, nil diff --git a/internal/scheduler/reports_test.go b/internal/scheduler/reports_test.go index a3178449ed9..fc96cc1b25e 100644 --- a/internal/scheduler/reports_test.go +++ b/internal/scheduler/reports_test.go @@ -253,10 +253,7 @@ func withSuccessfulJobSchedulingContext(sctx *schedulercontext.SchedulingContext qctx.SchedulingContext = nil qctx.Created = time.Time{} } - qctx.SuccessfulJobSchedulingContexts[jobId] = &schedulercontext.JobSchedulingContext{ - ExecutorId: sctx.ExecutorId, - JobId: jobId, - } + qctx.SuccessfulJobSchedulingContexts[jobId] = &schedulercontext.JobSchedulingContext{JobId: jobId} rl := schedulerobjects.ResourceList{Resources: map[string]resource.Quantity{"cpu": resource.MustParse("1")}} qctx.ScheduledResourcesByPriorityClass.AddResourceList("foo", rl) sctx.ScheduledResourcesByPriorityClass.AddResourceList("foo", rl) @@ -296,11 +293,7 @@ func withUnsuccessfulJobSchedulingContext(sctx *schedulercontext.SchedulingConte qctx.SchedulingContext = nil qctx.Created = time.Time{} } - qctx.UnsuccessfulJobSchedulingContexts[jobId] = &schedulercontext.JobSchedulingContext{ - ExecutorId: sctx.ExecutorId, - JobId: jobId, - UnschedulableReason: "unknown", - } + qctx.UnsuccessfulJobSchedulingContexts[jobId] = &schedulercontext.JobSchedulingContext{JobId: jobId, UnschedulableReason: "unknown"} return sctx } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 3a3459506a1..31003cf15c8 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -326,11 +326,11 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*jobdb.Job, error) { func (s *Scheduler) createSchedulingInfoWithNodeAntiAffinityForAttemptedRuns(job *jobdb.Job) (*schedulerobjects.JobSchedulingInfo, error) { newSchedulingInfo := proto.Clone(job.JobSchedulingInfo()).(*schedulerobjects.JobSchedulingInfo) newSchedulingInfo.Version = job.JobSchedulingInfo().Version + 1 - podSchedulingRequirement := PodRequirementFromJobSchedulingInfo(newSchedulingInfo) - if podSchedulingRequirement == nil { + podRequirements := newSchedulingInfo.GetPodRequirements() + if podRequirements == nil { return nil, errors.Errorf("no pod scheduling requirement found for job %s", job.GetId()) } - newAffinity := podSchedulingRequirement.Affinity + newAffinity := podRequirements.Affinity if newAffinity == nil { newAffinity = &v1.Affinity{} } @@ -343,7 +343,7 @@ func (s *Scheduler) createSchedulingInfoWithNodeAntiAffinityForAttemptedRuns(job } } } - podSchedulingRequirement.Affinity = newAffinity + podRequirements.Affinity = newAffinity return newSchedulingInfo, nil } @@ -352,12 +352,9 @@ func (s *Scheduler) addNodeAntiAffinitiesForAttemptedRunsIfSchedulable(job *jobd if err != nil { return nil, false, err } - podSchedulingRequirement := PodRequirementFromJobSchedulingInfo(schedulingInfoWithNodeAntiAffinity) - if podSchedulingRequirement == nil { - return nil, false, errors.Errorf("no pod scheduling requirement found for job %s", job.GetId()) - } - isSchedulable, _ := s.submitChecker.CheckPodRequirements(podSchedulingRequirement) - return job.WithJobSchedulingInfo(schedulingInfoWithNodeAntiAffinity), isSchedulable, nil + job = job.WithJobSchedulingInfo(schedulingInfoWithNodeAntiAffinity) + isSchedulable, _ := s.submitChecker.CheckJobDbJobs([]*jobdb.Job{job}) + return job, isSchedulable, nil } // eventsFromSchedulerResult generates necessary EventSequences from the provided SchedulerResult. @@ -587,7 +584,7 @@ func (s *Scheduler) generateUpdateMessagesFromJob(job *jobdb.Job, jobRunErrors m if lastRun.Returned() { errorMessage := fmt.Sprintf("Maximum number of attempts (%d) reached - this job will no longer be retried", s.maxAttemptedRuns) if job.NumAttempts() < s.maxAttemptedRuns { - errorMessage = fmt.Sprintf("Job was attempeted %d times, and has been tried once on all nodes it can run on - this job will no longer be retried", job.NumAttempts()) + errorMessage = fmt.Sprintf("Job was attempted %d times, and has been tried once on all nodes it can run on - this job will no longer be retried", job.NumAttempts()) } runError = &armadaevents.Error{ Terminal: true, diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index d43dfaa89a0..35ef528c668 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -512,7 +512,7 @@ func TestScheduler_TestCycle(t *testing.T) { expectedAffinity := createAntiAffinity(t, nodeIdLabel, tc.expectedNodeAntiAffinities) assert.Equal(t, expectedAffinity, affinity) } - podRequirements := PodRequirementFromJobSchedulingInfo(job.JobSchedulingInfo()) + podRequirements := job.PodRequirements() assert.NotNil(t, podRequirements) expectedQueuedVersion := int32(1) @@ -819,12 +819,20 @@ type testSubmitChecker struct { checkSuccess bool } -func (t *testSubmitChecker) CheckPodRequirements(podRequirement *schedulerobjects.PodRequirements) (bool, string) { - return t.checkSuccess, "" +func (t *testSubmitChecker) CheckApiJobs(_ []*api.Job) (bool, string) { + reason := "" + if !t.checkSuccess { + reason = "CheckApiJobs failed" + } + return t.checkSuccess, reason } -func (t *testSubmitChecker) CheckApiJobs(jobs []*api.Job) (bool, string) { - return t.checkSuccess, "2" +func (t *testSubmitChecker) CheckJobDbJobs(_ []*jobdb.Job) (bool, string) { + reason := "" + if !t.checkSuccess { + reason = "CheckJobDbJobs failed" + } + return t.checkSuccess, reason } // Test implementations of the interfaces needed by the Scheduler diff --git a/internal/scheduler/schedulerobjects/requirements.go b/internal/scheduler/schedulerobjects/requirements.go deleted file mode 100644 index d2bd19e7ea3..00000000000 --- a/internal/scheduler/schedulerobjects/requirements.go +++ /dev/null @@ -1,11 +0,0 @@ -package schedulerobjects - -func (info *JobSchedulingInfo) GetTotalResourceRequest() ResourceList { - rv := ResourceList{} - for _, oreq := range info.ObjectRequirements { - if preq := oreq.GetPodRequirements(); preq != nil { - rv.AddV1ResourceList(preq.ResourceRequirements.Requests) - } - } - return rv -} diff --git a/internal/scheduler/schedulerobjects/schedulinginfo.go b/internal/scheduler/schedulerobjects/schedulinginfo.go new file mode 100644 index 00000000000..ef69dfd7e3d --- /dev/null +++ b/internal/scheduler/schedulerobjects/schedulinginfo.go @@ -0,0 +1,10 @@ +package schedulerobjects + +func (info *JobSchedulingInfo) GetPodRequirements() *PodRequirements { + for _, oreq := range info.ObjectRequirements { + if preq := oreq.GetPodRequirements(); preq != nil { + return preq + } + } + return nil +} diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index cc91f19709c..ba70c3bbf20 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -439,12 +439,7 @@ func (l *FairSchedulingAlgo) constructNodeDb(nodes []*schedulerobjects.Node, job ) continue } - req := PodRequirementFromLegacySchedulerJob(job, l.config.Preemption.PriorityClasses) - if req == nil { - log.Errorf("no pod spec found for job %s", job.Id()) - continue - } - node, err := nodedb.BindPodToNode(req, node) + node, err := nodedb.BindJobToNode(l.config.Preemption.PriorityClasses, job, node) if err != nil { return nil, err } diff --git a/internal/scheduler/scheduling_algo_test.go b/internal/scheduler/scheduling_algo_test.go index 7f07f9ded04..f1553272c74 100644 --- a/internal/scheduler/scheduling_algo_test.go +++ b/internal/scheduler/scheduling_algo_test.go @@ -373,8 +373,7 @@ func TestLegacySchedulingAlgo_TestSchedule(t *testing.T) { run := job.LatestRun() node.StateByJobRunId[run.Id().String()] = schedulerobjects.JobRunState_RUNNING - req := PodRequirementFromLegacySchedulerJob(job, tc.schedulingConfig.Preemption.PriorityClasses) - node, err = nodedb.BindPodToNode(req, node) + node, err = nodedb.BindJobToNode(tc.schedulingConfig.Preemption.PriorityClasses, job, node) require.NoError(t, err) } diff --git a/internal/scheduler/submitcheck.go b/internal/scheduler/submitcheck.go index 700cb541cb8..c80d6a17c85 100644 --- a/internal/scheduler/submitcheck.go +++ b/internal/scheduler/submitcheck.go @@ -15,7 +15,10 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "github.com/armadaproject/armada/internal/armada/configuration" + armadaslices "github.com/armadaproject/armada/internal/common/slices" + schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" + "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/nodedb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/pkg/api" @@ -34,8 +37,8 @@ type schedulingResult struct { const maxJobSchedulingResults = 10000 type SubmitScheduleChecker interface { - CheckPodRequirements(podRequirement *schedulerobjects.PodRequirements) (bool, string) CheckApiJobs(jobs []*api.Job) (bool, string) + CheckJobDbJobs(jobs []*jobdb.Job) (bool, string) } type SubmitChecker struct { @@ -132,51 +135,41 @@ func (srv *SubmitChecker) updateExecutors(ctx context.Context) { srv.jobSchedulingResultsCache.Purge() } -func (srv *SubmitChecker) CheckPodRequirements(req *schedulerobjects.PodRequirements) (bool, string) { - schedulingResult := srv.getSchedulingResult(req) - if !schedulingResult.isSchedulable { - return schedulingResult.isSchedulable, fmt.Sprintf("requirements unschedulable:\n%s", schedulingResult.reason) - } - return true, "" +func (srv *SubmitChecker) CheckApiJobs(jobs []*api.Job) (bool, string) { + return srv.check(schedulercontext.JobSchedulingContextsFromJobs(srv.priorityClasses, jobs)) } -func (srv *SubmitChecker) CheckApiJobs(jobs []*api.Job) (bool, string) { +func (srv *SubmitChecker) CheckJobDbJobs(jobs []*jobdb.Job) (bool, string) { + return srv.check(schedulercontext.JobSchedulingContextsFromJobs(srv.priorityClasses, jobs)) +} + +func (srv *SubmitChecker) check(jctxs []*schedulercontext.JobSchedulingContext) (bool, string) { // First, check if all jobs can be scheduled individually. - for i, job := range jobs { - req := PodRequirementFromLegacySchedulerJob(job, srv.priorityClasses) - schedulingResult := srv.getSchedulingResult(req) + for i, jctx := range jctxs { + schedulingResult := srv.getIndividualSchedulingResult(jctx) if !schedulingResult.isSchedulable { return schedulingResult.isSchedulable, fmt.Sprintf("%d-th job unschedulable:\n%s", i, schedulingResult.reason) } } // Then, check if all gangs can be scheduled. - for gangId, jobs := range GroupJobsByAnnotation(srv.gangIdAnnotation, jobs) { + for gangId, jctxsInGang := range armadaslices.GroupByFunc( + jctxs, + func(jctx *schedulercontext.JobSchedulingContext) string { + return jctx.Job.GetAnnotations()[srv.gangIdAnnotation] + }, + ) { if gangId == "" { continue } - reqs := PodRequirementsFromLegacySchedulerJobs(jobs, srv.priorityClasses) - schedulingResult := srv.check(reqs) - if !schedulingResult.isSchedulable { + if schedulingResult := srv.getSchedulingResult(jctxsInGang); !schedulingResult.isSchedulable { return schedulingResult.isSchedulable, fmt.Sprintf("gang %s is unschedulable:\n%s", gangId, schedulingResult.reason) } } return true, "" } -func GroupJobsByAnnotation(annotation string, jobs []*api.Job) map[string][]*api.Job { - rv := make(map[string][]*api.Job) - for _, job := range jobs { - if len(job.Annotations) == 0 { - rv[""] = append(rv[""], job) - } else { - value := job.Annotations[annotation] - rv[value] = append(rv[value], job) - } - } - return rv -} - -func (srv *SubmitChecker) getSchedulingResult(req *schedulerobjects.PodRequirements) schedulingResult { +func (srv *SubmitChecker) getIndividualSchedulingResult(jctx *schedulercontext.JobSchedulingContext) schedulingResult { + req := jctx.PodRequirements srv.mu.Lock() schedulingKey := srv.schedulingKeyGenerator.Key( req.NodeSelector, @@ -190,7 +183,7 @@ func (srv *SubmitChecker) getSchedulingResult(req *schedulerobjects.PodRequireme if obj, ok := srv.jobSchedulingResultsCache.Get(schedulingKey); ok { result = obj.(schedulingResult) } else { - result = srv.check([]*schedulerobjects.PodRequirements{req}) + result = srv.getSchedulingResult([]*schedulercontext.JobSchedulingContext{jctx}) srv.jobSchedulingResultsCache.Add(schedulingKey, result) } if !result.isSchedulable { @@ -199,9 +192,9 @@ func (srv *SubmitChecker) getSchedulingResult(req *schedulerobjects.PodRequireme return schedulingResult{isSchedulable: true} } -// Check if a set of pods can be scheduled onto some cluster. -func (srv *SubmitChecker) check(reqs []*schedulerobjects.PodRequirements) schedulingResult { - if len(reqs) == 0 { +// Check if a set of jobs can be scheduled onto some cluster. +func (srv *SubmitChecker) getSchedulingResult(jctxs []*schedulercontext.JobSchedulingContext) schedulingResult { + if len(jctxs) == 0 { return schedulingResult{isSchedulable: true, reason: ""} } @@ -210,19 +203,21 @@ func (srv *SubmitChecker) check(reqs []*schedulerobjects.PodRequirements) schedu srv.mu.Lock() executorById := maps.Clone(srv.executorById) srv.mu.Unlock() - executorById = srv.filterStaleNodeDbs(executorById) + executorById = srv.filterStaleExecutors(executorById) if len(executorById) == 0 { return schedulingResult{isSchedulable: false, reason: "no executor clusters available"} } - canSchedule := false + isSchedulable := false var sb strings.Builder for id, executor := range executorById { nodeDb := executor.nodeDb txn := nodeDb.Txn(true) - reports, ok, err := nodeDb.ScheduleManyWithTxn(txn, reqs) + ok, err := nodeDb.ScheduleManyWithTxn(txn, jctxs) txn.Abort() + isSchedulable = isSchedulable || ok + sb.WriteString(id) if err != nil { sb.WriteString(err.Error()) @@ -230,31 +225,35 @@ func (srv *SubmitChecker) check(reqs []*schedulerobjects.PodRequirements) schedu continue } - canSchedule = canSchedule || ok numSuccessfullyScheduled := 0 - for _, report := range reports { - if report.Node != nil { + for _, jctx := range jctxs { + pctx := jctx.PodSchedulingContext + if pctx != nil && pctx.Node != nil { numSuccessfullyScheduled++ } } - if len(reqs) == 1 { + if len(jctxs) == 1 { sb.WriteString(":\n") - for _, report := range reports { - sb.WriteString(report.String()) + for _, jctx := range jctxs { + pctx := jctx.PodSchedulingContext + if pctx == nil { + continue + } + sb.WriteString(pctx.String()) sb.WriteString("\n") } sb.WriteString("---") sb.WriteString("\n") } else { sb.WriteString(":") - sb.WriteString(fmt.Sprintf(" %d out of %d pods schedulable\n", numSuccessfullyScheduled, len(reqs))) + sb.WriteString(fmt.Sprintf(" %d out of %d pods schedulable\n", numSuccessfullyScheduled, len(jctxs))) } } - return schedulingResult{isSchedulable: canSchedule, reason: sb.String()} + return schedulingResult{isSchedulable: isSchedulable, reason: sb.String()} } -func (srv *SubmitChecker) filterStaleNodeDbs(executorsById map[string]minimalExecutor) map[string]minimalExecutor { +func (srv *SubmitChecker) filterStaleExecutors(executorsById map[string]minimalExecutor) map[string]minimalExecutor { rv := make(map[string]minimalExecutor) for id, executor := range executorsById { if srv.clock.Since(executor.updateTime) < srv.executorTimeout { diff --git a/internal/scheduler/submitcheck_test.go b/internal/scheduler/submitcheck_test.go index 9949090d491..49c0dfb1bc2 100644 --- a/internal/scheduler/submitcheck_test.go +++ b/internal/scheduler/submitcheck_test.go @@ -15,13 +15,14 @@ import ( "github.com/armadaproject/armada/internal/armada/configuration" "github.com/armadaproject/armada/internal/common/util" + "github.com/armadaproject/armada/internal/scheduler/jobdb" schedulermocks "github.com/armadaproject/armada/internal/scheduler/mocks" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" "github.com/armadaproject/armada/internal/scheduler/testfixtures" "github.com/armadaproject/armada/pkg/api" ) -func TestSubmitChecker_CheckPodRequirements(t *testing.T) { +func TestSubmitChecker_CheckJobDbJobs(t *testing.T) { defaultTimeout := 15 * time.Minute baseTime := time.Now().UTC() expiredTime := baseTime.Add(-defaultTimeout).Add(-1 * time.Second) @@ -30,42 +31,42 @@ func TestSubmitChecker_CheckPodRequirements(t *testing.T) { executorTimout time.Duration config configuration.SchedulingConfig executors []*schedulerobjects.Executor - podRequirement *schedulerobjects.PodRequirements + job *jobdb.Job expectPass bool }{ "one job schedules": { executorTimout: defaultTimeout, config: testfixtures.TestSchedulingConfig(), executors: []*schedulerobjects.Executor{testExecutor(baseTime)}, - podRequirement: testfixtures.Test1CpuPodReqs("queue", util.ULID(), 1), + job: testfixtures.Test1CpuJob("queue", testfixtures.PriorityClass1), expectPass: true, }, "no jobs schedule due to resources": { executorTimout: defaultTimeout, config: testfixtures.TestSchedulingConfig(), executors: []*schedulerobjects.Executor{testExecutor(baseTime)}, - podRequirement: testfixtures.Test32CpuPodReqs("queue", util.ULID(), 1), + job: testfixtures.Test32CpuJob("queue", testfixtures.PriorityClass1), expectPass: false, }, "no jobs schedule due to selector": { executorTimout: defaultTimeout, config: testfixtures.TestSchedulingConfig(), executors: []*schedulerobjects.Executor{testExecutor(baseTime)}, - podRequirement: testfixtures.WithNodeSelectorPodReq(map[string]string{"foo": "bar"}, testfixtures.Test1CpuPodReqs("queue", util.ULID(), 1)), + job: testfixtures.WithNodeSelectorJob(map[string]string{"foo": "bar"}, testfixtures.Test1CpuJob("queue", testfixtures.PriorityClass1)), expectPass: false, }, "no jobs schedule due to executor timeout": { executorTimout: defaultTimeout, config: testfixtures.TestSchedulingConfig(), executors: []*schedulerobjects.Executor{testExecutor(expiredTime)}, - podRequirement: testfixtures.Test1CpuPodReqs("queue", util.ULID(), 1), + job: testfixtures.Test1CpuJob("queue", testfixtures.PriorityClass1), expectPass: false, }, "multiple executors, 1 expired": { executorTimout: defaultTimeout, config: testfixtures.TestSchedulingConfig(), executors: []*schedulerobjects.Executor{testExecutor(expiredTime), testExecutor(baseTime)}, - podRequirement: testfixtures.Test1CpuPodReqs("queue", util.ULID(), 1), + job: testfixtures.Test1CpuJob("queue", testfixtures.PriorityClass1), expectPass: true, }, } @@ -81,12 +82,12 @@ func TestSubmitChecker_CheckPodRequirements(t *testing.T) { submitCheck := NewSubmitChecker(tc.executorTimout, tc.config, mockExecutorRepo) submitCheck.clock = fakeClock submitCheck.updateExecutors(ctx) - result, msg := submitCheck.CheckPodRequirements(tc.podRequirement) - assert.Equal(t, tc.expectPass, result) + isSchedulable, reason := submitCheck.CheckJobDbJobs([]*jobdb.Job{tc.job}) + assert.Equal(t, tc.expectPass, isSchedulable) if !tc.expectPass { - assert.NotEqual(t, "", msg) + assert.NotEqual(t, "", reason) } - logrus.Info(msg) + logrus.Info(reason) }) } } diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 07ccae0f531..c2314992cdd 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -18,7 +18,6 @@ import ( "github.com/armadaproject/armada/internal/armada/configuration" "github.com/armadaproject/armada/internal/common/util" - schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration" "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/jobdb" "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" @@ -221,8 +220,9 @@ func WithNodeSelectorPodReq(selector map[string]string, req *schedulerobjects.Po return req } -func WithNodeAffinityPodReqs(nodeSelectorTerms []v1.NodeSelectorTerm, reqs []*schedulerobjects.PodRequirements) []*schedulerobjects.PodRequirements { - for _, req := range reqs { +func WithNodeAffinityJobs(nodeSelectorTerms []v1.NodeSelectorTerm, jobs []*jobdb.Job) []*jobdb.Job { + for _, job := range jobs { + req := job.PodRequirements() if req.Affinity == nil { req.Affinity = &v1.Affinity{} } @@ -237,7 +237,7 @@ func WithNodeAffinityPodReqs(nodeSelectorTerms []v1.NodeSelectorTerm, reqs []*sc nodeSelectorTerms..., ) } - return reqs + return jobs } func WithGangAnnotationsPodReqs(reqs []*schedulerobjects.PodRequirements) []*schedulerobjects.PodRequirements { @@ -259,25 +259,34 @@ func WithAnnotationsPodReqs(annotations map[string]string, reqs []*schedulerobje return reqs } -func WithRequestsPodReqs(rl schedulerobjects.ResourceList, reqs []*schedulerobjects.PodRequirements) []*schedulerobjects.PodRequirements { - for _, req := range reqs { - maps.Copy( - req.ResourceRequirements.Requests, - schedulerobjects.V1ResourceListFromResourceList(rl), - ) +func WithRequestsJobs(rl schedulerobjects.ResourceList, jobs []*jobdb.Job) []*jobdb.Job { + for _, job := range jobs { + for _, req := range job.JobSchedulingInfo().GetObjectRequirements() { + maps.Copy( + req.GetPodRequirements().ResourceRequirements.Requests, + schedulerobjects.V1ResourceListFromResourceList(rl), + ) + } } - return reqs + return jobs } func WithNodeSelectorJobs(selector map[string]string, jobs []*jobdb.Job) []*jobdb.Job { for _, job := range jobs { - for _, req := range job.GetJobSchedulingInfo(nil).GetObjectRequirements() { + for _, req := range job.JobSchedulingInfo().GetObjectRequirements() { req.GetPodRequirements().NodeSelector = maps.Clone(selector) } } return jobs } +func WithNodeSelectorJob(selector map[string]string, job *jobdb.Job) *jobdb.Job { + for _, req := range job.JobSchedulingInfo().GetObjectRequirements() { + req.GetPodRequirements().NodeSelector = maps.Clone(selector) + } + return job +} + func WithGangAnnotationsJobs(jobs []*jobdb.Job) []*jobdb.Job { gangId := uuid.NewString() gangCardinality := fmt.Sprintf("%d", len(jobs)) @@ -289,7 +298,7 @@ func WithGangAnnotationsJobs(jobs []*jobdb.Job) []*jobdb.Job { func WithAnnotationsJobs(annotations map[string]string, jobs []*jobdb.Job) []*jobdb.Job { for _, job := range jobs { - for _, req := range job.GetJobSchedulingInfo(nil).GetObjectRequirements() { + for _, req := range job.JobSchedulingInfo().GetObjectRequirements() { if req.GetPodRequirements().Annotations == nil { req.GetPodRequirements().Annotations = make(map[string]string) } @@ -424,11 +433,8 @@ func TestPodReqs(queue string, jobId ulid.ULID, priority int32, requests v1.Reso return &schedulerobjects.PodRequirements{ Priority: priority, ResourceRequirements: v1.ResourceRequirements{Requests: requests}, - Annotations: map[string]string{ - schedulerconfig.JobIdAnnotation: jobId.String(), - schedulerconfig.QueueAnnotation: queue, - }, - NodeSelector: make(map[string]string), + Annotations: make(map[string]string), + NodeSelector: make(map[string]string), } } @@ -511,10 +517,7 @@ func TestUnitReqs(priority int32) *schedulerobjects.PodRequirements { "memory": resource.MustParse("1Gi"), }, }, - Annotations: map[string]string{ - schedulerconfig.JobIdAnnotation: util.NewULID(), - schedulerconfig.QueueAnnotation: TestQueue, - }, + Annotations: make(map[string]string), NodeSelector: make(map[string]string), } } diff --git a/pkg/api/util.go b/pkg/api/util.go index 399a3461446..96fc848519e 100644 --- a/pkg/api/util.go +++ b/pkg/api/util.go @@ -117,7 +117,7 @@ func (job *Job) GetSubmitTime() time.Time { return job.Created } -func (job *Job) GetJobSchedulingInfo(priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.JobSchedulingInfo { +func (job *Job) GetPodRequirements(priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements { podSpec := job.GetMainPodSpec() priority, ok := PriorityFromPodSpec(podSpec, priorityClasses) @@ -132,7 +132,8 @@ func (job *Job) GetJobSchedulingInfo(priorityClasses map[string]configuration.Pr if podSpec.PreemptionPolicy != nil { preemptionPolicy = string(*podSpec.PreemptionPolicy) } - podRequirements := &schedulerobjects.PodRequirements{ + + return &schedulerobjects.PodRequirements{ NodeSelector: podSpec.NodeSelector, Affinity: podSpec.Affinity, Tolerations: podSpec.Tolerations, @@ -141,18 +142,6 @@ func (job *Job) GetJobSchedulingInfo(priorityClasses map[string]configuration.Pr PreemptionPolicy: preemptionPolicy, ResourceRequirements: job.GetResourceRequirements(), } - return &schedulerobjects.JobSchedulingInfo{ - PriorityClassName: podSpec.PriorityClassName, - Priority: job.GetPerQueuePriority(), - SubmitTime: job.GetSubmitTime(), - ObjectRequirements: []*schedulerobjects.ObjectRequirements{ - { - Requirements: &schedulerobjects.ObjectRequirements_PodRequirements{ - PodRequirements: podRequirements, - }, - }, - }, - } } // SchedulingResourceRequirementsFromPodSpec returns resource requests and limits necessary for scheduling a pod. diff --git a/pkg/api/util_test.go b/pkg/api/util_test.go index 2e9315f031c..6b49ee91c20 100644 --- a/pkg/api/util_test.go +++ b/pkg/api/util_test.go @@ -264,10 +264,10 @@ func QuantityWithMilliValue(v int64) resource.Quantity { return q } -func TestJobGetRequirements(t *testing.T) { +func TestJobGetPodRequirements(t *testing.T) { tests := map[string]struct { job *Job - expected *schedulerobjects.JobSchedulingInfo + expected *schedulerobjects.PodRequirements }{ "queue priority": { job: &Job{ @@ -277,20 +277,11 @@ func TestJobGetRequirements(t *testing.T) { // PriorityClassName: , }, }, - expected: &schedulerobjects.JobSchedulingInfo{ - Priority: 10, - ObjectRequirements: []*schedulerobjects.ObjectRequirements{ - { - &schedulerobjects.ObjectRequirements_PodRequirements{ - PodRequirements: &schedulerobjects.PodRequirements{ - PreemptionPolicy: string(v1.PreemptLowerPriority), - ResourceRequirements: v1.ResourceRequirements{ - Requests: make(v1.ResourceList), - Limits: make(v1.ResourceList), - }, - }, - }, - }, + expected: &schedulerobjects.PodRequirements{ + PreemptionPolicy: string(v1.PreemptLowerPriority), + ResourceRequirements: v1.ResourceRequirements{ + Requests: make(v1.ResourceList), + Limits: make(v1.ResourceList), }, }, }, @@ -300,21 +291,12 @@ func TestJobGetRequirements(t *testing.T) { PriorityClassName: PriorityClass1, }, }, - expected: &schedulerobjects.JobSchedulingInfo{ - PriorityClassName: PriorityClass1, - ObjectRequirements: []*schedulerobjects.ObjectRequirements{ - { - &schedulerobjects.ObjectRequirements_PodRequirements{ - PodRequirements: &schedulerobjects.PodRequirements{ - Priority: 1, - PreemptionPolicy: string(v1.PreemptLowerPriority), - ResourceRequirements: v1.ResourceRequirements{ - Requests: make(v1.ResourceList), - Limits: make(v1.ResourceList), - }, - }, - }, - }, + expected: &schedulerobjects.PodRequirements{ + Priority: 1, + PreemptionPolicy: string(v1.PreemptLowerPriority), + ResourceRequirements: v1.ResourceRequirements{ + Requests: make(v1.ResourceList), + Limits: make(v1.ResourceList), }, }, }, @@ -324,19 +306,11 @@ func TestJobGetRequirements(t *testing.T) { PreemptionPolicy: pointerFromValue(v1.PreemptNever), }, }, - expected: &schedulerobjects.JobSchedulingInfo{ - ObjectRequirements: []*schedulerobjects.ObjectRequirements{ - { - &schedulerobjects.ObjectRequirements_PodRequirements{ - PodRequirements: &schedulerobjects.PodRequirements{ - PreemptionPolicy: string(v1.PreemptNever), - ResourceRequirements: v1.ResourceRequirements{ - Requests: make(v1.ResourceList), - Limits: make(v1.ResourceList), - }, - }, - }, - }, + expected: &schedulerobjects.PodRequirements{ + PreemptionPolicy: string(v1.PreemptNever), + ResourceRequirements: v1.ResourceRequirements{ + Requests: make(v1.ResourceList), + Limits: make(v1.ResourceList), }, }, }, @@ -366,41 +340,33 @@ func TestJobGetRequirements(t *testing.T) { }, }, }, - expected: &schedulerobjects.JobSchedulingInfo{ - ObjectRequirements: []*schedulerobjects.ObjectRequirements{ - { - &schedulerobjects.ObjectRequirements_PodRequirements{ - PodRequirements: &schedulerobjects.PodRequirements{ - NodeSelector: map[string]string{"label": "value"}, - Affinity: &v1.Affinity{ - NodeAffinity: &v1.NodeAffinity{ - RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ - NodeSelectorTerms: []v1.NodeSelectorTerm{ - { - MatchExpressions: []v1.NodeSelectorRequirement{ - { - Key: "affinityKey", - }, - }, - }, - }, + expected: &schedulerobjects.PodRequirements{ + NodeSelector: map[string]string{"label": "value"}, + Affinity: &v1.Affinity{ + NodeAffinity: &v1.NodeAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{ + NodeSelectorTerms: []v1.NodeSelectorTerm{ + { + MatchExpressions: []v1.NodeSelectorRequirement{ + { + Key: "affinityKey", }, }, }, - Tolerations: []v1.Toleration{ - { - Key: "tolerationKey", - }, - }, - PreemptionPolicy: string(v1.PreemptLowerPriority), - ResourceRequirements: v1.ResourceRequirements{ - Requests: make(v1.ResourceList), - Limits: make(v1.ResourceList), - }, }, }, }, }, + Tolerations: []v1.Toleration{ + { + Key: "tolerationKey", + }, + }, + PreemptionPolicy: string(v1.PreemptLowerPriority), + ResourceRequirements: v1.ResourceRequirements{ + Requests: make(v1.ResourceList), + Limits: make(v1.ResourceList), + }, }, }, "annotations": { @@ -408,20 +374,12 @@ func TestJobGetRequirements(t *testing.T) { Annotations: map[string]string{"key": "value"}, PodSpec: &v1.PodSpec{}, }, - expected: &schedulerobjects.JobSchedulingInfo{ - ObjectRequirements: []*schedulerobjects.ObjectRequirements{ - { - &schedulerobjects.ObjectRequirements_PodRequirements{ - PodRequirements: &schedulerobjects.PodRequirements{ - Annotations: map[string]string{"key": "value"}, - PreemptionPolicy: string(v1.PreemptLowerPriority), - ResourceRequirements: v1.ResourceRequirements{ - Requests: make(v1.ResourceList), - Limits: make(v1.ResourceList), - }, - }, - }, - }, + expected: &schedulerobjects.PodRequirements{ + Annotations: map[string]string{"key": "value"}, + PreemptionPolicy: string(v1.PreemptLowerPriority), + ResourceRequirements: v1.ResourceRequirements{ + Requests: make(v1.ResourceList), + Limits: make(v1.ResourceList), }, }, }, @@ -438,19 +396,11 @@ func TestJobGetRequirements(t *testing.T) { }, }, }, - expected: &schedulerobjects.JobSchedulingInfo{ - ObjectRequirements: []*schedulerobjects.ObjectRequirements{ - { - &schedulerobjects.ObjectRequirements_PodRequirements{ - PodRequirements: &schedulerobjects.PodRequirements{ - PreemptionPolicy: string(v1.PreemptLowerPriority), - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{"foo": QuantityWithMilliValue(1000)}, - Limits: v1.ResourceList{"bar": QuantityWithMilliValue(2000)}, - }, - }, - }, - }, + expected: &schedulerobjects.PodRequirements{ + PreemptionPolicy: string(v1.PreemptLowerPriority), + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{"foo": QuantityWithMilliValue(1000)}, + Limits: v1.ResourceList{"bar": QuantityWithMilliValue(2000)}, }, }, }, @@ -461,19 +411,11 @@ func TestJobGetRequirements(t *testing.T) { Requests: v1.ResourceList{"foo": resource.MustParse("1")}, }, }, - expected: &schedulerobjects.JobSchedulingInfo{ - ObjectRequirements: []*schedulerobjects.ObjectRequirements{ - { - &schedulerobjects.ObjectRequirements_PodRequirements{ - PodRequirements: &schedulerobjects.PodRequirements{ - PreemptionPolicy: string(v1.PreemptLowerPriority), - ResourceRequirements: v1.ResourceRequirements{ - Requests: v1.ResourceList{"foo": resource.MustParse("1")}, - Limits: nil, - }, - }, - }, - }, + expected: &schedulerobjects.PodRequirements{ + PreemptionPolicy: string(v1.PreemptLowerPriority), + ResourceRequirements: v1.ResourceRequirements{ + Requests: v1.ResourceList{"foo": resource.MustParse("1")}, + Limits: nil, }, }, }, @@ -484,26 +426,18 @@ func TestJobGetRequirements(t *testing.T) { Limits: v1.ResourceList{"foo": resource.MustParse("1")}, }, }, - expected: &schedulerobjects.JobSchedulingInfo{ - ObjectRequirements: []*schedulerobjects.ObjectRequirements{ - { - &schedulerobjects.ObjectRequirements_PodRequirements{ - PodRequirements: &schedulerobjects.PodRequirements{ - PreemptionPolicy: string(v1.PreemptLowerPriority), - ResourceRequirements: v1.ResourceRequirements{ - Requests: nil, - Limits: v1.ResourceList{"foo": resource.MustParse("1")}, - }, - }, - }, - }, + expected: &schedulerobjects.PodRequirements{ + PreemptionPolicy: string(v1.PreemptLowerPriority), + ResourceRequirements: v1.ResourceRequirements{ + Requests: nil, + Limits: v1.ResourceList{"foo": resource.MustParse("1")}, }, }, }, } for name, tc := range tests { t.Run(name, func(t *testing.T) { - assert.Equal(t, tc.expected, tc.job.GetJobSchedulingInfo(TestPriorityClasses)) + assert.Equal(t, tc.expected, tc.job.GetPodRequirements(TestPriorityClasses)) }) } }