Skip to content

Commit

Permalink
Revert "ExecutionStore should cycle through queued elements [BA-6487
Browse files Browse the repository at this point in the history
…prerequisite] (#5588)" (#5918)
  • Loading branch information
mcovarr committed Oct 7, 2020
1 parent bac1ca3 commit 171f12c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 113 deletions.
Expand Up @@ -22,7 +22,7 @@ object ExecutionStore {

type StatusTable = Table[GraphNode, ExecutionIndex.ExecutionIndex, JobKey]

val MaxJobsAllowedInQueuedState = 1000
val MaxJobsToStartPerTick = 1000

implicit class EnhancedJobKey(val key: JobKey) extends AnyVal {
/**
Expand Down Expand Up @@ -154,10 +154,7 @@ final case class SealedExecutionStore private[stores](private val statusStore: M
sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, ExecutionStatus], val needsUpdate: Boolean) {
// View of the statusStore more suited for lookup based on status
lazy val store: Map[ExecutionStatus, List[JobKey]] = statusStore.groupBy(_._2).safeMapValues(_.keys.toList)
lazy val queueableJobLimit = {
val diff = MaxJobsAllowedInQueuedState - queuedJobs
if (diff < 0) 0 else diff
}
lazy val queuedJobsAboveThreshold = queuedJobs > MaxJobsToStartPerTick

def backendJobDescriptorKeyForNode(node: GraphNode): Option[BackendJobDescriptorKey] = {
statusStore.keys collectFirst { case k: BackendJobDescriptorKey if k.node eq node => k }
Expand All @@ -177,11 +174,7 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex
* Update key statuses
*/
def updateKeys(values: Map[JobKey, ExecutionStatus]): ExecutionStore = {
// The store might newly need updating now if keys have changed and either:
// - A job has completed -> downstream jobs might now be runnable
// - The store had jobs waiting for queue space -> more queue space might have been freed up
val needsNewUpdate = values.nonEmpty && (values.values.exists(_.isTerminalOrRetryable) || store.contains(WaitingForQueueSpace))
updateKeys(values, needsUpdate || needsNewUpdate)
updateKeys(values, needsUpdate || values.values.exists(_.isTerminalOrRetryable))
}

/**
Expand All @@ -203,7 +196,7 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex
* Create 2 Tables, one for keys in done status and one for keys in terminal status.
* A Table is nothing more than a Map[R, Map[C, V]], see Table trait for more details
* In this case, rows are GraphNodes, columns are ExecutionIndexes, and values are JobKeys
* This allows for quick lookup of all shards for a node, as well as accessing a specific key with a
* This allows for quick lookup of all shards for a node, as well as accessing a specific key with a
* (node, index) pair
*/
lazy val (doneStatus, terminalStatus) = {
Expand Down Expand Up @@ -281,24 +274,24 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex
// Even if the key is runnable, if it's a call key and there's already too many queued jobs,
// don't start it and mark it as WaitingForQueueSpace
// TODO maybe also limit the number of expression keys to run somehow ?
case callKey: CallKey if runnable && queueableJobLimit <= 0 =>
case callKey: CallKey if runnable && queuedJobsAboveThreshold =>
internalUpdates = internalUpdates + (callKey -> WaitingForQueueSpace)
false
case _ => runnable
}
}

// If the queued jobs are not above the threshold, use the nodes that are already waiting for queue space
val runnableWaitingForQueueSpace = if (queueableJobLimit > 0) keysWithStatus(WaitingForQueueSpace).toStream else Stream.empty[JobKey]

val runnableWaitingForQueueSpace = if (!queuedJobsAboveThreshold) keysWithStatus(WaitingForQueueSpace).toStream else Stream.empty[JobKey]
// Start with keys that are waiting for queue space as we know they're runnable already. Then filter the not started ones
val readyToStart = runnableWaitingForQueueSpace ++ keysWithStatus(NotStarted).toStream.filter(filterFunction)

// Take up to queueableJobLimit + 1 runnable keys
val keysToStartPlusOne = readyToStart.take(queueableJobLimit + 1).toList
// Compute the first ExecutionStore.MaxJobsToStartPerTick + 1 runnable keys
val keysToStartPlusOne = readyToStart.take(MaxJobsToStartPerTick + 1).toList

// Will be true if the result is truncated, in which case we'll need to do another pass later
val truncated = keysToStartPlusOne.size > queueableJobLimit
val truncated = keysToStartPlusOne.size > MaxJobsToStartPerTick

// If we found unstartable keys, update their status, and set needsUpdate to true (it might unblock other keys)
val updated = if (internalUpdates.nonEmpty) {
Expand All @@ -309,7 +302,7 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex
// Otherwise we can reset it, nothing else will be runnable / unstartable until some new keys become terminal
} else withNeedsUpdateFalse

// Only take the first queueableJobLimit keys from the list.
ExecutionStoreUpdate(keysToStartPlusOne.take(queueableJobLimit), updated, internalUpdates)
// Only take the first ExecutionStore.MaxJobsToStartPerTick from the above list.
ExecutionStoreUpdate(keysToStartPlusOne.take(MaxJobsToStartPerTick), updated, internalUpdates)
} else ExecutionStoreUpdate(List.empty, this, Map.empty)
}

This file was deleted.

0 comments on commit 171f12c

Please sign in to comment.