New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ExecutionStore should cycle through queued elements [BA-6487 prerequisite] #5588
Conversation
@@ -154,7 +154,7 @@ final case class SealedExecutionStore private[stores](private val statusStore: M | |||
sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, ExecutionStatus], val needsUpdate: Boolean) { | |||
// View of the statusStore more suited for lookup based on status | |||
lazy val store: Map[ExecutionStatus, List[JobKey]] = statusStore.groupBy(_._2).safeMapValues(_.keys.toList) | |||
lazy val queuedJobsAboveThreshold = queuedJobs > MaxJobsToStartPerTick | |||
lazy val queuedJobsAboveThreshold = queuedJobs >= MaxJobsToStartPerTick |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the variable should truly represent queued jobs above the threshold why would this be >=
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subsequently replaced with a subtraction
// The store might newly need updating now if keys have changed and either: | ||
// - A job has completed -> downstream jobs might now be runnable | ||
// - The store had jobs waiting for queue space -> more queue space might have been freed up | ||
val needsNewUpdate = values.nonEmpty && (values.values.exists(_.isTerminalOrRetryable) || store.contains(WaitingForQueueSpace)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this need the new values.nonEmpty
check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because my test case ended by updating an empty set of values (ie changing nothing) but nevertheless being considered as newly needing an update
@@ -274,24 +281,24 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex | |||
// Even if the key is runnable, if it's a call key and there's already too many queued jobs, | |||
// don't start it and mark it as WaitingForQueueSpace | |||
// TODO maybe also limit the number of expression keys to run somehow ? | |||
case callKey: CallKey if runnable && queuedJobsAboveThreshold => | |||
case callKey: CallKey if runnable && startableJobLimit > 0 => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think startableJobLimit > 0
leaves us room for starting more jobs right away? So it should be if runnable && startableJobLimit <= 0
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the test case passing, I think this is the right way round, but maybe the variable is badly named? It's supposed to mean "how many jobs am I going to be allowing myself to start this time". Which you need to be over 0 to start any new jobs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's supposed to mean "how many jobs am I going to be allowing myself to start this time". Which you need to be over 0 to start any new jobs
Exactly, and this is how I read it. But I think in this case clause we are saying that the job is not runnable this time (by returning false) and postponing the job by transitioning it into the WaitingForQueueSpace
state. But since we have startableJobLimit > 0
we should allow to start this job right away, instead of putting it into the waiting state
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right, I see what you mean. Yes I think you're right, I'll fix that
store.store(WaitingForQueueSpace).size should be(9000) | ||
store.store.contains(Running) should be(false) | ||
|
||
while(store.store.getOrElse(Running, List.empty).size < 10000) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
||
updateStoreToEnqueueNewlyRunnableJobs() | ||
|
||
while(store.store.getOrElse(Running, List.empty).size < 10000) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same concern as above
Closing for now, in hopes that #5590 will work instead |
Re-opening because we think we prefer this one to #5590 after all... |
There're several comments from me regarding usage of while loops in tests (the same ones as in another PR). Otherwise LGTM. |
@@ -274,24 +281,24 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex | |||
// Even if the key is runnable, if it's a call key and there's already too many queued jobs, | |||
// don't start it and mark it as WaitingForQueueSpace | |||
// TODO maybe also limit the number of expression keys to run somehow ? | |||
case callKey: CallKey if runnable && queuedJobsAboveThreshold => | |||
case callKey: CallKey if runnable && startableJobLimit <= 0 => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the code above keeps startableJobLimit
from being less than zero
case callKey: CallKey if runnable && startableJobLimit <= 0 => | |
case callKey: CallKey if runnable && startableJobLimit == 0 => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Start with keys that are waiting for queue space as we know they're runnable already. Then filter the not started ones | ||
val readyToStart = runnableWaitingForQueueSpace ++ keysWithStatus(NotStarted).toStream.filter(filterFunction) | ||
|
||
// Compute the first ExecutionStore.MaxJobsToStartPerTick + 1 runnable keys | ||
val keysToStartPlusOne = readyToStart.take(MaxJobsToStartPerTick + 1).toList | ||
val keysToStartPlusOne = readyToStart.take(startableJobLimit + 1).toList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment no longer matches the code
@@ -303,6 +310,6 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex | |||
} else withNeedsUpdateFalse | |||
|
|||
// Only take the first ExecutionStore.MaxJobsToStartPerTick from the above list. | |||
ExecutionStoreUpdate(keysToStartPlusOne.take(MaxJobsToStartPerTick), updated, internalUpdates) | |||
ExecutionStoreUpdate(keysToStartPlusOne.take(startableJobLimit), updated, internalUpdates) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here too
@@ -154,7 +154,10 @@ final case class SealedExecutionStore private[stores](private val statusStore: M | |||
sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, ExecutionStatus], val needsUpdate: Boolean) { | |||
// View of the statusStore more suited for lookup based on status | |||
lazy val store: Map[ExecutionStatus, List[JobKey]] = statusStore.groupBy(_._2).safeMapValues(_.keys.toList) | |||
lazy val queuedJobsAboveThreshold = queuedJobs > MaxJobsToStartPerTick | |||
lazy val startableJobLimit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe queueableJobLimit
would be a more accurate name?
Updating keys in the ExecutionStore might leave room for previously "waiting for queue space" jobs to start.
TOL: I'm not 100% sure what this concept of "waiting for queue space" is, it seems to be distinct from the EJEA QueuedInCromwell task status since it's quite possible to have over 100,000 jobs in that status just fine... 🤔