Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ExecutionStore should cycle through queued elements [BA-6487 prerequisite] #5588

Merged
merged 8 commits into from Jul 30, 2020
Expand Up @@ -154,7 +154,7 @@ final case class SealedExecutionStore private[stores](private val statusStore: M
sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, ExecutionStatus], val needsUpdate: Boolean) {
// View of the statusStore more suited for lookup based on status
lazy val store: Map[ExecutionStatus, List[JobKey]] = statusStore.groupBy(_._2).safeMapValues(_.keys.toList)
lazy val queuedJobsAboveThreshold = queuedJobs > MaxJobsToStartPerTick
lazy val queuedJobsAboveThreshold = queuedJobs >= MaxJobsToStartPerTick
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the variable should truly represent queued jobs above the threshold why would this be >=?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subsequently replaced with a subtraction


def backendJobDescriptorKeyForNode(node: GraphNode): Option[BackendJobDescriptorKey] = {
statusStore.keys collectFirst { case k: BackendJobDescriptorKey if k.node eq node => k }
Expand All @@ -174,7 +174,11 @@ sealed abstract class ExecutionStore private[stores](statusStore: Map[JobKey, Ex
* Update key statuses
*/
def updateKeys(values: Map[JobKey, ExecutionStatus]): ExecutionStore = {
updateKeys(values, needsUpdate || values.values.exists(_.isTerminalOrRetryable))
// The store might newly need updating now if keys have changed and either:
// - A job has completed -> downstream jobs might now be runnable
// - The store had jobs waiting for queue space -> more queue space might have been freed up
val needsNewUpdate = values.nonEmpty && (values.values.exists(_.isTerminalOrRetryable) || store.contains(WaitingForQueueSpace))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need the new values.nonEmpty check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because my test case ended by updating an empty set of values (ie changing nothing) but nevertheless being considered as newly needing an update

updateKeys(values, needsUpdate || needsNewUpdate)
}

/**
Expand Down
@@ -0,0 +1,52 @@
package cromwell.engine.workflow.lifecycle.execution.stores

import cromwell.backend.BackendJobDescriptorKey
import cromwell.core.ExecutionStatus.{ExecutionStatus, _}
import cromwell.core.JobKey
import cromwell.engine.workflow.lifecycle.execution.stores.ExecutionStoreSpec._
import org.scalatest.{FlatSpec, Matchers}
import wom.graph._

class ExecutionStoreSpec extends FlatSpec with Matchers {



it should "keep requiring an update until all 10000 call keys are started" in {

def jobKeys: Map[JobKey, ExecutionStatus] = (0.until(10000).toList map {
i => BackendJobDescriptorKey(noConnectionsGraphNode, Option(i), 1) -> NotStarted })
.toMap

var store: ExecutionStore = ActiveExecutionStore(jobKeys, needsUpdate = true)

def updateStoreToEnqueueNewlyRunnableJobs(): Unit = {
while (store.needsUpdate) {
val update = store.update
store = update.updatedStore.updateKeys(update.runnableKeys.map(_ -> QueuedInCromwell).toMap)
}
}

updateStoreToEnqueueNewlyRunnableJobs()

store.store(QueuedInCromwell).size should be(1000)
store.store(WaitingForQueueSpace).size should be(9000)

while(store.store.getOrElse(Running, List.empty).size < 10000) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

store = store.updateKeys(store.store(QueuedInCromwell).map(j => j -> Running).toMap)
updateStoreToEnqueueNewlyRunnableJobs()
}
}
}

object ExecutionStoreSpec {

val noConnectionsGraphNode: CommandCallNode = CommandCallNode(
identifier = WomIdentifier("mock_task", "mock_wf.mock_task"),
callable = null,
inputPorts = Set.empty[GraphNodePort.InputPort],
inputDefinitionMappings = List.empty,
nonInputBasedPrerequisites = Set.empty[GraphNode],
outputIdentifierCompoundingFunction = (wi, _) => wi,
sourceLocation = None
)
}