Skip to content

Commit

Permalink
Merge pull request #2821 from broadinstitute/ks_centaur_cwl_runner
Browse files Browse the repository at this point in the history
Added CentaurCwlRunner.
  • Loading branch information
kshakir committed Nov 7, 2017
2 parents b296fc4 + 043afe2 commit 14f0e70
Show file tree
Hide file tree
Showing 45 changed files with 1,460 additions and 1,311 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ env:
- BUILD_TYPE=centaurJes
- BUILD_TYPE=centaurLocal
- BUILD_TYPE=centaurTes
- BUILD_TYPE=centaurCwlConformance
script:
- pip install 'requests[security]'
- sudo pip install --ignore-installed cwltool
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package cromwell.backend.async


import java.util.concurrent.ExecutionException

import akka.actor.{Actor, ActorLogging, ActorRef}
import cromwell.backend.BackendJobDescriptor
import cromwell.backend.BackendJobExecutionActor._
Expand Down Expand Up @@ -49,6 +51,7 @@ trait AsyncBackendJobExecutionActor { this: Actor with ActorLogging =>
case _: RuntimeException => true
case _: InterruptedException => true
case _: CromwellFatalExceptionMarker => true
case e: ExecutionException => Option(e.getCause).exists(isFatal)
case _ => false
}

Expand Down
163 changes: 71 additions & 92 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,78 +1,64 @@
import Dependencies._
import Settings._
import Testing._

lazy val common = (project in file("common"))
.settings(commonSettings:_*)
.withTestSettings
// Libraries

lazy val wom = (project in file("wom"))
.settings(womSettings:_*)
lazy val common = project
.withLibrarySettings("cromwell-common", commonDependencies, crossCompile = true)

lazy val wom = project
.withLibrarySettings("cromwell-wom", womDependencies, crossCompile = true)
.dependsOn(common)
.withTestSettings

lazy val wdl = (project in file("wdl"))
.settings(wdlSettings:_*)
lazy val wdl = project
.withLibrarySettings("cromwell-wdl", wdlDependencies, crossCompile = true)
.dependsOn(wom)
.withTestSettings

lazy val cwl = (project in file("cwl"))
.settings(cwlSettings:_*)
lazy val cwl = project
.withLibrarySettings("cromwell-cwl", cwlDependencies, crossCompile = true)
.dependsOn(wom)
.dependsOn(wom % "test->test")
.withTestSettings

lazy val womtool = (project in file("womtool"))
.settings(womtoolSettings:_ *)
.dependsOn(wdl)
.dependsOn(cwl)
.dependsOn(wom % "test->test")
.withTestSettings

lazy val core = (project in file("core"))
.settings(coreSettings:_*)
lazy val core = project
.withLibrarySettings("cromwell-core", coreDependencies)
.dependsOn(wom)
.dependsOn(wom % "test->test")
.withTestSettings

lazy val gcsFileSystem = (project in file("filesystems/gcs"))
.settings(gcsFileSystemSettings:_*)
.withTestSettings
lazy val cloudSupport = project
.withLibrarySettings("cromwell-cloud-support", cloudSupportDependencies)
.dependsOn(core)
.dependsOn(core % "test->test")

lazy val gcsFileSystem = (project in file("filesystems/gcs"))
.withLibrarySettings("cromwell-gcsfilesystem")
.dependsOn(core)
.dependsOn(cloudSupport)
.dependsOn(core % "test->test")
.dependsOn(cloudSupport % "test->test")

lazy val databaseSql = (project in file("database/sql"))
.settings(databaseSqlSettings:_*)
.withTestSettings
.withLibrarySettings("cromwell-database-sql", databaseSqlDependencies)

lazy val databaseMigration = (project in file("database/migration"))
.settings(databaseMigrationSettings: _*)
.withLibrarySettings("cromwell-database-migration", databaseMigrationDependencies)
.dependsOn(core)
.dependsOn(wdl)
.withTestSettings

lazy val dockerHashing = (project in file("dockerHashing"))
.settings(dockerHashingSettings: _*)
lazy val dockerHashing = project
.withLibrarySettings("cromwell-docker-hashing")
.dependsOn(core)
.dependsOn(core % "test->test")
.withTestSettings

lazy val cromwellApiClient = (project in file("cromwellApiClient"))
.settings(cromwellApiClientSettings: _*)
.withTestSettings
lazy val cromwellApiClient = project
.withLibrarySettings("cromwell-api-client", cromwellApiClientDependencies)

lazy val centaur = (project in file("centaur"))
.configs(IntegrationTest)
.settings(Defaults.itSettings ++ centaurSettings: _*)
.withTestSettings
lazy val centaur = project
.withLibrarySettings("centaur", centaurDependencies, integrationTests = true)
.dependsOn(common)
.dependsOn(cromwellApiClient)

lazy val services = (project in file("services"))
.settings(servicesSettings:_*)
.withTestSettings
.dependsOn(core)
lazy val services = project
.withLibrarySettings("cromwell-services")
.dependsOn(databaseSql)
.dependsOn(databaseMigration)
.dependsOn(cloudSupport)
Expand All @@ -81,49 +67,38 @@ lazy val services = (project in file("services"))

lazy val backendRoot = Path("supportedBackends")

lazy val backend = (project in file("backend"))
.settings(backendSettings:_*)
.withTestSettings
.dependsOn(core)
lazy val backend = project
.withLibrarySettings("cromwell-backend")
.dependsOn(services)
.dependsOn(core % "test->test")

lazy val sfsBackend = (project in backendRoot / "sfs")
.settings(sfsBackendSettings:_*)
.withTestSettings
.withLibrarySettings("cromwell-sfs-backend")
.dependsOn(backend)
.dependsOn(gcsFileSystem)
.dependsOn(backend % "test->test")
.dependsOn(services % "test->test")

lazy val tesBackend = (project in backendRoot / "tes")
.settings(tesBackendSettings:_*)
.withTestSettings
.withLibrarySettings("cromwell-tes-backend", tesBackendDependencies)
.dependsOn(sfsBackend)
.dependsOn(backend % "test->test")

lazy val sparkBackend = (project in backendRoot / "spark")
.settings(sparkBackendSettings:_*)
.withTestSettings
.withLibrarySettings("cromwell-spark-backend", sparkBackendDependencies)
.dependsOn(sfsBackend)
.dependsOn(backend % "test->test")

lazy val jesBackend = (project in backendRoot / "jes")
.settings(jesBackendSettings:_*)
.withTestSettings
.withLibrarySettings("cromwell-jes-backend")
.dependsOn(backend)
.dependsOn(gcsFileSystem)
.dependsOn(cloudSupport)
.dependsOn(backend % "test->test")
.dependsOn(gcsFileSystem % "test->test")
.dependsOn(services % "test->test")

lazy val engine = (project in file("engine"))
.settings(engineSettings: _*)
.withTestSettings
.dependsOn(core)
.dependsOn(dockerHashing)
.dependsOn(services)
lazy val engine = project
.withLibrarySettings("cromwell-engine", engineDependencies, engineSettings)
.dependsOn(backend)
.dependsOn(gcsFileSystem)
.dependsOn(wdl)
Expand All @@ -135,43 +110,47 @@ lazy val engine = (project in file("engine"))
.dependsOn(sfsBackend % "test->compile")
.dependsOn(gcsFileSystem % "test->test")

lazy val cloudSupport = (project in file("cloudSupport"))
.settings(cloudSupportSettings: _*)
.withTestSettings
.dependsOn(core)
.dependsOn(core % "test->test")
// Executables

lazy val centaurCwlRunner = project
.withExecutableSettings("centaur-cwl-runner", centaurCwlRunnerDependencies, centaurCwlRunnerSettings)
.dependsOn(cwl)
.dependsOn(centaur)

lazy val womtool = project
.withExecutableSettings("womtool", womtoolDependencies, buildDocker = false)
.dependsOn(wdl)
.dependsOn(cwl)
.dependsOn(wom % "test->test")

lazy val root = (project in file("."))
.settings(rootSettings: _*)
.enablePlugins(DockerPlugin)
.withTestSettings
.withExecutableSettings("cromwell", rootDependencies)
// Next level of projects to include in the fat jar (their dependsOn will be transitively included)
.dependsOn(engine)
.dependsOn(jesBackend)
.dependsOn(tesBackend)
.dependsOn(sparkBackend)
.dependsOn(engine % "test->test")
// Full list of all sub-projects to build with the root (ex: include in `sbt test`)
.aggregate(backend)
.aggregate(centaur)
.aggregate(centaurCwlRunner)
.aggregate(common)
.aggregate(wom)
.aggregate(wdl)
.aggregate(cwl)
.aggregate(womtool)
.aggregate(core)
.aggregate(cromwellApiClient)
.aggregate(cwl)
.aggregate(databaseMigration)
.aggregate(databaseSql)
.aggregate(dockerHashing)
.aggregate(engine)
.aggregate(gcsFileSystem)
.aggregate(databaseSql)
.aggregate(databaseMigration)
.aggregate(jesBackend)
.aggregate(services)
.aggregate(backend)
.aggregate(sfsBackend)
.aggregate(sparkBackend)
.aggregate(jesBackend)
.aggregate(tesBackend)
.aggregate(engine)
.aggregate(cromwellApiClient)
.aggregate(centaur)
// Next level of projects to include in the fat jar (their dependsOn will be transitively included)
.dependsOn(engine)
.dependsOn(jesBackend)
.dependsOn(tesBackend)
.dependsOn(sparkBackend)
.dependsOn(wdl)
.dependsOn(cwl)
.dependsOn(womtool)
// Dependencies for tests
.dependsOn(engine % "test->test")
.aggregate(wdl)
.aggregate(wom)
.aggregate(womtool)
// TODO: See comment in plugins.sbt regarding SBT 1.x
.enablePlugins(CrossPerProjectPlugin)
4 changes: 2 additions & 2 deletions centaur/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ Tag names are all lower case, so a test named "tagFoo" has a tag "tagfoo".

To run only those tests which have been tagged with a specified tag `tagFoo`:
```
sbt "project centaur" "it:testOnly * -- -n tagfoo"
sbt "centaur/it:testOnly * -- -n tagfoo"
```

Or to instead exclude all tests which have been tagged with a specified tag `tagFoo`:
```
sbt "project centaur" "it:testOnly * -- -l tagfoo"
sbt "centaur/it:testOnly * -- -l tagfoo"
```

# Adding custom tests
Expand Down
1 change: 1 addition & 0 deletions centaur/src/it/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
akka.http.host-connection-pool.max-open-requests: 128
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ abstract class AbstractCentaurTestCaseSpec(cromwellBackends: List[String]) exten

def executeStandardTest(testCase: CentaurTestCase): Unit = {
def nameTest = s"${testCase.testFormat.testSpecString} ${testCase.workflow.testName}"
def runTest(): Unit = testCase.testFunction.run.get
def runTest(): Unit = {
testCase.testFunction.run.get
()
}

// Make tags, but enforce lowercase:
val tags = (testCase.testOptions.tags :+ testCase.workflow.testName :+ testCase.testFormat.name) map { x => Tag(x.toLowerCase) }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
akka.http.host-connection-pool.max-open-requests: 128

centaur {

cromwell {
Expand Down
13 changes: 9 additions & 4 deletions centaur/src/main/scala/centaur/api/CentaurCromwellClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,18 @@ import centaur.test.metadata.WorkflowMetadata
import centaur.test.workflow.Workflow
import centaur.{CentaurConfig, CromwellManager}
import cromwell.api.CromwellClient
import cromwell.api.model.{CromwellBackends, SubmittedWorkflow, WorkflowStatus}
import cromwell.api.model.{CromwellBackends, SubmittedWorkflow, WorkflowOutputs, WorkflowStatus}

import scala.concurrent._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException}
import scala.util.Try

object CentaurCromwellClient {
// Do not use scala.concurrent.ExecutionContext.Implicits.global as long as this is using Await.result
// See https://github.com/akka/akka-http/issues/602
// And https://github.com/viktorklang/blog/blob/master/Futures-in-Scala-2.12-part-7.md
final implicit val blockingEc = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
final implicit val blockingEc: ExecutionContextExecutor = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool(DaemonizedDefaultThreadFactory))

// Akka HTTP needs both the actor system and a materializer
final implicit val system = ActorSystem("centaur-acting-like-a-system")
Expand All @@ -37,11 +38,15 @@ object CentaurCromwellClient {
def status(workflow: SubmittedWorkflow): Try[WorkflowStatus] = {
sendReceiveFutureCompletion(() => cromwellClient.status(workflow.id))
}

def abort(workflow: SubmittedWorkflow): Try[WorkflowStatus] = {
sendReceiveFutureCompletion(() => cromwellClient.abort(workflow.id))
}

def outputs(workflow: SubmittedWorkflow): Try[WorkflowOutputs] = {
sendReceiveFutureCompletion(() => cromwellClient.outputs(workflow.id))
}

/*
Sends a quick ping to the Cromwell query endpoint. The query endpoint is the only one which both hits the
database w/o requiring a workflow id and does not modify server state. Not using CromwellClient here as it
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package centaur.api

import java.util.concurrent.ThreadFactory
import java.util.concurrent.atomic.AtomicInteger

/**
* A static version of java.util.concurrent.Executors.DefaultThreadFactory that creates daemon threads that exit when
* the application exits.
*
* See also: java.util.concurrent.Executors.DefaultThreadFactory
* See also: http://dev.bizo.com/2014/06/cached-thread-pool-considered-harmlful.html
* > Always provide your own ThreadFactory so all your threads are named appropriately and have the daemon flag set.
* > Your thread pool shouldn't keep the application alive. That's the responsibility of the application itself (i.e.,
* > main thread).
*/
object DaemonizedDefaultThreadFactory extends ThreadFactory {
private val s = System.getSecurityManager
private val group = if (s != null) s.getThreadGroup else Thread.currentThread.getThreadGroup
private val threadNumber = new AtomicInteger(1)
private val namePrefix = "daemonpool-thread-"

override def newThread(r: Runnable): Thread = {
val t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement, 0)
if (!t.isDaemon) t.setDaemon(true)
if (t.getPriority != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY)
t
}
}

0 comments on commit 14f0e70

Please sign in to comment.