Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/armadaproject/armada into…
Browse files Browse the repository at this point in the history
… severinson/dominant-resource-fairness
  • Loading branch information
severinson committed Jun 28, 2023
2 parents 3541093 + dac1cf3 commit 0b80222
Show file tree
Hide file tree
Showing 33 changed files with 587 additions and 770 deletions.
13 changes: 7 additions & 6 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,11 +370,9 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
// Bind pods to nodes, thus ensuring resources are marked as allocated on the node.
skipNode := false
for _, job := range jobs {
node, err = nodedb.BindPodToNode(
scheduler.PodRequirementFromLegacySchedulerJob(
job,
q.schedulingConfig.Preemption.PriorityClasses,
),
node, err = nodedb.BindJobToNode(
q.schedulingConfig.Preemption.PriorityClasses,
job,
node,
)
if err != nil {
Expand Down Expand Up @@ -472,7 +470,10 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
sctx.EnableDominantResourceFairness(q.schedulingConfig.DominantResourceFairnessResourcesToConsider)
}
for queue, priorityFactor := range priorityFactorByQueue {
weight := 1 / priorityFactor
var weight float64 = 1
if priorityFactor > 0 {
weight = 1 / priorityFactor
}
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions internal/executor/reporter/job_event_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (eventReporter *JobEventReporter) reportStatusUpdate(old *v1.Pod, new *v1.P
// Don't report status change for pods Armada is deleting
// This prevents reporting JobFailed when we delete a pod - for example due to cancellation
if util.IsMarkedForDeletion(new) {
log.Infof("not sending event to report pod %s moving into phase %s as pod is marked for deletion", new.Name, new.Status.Phase)
return
}
eventReporter.reportCurrentStatus(new)
Expand Down
39 changes: 23 additions & 16 deletions internal/executor/service/pod_issue_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
StuckStartingUp
StuckTerminating
ExternallyDeleted
ErrorDuringIssueHandling
)

type podIssue struct {
Expand Down Expand Up @@ -303,9 +304,7 @@ func (p *IssueHandler) handleNonRetryableJobIssue(issue *issue) {
// - Report JobUnableToScheduleEvent
// - Report JobReturnLeaseEvent
//
// Special consideration must be taken that most of these pods are somewhat "stuck" in pending.
// So can transition to Running/Completed/Failed in the middle of this
// We must not return the lease if the pod state changes - as likely it has become "unstuck"
// If the pod becomes Running/Completed/Failed in the middle of being deleted - swap this issue to a nonRetryableIssue where it will be Failed
func (p *IssueHandler) handleRetryableJobIssue(issue *issue) {
if !issue.RunIssue.Reported {
log.Infof("Retryable issue detected for job %s run %s - %s", issue.RunIssue.JobId, issue.RunIssue.RunId, issue.RunIssue.PodIssue.Message)
Expand All @@ -321,7 +320,25 @@ func (p *IssueHandler) handleRetryableJobIssue(issue *issue) {
}

if issue.CurrentPodState != nil {
// TODO consider moving this to a synchronous call - but long termination periods would need to be handled
if issue.CurrentPodState.Status.Phase != v1.PodPending {
p.markIssuesResolved(issue.RunIssue)
if issue.RunIssue.PodIssue.DeletionRequested {
p.registerIssue(&runIssue{
JobId: issue.RunIssue.JobId,
RunId: issue.RunIssue.RunId,
PodIssue: &podIssue{
OriginalPodState: issue.RunIssue.PodIssue.OriginalPodState,
Message: "Pod unexpectedly started up after delete was called",
Retryable: false,
DeletionRequested: false,
Type: ErrorDuringIssueHandling,
Cause: api.Cause_Error,
},
})
}
return
}

err := p.clusterContext.DeletePodWithCondition(issue.CurrentPodState, func(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodPending
}, true)
Expand Down Expand Up @@ -359,20 +376,10 @@ func hasPodIssueSelfResolved(issue *issue) bool {
return false
}

// Pod has completed - no need to report any issues
if util.IsInTerminalState(issue.CurrentPodState) {
return true
}

// Pod has started running, and we haven't requested deletion - let it continue
if issue.CurrentPodState.Status.Phase == v1.PodRunning && !issue.RunIssue.PodIssue.DeletionRequested {
// Pod has started up and we haven't tried to delete the pod yet - so resolve the issue
if issue.CurrentPodState.Status.Phase != v1.PodPending && !issue.RunIssue.PodIssue.DeletionRequested {
return true
}
// TODO There is an edge case here where the pod has started running but we have requested deletion
// Without a proper state model, we can't easily handle this correctly
// Ideally we'd see if it completes or deletes first and report it accordingly
// If it completes first - do nothing
// If it deletes first - report JobFailed (as we accidentally deleted it during the run)
}

return false
Expand Down
31 changes: 31 additions & 0 deletions internal/executor/service/pod_issue_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,37 @@ func TestPodIssueService_DeletesPodAndReportsLeaseReturned_IfRetryableStuckPod(t
assert.True(t, ok)
}

func TestPodIssueService_DeletesPodAndReportsFailed_IfRetryableStuckPodStartsUpAfterDeletionCalled(t *testing.T) {
podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{})
retryableStuckPod := makeRetryableStuckPod(false)
addPod(t, fakeClusterContext, retryableStuckPod)

podIssueService.HandlePodIssues()

// Reports UnableToSchedule
assert.Len(t, eventsReporter.ReceivedEvents, 1)
_, ok := eventsReporter.ReceivedEvents[0].Event.(*api.JobUnableToScheduleEvent)
assert.True(t, ok)

// Reset events, and add pod back as running
eventsReporter.ReceivedEvents = []reporter.EventMessage{}
retryableStuckPod.Status.Phase = v1.PodRunning
addPod(t, fakeClusterContext, retryableStuckPod)

// Detects pod is now unexpectedly running and marks it non-retryable
podIssueService.HandlePodIssues()
assert.Len(t, eventsReporter.ReceivedEvents, 0)
assert.Len(t, getActivePods(t, fakeClusterContext), 1)

// Now processes the issue as non-retryable and fails the pod
podIssueService.HandlePodIssues()
assert.Len(t, getActivePods(t, fakeClusterContext), 0)

assert.Len(t, eventsReporter.ReceivedEvents, 1)
_, ok = eventsReporter.ReceivedEvents[0].Event.(*api.JobFailedEvent)
assert.True(t, ok)
}

func TestPodIssueService_ReportsFailed_IfDeletedExternally(t *testing.T) {
podIssueService, _, fakeClusterContext, eventsReporter := setupTestComponents([]*job.RunState{})
runningPod := makeRunningPod(false)
Expand Down
60 changes: 0 additions & 60 deletions internal/scheduler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scheduler
import (
"fmt"
"strconv"
"time"

"github.com/pkg/errors"
"golang.org/x/exp/maps"
Expand All @@ -12,7 +11,6 @@ import (
armadamaps "github.com/armadaproject/armada/internal/common/maps"
armadaslices "github.com/armadaproject/armada/internal/common/slices"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
schedulercontext "github.com/armadaproject/armada/internal/scheduler/context"
"github.com/armadaproject/armada/internal/scheduler/interfaces"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
)
Expand Down Expand Up @@ -111,27 +109,6 @@ func JobsSummary(jobs []interfaces.LegacySchedulerJob) string {
)
}

func jobSchedulingContextsFromJobs[T interfaces.LegacySchedulerJob](jobs []T, executorId string, priorityClasses map[string]configuration.PriorityClass) []*schedulercontext.JobSchedulingContext {
if jobs == nil {
return nil
}
if len(jobs) == 0 {
return make([]*schedulercontext.JobSchedulingContext, 0)
}
jctxs := make([]*schedulercontext.JobSchedulingContext, len(jobs))
timestamp := time.Now()
for i, job := range jobs {
jctxs[i] = &schedulercontext.JobSchedulingContext{
Created: timestamp,
ExecutorId: executorId,
JobId: job.GetId(),
Job: job,
Req: PodRequirementFromLegacySchedulerJob(job, priorityClasses),
}
}
return jctxs
}

func isEvictedJob(job interfaces.LegacySchedulerJob) bool {
return job.GetAnnotations()[schedulerconfig.IsEvictedAnnotation] == "true"
}
Expand Down Expand Up @@ -168,40 +145,3 @@ func GangIdAndCardinalityFromAnnotations(annotations map[string]string) (string,
}
return gangId, gangCardinality, true, nil
}

func PodRequirementsFromLegacySchedulerJobs[S ~[]E, E interfaces.LegacySchedulerJob](jobs S, priorityClasses map[string]configuration.PriorityClass) []*schedulerobjects.PodRequirements {
rv := make([]*schedulerobjects.PodRequirements, len(jobs))
for i, job := range jobs {
rv[i] = PodRequirementFromLegacySchedulerJob(job, priorityClasses)
}
return rv
}

func PodRequirementFromLegacySchedulerJob[E interfaces.LegacySchedulerJob](job E, priorityClasses map[string]configuration.PriorityClass) *schedulerobjects.PodRequirements {
annotations := make(map[string]string, len(configuration.ArmadaManagedAnnotations)+len(schedulerconfig.ArmadaSchedulerManagedAnnotations))
for _, key := range configuration.ArmadaManagedAnnotations {
if value, ok := job.GetAnnotations()[key]; ok {
annotations[key] = value
}
}
for _, key := range schedulerconfig.ArmadaSchedulerManagedAnnotations {
if value, ok := job.GetAnnotations()[key]; ok {
annotations[key] = value
}
}
annotations[schedulerconfig.JobIdAnnotation] = job.GetId()
annotations[schedulerconfig.QueueAnnotation] = job.GetQueue()
info := job.GetJobSchedulingInfo(priorityClasses)
req := PodRequirementFromJobSchedulingInfo(info)
req.Annotations = annotations
return req
}

func PodRequirementFromJobSchedulingInfo(info *schedulerobjects.JobSchedulingInfo) *schedulerobjects.PodRequirements {
for _, oreq := range info.ObjectRequirements {
if preq := oreq.GetPodRequirements(); preq != nil {
return preq
}
}
return nil
}
8 changes: 3 additions & 5 deletions internal/scheduler/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@ import (

"github.com/armadaproject/armada/internal/armada/configuration"
"github.com/armadaproject/armada/internal/common/util"
schedulerconfig "github.com/armadaproject/armada/internal/scheduler/configuration"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/pkg/api"
)

func TestPodRequirementFromLegacySchedulerJob(t *testing.T) {
func TestGetPodRequirements(t *testing.T) {
resourceLimit := v1.ResourceList{
"cpu": resource.MustParse("1"),
"memory": resource.MustParse("128Mi"),
Expand Down Expand Up @@ -64,13 +63,12 @@ func TestPodRequirementFromLegacySchedulerJob(t *testing.T) {
PreemptionPolicy: string(v1.PreemptLowerPriority),
ResourceRequirements: requirements,
Annotations: map[string]string{
"something": "test",
configuration.GangIdAnnotation: "gang-id",
configuration.GangCardinalityAnnotation: "1",
schedulerconfig.JobIdAnnotation: j.Id,
schedulerconfig.QueueAnnotation: j.Queue,
},
}
actual := PodRequirementFromLegacySchedulerJob(j, map[string]configuration.PriorityClass{"armada-default": {Priority: int32(1)}})
actual := j.GetPodRequirements(map[string]configuration.PriorityClass{"armada-default": {Priority: int32(1)}})
assert.Equal(t, expected, actual)
}

Expand Down
17 changes: 4 additions & 13 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,14 @@ import (
)

const (
// IsEvictedAnnotation, indicates a pod was evicted in this round and is currently running.
// Used by the scheduler to differentiate between pods from running and queued jobs.
// IsEvictedAnnotation is set on evicted jobs; the scheduler uses it to differentiate between
// already-running and queued jobs.
IsEvictedAnnotation = "armadaproject.io/isEvicted"
// JobIdAnnotation if set on a pod, indicates which job this pod is part of.
JobIdAnnotation = "armadaproject.io/jobId"
// QueueAnnotation if set on a pod, indicates which queue this pod is part of.
QueueAnnotation = "armadaproject.io/queue"
// IdNodeLabel is automatically added to nodes in the NodeDb.
// NodeIdLabel is set on evicted jobs, so that the scheduler only tries to schedule them on the
// nodes that they are already running on; nodedb is responsible for labelling its Node objects.
NodeIdLabel = "armadaproject.io/nodeId"
)

var ArmadaSchedulerManagedAnnotations = []string{
IsEvictedAnnotation,
JobIdAnnotation,
QueueAnnotation,
}

type Configuration struct {
// Database configuration
Postgres configuration.PostgresConfig
Expand Down
Loading

0 comments on commit 0b80222

Please sign in to comment.