Skip to content

Commit

Permalink
ExecutionStore should cycle through queued elements [BA-6487 prerequi…
Browse files Browse the repository at this point in the history
…site] (#5588)
  • Loading branch information
cjllanwarne committed Jul 30, 2020
1 parent 2df2563 commit 3da6198
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 12 deletions.
Expand Up @@ -22,7 +22,7 @@ object ExecutionStore {

type StatusTable = Table[GraphNode, ExecutionIndex.ExecutionIndex, JobKey]

val MaxJobsToStartPerTick = 1000
val MaxJobsAllowedInQueuedState = 1000

implicit class EnhancedJobKey(val key: JobKey) extends AnyVal {
/**
Expand Down Expand Up @@ -154,7 +154,10 @@ final case class SealedExecutionStore private[stores](private val statusStore: M
sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, ExecutionStatus], val needsUpdate: Boolean) {
// View of the statusStore more suited for lookup based on status
lazy val store: Map[ExecutionStatus, List[JobKey]] = statusStore.groupBy(_._2).safeMapValues(_.keys.toList)
lazy val queuedJobsAboveThreshold = queuedJobs > MaxJobsToStartPerTick
lazy val queueableJobLimit = {
val diff = MaxJobsAllowedInQueuedState - queuedJobs
if (diff < 0) 0 else diff
}

def backendJobDescriptorKeyForNode(node: GraphNode): Option[BackendJobDescriptorKey] = {
statusStore.keys collectFirst { case k: BackendJobDescriptorKey if k.node eq node => k }
Expand All @@ -174,7 +177,11 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex
* Update key statuses
*/
def updateKeys(values: Map[JobKey, ExecutionStatus]): ExecutionStore = {
updateKeys(values, needsUpdate || values.values.exists(_.isTerminalOrRetryable))
// The store might newly need updating now if keys have changed and either:
// - A job has completed -> downstream jobs might now be runnable
// - The store had jobs waiting for queue space -> more queue space might have been freed up
val needsNewUpdate = values.nonEmpty && (values.values.exists(_.isTerminalOrRetryable) || store.contains(WaitingForQueueSpace))
updateKeys(values, needsUpdate || needsNewUpdate)
}

/**
Expand All @@ -196,7 +203,7 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex
* Create 2 Tables, one for keys in done status and one for keys in terminal status.
* A Table is nothing more than a Map[R, Map[C, V]], see Table trait for more details
* In this case, rows are GraphNodes, columns are ExecutionIndexes, and values are JobKeys
* This allows for quick lookup of all shards for a node, as well as accessing a specific key with a
* This allows for quick lookup of all shards for a node, as well as accessing a specific key with a
* (node, index) pair
*/
lazy val (doneStatus, terminalStatus) = {
Expand Down Expand Up @@ -274,24 +281,24 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex
// Even if the key is runnable, if it's a call key and there's already too many queued jobs,
// don't start it and mark it as WaitingForQueueSpace
// TODO maybe also limit the number of expression keys to run somehow ?
case callKey: CallKey if runnable && queuedJobsAboveThreshold =>
case callKey: CallKey if runnable && queueableJobLimit <= 0 =>
internalUpdates = internalUpdates + (callKey -> WaitingForQueueSpace)
false
case _ => runnable
}
}

// If the queued jobs are not above the threshold, use the nodes that are already waiting for queue space
val runnableWaitingForQueueSpace = if (!queuedJobsAboveThreshold) keysWithStatus(WaitingForQueueSpace).toStream else Stream.empty[JobKey]
val runnableWaitingForQueueSpace = if (queueableJobLimit > 0) keysWithStatus(WaitingForQueueSpace).toStream else Stream.empty[JobKey]

// Start with keys that are waiting for queue space as we know they're runnable already. Then filter the not started ones
val readyToStart = runnableWaitingForQueueSpace ++ keysWithStatus(NotStarted).toStream.filter(filterFunction)

// Compute the first ExecutionStore.MaxJobsToStartPerTick + 1 runnable keys
val keysToStartPlusOne = readyToStart.take(MaxJobsToStartPerTick + 1).toList
// Take up to queueableJobLimit + 1 runnable keys
val keysToStartPlusOne = readyToStart.take(queueableJobLimit + 1).toList

// Will be true if the result is truncated, in which case we'll need to do another pass later
val truncated = keysToStartPlusOne.size > MaxJobsToStartPerTick
val truncated = keysToStartPlusOne.size > queueableJobLimit

// If we found unstartable keys, update their status, and set needsUpdate to true (it might unblock other keys)
val updated = if (internalUpdates.nonEmpty) {
Expand All @@ -302,7 +309,7 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex
// Otherwise we can reset it, nothing else will be runnable / unstartable until some new keys become terminal
} else withNeedsUpdateFalse

// Only take the first ExecutionStore.MaxJobsToStartPerTick from the above list.
ExecutionStoreUpdate(keysToStartPlusOne.take(MaxJobsToStartPerTick), updated, internalUpdates)
// Only take the first queueableJobLimit keys from the list.
ExecutionStoreUpdate(keysToStartPlusOne.take(queueableJobLimit), updated, internalUpdates)
} else ExecutionStoreUpdate(List.empty, this, Map.empty)
}
@@ -0,0 +1,92 @@
package cromwell.engine.workflow.lifecycle.execution.stores

import cromwell.backend.BackendJobDescriptorKey
import cromwell.core.ExecutionStatus.{ExecutionStatus, _}
import cromwell.core.JobKey
import cromwell.engine.workflow.lifecycle.execution.stores.ExecutionStoreSpec._
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import wom.graph._

import scala.util.Random

class ExecutionStoreSpec extends FlatSpec with Matchers with BeforeAndAfter {

var store: ExecutionStore = _

before {
def jobKeys: Map[JobKey, ExecutionStatus] = (0.until(10000).toList map {
i => BackendJobDescriptorKey(noConnectionsGraphNode, Option(i), 1) -> NotStarted })
.toMap

store = ActiveExecutionStore(jobKeys, needsUpdate = true)
}

def updateStoreToEnqueueNewlyRunnableJobs(): Unit = {
while (store.needsUpdate) {
val update = store.update
store = update.updatedStore.updateKeys(update.runnableKeys.map(_ -> QueuedInCromwell).toMap)
}
}

it should "keep allowing updates while 10000 call keys are enqueued and then started" in {

updateStoreToEnqueueNewlyRunnableJobs()

store.store(QueuedInCromwell).size should be(1000)
store.store(WaitingForQueueSpace).size should be(9000)
store.store.contains(Running) should be(false)

var iteration = 0
while(store.store.getOrElse(Running, List.empty).size < 10000) {
store = store.updateKeys(store.store(QueuedInCromwell).map(j => j -> Running).toMap)
updateStoreToEnqueueNewlyRunnableJobs()

// In all situations, the queue should never go above the queue limit:
(store.store.getOrElse(QueuedInCromwell, List.empty).size <= 1000) should be(true)

// Additionally, as long as elements are waiting for queue space, the queue should be exactly 1000 long
if (store.store.contains(WaitingForQueueSpace)) { store.store(QueuedInCromwell).size should be(1000) }

iteration = iteration + 1
store.store.getOrElse(Running, List.empty).size should be(iteration * 1000)
}
}

it should "never enqueue more than the total allowed queue space" in {

updateStoreToEnqueueNewlyRunnableJobs()

store.store(QueuedInCromwell).size should be(1000)
store.store(WaitingForQueueSpace).size should be(9000)
store.store.contains(Running) should be(false)
var currentlyRunning = 0

while(store.store.getOrElse(Running, List.empty).size < 10000) {
val newlyRunning: Iterable[JobKey] = store.store(QueuedInCromwell).take(Random.nextInt(1000))
store = store.updateKeys(newlyRunning.map(j => j -> Running).toMap)
updateStoreToEnqueueNewlyRunnableJobs()

// In all situations, the queue should never go above the queue limit:
(store.store.getOrElse(QueuedInCromwell, List.empty).size <= 1000) should be(true)

// Additionally, as long as elements are waiting for queue space, the queue should be exactly 1000 long
if (store.store.contains(WaitingForQueueSpace)) { store.store(QueuedInCromwell).size should be(1000) }

store.store.getOrElse(Running, List.empty).size should be(currentlyRunning + newlyRunning.size)
currentlyRunning = currentlyRunning + newlyRunning.size
}
}
}

object ExecutionStoreSpec {

val noConnectionsGraphNode: CommandCallNode = CommandCallNode(
identifier = WomIdentifier("mock_task", "mock_wf.mock_task"),
callable = null,
inputPorts = Set.empty[GraphNodePort.InputPort],
inputDefinitionMappings = List.empty,
nonInputBasedPrerequisites = Set.empty[GraphNode],
outputIdentifierCompoundingFunction = (wi, _) => wi,
sourceLocation = None
)
}

0 comments on commit 3da6198

Please sign in to comment.