Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[armada-scheduler] Avoid retrying jobs on the same node #2343

Merged
merged 25 commits into from
Apr 16, 2023

Conversation

JamesMurkin
Copy link
Contributor

@JamesMurkin JamesMurkin commented Apr 6, 2023

This PR adds the functionality to avoid nodes on retrying jobs

It will only avoid a node if:

  • The lease was returned
  • The runAttempted flag was set on the lease return

This is so if the lease returned for reasons that didn't involve then node - we don't avoid the node

The node avoidance is implemented using affinities and setting a node anti-affinity

┆Issue is synchronized with this Jira Task by Unito

This PR adds the functionality to avoid nodes on retrying jobs

It will only avoid a node if:
 - The lease was returned
 - The runAttempted flag was set on the lease return

This is so if the lease returned for reasons that didn't involve then node - we don't avoid the node

The node avoidance is implemented using affinities and setting a node anti-affinity
@codecov
Copy link

codecov bot commented Apr 6, 2023

Codecov Report

Patch coverage: 85.15% and project coverage change: +0.25 🎉

Comparison is base (b405422) 58.97% compared to head (a5a9ce6) 59.23%.

❗ Current head a5a9ce6 differs from pull request most recent head 342bd8c. Consider uploading reports for the commit 342bd8c to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #2343      +/-   ##
==========================================
+ Coverage   58.97%   59.23%   +0.25%     
==========================================
  Files         225      226       +1     
  Lines       28470    28781     +311     
==========================================
+ Hits        16790    17048     +258     
- Misses      10388    10426      +38     
- Partials     1292     1307      +15     
Flag Coverage Δ
armada-server 59.23% <85.15%> (+0.25%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
internal/scheduler/schedulerapp.go 0.00% <0.00%> (ø)
internal/scheduler/scheduler.go 77.92% <80.95%> (-0.30%) ⬇️
internal/scheduleringester/schedulerdb.go 65.96% <82.69%> (+6.25%) ⬆️
internal/scheduler/jobdb/job.go 91.20% <88.23%> (-0.36%) ⬇️
internal/scheduleringester/instructions.go 73.02% <89.09%> (+1.77%) ⬆️
internal/scheduleringester/dbops.go 83.42% <93.33%> (+1.89%) ⬆️
...ernal/armada/repository/apimessages/conversions.go 79.21% <100.00%> (+0.49%) ⬆️
internal/scheduler/database/job_repository.go 79.10% <100.00%> (+0.31%) ⬆️
internal/scheduler/jobdb/job_run.go 87.14% <100.00%> (+1.42%) ⬆️
...l/scheduler/kubernetesobjects/affinity/affinity.go 100.00% <100.00%> (ø)
... and 1 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

☔ View full report in Codecov by Sentry.
📢 Do you have feedback about the report comment? Let us know in this issue.

)

func AddNodeAntiAffinity(affinity *v1.Affinity, labelName string, labelValue string) {
if affinity == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it ok to silently ignore a nil here?

@@ -273,13 +285,39 @@ func (s *Scheduler) syncState(ctx context.Context) ([]*jobdb.Job, error) {
jobsToUpdateById[jobId] = job
}

// Ensure queued jobs being retried have node anti affinities for any nodes they have already run on
for _, job := range jobsToUpdateById {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be simplified? We seem to iterate over jobsToUpdateById twice and do much the same thing in each case?

It feels like we could iterate over once and apply the anti affinity in one place.

// Jobs with no active runs are queued.
for _, job := range jobsToUpdateById {
// Determine if the job needs to be re-queued.
// TODO: If have an explicit queued message, we can remove this code.
run := job.LatestRun()
desiredQueueState := false
requeueJob := run != nil && run.Returned() && job.NumReturned() <= s.maxLeaseReturns
requeueJob := run != nil && run.Returned() && job.NumAttempts() < s.maxAttemptedRuns
if requeueJob && !job.Queued() && run.Returned() && run.RunAttempted() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we simplify this? To work out if the job is going to fall into this, we've got 6 bools to check!

To tie this in with my previous comment- could we not have a single loop and do something like:

if job.Queued || requeueJob {
  // Ensure affinity logic is set
}

if lastRun.Returned() {
errorMessage := fmt.Sprintf("Maximum number of attempts (%d) reached - this job will no longer be retried", s.maxAttemptedRuns)
if job.NumAttempts() < s.maxAttemptedRuns {
errorMessage = fmt.Sprintf("Job was attempeted %d times, and has been tried once on all nodes it can run on - this job will no longer be retried", job.NumAttempts())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
errorMessage = fmt.Sprintf("Job was attempeted %d times, and has been tried once on all nodes it can run on - this job will no longer be retried", job.NumAttempts())
errorMessage = fmt.Sprintf("Job was attempted %d times, and has been tried once on all nodes it can run on - this job will no longer be retried", job.NumAttempts())

if podSchedulingRequirement == nil {
return nil, false, fmt.Errorf("no pod scheduling requirement found for job %s", job.GetId())
}
isSchedulable, _ := s.submitChecker.CheckPodRequirements(podSchedulingRequirement)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit dodgy- I think this function is being called from syncState which means that all replicas of the scheduler are executing this regardless of whether they are leader. Previously all the code called by this function was deterministic so we could guarantee that all replicas would end up in the same state. This code breaks that assumption which means we could lose a job. For example:

  • Leader executes this code and determines job is ok to requeue
  • Follower executes this code and determines job should fail (and marks it as such)
  • Leader leases the job to the executor
  • Follower Becomes Leader and fails the job
  • Job run is now orphaned

TBF this whole bit of code is awkward because the point of syncState is to make the internal in-memory model look like the database. The re-queuing of jobs doesn't fit nicely into that because we're not just taking the state from the db, but rather using scheduler-specific business logic to modify the state and at that point you run the risk that different scheduler instances may make different decisions. I've gone over a couple of different ways of sorting this and the best I can come up with is:

  • Always requeue in syncstate, but leave the job run as failed
  • Move the requeing logic into generateUpdateMessages which can fail the job if it determines it shouldn't be requeued.

I think this works, because it means that only the leader will be checking whether something can be requeued.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this and we can move it around

Is there a reason we don't send a requeue message in generateUpdateMessages - and remove all the business logic from syncstate.

Then only leader can decide if we requeue or fail the job and the others just get their state from the db.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a requeue message is probably the best solution, it would also make life easier for e.g. lookout which could then mark the state correctly without having to gues what the scheduler has decided.

The only slightly tricky bit is what to store in the schedulerDb as a bool won't be enough. I think an int giving the number times it has been requeued should be enough, but I haven't thought through this enough to be sure.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder if the requeue message could include the updated scheduling requirements. That would be nice from the Pov of ensuing that all schedulers agree what nodes are being excluded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you envisioning that we'd conditionally exclude nodes?

Currently its either exclude all nodes we've tried on or fail - so it shouldn't be ambiguous - however I'm happy to be proven wrong.

Although including the scheduling requirements on the requeue message would work and be pretty flexible

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you're right- at the moment the logic is unambiguous so each scheduler can determine which nodes to exclude purely from the runs. My thinking behind suggesting an explicit update to the scheduling requirements is mainly based around the feeling that the fewer decisions non-leader schedulers make, the less chance there is of them getting out of sync. There's a few secondary benefits of this approach too- e.g. something like lookout could now show excluded nodes, but I'm not sure these are compelling enough to make a change.

The other thing I'm thinking of is how we integrate this with the hash-based scheduling optimization, which should go a long way toward mitigating the performance of scheduleMany as discussed in #2319. Ideally we don't always compute hashes in syncState because this will involve computing len(queue) hashes every time the leader fails over. Instead, I was hoping to calculate them in the ingester and a requeued message with an updated schedulingRequirements would be useful for this I think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is all reasonable and won't be a lot more work.

In terms of implementation - would the plan be to just updated the scheduling_info column on the job table - on each requeue message that changes the scheduling requirements?

  • Or would we want to record these updates elsewhere?
  • One pain point here is working out when the scheduling_info has changed in a cheap way - maybe this would be a reason to implement the hash in this PR

Would you be happy if I updated this PR to:

  • Add requeue event
  • Move all requeue logic to generateUpdateMessages (i.e send requeue event) and simplify syncState

Unsure if I want to also add a scheduling info hash in this PR - it depends the answer to the above

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add requeue event
Move all requeue logic to generateUpdateMessages (i.e send requeue event) and simplify syncState

That seems good. My suggestion for working out whether scheduling info has changed would be to add a seq no to scheduling info (both the table and the in-mem representation). Every time you generate a new scheduling infor, increment the seq no and include it in the requeue event. The ingester should only overwrite the scheduling info if the seq num is higher (I think you can do this relatively cheaply with an ON CONFLICT), likewise the in memory representation should only be overwritten if the seq num is higher.

With regard for hashing- I think a tentative plan would be:

  • Have a hash col that lives alongside the scheduling info and is versioned with it
  • Scheduler ingester calculates and inserts the initial hash on the submit message
  • Requeue message needs to include a hash (the leader scheduler will have to calculate the has when it requeues). The logic for updating the hash (both for the ingester and for follower schedulers) is identical to updating the scheduling info.

If you can get the hashing in that would be great, but if not it shouldn't be too hard to get it in later.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add requeue event
Move all requeue logic to generateUpdateMessages (i.e send requeue event) and simplify syncState

That seems good. My suggestion for working out whether scheduling info has changed would be to add a seq no to scheduling info (both the table and the in-mem representation). Every time you generate a new scheduling infor, increment the seq no and include it in the requeue event. The ingester should only overwrite the scheduling info if the seq num is higher (I think you can do this relatively cheaply with an ON CONFLICT), likewise the in memory representation should only be overwritten if the seq num is higher.

With regard for hashing- I think a tentative plan would be:

  • Have a hash col that lives alongside the scheduling info and is versioned with it
  • Scheduler ingester calculates and inserts the initial hash on the submit message
  • Requeue message needs to include a hash (the leader scheduler will have to calculate the has when it requeues). The logic for updating the hash (both for the ingester and for follower schedulers) is identical to updating the scheduling info.

If you can get the hashing in that would be great, but if not it shouldn't be too hard to get it in later.

@JamesMurkin
Copy link
Contributor Author

JamesMurkin commented Apr 13, 2023

Hi @d80tb7

So I've made the changes:

  • Requeue jobs with an explicit event (which can also change scheduling info and is versioned)
  • Have an explicit queued state in the jobs table - so the scheduler doesn't need to work out the queued state each time

Functionally this works, I know there are some points that need tidying:

  • Naming could be improved
  • The update queries are not efficient. sqlc isn't helping me out here - but I can just write an efficient native query relatively easily (luckily postgres has a nice support for this)
  • A few other minor points

However before I finished off - I just wanted your overall opinion on if I'm butchering things too much

Also I've included JobSchedulingInfo in the JobRequeued event - however I've for now imported it from our internal schedulerobjects proto

We could make duplicate proto and convert them? Or move JobSchedulingInfo. It is a bit awkward

Errors: []*armadaevents.Error{runError},
requeueJob := lastRun.Returned() && job.NumAttempts() < s.maxAttemptedRuns

if requeueJob && lastRun.RunAttempted() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we become leader (either on startup or through failover) we pass all jobs to this function. In this case we will get jobs that have already been requeued fall into this code branch. In that case I think the code will do the right thing- i.e. it'll end up with the jobs queued with the correct anti affinity set but it might be either writing a test to be sure, or seeing if we can already detect the job is in the correct state and short circuit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think it'll do the correct thing - however I think it'll end up sending the event back through the loop

We probably could just short circuit by checking if the job is already queued - if it is - skip. If it isn't - that implies it hasn't been processed

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we can avoid sending a large number of redundant messages that would be good. Is it enough to check that the job is already queued, or do we also need to check that the scheduling Info is as we expect? I think you're right that rechecking the queued state is enough, but I think I'd need to stare at this a bit more to convince myself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm relatively confident it will be sufficient to check it is queued - however we could also validate the affinity is as we expect

However this would only handle if we managed to get into an unexpected state - but maybe that isn't a bad thing

@@ -216,7 +248,14 @@ func (c *InstructionConverter) handleJobRunErrors(jobRunErrors *armadaevents.Job
JobID: jobId,
Error: bytes,
}
markRunsFailed[runId] = &JobRunFailed{LeaseReturned: runError.GetPodLeaseReturned() != nil}
runAttempted := false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this default to true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably yes

It is a bit awkward because the value only matters if lease was returned - so its value is somewhat irrelevant for other cases

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice it probably doesn't matter too much- although I'd argue that in general it's safer to assume the run was attempted if we can't determine whether it was.

@d80tb7
Copy link
Collaborator

d80tb7 commented Apr 13, 2023

Hi @d80tb7

So I've made the changes:

* Requeue jobs with an explicit event (which can also change scheduling info and is versioned)

* Have an explicit queued state in the jobs table - so the scheduler doesn't need to work out the queued state each time

Functionally this works, I know there are some points that need tidying:

* Naming could be improved

* The update queries are not efficient. sqlc isn't helping me out here - but I can just write an efficient native query relatively easily (luckily postgres has a nice support for this)

* A few other minor points

However before I finished off - I just wanted your overall opinion on if I'm butchering things too much

Also I've included JobSchedulingInfo in the JobRequeued event - however I've for now imported it from our internal schedulerobjects proto

We could make duplicate proto and convert them? Or move JobSchedulingInfo. It is a bit awkward

Yeah this all looks good- I made a few coments- none of them serious I don't think. Even the comment about the reordering of events in the ingester I don't think will cause any issues.

Re the proto- I think for now that's fine. My personal view, which admittedly isn't quite crystallized yet, is something like:

  • all the proto types in schedulerObjects should be treated as types used for serialisation only- they shouldn't be used in side the scheduler itself.
  • as internal serialised types they should live alongside the proto used for armadamessages which means importing one from the other should be fine.
  • The Scheduler should use it's own internal types which are probably largely mirror the proto types, but have the advantage that we can control their code so as to store things as efficiently as possible for the scheduler.

Sadly sqlc doesn't support dynamic queries - so I had to write a dynamic query manually

This means we don't need to do N update statements
… rest client

No other proto is considered public in that folder currently
@JamesMurkin JamesMurkin marked this pull request as ready for review April 14, 2023 14:42
@@ -26,6 +30,8 @@ CREATE TABLE jobs (
submit_message bytea NOT NULL,
-- JobSchedulingInfo message stored as a proto buffer.
scheduling_info bytea NOT NULL,
-- The current version of the JobSchedulingInfo, used to prevent JobSchedulingInfo state going backwards on even replay
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- The current version of the JobSchedulingInfo, used to prevent JobSchedulingInfo state going backwards on even replay
-- The current version of the JobSchedulingInfo, used to prevent JobSchedulingInfo state going backwards even on replay


func AddNodeAntiAffinity(affinity *v1.Affinity, labelName string, labelValue string) error {
if affinity == nil {
return fmt.Errorf("failed to add not anti affinity, as provided affinity is nil")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("failed to add not anti affinity, as provided affinity is nil")
return errors.Errorf("failed to add node anti affinity, as provided affinity is nil")

newSchedulingInfo.Version = job.JobSchedulingInfo().Version + 1
podSchedulingRequirement := PodRequirementFromJobSchedulingInfo(newSchedulingInfo)
if podSchedulingRequirement == nil {
return nil, fmt.Errorf("no pod scheduling requirement found for job %s", job.GetId())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return nil, fmt.Errorf("no pod scheduling requirement found for job %s", job.GetId())
return nil, errors.Errorf("no pod scheduling requirement found for job %s", job.GetId())

}
podSchedulingRequirement := PodRequirementFromJobSchedulingInfo(schedulingInfoWithNodeAntiAffinity)
if podSchedulingRequirement == nil {
return nil, false, fmt.Errorf("no pod scheduling requirement found for job %s", job.GetId())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return nil, false, fmt.Errorf("no pod scheduling requirement found for job %s", job.GetId())
return nil, false, errors.Errorf("no pod scheduling requirement found for job %s", job.GetId())

}

if requeueJob {
job = job.WithUpdatedRun(lastRun.WithFailed(false))
Copy link
Collaborator

@d80tb7 d80tb7 Apr 14, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to do this? I previously had to set lastRun.Failed = false because I was doing this in syncState and I needed to prevent the job from being failed later. I think now we are handling the requeing in the correct place so we can indeed treat the run as failed.

runError := jobRunErrors[lastRun.Id()]
job = job.WithFailed(true).WithQueued(false)
if lastRun.Returned() {
errorMessage := fmt.Sprintf("Maximum number of attempts (%d) reached - this job will no longer be retried", s.maxAttemptedRuns)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
errorMessage := fmt.Sprintf("Maximum number of attempts (%d) reached - this job will no longer be retried", s.maxAttemptedRuns)
errorMessage := errors.Sprintf("Maximum number of attempts (%d) reached - this job will no longer be retried", s.maxAttemptedRuns)

if lastRun.Returned() {
errorMessage := fmt.Sprintf("Maximum number of attempts (%d) reached - this job will no longer be retried", s.maxAttemptedRuns)
if job.NumAttempts() < s.maxAttemptedRuns {
errorMessage = fmt.Sprintf("Job was attempeted %d times, and has been tried once on all nodes it can run on - this job will no longer be retried", job.NumAttempts())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
errorMessage = fmt.Sprintf("Job was attempeted %d times, and has been tried once on all nodes it can run on - this job will no longer be retried", job.NumAttempts())
errorMessage = errors.Sprintf("Job was attempeted %d times, and has been tried once on all nodes it can run on - this job will no longer be retried", job.NumAttempts())

argMarkers := make([]string, 0, len(o))

currentIndex := 1
for key, value := range o {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might be simpler as a pgx.batch query (https://github.com/jackc/pgx/blob/master/batch_test.go#L30). I suspect the performance won't be much worse either.

return errors.WithStack(err)
}
case UpdateJobQueuedState:
args := make([]interface{}, 0, len(o)*3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above- I think this could be a pgx.batch query.

Copy link
Collaborator

@d80tb7 d80tb7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raised some comments- but nothing super important.

@JamesMurkin JamesMurkin enabled auto-merge (squash) April 16, 2023 13:39
@JamesMurkin JamesMurkin merged commit 8aeb5f3 into master Apr 16, 2023
@JamesMurkin JamesMurkin deleted the avoid_node_on_retry branch April 16, 2023 14:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants