From 4327ca61b91fa48852c41362493c9a04a6f59e8e Mon Sep 17 00:00:00 2001 From: Thibault Jeandet Date: Tue, 30 May 2017 14:07:33 -0400 Subject: [PATCH 01/74] Update cromwell version from 27 to 28 --- project/Version.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Version.scala b/project/Version.scala index 0d9839c3d..251a4c7d6 100644 --- a/project/Version.scala +++ b/project/Version.scala @@ -4,7 +4,7 @@ import sbt._ object Version { // Upcoming release, or current if we're on a master / hotfix branch - val cromwellVersion = "27" + val cromwellVersion = "28" // Adapted from SbtGit.versionWithGit def cromwellVersionWithGit: Seq[Setting[_]] = From 98d766b7754251aec15e30f6e4519445437ec355 Mon Sep 17 00:00:00 2001 From: Ruchi Date: Thu, 1 Jun 2017 09:26:22 -0400 Subject: [PATCH 02/74] Preliminary validation of workflow options (#2295) preliminary validation of wf options before WorkflowID creation --- core/src/main/scala/cromwell/core/package.scala | 19 ++++ .../src/main/scala/cromwell/api/model/Label.scala | 7 +- .../MaterializeWorkflowDescriptorActor.scala | 2 +- .../cromwell/webservice/CromwellApiService.scala | 47 ++++++--- .../webservice/CromwellApiServiceSpec.scala | 112 ++++++++++++++------- 5 files changed, 131 insertions(+), 56 deletions(-) diff --git a/core/src/main/scala/cromwell/core/package.scala b/core/src/main/scala/cromwell/core/package.scala index def878003..12f2e0e44 100644 --- a/core/src/main/scala/cromwell/core/package.scala +++ b/core/src/main/scala/cromwell/core/package.scala @@ -1,7 +1,12 @@ package cromwell +import cats.data.Validated._ +import cats.syntax.validated._ +import lenthall.validation.ErrorOr.ErrorOr import wdl4s.values.WdlValue +import scala.util.{Failure, Success, Try} + package object core { type LocallyQualifiedName = String type FullyQualifiedName = String @@ -10,4 +15,18 @@ package object core { type CallOutputs = Map[LocallyQualifiedName, JobOutput] type HostInputs = Map[String, WdlValue] type EvaluatedRuntimeAttributes = Map[String, WdlValue] + + implicit class toErrorOr[A](val trySomething: Try[A]) { + def tryToErrorOr: ErrorOr[A] = trySomething match { + case Success(options) => options.validNel + case Failure(err) => err.getMessage.invalidNel + } + } + + implicit class toTry[A](val validatedSomething: ErrorOr[A]) { + def errorOrToTry: Try[A] = validatedSomething match { + case Valid(options) => Success(options) + case Invalid(err) => Failure(new RuntimeException(s"Error(s): ${err.toList.mkString(",")}")) + } + } } diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala index 7cb72dd94..fb5e97669 100644 --- a/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/Label.scala @@ -5,9 +5,10 @@ import spray.json.{DefaultJsonProtocol, JsObject, JsString, JsValue, RootJsonFor object LabelsJsonFormatter extends DefaultJsonProtocol { implicit object LabelJsonFormat extends RootJsonFormat[List[Label]] { def write(l: List[Label]) = JsObject(l map { label => label.key -> JsString(label.value)} :_* ) - def read(value: JsValue) = value match { - case JsObject(x) => x map { case (k, JsString(v)) => Label(k, v) } toList - } + def read(value: JsValue) = value.asJsObject.fields map { + case (k, JsString(v)) => Label(k, v) + case other => throw new UnsupportedOperationException(s"Cannot deserialize $other to a Label") + } toList } } diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/MaterializeWorkflowDescriptorActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/MaterializeWorkflowDescriptorActor.scala index 1e8c21378..aaae5e6c6 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/MaterializeWorkflowDescriptorActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/MaterializeWorkflowDescriptorActor.scala @@ -12,8 +12,8 @@ import com.typesafe.config.Config import com.typesafe.scalalogging.LazyLogging import cromwell.backend.BackendWorkflowDescriptor import cromwell.core.Dispatcher.EngineDispatcher -import cromwell.core.WorkflowOptions.{ReadFromCache, WorkflowOption, WriteToCache} import cromwell.core._ +import cromwell.core.WorkflowOptions.{ReadFromCache, WorkflowOption, WriteToCache} import cromwell.core.callcaching._ import cromwell.core.labels.{Label, Labels} import cromwell.core.logging.WorkflowLogging diff --git a/engine/src/main/scala/cromwell/webservice/CromwellApiService.scala b/engine/src/main/scala/cromwell/webservice/CromwellApiService.scala index 38624b853..a575e4970 100644 --- a/engine/src/main/scala/cromwell/webservice/CromwellApiService.scala +++ b/engine/src/main/scala/cromwell/webservice/CromwellApiService.scala @@ -1,13 +1,18 @@ package cromwell.webservice import akka.actor._ +import cats.data.Validated.{Invalid, Valid} import cats.data.NonEmptyList +import cats.syntax.validated._ +import cats.syntax.cartesian._ import com.typesafe.config.{Config, ConfigFactory} -import cromwell.core.{WorkflowId, WorkflowOptionsJson, WorkflowSourceFilesCollection} +import cromwell.core.{WorkflowId, WorkflowOptions, WorkflowOptionsJson, WorkflowSourceFilesCollection} import cromwell.engine.backend.BackendConfiguration import cromwell.services.metadata.MetadataService._ import cromwell.webservice.WorkflowJsonSupport._ import cromwell.webservice.metadata.MetadataBuilderActor +import cromwell.core._ +import lenthall.validation.ErrorOr.ErrorOr import spray.http.MediaTypes._ import spray.http._ import spray.httpx.SprayJsonSupport._ @@ -147,18 +152,32 @@ trait CromwellApiService extends HttpService with PerRequestCreator { } } - def partialSourcesToSourceCollections(partialSources: Try[PartialWorkflowSources], allowNoInputs: Boolean): Try[Seq[WorkflowSourceFilesCollection]] = { - partialSources flatMap { - case PartialWorkflowSources(Some(wdlSource), workflowInputs, workflowInputsAux, workflowOptions, labels, wdlDependencies) => - //The order of addition allows for the expected override of colliding keys. - val sortedInputAuxes = workflowInputsAux.toSeq.sortBy(_._1).map(x => Option(x._2)) - val wfInputs: Try[Seq[WdlJson]] = if (workflowInputs.isEmpty) { - if (allowNoInputs) Success(Vector("{}")) else Failure(new IllegalArgumentException("No inputs were provided")) - } else Success(workflowInputs map { workflowInputSet => - mergeMaps(Seq(Option(workflowInputSet)) ++ sortedInputAuxes).toString - }) - wfInputs.map(_.map(x => WorkflowSourceFilesCollection(wdlSource, x, workflowOptions.getOrElse("{}"), labels.getOrElse("{}"), wdlDependencies))) - case other => Failure(new IllegalArgumentException(s"Incomplete workflow submission: $other")) + def partialSourcesToSourceCollections(partialSources: ErrorOr[PartialWorkflowSources], allowNoInputs: Boolean): ErrorOr[Seq[WorkflowSourceFilesCollection]] = { + + def validateInputs(pws: PartialWorkflowSources): ErrorOr[Seq[WdlJson]] = + (pws.workflowInputs.isEmpty, allowNoInputs) match { + case (true, true) => Vector("{}").validNel + case (true, false) => "No inputs were provided".invalidNel + case _ => + val sortedInputAuxes = pws.workflowInputsAux.toSeq.sortBy { case (index, _) => index } map { case(_, inputJson) => Option(inputJson) } + (pws.workflowInputs map { workflowInputSet: WdlJson => mergeMaps(Seq(Option(workflowInputSet)) ++ sortedInputAuxes).toString }).validNel + } + + def validateOptions(options: Option[WorkflowOptionsJson]): ErrorOr[WorkflowOptions] = + WorkflowOptions.fromJsonString(options.getOrElse("{}")).tryToErrorOr leftMap { _ map { i => s"Invalid workflow options provided: $i" } } + + def validateWdlSource(partialSource: PartialWorkflowSources): ErrorOr[WdlJson] = partialSource.wdlSource match { + case Some(src) => src.validNel + case _ => s"Incomplete workflow submission: $partialSource".invalidNel + } + + partialSources match { + case Valid(partialSource) => + (validateWdlSource(partialSource) |@| validateInputs(partialSource) |@| validateOptions(partialSource.workflowOptions)) map { + case (wdlSource, wfInputs, wfOptions) => + wfInputs.map(x => WorkflowSourceFilesCollection(wdlSource, x, wfOptions.asPrettyJson, partialSource.customLabels.getOrElse("{}"), partialSource.zippedImports)) + } + case Invalid(err) => err.invalid } } @@ -181,7 +200,7 @@ trait CromwellApiService extends HttpService with PerRequestCreator { throw new IllegalArgumentException(s"Unexpected body part name: ${bodyPart.name.getOrElse("None")}") } }) - partialSourcesToSourceCollections(partialSources, allowNoInputs) + partialSourcesToSourceCollections(partialSources.tryToErrorOr, allowNoInputs).errorOrToTry } } diff --git a/engine/src/test/scala/cromwell/webservice/CromwellApiServiceSpec.scala b/engine/src/test/scala/cromwell/webservice/CromwellApiServiceSpec.scala index 0cd2220e1..bbd926fc9 100644 --- a/engine/src/test/scala/cromwell/webservice/CromwellApiServiceSpec.scala +++ b/engine/src/test/scala/cromwell/webservice/CromwellApiServiceSpec.scala @@ -28,16 +28,16 @@ class CromwellApiServiceSpec extends FlatSpec with ScalatestRouteTest with Match val version = "v1" behavior of "REST API /status endpoint" - it should "return 200 for get of a known workflow id" in { - val workflowId = MockApiService.ExistingWorkflowId - Get(s"/workflows/$version/$workflowId/status") ~> - cromwellApiService.statusRoute ~> - check { - status should be(StatusCodes.OK) - val result = responseAs[JsObject] - result.fields(WorkflowMetadataKeys.Status) should be(JsString("Submitted")) - } - } + it should "return 200 for get of a known workflow id" in { + val workflowId = MockApiService.ExistingWorkflowId + Get(s"/workflows/$version/$workflowId/status") ~> + cromwellApiService.statusRoute ~> + check { + status should be(StatusCodes.OK) + val result = responseAs[JsObject] + result.fields(WorkflowMetadataKeys.Status) should be(JsString("Submitted")) + } + } it should "return 404 for get of unknown workflow" in { val workflowId = MockApiService.UnrecognizedWorkflowId @@ -51,7 +51,7 @@ class CromwellApiServiceSpec extends FlatSpec with ScalatestRouteTest with Match } } - it should "return 400 for get of a malformed workflow id's status" in { + it should "return 400 for get of a malformed workflow id's status" in { Get(s"/workflows/$version/foobar/status") ~> cromwellApiService.statusRoute ~> check { @@ -70,35 +70,35 @@ class CromwellApiServiceSpec extends FlatSpec with ScalatestRouteTest with Match } behavior of "REST API /abort endpoint" - it should "return 404 for abort of unknown workflow" in { - val workflowId = MockApiService.UnrecognizedWorkflowId + it should "return 404 for abort of unknown workflow" in { + val workflowId = MockApiService.UnrecognizedWorkflowId - Post(s"/workflows/$version/$workflowId/abort") ~> - cromwellApiService.abortRoute ~> - check { - assertResult(StatusCodes.NotFound) { - status + Post(s"/workflows/$version/$workflowId/abort") ~> + cromwellApiService.abortRoute ~> + check { + assertResult(StatusCodes.NotFound) { + status + } } - } - } + } - it should "return 400 for abort of a malformed workflow id" in { - Post(s"/workflows/$version/foobar/abort") ~> - cromwellApiService.abortRoute ~> - check { - assertResult(StatusCodes.BadRequest) { - status - } - assertResult( - """{ - | "status": "fail", - | "message": "Invalid workflow ID: 'foobar'." - |}""".stripMargin - ) { - responseAs[String] + it should "return 400 for abort of a malformed workflow id" in { + Post(s"/workflows/$version/foobar/abort") ~> + cromwellApiService.abortRoute ~> + check { + assertResult(StatusCodes.BadRequest) { + status + } + assertResult( + """{ + | "status": "fail", + | "message": "Invalid workflow ID: 'foobar'." + |}""".stripMargin + ) { + responseAs[String] + } } - } - } + } it should "return 403 for abort of a workflow in a terminal state" in { Post(s"/workflows/$version/${MockApiService.AbortedWorkflowId}/abort") ~> @@ -163,7 +163,7 @@ class CromwellApiServiceSpec extends FlatSpec with ScalatestRouteTest with Match assertResult( s"""{ | "status": "fail", - | "message": "Unexpected body part name: incorrectParameter" + | "message": "Error(s): Unexpected body part name: incorrectParameter" |}""".stripMargin) { responseAs[String] } @@ -173,6 +173,42 @@ class CromwellApiServiceSpec extends FlatSpec with ScalatestRouteTest with Match } } + it should "return 400 for a workflow submission with unsupported workflow option keys" in { + val options = """ + |{ + | "defaultRuntimeOptions": { + | "cpu":1 + | } + |} + |""".stripMargin + + val bodyParts = Map("wdlSource" -> BodyPart(HelloWorld.wdlSource()), "workflowOptions" -> BodyPart(options)) + + Post(s"/workflows/$version", MultipartFormData(bodyParts)) ~> + cromwellApiService.submitRoute ~> + check { + assertResult(StatusCodes.BadRequest) { + status + } + } + } + + it should "return 400 for a workflow submission with malformed workflow options json" in { + val options = s""" + |{"read_from_cache": "true" + |""".stripMargin + + val bodyParts = Map("wdlSource" -> BodyPart(HelloWorld.wdlSource()), "workflowOptions" -> BodyPart(options)) + + Post(s"/workflows/$version", MultipartFormData(bodyParts)) ~> + cromwellApiService.submitRoute ~> + check { + assertResult(StatusCodes.BadRequest) { + status + } + } + } + it should "succesfully merge and override multiple input files" in { val input1 = Map("wf.a1" -> "hello", "wf.a2" -> "world").toJson.toString @@ -219,7 +255,7 @@ class CromwellApiServiceSpec extends FlatSpec with ScalatestRouteTest with Match assertResult( s"""{ | "status": "fail", - | "message": "No inputs were provided" + | "message": "Error(s): No inputs were provided" |}""".stripMargin) { responseAs[String] } From bb96682db9039f9066ae861c6bdcc1d719bbc960 Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Tue, 30 May 2017 18:44:44 -0400 Subject: [PATCH 03/74] Purge workflow store entry before job entries. --- .../engine/workflow/WorkflowManagerActor.scala | 2 -- .../workflowstore/WorkflowStoreActor.scala | 1 - .../workflowstore/WorkflowStoreEngineActor.scala | 10 ---------- .../scala/cromwell/jobstore/JobStoreActor.scala | 6 +++--- .../main/scala/cromwell/jobstore/SqlJobStore.scala | 16 ++++++++++++--- .../cromwell/engine/WorkflowStoreActorSpec.scala | 23 ++++------------------ 6 files changed, 20 insertions(+), 38 deletions(-) diff --git a/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala b/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala index 2202aed28..5ee462622 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala @@ -224,8 +224,6 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams) val replyTo = abortingWorkflowToReplyTo(workflowId) replyTo ! WorkflowStoreEngineActor.WorkflowAborted(workflowId) abortingWorkflowToReplyTo -= workflowId - } else { - params.workflowStore ! WorkflowStoreActor.RemoveWorkflow(workflowId) } } stay using data.without(workflowActor) diff --git a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala index b45700824..ed231cb1a 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreActor.scala @@ -21,7 +21,6 @@ final case class WorkflowStoreActor private(store: WorkflowStore, serviceRegistr object WorkflowStoreActor { sealed trait WorkflowStoreActorEngineCommand final case class FetchRunnableWorkflows(n: Int) extends WorkflowStoreActorEngineCommand - final case class RemoveWorkflow(id: WorkflowId) extends WorkflowStoreActorEngineCommand final case class AbortWorkflow(id: WorkflowId, manager: ActorRef) extends WorkflowStoreActorEngineCommand case object InitializerCommand extends WorkflowStoreActorEngineCommand case object WorkDone extends WorkflowStoreActorEngineCommand diff --git a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreEngineActor.scala b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreEngineActor.scala index 955ad8033..b5767a926 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreEngineActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/workflowstore/WorkflowStoreEngineActor.scala @@ -91,16 +91,6 @@ final case class WorkflowStoreEngineActor private(store: WorkflowStore, serviceR val e = new RuntimeException(s"$message: ${t.getMessage}", t) sndr ! WorkflowAbortFailed(id, e) } - case RemoveWorkflow(id) => - val cleanup = for { - removed <- store.remove(id) - _ = if (!removed) log.warning(s"Attempted to remove ID {} from the WorkflowStore but it didn't exist", id) - _ <- database.removeDockerHashStoreEntries(id.toString) - } yield () - - cleanup recover { - case t => log.error(t, s"Unable to remove workflow $id from workflow store or clean up docker hash entries") - } case oops => log.error("Unexpected type of start work command: {}", oops.getClass.getSimpleName) Future.successful(self ! WorkDone) diff --git a/engine/src/main/scala/cromwell/jobstore/JobStoreActor.scala b/engine/src/main/scala/cromwell/jobstore/JobStoreActor.scala index 1c308244c..5365ca5ad 100644 --- a/engine/src/main/scala/cromwell/jobstore/JobStoreActor.scala +++ b/engine/src/main/scala/cromwell/jobstore/JobStoreActor.scala @@ -13,10 +13,10 @@ import scala.language.postfixOps * * This level of indirection is a tiny bit awkward but allows the database to be injected. */ -class JobStoreActor(database: JobStore, dbBatchSize: Int, dbFlushRate: FiniteDuration) extends Actor { +class JobStoreActor(jobStore: JobStore, dbBatchSize: Int, dbFlushRate: FiniteDuration) extends Actor { import JobStoreActor._ - val jobStoreWriterActor = context.actorOf(JobStoreWriterActor.props(database, dbBatchSize, dbFlushRate)) - val jobStoreReaderActor = context.actorOf(JobStoreReaderActor.props(database)) + val jobStoreWriterActor = context.actorOf(JobStoreWriterActor.props(jobStore, dbBatchSize, dbFlushRate)) + val jobStoreReaderActor = context.actorOf(JobStoreReaderActor.props(jobStore)) override def receive: Receive = { case command: JobStoreWriterCommand => jobStoreWriterActor.tell(command, sender()) diff --git a/engine/src/main/scala/cromwell/jobstore/SqlJobStore.scala b/engine/src/main/scala/cromwell/jobstore/SqlJobStore.scala index 2df3515cf..8e185acc2 100644 --- a/engine/src/main/scala/cromwell/jobstore/SqlJobStore.scala +++ b/engine/src/main/scala/cromwell/jobstore/SqlJobStore.scala @@ -1,24 +1,34 @@ package cromwell.jobstore +import cats.instances.future._ +import cats.instances.list._ +import cats.syntax.traverse._ + import cromwell.Simpletons._ import cromwell.backend.async.JobAlreadyFailedInJobStore import cromwell.core.ExecutionIndex._ import cromwell.core.simpleton.WdlValueBuilder import cromwell.core.simpleton.WdlValueSimpleton._ -import cromwell.database.sql.JobStoreSqlDatabase import cromwell.database.sql.SqlConverters._ +import cromwell.database.sql.SqlDatabase import cromwell.database.sql.joins.JobStoreJoin import cromwell.database.sql.tables.{JobStoreEntry, JobStoreSimpletonEntry} import cromwell.jobstore.JobStore.{JobCompletion, WorkflowCompletion} +import org.slf4j.LoggerFactory import wdl4s.TaskOutput import scala.concurrent.{ExecutionContext, Future} -class SqlJobStore(sqlDatabase: JobStoreSqlDatabase) extends JobStore { +class SqlJobStore(sqlDatabase: SqlDatabase) extends JobStore { + val log = LoggerFactory.getLogger(classOf[SqlJobStore]) + override def writeToDatabase(workflowCompletions: Seq[WorkflowCompletion], jobCompletions: Seq[JobCompletion], batchSize: Int)(implicit ec: ExecutionContext): Future[Unit] = { + val completedWorkflowIds = workflowCompletions.toList.map(_.workflowId.toString) for { _ <- sqlDatabase.addJobStores(jobCompletions map toDatabase, batchSize) - _ <- sqlDatabase.removeJobStores(workflowCompletions.map(_.workflowId.toString)) + _ <- completedWorkflowIds traverse sqlDatabase.removeWorkflowStoreEntry + _ <- completedWorkflowIds traverse sqlDatabase.removeDockerHashStoreEntries + _ <- sqlDatabase.removeJobStores(completedWorkflowIds) } yield () } diff --git a/engine/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala b/engine/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala index 9a68c60ef..d16bbe3bc 100644 --- a/engine/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala +++ b/engine/src/test/scala/cromwell/engine/WorkflowStoreActorSpec.scala @@ -1,8 +1,7 @@ package cromwell.engine import cats.data.NonEmptyList -import cromwell.{CromwellTestKitSpec, CromwellTestKitWordSpec} -import cromwell.core.{WorkflowId, WorkflowSourceFilesCollection} +import cromwell.core.WorkflowSourceFilesCollection import cromwell.database.sql.SqlDatabase import cromwell.engine.workflow.workflowstore.WorkflowStoreActor._ import cromwell.engine.workflow.workflowstore.WorkflowStoreEngineActor.{NewWorkflowsToStart, NoNewWorkflowsToStart} @@ -13,12 +12,13 @@ import cromwell.services.metadata.MetadataService.{GetMetadataQueryAction, Metad import cromwell.services.metadata.impl.ReadMetadataActor import cromwell.util.EncryptionSpec import cromwell.util.SampleWdl.HelloWorld +import cromwell.{CromwellTestKitSpec, CromwellTestKitWordSpec} +import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfter, Matchers} import org.specs2.mock.Mockito -import org.mockito.Mockito._ -import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContext, Future} import scala.language.postfixOps class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with Matchers with BeforeAndAfter with Mockito { @@ -158,24 +158,9 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with Matchers with } } - "remove workflows which exist" in { - val store = new InMemoryWorkflowStore - val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance, database)) - storeActor ! SubmitWorkflow(helloWorldSourceFiles) - val id = expectMsgType[WorkflowSubmittedToStore](10 seconds).workflowId - storeActor ! RemoveWorkflow(id) - storeActor ! FetchRunnableWorkflows(100) - expectMsgPF(10 seconds) { - case NoNewWorkflowsToStart => // Great - case x => fail(s"Unexpected response from supposedly empty WorkflowStore: $x") - } - } - "remain responsive if you ask to remove a workflow it doesn't have" in { val store = new InMemoryWorkflowStore val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance, database)) - val id = WorkflowId.randomId() - storeActor ! RemoveWorkflow(id) storeActor ! FetchRunnableWorkflows(100) expectMsgPF(10 seconds) { From 44ba3dfb2d61de84b8c84bdf53a9ff3dc20cfc39 Mon Sep 17 00:00:00 2001 From: Adam Struck Date: Mon, 5 Jun 2017 07:43:14 -0700 Subject: [PATCH 04/74] catching up backend to the TES schema (#2293) added google storage support, let TES handle creating the command script, updated funnel commit for centaur tests --- README.md | 13 +- .../backend/validation/CpuValidation.scala | 1 + .../backend/validation/MemoryValidation.scala | 30 ++-- .../validation/RuntimeAttributesValidation.scala | 8 +- src/bin/travis/resources/funnel.conf | 10 ++ src/bin/travis/resources/tes.conf | 19 --- src/bin/travis/resources/tes_centaur.conf | 2 +- src/bin/travis/testCentaurTes.sh | 42 +++-- .../tes/TesAsyncBackendJobExecutionActor.scala | 49 +++--- .../backend/impl/tes/TesBackendFileHashing.scala | 21 --- .../backend/impl/tes/TesInitializationActor.scala | 28 ++++ .../cromwell/backend/impl/tes/TesJobPaths.scala | 24 ++- .../impl/tes/TesResponseJsonFormatter.scala | 26 ++-- .../backend/impl/tes/TesRuntimeAttributes.scala | 87 ++--------- .../scala/cromwell/backend/impl/tes/TesTask.scala | 171 ++++++++++----------- .../impl/tes/TesRuntimeAttributesSpec.scala | 19 ++- 16 files changed, 244 insertions(+), 306 deletions(-) create mode 100644 src/bin/travis/resources/funnel.conf delete mode 100644 src/bin/travis/resources/tes.conf delete mode 100644 supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesBackendFileHashing.scala diff --git a/README.md b/README.md index 1c228c7af..f79ced333 100644 --- a/README.md +++ b/README.md @@ -841,7 +841,7 @@ backend { TES { actor-factory = "cromwell.backend.impl.tes.TesBackendLifecycleActorFactory" config { - endpoint = "https:///v1/jobs" + endpoint = "https:///v1/tasks" root = "cromwell-executions" dockerRoot = "/cromwell-executions" concurrent-job-limit = 1000 @@ -859,16 +859,13 @@ This backend supports the following optional runtime attributes / workflow optio * docker: Docker image to use such as "Ubuntu". * dockerWorkingDir: defines the working directory in the container. -Outputs: -It will use `dockerOutputDir` runtime attribute / workflow option to resolve the folder in which the execution results will placed. If there is no `dockerWorkingDir` defined it will use `/cromwell-executions//call-/execution`. - ### CPU, Memory and Disk This backend supports CPU, memory and disk size configuration through the use of the following runtime attributes / workflow options: -* cpu: defines the amount of CPU to use. Default value: 1. Type: Integer. Ex: 4. -* memory: defines the amount of memory to use. Default value: "2 GB". Type: String. Ex: "4 GB" or "4096 MB" -* disk: defines the amount of disk to use. Default value: "2 GB". Type: String. Ex: "1 GB" or "1024 MB" +* cpu: defines the amount of CPU to use. Type: Integer. Ex: 4. +* memory: defines the amount of memory to use. Type: String. Ex: "4 GB" or "4096 MB" +* disk: defines the amount of disk to use. Type: String. Ex: "1 GB" or "1024 MB" -It they are not set, the TES backend will use default values. +If they are not set, the TES backend may use default values. ## Sun GridEngine Backend diff --git a/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala b/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala index 81cce8d0d..0d398a324 100644 --- a/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/CpuValidation.scala @@ -18,6 +18,7 @@ import wdl4s.values.{WdlInteger, WdlValue} */ object CpuValidation { lazy val instance: RuntimeAttributesValidation[Int] = new CpuValidation + lazy val optional: OptionalRuntimeAttributesValidation[Int] = instance.optional lazy val default: WdlValue = WdlInteger(1) def configDefaultWdlValue(config: Option[Config]): Option[WdlValue] = instance.configDefaultWdlValue(config) } diff --git a/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala b/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala index bc0fb32da..142e541d9 100644 --- a/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/MemoryValidation.scala @@ -39,31 +39,31 @@ object MemoryValidation { } private[validation] val wrongAmountFormat = - s"Expecting ${RuntimeAttributesKeys.MemoryKey} runtime attribute value greater than 0 but got %s" + "Expecting %s runtime attribute value greater than 0 but got %s" private[validation] val wrongTypeFormat = - s"Expecting ${RuntimeAttributesKeys.MemoryKey} runtime attribute to be an Integer or String with format '8 GB'." + - s" Exception: %s" + "Expecting %s runtime attribute to be an Integer or String with format '8 GB'." + + " Exception: %s" - private[validation] def validateMemoryString(wdlString: WdlString): ErrorOr[MemorySize] = - validateMemoryString(wdlString.value) + private[validation] def validateMemoryString(attributeName: String, wdlString: WdlString): ErrorOr[MemorySize] = + validateMemoryString(attributeName, wdlString.value) - private[validation] def validateMemoryString(value: String): ErrorOr[MemorySize] = { + private[validation] def validateMemoryString(attributeName: String, value: String): ErrorOr[MemorySize] = { MemorySize.parse(value) match { case scala.util.Success(memorySize: MemorySize) if memorySize.amount > 0 => memorySize.to(MemoryUnit.GB).validNel case scala.util.Success(memorySize: MemorySize) => - wrongAmountFormat.format(memorySize.amount).invalidNel + wrongAmountFormat.format(attributeName, memorySize.amount).invalidNel case scala.util.Failure(throwable) => - wrongTypeFormat.format(throwable.getMessage).invalidNel + wrongTypeFormat.format(attributeName, throwable.getMessage).invalidNel } } - private[validation] def validateMemoryInteger(wdlInteger: WdlInteger): ErrorOr[MemorySize] = - validateMemoryInteger(wdlInteger.value) + private[validation] def validateMemoryInteger(attributeName: String, wdlInteger: WdlInteger): ErrorOr[MemorySize] = + validateMemoryInteger(attributeName, wdlInteger.value) - private[validation] def validateMemoryInteger(value: Int): ErrorOr[MemorySize] = { + private[validation] def validateMemoryInteger(attributeName: String, value: Int): ErrorOr[MemorySize] = { if (value <= 0) - wrongAmountFormat.format(value).invalidNel + wrongAmountFormat.format(attributeName, value).invalidNel else MemorySize(value.toDouble, MemoryUnit.Bytes).to(MemoryUnit.GB).validNel } @@ -78,9 +78,9 @@ class MemoryValidation(attributeName: String = RuntimeAttributesKeys.MemoryKey) override def coercion = Seq(WdlIntegerType, WdlStringType) override protected def validateValue: PartialFunction[WdlValue, ErrorOr[MemorySize]] = { - case WdlInteger(value) => MemoryValidation.validateMemoryInteger(value) - case WdlString(value) => MemoryValidation.validateMemoryString(value) + case WdlInteger(value) => MemoryValidation.validateMemoryInteger(key, value) + case WdlString(value) => MemoryValidation.validateMemoryString(key, value) } - override def missingValueMessage: String = wrongTypeFormat.format("Not supported WDL type value") + override def missingValueMessage: String = wrongTypeFormat.format(key, "Not supported WDL type value") } diff --git a/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala b/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala index 6635747a1..907cf6ea9 100644 --- a/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala +++ b/backend/src/main/scala/cromwell/backend/validation/RuntimeAttributesValidation.scala @@ -65,12 +65,12 @@ object RuntimeAttributesValidation { } } - def parseMemoryString(s: WdlString): ErrorOr[MemorySize] = { - MemoryValidation.validateMemoryString(s) + def parseMemoryString(k: String, s: WdlString): ErrorOr[MemorySize] = { + MemoryValidation.validateMemoryString(k, s) } - def parseMemoryInteger(i: WdlInteger): ErrorOr[MemorySize] = { - MemoryValidation.validateMemoryInteger(i) + def parseMemoryInteger(k: String, i: WdlInteger): ErrorOr[MemorySize] = { + MemoryValidation.validateMemoryInteger(k, i) } def withDefault[ValidatedType](validation: RuntimeAttributesValidation[ValidatedType], diff --git a/src/bin/travis/resources/funnel.conf b/src/bin/travis/resources/funnel.conf new file mode 100644 index 000000000..08dbab623 --- /dev/null +++ b/src/bin/travis/resources/funnel.conf @@ -0,0 +1,10 @@ +HttpPort: 9000 +Storage: + - Local: + AllowedDirs: + - /home/ + - /cromwell-executions + - /tmp/ +DBPath: /tmp/tes_task.db +Scheduler: local +LogLevel: info diff --git a/src/bin/travis/resources/tes.conf b/src/bin/travis/resources/tes.conf deleted file mode 100644 index f1a0ff157..000000000 --- a/src/bin/travis/resources/tes.conf +++ /dev/null @@ -1,19 +0,0 @@ -HttpPort: 9000 -LogLevel: info -Storage: - - Local: - AllowedDirs: - - /home/ - - /cromwell-executions - - /tmp/ -DBPath: /tmp/tes_task.db -Scheduler: local -Worker: - LogLevel: info - Timeout: -1 - # Funnel (TES implementation) respects resource reqs - # Defaults 1 cpu 2 GB ram would make centaur take too long to run - Resources: - cpus: 100 - ram: 200 - disk: 1000 diff --git a/src/bin/travis/resources/tes_centaur.conf b/src/bin/travis/resources/tes_centaur.conf index 4254c1cb4..32ddd27ae 100644 --- a/src/bin/travis/resources/tes_centaur.conf +++ b/src/bin/travis/resources/tes_centaur.conf @@ -25,7 +25,7 @@ backend { config { root = "cromwell-executions" dockerRoot = "/cromwell-executions" - endpoint = "http://127.0.0.1:9000/v1/jobs" + endpoint = "http://127.0.0.1:9000/v1/tasks" } } } diff --git a/src/bin/travis/testCentaurTes.sh b/src/bin/travis/testCentaurTes.sh index 3775e27ed..f2976a758 100755 --- a/src/bin/travis/testCentaurTes.sh +++ b/src/bin/travis/testCentaurTes.sh @@ -16,8 +16,8 @@ killTravisHeartbeat() { } exitScript() { - echo "TES LOG" - cat logs/tes.log + echo "FUNNEL LOG" + cat logs/funnel.log echo "CROMWELL LOG" cat logs/cromwell.log echo "CENTAUR LOG" @@ -31,26 +31,48 @@ printTravisHeartbeat set -x set -e +WORKDIR=$(pwd) + sbt assembly CROMWELL_JAR=$(find "$(pwd)/target/scala-2.11" -name "cromwell-*.jar") TES_CENTAUR_CONF="$(pwd)/src/bin/travis/resources/tes_centaur.conf" git clone https://github.com/broadinstitute/centaur.git cd centaur git checkout ${CENTAUR_BRANCH} -cd .. +cd $WORKDIR + -TES_CONF="$(pwd)/src/bin/travis/resources/tes.conf" -git clone https://github.com/ohsu-comp-bio/funnel.git -cd funnel -git checkout 7adbf0b +FUNNEL_CONF="$(pwd)/src/bin/travis/resources/funnel.conf" +wget https://storage.googleapis.com/golang/go1.8.1.linux-amd64.tar.gz +tar xfz go1.8.1.linux-amd64.tar.gz +export GOROOT=$WORKDIR/go +mkdir go-lib +export GOPATH=$WORKDIR/go-lib +go get github.com/ohsu-comp-bio/funnel +cd $GOPATH/src/github.com/ohsu-comp-bio/funnel +git checkout 04c5e03 make -cd .. +cd $WORKDIR mkdir logs -nohup funnel/bin/tes-server -config ${TES_CONF} > logs/tes.log 2>&1 & +nohup $GOPATH/bin/funnel server --config ${FUNNEL_CONF} > logs/funnel.log 2>&1 & # All tests use ubuntu:latest - make sure it's there before starting the tests # because pulling the image during some of the tests would cause them to fail # (specifically output_redirection which expects a specific value in stderr) docker pull ubuntu:latest -centaur/test_cromwell.sh -j"${CROMWELL_JAR}" -c${TES_CENTAUR_CONF} -elocaldockertest + +# The following tests are skipped: +# +# non_root_specified_user: TES doesn't support switching users in the image +# write_lines_files: all inputs are read-only in TES +# lots_of_inputs: Funnel mounts in each input separately, this task surpasses the docker limit for volumes +# call_cache_capoeira_local: fails on task 'read_files_without_docker' since the 'docker' runtime key is required for this backend +# +centaur/test_cromwell.sh \ +-j ${CROMWELL_JAR} \ +-c ${TES_CENTAUR_CONF} \ +-e non_root_specified_user \ +-e write_lines_files \ +-e lots_of_inputs \ +-e call_cache_capoeira_local diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala index cfc3a4df9..723b781b1 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala @@ -92,20 +92,24 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn } } - def createTaskMessage(): TesTaskMessage = { - val task = TesTask(jobDescriptor, configurationDescriptor, jobLogger, tesJobPaths, runtimeAttributes, commandDirectory, - backendEngineFunctions, realDockerImageUsed) - - tesJobPaths.script.write(commandScriptContents) - - TesTaskMessage( + def createTaskMessage(): Task = { + val task = TesTask(jobDescriptor, configurationDescriptor, jobLogger, tesJobPaths, + runtimeAttributes, commandDirectory, commandScriptContents, backendEngineFunctions, + realDockerImageUsed) + + Task( + None, + None, Option(task.name), Option(task.description), Option(task.project), Option(task.inputs(commandLineValueMapper)), Option(task.outputs), - task.resources, - task.dockerExecutor + Option(task.resources), + task.executors, + None, + None, + None ) } @@ -114,12 +118,12 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn tesJobPaths.callExecutionRoot.createPermissionedDirectories() val taskMessage = createTaskMessage() - val submitTask = pipeline[TesPostResponse] + val submitTask = pipeline[CreateTaskResponse] .apply(Post(tesEndpoint, taskMessage)) submitTask.map { response => - val jobID = response.value + val jobID = response.id PendingExecutionHandle(jobDescriptor, StandardAsyncJob(jobID), None, previousStatus = None) } } @@ -136,8 +140,8 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn returnCodeTmp.delete(true) } - val abortRequest = pipeline[TesGetResponse] - .apply(Delete(s"$tesEndpoint/${job.jobId}")) + val abortRequest = pipeline[CancelTaskResponse] + .apply(Post(s"$tesEndpoint/${job.jobId}:cancel")) abortRequest onComplete { case Success(_) => jobLogger.info("{} Aborted {}", tag: Any, job.jobId) case Failure(ex) => jobLogger.warn("{} Failed to abort {}: {}", tag, job.jobId, ex.getMessage) @@ -146,21 +150,21 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn } override def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle)(implicit ec: ExecutionContext): Future[TesRunStatus] = { - val pollTask = pipeline[TesGetResponse].apply(Get(s"$tesEndpoint/${handle.pendingJob.jobId}")) + val pollTask = pipeline[MinimalTaskView].apply(Get(s"$tesEndpoint/${handle.pendingJob.jobId}?view=MINIMAL")) pollTask.map { response => val state = response.state state match { - case s if s.contains("Complete") => + case s if s.contains("COMPLETE") => jobLogger.info(s"Job ${handle.pendingJob.jobId} is complete") Complete - case s if s.contains("Cancel") => + case s if s.contains("CANCELED") => jobLogger.info(s"Job ${handle.pendingJob.jobId} was canceled") FailedOrError - case s if s.contains("Error") => + case s if s.contains("ERROR") => jobLogger.info(s"TES reported an error for Job ${handle.pendingJob.jobId}") FailedOrError @@ -185,21 +189,18 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn case _ => false } } - - private def hostAbsoluteFilePath(wdlFile: WdlFile): Path = { - tesJobPaths.callExecutionRoot.resolve(wdlFile.value) - } private val outputWdlFiles: Seq[WdlFile] = jobDescriptor.call.task .findOutputFiles(jobDescriptor.fullyQualifiedInputs, NoFunctions) .filter(o => !DefaultPathBuilder.get(o.valueString).isAbsolute) override def mapOutputWdlFile(wdlFile: WdlFile): WdlFile = { + val absPath: Path = tesJobPaths.callExecutionRoot.resolve(wdlFile.valueString) wdlFile match { - case fileNotFound if !hostAbsoluteFilePath(fileNotFound).exists && outputWdlFiles.contains(fileNotFound) => + case fileNotFound if !absPath.exists && outputWdlFiles.contains(fileNotFound) => throw new RuntimeException("Could not process output, file not found: " + - s"${hostAbsoluteFilePath(wdlFile).pathAsString}") - case _ => WdlFile(hostAbsoluteFilePath(wdlFile).pathAsString) + s"${absPath.pathAsString}") + case _ => WdlFile(absPath.pathAsString) } } } diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesBackendFileHashing.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesBackendFileHashing.scala deleted file mode 100644 index 8fcb1f055..000000000 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesBackendFileHashing.scala +++ /dev/null @@ -1,21 +0,0 @@ -package cromwell.backend.impl.tes - -import akka.event.LoggingAdapter -import cromwell.backend.standard.callcaching.StandardFileHashingActor.SingleFileHashRequest -import cromwell.core.path.DefaultPathBuilder -import cromwell.util.TryWithResource._ - -import scala.language.postfixOps -import scala.util.Try - -private[tes] object TesBackendFileHashing { - def getMd5Result(request: SingleFileHashRequest, log: LoggingAdapter): Try[String] = { - val path = DefaultPathBuilder.build(request.file.valueString) recover { - case failure => throw new RuntimeException("Failed to construct path to hash", failure) - } get - - tryWithResource(() => path.newInputStream) { inputStream => - org.apache.commons.codec.digest.DigestUtils.md5Hex(inputStream) - } - } -} diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesInitializationActor.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesInitializationActor.scala index 7da53c5c3..7ee1efdf6 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesInitializationActor.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesInitializationActor.scala @@ -1,9 +1,14 @@ package cromwell.backend.impl.tes import akka.actor.ActorRef +import cats.data.Validated.{Invalid, Valid} import cromwell.backend.standard._ import cromwell.backend.{BackendConfigurationDescriptor, BackendInitializationData, BackendWorkflowDescriptor} +import cromwell.core.path.{DefaultPathBuilderFactory, PathBuilder, PathBuilderFactory} +import cromwell.filesystems.gcs.{GcsPathBuilderFactory, GoogleConfiguration} +import lenthall.exception.MessageAggregation import wdl4s.TaskCall +import net.ceedubs.ficus.Ficus._ import scala.concurrent.Future import scala.util.Try @@ -23,6 +28,29 @@ class TesInitializationActor(params: TesInitializationActorParams) private val tesConfiguration = params.tesConfiguration + /** + * If the backend sets a gcs authentication mode, try to create a PathBuilderFactory with it. + */ + lazy val gcsPathBuilderFactory: Option[GcsPathBuilderFactory] = { + configurationDescriptor.backendConfig.as[Option[String]]("filesystems.gcs.auth") map { configAuth => + val googleConfiguration = GoogleConfiguration(configurationDescriptor.globalConfig) + googleConfiguration.auth(configAuth) match { + case Valid(auth) => GcsPathBuilderFactory(auth, googleConfiguration.applicationName) + case Invalid(error) => throw new MessageAggregation { + override def exceptionContext: String = "Failed to parse gcs auth configuration" + + override def errorMessages: Traversable[String] = error.toList + } + } + } + } + + lazy val pathBuilderFactories: List[PathBuilderFactory] = + List(gcsPathBuilderFactory, Option(DefaultPathBuilderFactory)).flatten + + override lazy val pathBuilders: List[PathBuilder] = + pathBuilderFactories map { _.withOptions(workflowDescriptor.workflowOptions)(context.system) } + override lazy val workflowPaths: TesWorkflowPaths = new TesWorkflowPaths(workflowDescriptor, tesConfiguration.configurationDescriptor.backendConfig, pathBuilders) diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesJobPaths.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesJobPaths.scala index 2091c85c0..b270d0864 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesJobPaths.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesJobPaths.scala @@ -3,7 +3,7 @@ package cromwell.backend.impl.tes import com.typesafe.config.Config import cromwell.backend.{BackendJobDescriptorKey, BackendWorkflowDescriptor} import cromwell.backend.io.{JobPaths, WorkflowPaths} -import cromwell.core.path.{DefaultPathBuilder, Path, PathBuilder} +import cromwell.core.path._ object TesJobPaths { def apply(jobKey: BackendJobDescriptorKey, @@ -28,18 +28,13 @@ case class TesJobPaths private[tes] (override val workflowPaths: TesWorkflowPath val callInputsDockerRoot = callDockerRoot.resolve("inputs") val callInputsRoot = callRoot.resolve("inputs") - //TODO move to TesConfiguration - private def prefixScheme(path: String): String = "file://" + path - - def storageInput(path: String): String = prefixScheme(path) - // Given an output path, return a path localized to the storage file system def storageOutput(path: String): String = { - prefixScheme(callExecutionRoot.resolve(path).toString) + callExecutionRoot.resolve(path).toString } def containerInput(path: String): String = { - cleanContainerInputPath(callInputsDockerRoot, DefaultPathBuilder.get(path)) + cleanContainerInputPath(callInputsDockerRoot, path) } // Given an output path, return a path localized to the container file system @@ -53,12 +48,15 @@ case class TesJobPaths private[tes] (override val workflowPaths: TesWorkflowPath cwd.resolve(path).toString } - private def cleanContainerInputPath(inputDir: Path, path: Path): String = { - path.toAbsolutePath match { - case p if p.startsWith(callExecutionRoot) => - callExecutionDockerRoot.resolve(p.getFileName.toString).toString + private def cleanContainerInputPath(inputDir: Path, path: String): String = { + path match { + case p if p.startsWith("gs:") => + inputDir.resolve(p.replaceFirst("gs:/?/?", "")).pathAsString + case p if p.startsWith(callExecutionRoot.pathAsString) => + val f = DefaultPathBuilder.get(p) + callExecutionDockerRoot.resolve(f.name).pathAsString case p => - inputDir + p.toString + inputDir.resolve(p).pathAsString } } } diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesResponseJsonFormatter.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesResponseJsonFormatter.scala index 111f7e198..79a2be702 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesResponseJsonFormatter.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesResponseJsonFormatter.scala @@ -2,24 +2,20 @@ package cromwell.backend.impl.tes import spray.json._ -final case class TesPostResponse(value: String) - -final case class TesGetResponse( - jobID: String, - task: TesTaskMessage, - state: String, - logs: Option[Seq[JobLogs]] - ) - +final case class CreateTaskResponse(id: String) +final case class MinimalTaskView(id: String, state: String) +final case class CancelTaskResponse() object TesResponseJsonFormatter extends DefaultJsonProtocol { - implicit val volumeFormat = jsonFormat5(Volume) implicit val resourcesFormat = jsonFormat5(Resources) implicit val taskParameterFormat = jsonFormat6(TaskParameter) implicit val portsFormat = jsonFormat2(Ports) - implicit val dockerExecutorFormat = jsonFormat7(DockerExecutor) - implicit val tesTaskMessageFormat = jsonFormat7(TesTaskMessage) - implicit val jobLogsFormat = jsonFormat8(JobLogs) - implicit val tesPostResponseFormat = jsonFormat1(TesPostResponse) - implicit val tesGetResponseFormat = jsonFormat4(TesGetResponse) + implicit val executorFormat = jsonFormat8(Executor) + implicit val executorLogFormat = jsonFormat7(ExecutorLog) + implicit val outputFileLogFormat = jsonFormat3(OutputFileLog) + implicit val taskLogFormat = jsonFormat5(TaskLog) + implicit val taskFormat = jsonFormat12(Task) + implicit val minimalTaskView = jsonFormat2(MinimalTaskView) + implicit val createTaskResponseFormat = jsonFormat1(CreateTaskResponse) + implicit val cancelTaskResponseFormat = jsonFormat0(CancelTaskResponse) } diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesRuntimeAttributes.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesRuntimeAttributes.scala index 2d28e6212..dfd8747b9 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesRuntimeAttributes.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesRuntimeAttributes.scala @@ -6,43 +6,30 @@ import cromwell.backend.MemorySize import cromwell.backend.standard.StandardValidatedRuntimeAttributesBuilder import cromwell.backend.validation._ import lenthall.validation.ErrorOr.ErrorOr -import wdl4s.parser.MemoryUnit -import wdl4s.values.{WdlInteger, WdlString, WdlValue} - -import scala.util.{Failure, Success} +import wdl4s.values.{WdlString, WdlValue} case class TesRuntimeAttributes(continueOnReturnCode: ContinueOnReturnCode, dockerImage: String, dockerWorkingDir: Option[String], failOnStderr: Boolean, - cpu: Int, - memory: MemorySize, - disk: MemorySize) + cpu: Option[Int], + memory: Option[MemorySize], + disk: Option[MemorySize]) object TesRuntimeAttributes { val DockerWorkingDirKey = "dockerWorkingDir" - - private val MemoryDefaultValue = "2 GB" - val DiskSizeKey = "disk" - private val DiskSizeDefaultValue = "2 GB" - private def cpuValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[Int] = CpuValidation.instance - .withDefault(CpuValidation.configDefaultWdlValue(runtimeConfig) getOrElse CpuValidation.default) + private def cpuValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[Int] = CpuValidation.optional private def failOnStderrValidation(runtimeConfig: Option[Config]) = FailOnStderrValidation.default(runtimeConfig) private def continueOnReturnCodeValidation(runtimeConfig: Option[Config]) = ContinueOnReturnCodeValidation.default(runtimeConfig) - private def diskSizeValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] = DiskSizeValidation - .withDefaultDiskSize(DiskSizeValidation.configDefaultString(runtimeConfig) getOrElse DiskSizeDefaultValue) + private def diskSizeValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[MemorySize] = MemoryValidation.optional(DiskSizeKey) - private def memoryValidation(runtimeConfig: Option[Config]): RuntimeAttributesValidation[MemorySize] = { - MemoryValidation.withDefaultMemory( - RuntimeAttributesKeys.MemoryKey, - MemoryValidation.configDefaultString(RuntimeAttributesKeys.MemoryKey, runtimeConfig) getOrElse MemoryDefaultValue) - } + private def memoryValidation(runtimeConfig: Option[Config]): OptionalRuntimeAttributesValidation[MemorySize] = MemoryValidation.optional(RuntimeAttributesKeys.MemoryKey) private val dockerValidation: RuntimeAttributesValidation[String] = DockerValidation.instance @@ -60,9 +47,9 @@ object TesRuntimeAttributes { def apply(validatedRuntimeAttributes: ValidatedRuntimeAttributes, backendRuntimeConfig: Option[Config]): TesRuntimeAttributes = { val docker: String = RuntimeAttributesValidation.extract(dockerValidation, validatedRuntimeAttributes) val dockerWorkingDir: Option[String] = RuntimeAttributesValidation.extractOption(dockerWorkingDirValidation.key, validatedRuntimeAttributes) - val cpu: Int = RuntimeAttributesValidation.extract(cpuValidation(backendRuntimeConfig), validatedRuntimeAttributes) - val memory: MemorySize = RuntimeAttributesValidation.extract(memoryValidation(backendRuntimeConfig), validatedRuntimeAttributes) - val disk: MemorySize = RuntimeAttributesValidation.extract(diskSizeValidation(backendRuntimeConfig), validatedRuntimeAttributes) + val cpu: Option[Int] = RuntimeAttributesValidation.extractOption(cpuValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) + val memory: Option[MemorySize] = RuntimeAttributesValidation.extractOption(memoryValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) + val disk: Option[MemorySize] = RuntimeAttributesValidation.extractOption(diskSizeValidation(backendRuntimeConfig).key, validatedRuntimeAttributes) val failOnStderr: Boolean = RuntimeAttributesValidation.extract(failOnStderrValidation(backendRuntimeConfig), validatedRuntimeAttributes) val continueOnReturnCode: ContinueOnReturnCode = @@ -92,57 +79,3 @@ class DockerWorkingDirValidation extends StringRuntimeAttributesValidation(TesRu } } -object DiskSizeValidation { - lazy val instance: RuntimeAttributesValidation[MemorySize] = new DiskSizeValidation - lazy val optional: OptionalRuntimeAttributesValidation[MemorySize] = instance.optional - def configDefaultString(runtimeConfig: Option[Config]): Option[String] = instance.configDefaultValue(runtimeConfig) - def withDefaultDiskSize(memorySize: String): RuntimeAttributesValidation[MemorySize] = { - MemorySize.parse(memorySize) match { - case Success(memory) => instance.withDefault(WdlInteger(memory.bytes.toInt)) - case Failure(_) => instance.withDefault(BadDefaultAttribute(WdlString(memorySize.toString))) - } - } - - private val wrongAmountFormat = - s"Expecting ${TesRuntimeAttributes.DiskSizeKey} runtime attribute value greater than 0 but got %s" - private val wrongTypeFormat = - s"Expecting ${TesRuntimeAttributes.DiskSizeKey} runtime attribute to be an Integer or String with format '8 GB'." + - s" Exception: %s" - - def validateDiskSizeString(wdlString: WdlString): ErrorOr[MemorySize] = - validateDiskSizeString(wdlString.value) - - def validateDiskSizeString(value: String): ErrorOr[MemorySize] = { - MemorySize.parse(value) match { - case scala.util.Success(memorySize: MemorySize) if memorySize.amount > 0 => - memorySize.to(MemoryUnit.GB).validNel - case scala.util.Success(memorySize: MemorySize) => - wrongAmountFormat.format(memorySize.amount).invalidNel - case scala.util.Failure(throwable) => - wrongTypeFormat.format(throwable.getMessage).invalidNel - } - } - - def validateDiskSizeInteger(wdlInteger: WdlInteger): ErrorOr[MemorySize] = - validateDiskSizeInteger(wdlInteger.value) - - def validateDiskSizeInteger(value: Int): ErrorOr[MemorySize] = { - if (value <= 0) - wrongAmountFormat.format(value).invalidNel - else - MemorySize(value.toDouble, MemoryUnit.Bytes).to(MemoryUnit.GB).validNel - } -} - -class DiskSizeValidation extends MemoryValidation { - override def key = TesRuntimeAttributes.DiskSizeKey - - override protected def validateValue: PartialFunction[WdlValue, ErrorOr[MemorySize]] = { - case WdlInteger(value) => DiskSizeValidation.validateDiskSizeInteger(value) - case WdlString(value) => DiskSizeValidation.validateDiskSizeString(value) - } - - override def missingValueMessage: String = DiskSizeValidation.wrongTypeFormat.format("Not supported WDL type value") -} - - diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala index 6aa535eb6..17e3fb037 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala @@ -15,6 +15,7 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor, tesPaths: TesJobPaths, runtimeAttributes: TesRuntimeAttributes, containerWorkDir: Path, + commandScriptContents: String, backendEngineFunctions: StandardExpressionFunctions, dockerImageUsed: String) { @@ -26,17 +27,22 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor, // TODO validate "project" field of workflowOptions val project = { - workflowDescriptor.workflowOptions.getOrElse("project", workflowName) + workflowDescriptor.workflowOptions.getOrElse("project", "") } // contains the script to be executed private val commandScript = TaskParameter( Option("commandScript"), Option(fullyQualifiedTaskName + ".commandScript"), - tesPaths.storageInput(tesPaths.script.toString), + None, tesPaths.callExecutionDockerRoot.resolve("script").toString, - "File", - Option(false) + Option("FILE"), + Option(commandScriptContents) + ) + + private val commandScriptOut = commandScript.copy( + url = Option(tesPaths.script.toString), + contents = None ) private def writeFunctionFiles(commandLineValueMapper: WdlValue => WdlValue): Map[FullyQualifiedName, Seq[WdlFile]] = { @@ -65,10 +71,10 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor, case (f, index) => TaskParameter( Option(fullyQualifiedName + "." + index), Option(workflowName + "." + fullyQualifiedName + "." + index), - tesPaths.storageInput(f.value), + Option(f.value), tesPaths.containerInput(f.value), - "File", - Option(false) + Option("FILE"), + None ) } }.toList ++ Seq(commandScript) @@ -79,10 +85,10 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor, TaskParameter( Option(f), Option(fullyQualifiedTaskName + "." + f), - tesPaths.storageOutput(f), + Option(tesPaths.storageOutput(f)), tesPaths.containerOutput(containerWorkDir, f), - "File", - Option(false) + Option("FILE"), + None ) } @@ -109,10 +115,10 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor, TaskParameter( Option(fullyQualifiedTaskName + ".output." + index), Option(fullyQualifiedTaskName + ".output." + index), - tesPaths.storageOutput(outputFile), + Option(tesPaths.storageOutput(outputFile)), tesPaths.containerOutput(containerWorkDir, outputFile), - "File", - Option(false) + Option("FILE"), + None ) ) case (g: WdlGlobFile, index) => @@ -125,117 +131,104 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor, TaskParameter( Option(globDirName), Option(fullyQualifiedTaskName + "." + globDirName), - tesPaths.storageOutput(globDirectory), + Option(tesPaths.storageOutput(globDirectory)), tesPaths.containerOutput(containerWorkDir, globDirectory), - "Directory", - Option(false) + Option("DIRECTORY"), + None ), TaskParameter( Option(globListName), Option(fullyQualifiedTaskName + "." + globListName), - tesPaths.storageOutput(globListFile), + Option(tesPaths.storageOutput(globListFile)), tesPaths.containerOutput(containerWorkDir, globListFile), - "File", - Option(false) + Option("FILE"), + None ) ) } - val outputs: Seq[TaskParameter] = wdlOutputs ++ standardOutputs - - // TODO all volumes currently get the same disk requirements - private val workingDirVolume = runtimeAttributes - .dockerWorkingDir - .map(path => Volume( - Option(path), - runtimeAttributes.disk.to(MemoryUnit.GB).amount.toInt, - None, - path, - Option(false) - )) - - val volumes = Seq( - Volume( - Option(tesPaths.callInputsDockerRoot.toString), - runtimeAttributes.disk.to(MemoryUnit.GB).amount.toInt, - None, - tesPaths.callInputsDockerRoot.toString, - // inputs in read-only volume - Option(true) - ), - Volume( - Option(tesPaths.callExecutionDockerRoot.toString), - runtimeAttributes.disk.to(MemoryUnit.GB).amount.toInt, - None, - tesPaths.callExecutionDockerRoot.toString, - Option(false) - ) - ) ++ workingDirVolume + val outputs: Seq[TaskParameter] = wdlOutputs ++ standardOutputs ++ Seq(commandScriptOut) + + private val disk :: ram :: _ = Seq(runtimeAttributes.disk, runtimeAttributes.memory) map { + case Some(x) => + Option(x.to(MemoryUnit.GB).amount) + case None => + None + } val resources = Resources( runtimeAttributes.cpu, - runtimeAttributes.memory.to(MemoryUnit.GB).amount.toInt, + ram, + disk, Option(false), - volumes, None ) - val dockerExecutor = Seq(DockerExecutor( + val executors = Seq(Executor( dockerImageUsed, Seq("/bin/bash", commandScript.path), runtimeAttributes.dockerWorkingDir, Option(tesPaths.containerOutput(containerWorkDir, "stdout")), Option(tesPaths.containerOutput(containerWorkDir, "stderr")), None, + None, None )) } // Field requirements in classes below based off GA4GH schema - -final case class TesTaskMessage(name: Option[String], - description: Option[String], - projectId: Option[String], - inputs: Option[Seq[TaskParameter]], - outputs: Option[Seq[TaskParameter]], - resources: Resources, - docker: Seq[DockerExecutor]) - -final case class DockerExecutor(imageName: String, - cmd: Seq[String], - workdir: Option[String], - stdout: Option[String], - stderr: Option[String], - stdin: Option[String], - ports: Option[Seq[Ports]]) +final case class Task(id: Option[String], + state: Option[String], + name: Option[String], + description: Option[String], + project: Option[String], + inputs: Option[Seq[TaskParameter]], + outputs: Option[Seq[TaskParameter]], + resources: Option[Resources], + executors: Seq[Executor], + volumes: Option[Seq[String]], + tags: Option[Map[String, String]], + logs: Option[Seq[TaskLog]]) + +final case class Executor(image_name: String, + cmd: Seq[String], + workdir: Option[String], + stdout: Option[String], + stderr: Option[String], + stdin: Option[String], + environ: Option[Map[String, String]], + ports: Option[Seq[Ports]]) final case class TaskParameter(name: Option[String], description: Option[String], - location: String, + url: Option[String], path: String, - `class`: String, - create: Option[Boolean]) + `type`: Option[String], + contents: Option[String]) -final case class Resources(minimumCpuCores: Int, - minimumRamGb: Int, +final case class Resources(cpu_cores: Option[Int], + ram_gb: Option[Double], + size_gb: Option[Double], preemptible: Option[Boolean], - volumes: Seq[Volume], zones: Option[Seq[String]]) -final case class Volume(name: Option[String], - sizeGb: Int, - source: Option[String], - mountPoint: String, - readonly: Option[Boolean]) - -final case class JobLogs(cmd: Option[Seq[String]], - startTime: Option[String], - endTime: Option[String], - stdout: Option[String], - stderr: Option[String], - exitCode: Option[Int], - hostIP: Option[String], - ports: Option[Seq[Ports]]) +final case class OutputFileLog(url: String, + path: String, + size_bytes: Int) + +final case class TaskLog(start_time: Option[String], + end_time: Option[String], + metadata: Option[Map[String, String]], + logs: Option[Seq[ExecutorLog]], + outputs: Option[Seq[OutputFileLog]]) + +final case class ExecutorLog(start_time: Option[String], + end_time: Option[String], + stdout: Option[String], + stderr: Option[String], + exit_code: Option[Int], + host_ip: Option[String], + ports: Option[Seq[Ports]]) final case class Ports(host: Option[String], - container: String) \ No newline at end of file + container: String) diff --git a/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesRuntimeAttributesSpec.scala b/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesRuntimeAttributesSpec.scala index ee21eef66..09a76fd86 100644 --- a/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesRuntimeAttributesSpec.scala +++ b/supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesRuntimeAttributesSpec.scala @@ -6,7 +6,6 @@ import cromwell.core.WorkflowOptions import org.scalatest.{Matchers, WordSpecLike} import org.slf4j.helpers.NOPLogger import spray.json._ -import wdl4s.parser.MemoryUnit import wdl4s.types.{WdlArrayType, WdlIntegerType, WdlStringType} import wdl4s.values.{WdlArray, WdlBoolean, WdlInteger, WdlString, WdlValue} @@ -17,9 +16,9 @@ class TesRuntimeAttributesSpec extends WordSpecLike with Matchers { "ubuntu:latest", None, false, - 1, - MemorySize(2, MemoryUnit.GB), - MemorySize(2, MemoryUnit.GB) + None, + None, + None ) val expectedDefaultsPlusUbuntuDocker = expectedDefaults.copy(dockerImage = "ubuntu:latest") @@ -84,12 +83,12 @@ class TesRuntimeAttributesSpec extends WordSpecLike with Matchers { "validate a valid cpu entry" in assertSuccess( Map("docker" -> WdlString("ubuntu:latest"), "cpu" -> WdlInteger(2)), - expectedDefaultsPlusUbuntuDocker.copy(cpu = 2) + expectedDefaultsPlusUbuntuDocker.copy(cpu = Option(2)) ) "validate a valid cpu string entry" in { val runtimeAttributes = Map("docker" -> WdlString("ubuntu:latest"), "cpu" -> WdlString("2")) - val expectedRuntimeAttributes = expectedDefaultsPlusUbuntuDocker.copy(cpu = 2) + val expectedRuntimeAttributes = expectedDefaultsPlusUbuntuDocker.copy(cpu = Option(2)) assertSuccess(runtimeAttributes, expectedRuntimeAttributes) } @@ -100,7 +99,7 @@ class TesRuntimeAttributesSpec extends WordSpecLike with Matchers { "validate a valid memory entry" in { val runtimeAttributes = Map("docker" -> WdlString("ubuntu:latest"), "memory" -> WdlString("1 GB")) - val expectedRuntimeAttributes = expectedDefaults.copy(memory = MemorySize.parse("1 GB").get) + val expectedRuntimeAttributes = expectedDefaults.copy(memory = Option(MemorySize.parse("1 GB").get)) assertSuccess(runtimeAttributes, expectedRuntimeAttributes) } @@ -109,13 +108,13 @@ class TesRuntimeAttributesSpec extends WordSpecLike with Matchers { assertFailure(runtimeAttributes, "Expecting memory runtime attribute to be an Integer or String with format '8 GB'") } - "validate a valid disks entry" in { + "validate a valid disk entry" in { val runtimeAttributes = Map("docker" -> WdlString("ubuntu:latest"), "disk" -> WdlString("1 GB")) - val expectedRuntimeAttributes = expectedDefaults.copy(disk = MemorySize.parse("1 GB").get) + val expectedRuntimeAttributes = expectedDefaults.copy(disk = Option(MemorySize.parse("1 GB").get)) assertSuccess(runtimeAttributes, expectedRuntimeAttributes) } - "fail to validate an invalid disks entry" in { + "fail to validate an invalid disk entry" in { val runtimeAttributes = Map("docker" -> WdlString("ubuntu:latest"), "disk" -> WdlString("blah")) assertFailure(runtimeAttributes, "Expecting disk runtime attribute to be an Integer or String with format '8 GB'") } From 6fbeadce7def9968a688f36a893e2c53cf3edd16 Mon Sep 17 00:00:00 2001 From: Thib Date: Tue, 6 Jun 2017 17:57:01 -0400 Subject: [PATCH 05/74] Add retries to credential validation (#2314) * works? * PR comments * rebase * oops --- .../standard/StandardInitializationActor.scala | 12 +-- .../core/path/DefaultPathBuilderFactory.scala | 4 +- .../cromwell/core/path/PathBuilderFactory.scala | 4 +- .../scala/cromwell/engine/EngineFilesystems.scala | 18 ++-- .../cromwell/engine/workflow/WorkflowActor.scala | 9 +- .../MaterializeWorkflowDescriptorActor.scala | 89 ++++++++++++------ .../cromwell/engine/io/IoActorGcsBatchSpec.scala | 4 +- .../cromwell/filesystems/gcs/GcsPathBuilder.scala | 100 ++++++++++++--------- .../filesystems/gcs/GcsPathBuilderFactory.scala | 15 +++- .../filesystems/gcs/auth/GoogleAuthMode.scala | 51 ++++++++--- .../filesystems/gcs/GcsPathBuilderSpec.scala | 13 +-- .../filesystems/gcs/auth/GoogleAuthModeSpec.scala | 6 +- .../backend/impl/jes/GenomicsFactory.scala | 8 +- .../backend/impl/jes/JesInitializationActor.scala | 60 +++++++++---- .../cromwell/backend/impl/jes/JesJobPaths.scala | 8 +- .../backend/impl/jes/JesWorkflowPaths.scala | 27 ++++-- .../jes/JesAsyncBackendJobExecutionActorSpec.scala | 9 +- .../backend/impl/jes/JesCallPathsSpec.scala | 16 ++-- .../backend/impl/jes/JesWorkflowPathsSpec.scala | 3 +- .../sfs/config/ConfigInitializationActor.scala | 8 +- .../sfs/SharedFileSystemInitializationActor.scala | 30 ++++--- .../backend/impl/tes/TesInitializationActor.scala | 32 +++---- 22 files changed, 332 insertions(+), 194 deletions(-) diff --git a/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala b/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala index 89e3c48b2..73dd47925 100644 --- a/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala +++ b/backend/src/main/scala/cromwell/backend/standard/StandardInitializationActor.scala @@ -47,18 +47,18 @@ class StandardInitializationActor(val standardParams: StandardInitializationActo override lazy val calls: Set[TaskCall] = standardParams.calls override def beforeAll(): Future[Option[BackendInitializationData]] = { - Future.fromTry(Try(Option(initializationData))) + initializationData map Option.apply } - lazy val initializationData: StandardInitializationData = - new StandardInitializationData(workflowPaths, runtimeAttributesBuilder, classOf[StandardExpressionFunctions]) + lazy val initializationData: Future[StandardInitializationData] = + workflowPaths map { new StandardInitializationData(_, runtimeAttributesBuilder, classOf[StandardExpressionFunctions]) } lazy val expressionFunctions: Class[_ <: StandardExpressionFunctions] = classOf[StandardExpressionFunctions] - lazy val pathBuilders: List[PathBuilder] = List(DefaultPathBuilder) + lazy val pathBuilders: Future[List[PathBuilder]] = Future.successful(List(DefaultPathBuilder)) - lazy val workflowPaths: WorkflowPaths = - WorkflowPathBuilder.workflowPaths(configurationDescriptor, workflowDescriptor, pathBuilders) + lazy val workflowPaths: Future[WorkflowPaths] = + pathBuilders map { WorkflowPathBuilder.workflowPaths(configurationDescriptor, workflowDescriptor, _) } /** * Returns the runtime attribute builder for this backend. diff --git a/core/src/main/scala/cromwell/core/path/DefaultPathBuilderFactory.scala b/core/src/main/scala/cromwell/core/path/DefaultPathBuilderFactory.scala index 5339fae3c..234a19d79 100644 --- a/core/src/main/scala/cromwell/core/path/DefaultPathBuilderFactory.scala +++ b/core/src/main/scala/cromwell/core/path/DefaultPathBuilderFactory.scala @@ -3,6 +3,8 @@ package cromwell.core.path import akka.actor.ActorSystem import cromwell.core.WorkflowOptions +import scala.concurrent.{ExecutionContext, Future} + case object DefaultPathBuilderFactory extends PathBuilderFactory { - override def withOptions(options: WorkflowOptions)(implicit actorSystem: ActorSystem) = DefaultPathBuilder + override def withOptions(options: WorkflowOptions)(implicit actorSystem: ActorSystem, ec: ExecutionContext) = Future.successful(DefaultPathBuilder) } diff --git a/core/src/main/scala/cromwell/core/path/PathBuilderFactory.scala b/core/src/main/scala/cromwell/core/path/PathBuilderFactory.scala index 7ee20eb2d..63a91e02b 100644 --- a/core/src/main/scala/cromwell/core/path/PathBuilderFactory.scala +++ b/core/src/main/scala/cromwell/core/path/PathBuilderFactory.scala @@ -3,9 +3,11 @@ package cromwell.core.path import akka.actor.ActorSystem import cromwell.core.WorkflowOptions +import scala.concurrent.{ExecutionContext, Future} + /** * Provide a method that can instantiate a path builder with the specified workflow options. */ trait PathBuilderFactory { - def withOptions(options: WorkflowOptions)(implicit actorSystem: ActorSystem): PathBuilder + def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[PathBuilder] } diff --git a/engine/src/main/scala/cromwell/engine/EngineFilesystems.scala b/engine/src/main/scala/cromwell/engine/EngineFilesystems.scala index 6e9236d21..6a7a2cec0 100644 --- a/engine/src/main/scala/cromwell/engine/EngineFilesystems.scala +++ b/engine/src/main/scala/cromwell/engine/EngineFilesystems.scala @@ -8,9 +8,13 @@ import cromwell.core.path.{DefaultPathBuilder, PathBuilder} import cromwell.filesystems.gcs.{GcsPathBuilderFactory, GoogleConfiguration} import lenthall.exception.MessageAggregation import net.ceedubs.ficus.Ficus._ +import cats.instances.future._ +import cats.instances.list._ +import cats.syntax.traverse._ -case class EngineFilesystems(actorSystem: ActorSystem) { +import scala.concurrent.{ExecutionContext, Future} +object EngineFilesystems { private val config = ConfigFactory.load private val googleConf: GoogleConfiguration = GoogleConfiguration(config) private val googleAuthMode = config.as[Option[String]]("engine.filesystems.gcs.auth") map { confMode => @@ -26,12 +30,10 @@ case class EngineFilesystems(actorSystem: ActorSystem) { private val gcsPathBuilderFactory = googleAuthMode map { mode => GcsPathBuilderFactory(mode, googleConf.applicationName) } - - private val defaultFileSystem = if (config.as[Boolean]("engine.filesystems.local.enabled")) { - Option(DefaultPathBuilder) - } else None - def pathBuildersForWorkflow(workflowOptions: WorkflowOptions): List[PathBuilder] = { - List(gcsPathBuilderFactory map { _.withOptions(workflowOptions)(actorSystem) }, defaultFileSystem).flatten - } + private val defaultFileSystem = + Option(DefaultPathBuilder).filter(_ => config.as[Boolean]("engine.filesystems.local.enabled")) + + def pathBuildersForWorkflow(workflowOptions: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[List[PathBuilder]] = + gcsPathBuilderFactory.toList.traverse(_.withOptions(workflowOptions)).map(_ ++ defaultFileSystem) } diff --git a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala index 811623eec..d2dcc7f5b 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala @@ -22,6 +22,7 @@ import cromwell.subworkflowstore.SubWorkflowStoreActor.WorkflowComplete import cromwell.webservice.EngineStatsActor import wdl4s.{LocallyQualifiedName => _} +import scala.concurrent.Future import scala.util.Failure object WorkflowActor { @@ -307,16 +308,18 @@ class WorkflowActor(val workflowId: WorkflowId, * be copied by accessing the workflow options outside of the EngineWorkflowDescriptor. */ def bruteForceWorkflowOptions: WorkflowOptions = WorkflowOptions.fromJsonString(workflowSources.workflowOptionsJson).getOrElse(WorkflowOptions.fromJsonString("{}").get) - def bruteForcePathBuilders: List[PathBuilder] = EngineFilesystems(context.system).pathBuildersForWorkflow(bruteForceWorkflowOptions) + val system = context.system + val ec = context.system.dispatcher + def bruteForcePathBuilders: Future[List[PathBuilder]] = EngineFilesystems.pathBuildersForWorkflow(bruteForceWorkflowOptions)(system, ec) val (workflowOptions, pathBuilders) = stateData.workflowDescriptor match { - case Some(wd) => (wd.backendDescriptor.workflowOptions, wd.pathBuilders) + case Some(wd) => (wd.backendDescriptor.workflowOptions, Future.successful(wd.pathBuilders)) case None => (bruteForceWorkflowOptions, bruteForcePathBuilders) } workflowOptions.get(FinalWorkflowLogDir).toOption match { case Some(destinationDir) => - workflowLogCopyRouter ! CopyWorkflowLogsActor.Copy(workflowId, PathFactory.buildPath(destinationDir, pathBuilders)) + pathBuilders.map(pb => workflowLogCopyRouter ! CopyWorkflowLogsActor.Copy(workflowId, PathFactory.buildPath(destinationDir, pb)))(ec) case None if WorkflowLogger.isTemporary => workflowLogger.deleteLogFile() match { case Failure(f) => log.error(f, "Failed to delete workflow log") case _ => diff --git a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/MaterializeWorkflowDescriptorActor.scala b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/MaterializeWorkflowDescriptorActor.scala index aaae5e6c6..b09df18ef 100644 --- a/engine/src/main/scala/cromwell/engine/workflow/lifecycle/MaterializeWorkflowDescriptorActor.scala +++ b/engine/src/main/scala/cromwell/engine/workflow/lifecycle/MaterializeWorkflowDescriptorActor.scala @@ -1,6 +1,7 @@ package cromwell.engine.workflow.lifecycle -import akka.actor.{ActorRef, FSM, LoggingFSM, Props} +import akka.actor.{ActorRef, FSM, LoggingFSM, Props, Status} +import akka.pattern.pipe import cats.data.NonEmptyList import cats.data.Validated._ import cats.instances.list._ @@ -21,7 +22,7 @@ import cromwell.core.path.BetterFileMethods.OpenOptions import cromwell.core.path.{DefaultPathBuilder, Path, PathBuilder} import cromwell.engine._ import cromwell.engine.backend.CromwellBackends -import cromwell.engine.workflow.lifecycle.MaterializeWorkflowDescriptorActor.{MaterializeWorkflowDescriptorActorData, MaterializeWorkflowDescriptorActorState} +import cromwell.engine.workflow.lifecycle.MaterializeWorkflowDescriptorActor.MaterializeWorkflowDescriptorActorState import cromwell.services.metadata.MetadataService._ import cromwell.services.metadata.{MetadataEvent, MetadataKey, MetadataValue} import lenthall.exception.MessageAggregation @@ -32,6 +33,7 @@ import wdl4s._ import wdl4s.expression.NoFunctions import wdl4s.values.{WdlSingleFile, WdlString, WdlValue} +import scala.concurrent.Future import scala.language.postfixOps import scala.util.{Failure, Success, Try} @@ -72,15 +74,11 @@ object MaterializeWorkflowDescriptorActor { override val terminal = true } case object ReadyToMaterializeState extends MaterializeWorkflowDescriptorActorState + case object MaterializingState extends MaterializeWorkflowDescriptorActorState case object MaterializationSuccessfulState extends MaterializeWorkflowDescriptorActorTerminalState case object MaterializationFailedState extends MaterializeWorkflowDescriptorActorTerminalState case object MaterializationAbortedState extends MaterializeWorkflowDescriptorActorTerminalState - /* - Data - */ - case class MaterializeWorkflowDescriptorActorData() - private val DefaultWorkflowFailureMode = NoNewCalls.toString private[lifecycle] def validateCallCachingMode(workflowOptions: WorkflowOptions, conf: Config): ErrorOr[CallCachingMode] = { @@ -116,34 +114,52 @@ object MaterializeWorkflowDescriptorActor { class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef, val workflowIdForLogging: WorkflowId, cromwellBackends: => CromwellBackends, - importLocalFilesystem: Boolean) extends LoggingFSM[MaterializeWorkflowDescriptorActorState, MaterializeWorkflowDescriptorActorData] with LazyLogging with WorkflowLogging { + importLocalFilesystem: Boolean) extends LoggingFSM[MaterializeWorkflowDescriptorActorState, Unit] with LazyLogging with WorkflowLogging { import MaterializeWorkflowDescriptorActor._ val tag = self.path.name val iOExecutionContext = context.system.dispatchers.lookup("akka.dispatchers.io-dispatcher") - - startWith(ReadyToMaterializeState, MaterializeWorkflowDescriptorActorData()) + implicit val ec = context.dispatcher + + startWith(ReadyToMaterializeState, ()) when(ReadyToMaterializeState) { case Event(MaterializeWorkflowDescriptorCommand(workflowSourceFiles, conf), _) => - buildWorkflowDescriptor(workflowIdForLogging, workflowSourceFiles, conf) match { - case Valid(descriptor) => - sender() ! MaterializeWorkflowDescriptorSuccessResponse(descriptor) - goto(MaterializationSuccessfulState) - case Invalid(error) => - sender() ! MaterializeWorkflowDescriptorFailureResponse( - new IllegalArgumentException with MessageAggregation { - val exceptionContext = s"Workflow input processing failed" - val errorMessages = error.toList - }) + val replyTo = sender() + + workflowOptionsAndPathBuilders(workflowSourceFiles) match { + case Valid((workflowOptions, pathBuilders)) => + val futureDescriptor = pathBuilders map { + buildWorkflowDescriptor(workflowIdForLogging, workflowSourceFiles, conf, workflowOptions, _) + } + + // Pipe the response to self, but make it look like it comes from the sender of the command + // This way we can access it through sender() in the next state and don't have to store the value + // of replyTo in the data + pipe(futureDescriptor).to(self, replyTo) + goto(MaterializingState) + case Invalid(error) => + workflowInitializationFailed(error, replyTo) goto(MaterializationFailedState) } case Event(MaterializeWorkflowDescriptorAbortCommand, _) => goto(MaterializationAbortedState) } + when(MaterializingState) { + case Event(Valid(descriptor: EngineWorkflowDescriptor), _) => + sender() ! MaterializeWorkflowDescriptorSuccessResponse(descriptor) + goto(MaterializationSuccessfulState) + case Event(Invalid(error: NonEmptyList[String]@unchecked), _) => + workflowInitializationFailed(error, sender()) + goto(MaterializationFailedState) + case Event(Status.Failure(failure), _) => + workflowInitializationFailed(NonEmptyList.of(failure.getMessage), sender()) + goto(MaterializationFailedState) + } + // Let these fall through to the whenUnhandled handler: when(MaterializationSuccessfulState) { FSM.NullFunction } when(MaterializationFailedState) { FSM.NullFunction } @@ -165,18 +181,33 @@ class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef, stay } + private def workflowInitializationFailed(errors: NonEmptyList[String], replyTo: ActorRef) = { + sender() ! MaterializeWorkflowDescriptorFailureResponse( + new IllegalArgumentException with MessageAggregation { + val exceptionContext = "Workflow input processing failed" + val errorMessages = errors.toList + }) + } + + private def workflowOptionsAndPathBuilders(sourceFiles: WorkflowSourceFilesCollection): ErrorOr[(WorkflowOptions, Future[List[PathBuilder]])] = { + val workflowOptionsValidation = validateWorkflowOptions(sourceFiles.workflowOptionsJson) + workflowOptionsValidation map { workflowOptions => + val pathBuilders = EngineFilesystems.pathBuildersForWorkflow(workflowOptions)(context.system, context.dispatcher) + (workflowOptions, pathBuilders) + } + } + private def buildWorkflowDescriptor(id: WorkflowId, sourceFiles: WorkflowSourceFilesCollection, - conf: Config): ErrorOr[EngineWorkflowDescriptor] = { + conf: Config, + workflowOptions: WorkflowOptions, + pathBuilders: List[PathBuilder]): ErrorOr[EngineWorkflowDescriptor] = { val namespaceValidation = validateNamespace(sourceFiles) - val workflowOptionsValidation = validateWorkflowOptions(sourceFiles.workflowOptionsJson) val labelsValidation = validateLabels(sourceFiles.labelsJson) - (namespaceValidation |@| workflowOptionsValidation |@| labelsValidation) map { - (_, _, _) - } flatMap { case (namespace, workflowOptions, labels) => + + (namespaceValidation |@| labelsValidation).tupled flatMap { case (namespace, labels) => pushWfNameMetadataService(namespace.workflow.unqualifiedName) publishLabelsToMetadata(id, namespace.workflow.unqualifiedName, labels) - val pathBuilders = EngineFilesystems(context.system).pathBuildersForWorkflow(workflowOptions) buildWorkflowDescriptor(id, sourceFiles, namespace, workflowOptions, labels, conf, pathBuilders) } } @@ -200,7 +231,7 @@ class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef, protected def labelsToMetadata(labels: Map[String, String], workflowId: WorkflowId): Unit = { labels foreach { case (k, v) => - serviceRegistryActor ! PutMetadataAction(MetadataEvent(MetadataKey(workflowId, None, s"${WorkflowMetadataKeys.Labels}:${k}"), MetadataValue(v))) + serviceRegistryActor ! PutMetadataAction(MetadataEvent(MetadataKey(workflowId, None, s"${WorkflowMetadataKeys.Labels}:$k"), MetadataValue(v))) } } @@ -471,8 +502,8 @@ class MaterializeWorkflowDescriptorActor(serviceRegistryActor: ActorRef, } modeString flatMap WorkflowFailureMode.tryParse match { - case Success(mode) => mode.validNel - case Failure(t) => t.getMessage.invalidNel + case Success(mode) => mode.validNel + case Failure(t) => t.getMessage.invalidNel } } } diff --git a/engine/src/test/scala/cromwell/engine/io/IoActorGcsBatchSpec.scala b/engine/src/test/scala/cromwell/engine/io/IoActorGcsBatchSpec.scala index e3f2dd3a1..fa611133b 100644 --- a/engine/src/test/scala/cromwell/engine/io/IoActorGcsBatchSpec.scala +++ b/engine/src/test/scala/cromwell/engine/io/IoActorGcsBatchSpec.scala @@ -13,7 +13,7 @@ import cromwell.filesystems.gcs.{GcsPathBuilder, GcsPathBuilderFactory} import org.scalatest.concurrent.Eventually import org.scalatest.{FlatSpecLike, Matchers} -import scala.concurrent.ExecutionContext +import scala.concurrent.{Await, ExecutionContext} import scala.concurrent.duration._ import scala.language.postfixOps @@ -34,7 +34,7 @@ class IoActorGcsBatchSpec extends TestKitSuite with FlatSpecLike with Matchers w } lazy val gcsPathBuilder = GcsPathBuilderFactory(ApplicationDefaultMode("default"), "cromwell-test") - lazy val pathBuilder: GcsPathBuilder = gcsPathBuilder.withOptions(WorkflowOptions.empty) + lazy val pathBuilder: GcsPathBuilder = Await.result(gcsPathBuilder.withOptions(WorkflowOptions.empty), 1 second) lazy val randomUUID = UUID.randomUUID().toString diff --git a/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilder.scala b/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilder.scala index 3c3c208f9..8f6ec4afa 100644 --- a/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilder.scala +++ b/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilder.scala @@ -1,13 +1,14 @@ package cromwell.filesystems.gcs import java.net.URI -import java.nio.file.spi.FileSystemProvider +import akka.actor.ActorSystem import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport import com.google.api.client.json.jackson2.JacksonFactory import com.google.api.gax.retrying.RetrySettings +import com.google.auth.Credentials import com.google.cloud.http.HttpTransportOptions -import com.google.cloud.storage.contrib.nio.{CloudStorageConfiguration, CloudStorageFileSystem, CloudStoragePath} +import com.google.cloud.storage.contrib.nio.{CloudStorageConfiguration, CloudStorageFileSystem, CloudStorageFileSystemProvider, CloudStoragePath} import com.google.cloud.storage.{BlobId, StorageOptions} import com.google.common.base.Preconditions._ import com.google.common.net.UrlEscapers @@ -16,6 +17,7 @@ import cromwell.core.path.{NioPath, Path, PathBuilder} import cromwell.filesystems.gcs.GcsPathBuilder._ import cromwell.filesystems.gcs.auth.GoogleAuthMode +import scala.concurrent.{ExecutionContext, Future} import scala.concurrent.duration._ import scala.util.Try @@ -44,55 +46,71 @@ object GcsPathBuilder { } def getUri(string: String) = URI.create(UrlEscapers.urlFragmentEscaper().escape(string)) -} + + def fromAuthMode(authMode: GoogleAuthMode, + applicationName: String, + retrySettings: Option[RetrySettings], + cloudStorageConfiguration: CloudStorageConfiguration, + options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[GcsPathBuilder] = { + authMode.credential(options) map { credentials => + fromCredentials(credentials, + applicationName, + retrySettings, + cloudStorageConfiguration, + options + ) + } + } + + def fromCredentials(credentials: Credentials, + applicationName: String, + retrySettings: Option[RetrySettings], + cloudStorageConfiguration: CloudStorageConfiguration, + options: WorkflowOptions): GcsPathBuilder = { + val transportOptions = HttpTransportOptions.newBuilder() + .setReadTimeout(3.minutes.toMillis.toInt) + .build() -class GcsPathBuilder(authMode: GoogleAuthMode, - applicationName: String, - retrySettings: Option[RetrySettings], - cloudStorageConfiguration: CloudStorageConfiguration, - options: WorkflowOptions) extends PathBuilder { - authMode.validate(options) + val storageOptionsBuilder = StorageOptions.newBuilder() + .setTransportOptions(transportOptions) + .setCredentials(credentials) - protected val transportOptions = HttpTransportOptions.newBuilder() - .setReadTimeout(3.minutes.toMillis.toInt) - .build() + retrySettings foreach storageOptionsBuilder.setRetrySettings - protected val storageOptionsBuilder = StorageOptions.newBuilder() - .setTransportOptions(transportOptions) - .setCredentials(authMode.credential(options)) + // Grab the google project from Workflow Options if specified and set + // that to be the project used by the StorageOptions Builder + options.get("google_project") map storageOptionsBuilder.setProjectId - retrySettings foreach storageOptionsBuilder.setRetrySettings - // Grab the google project from Workflow Options if specified and set - // that to be the project used by the StorageOptions Builder - options.get("google_project") map storageOptionsBuilder.setProjectId + val storageOptions = storageOptionsBuilder.build() + // Create a com.google.api.services.storage.Storage + // This is the underlying api used by com.google.cloud.storage + // By bypassing com.google.cloud.storage, we can create low level requests that can be batched + val apiStorage: com.google.api.services.storage.Storage = { + new com.google.api.services.storage.Storage + .Builder(HttpTransport, JsonFactory, GoogleConfiguration.withCustomTimeouts(transportOptions.getHttpRequestInitializer(storageOptions))) + .setApplicationName(applicationName) + .build() + } - protected val storageOptions = storageOptionsBuilder.build() + // Create a com.google.cloud.storage.Storage + // This is the "relatively" high level API, and recommended one. The nio implementation sits on top of this. + val cloudStorage: com.google.cloud.storage.Storage = storageOptions.getService - // Create a com.google.api.services.storage.Storage - // This is the underlying api used by com.google.cloud.storage - // By bypassing com.google.cloud.storage, we can create low level requests that can be batched - val apiStorage: com.google.api.services.storage.Storage = { - new com.google.api.services.storage.Storage - .Builder(HttpTransport, JsonFactory, GoogleConfiguration.withCustomTimeouts(transportOptions.getHttpRequestInitializer(storageOptions))) - .setApplicationName(applicationName) - .build() + // The CloudStorageFileSystemProvider constructor is not public. Currently the only way to obtain one is through a CloudStorageFileSystem + // Moreover at this point we can use the same provider for all operations as we have usable credentials + // In order to avoid recreating a provider with every getPath call, create a dummy FileSystem just to get its provider + val provider: CloudStorageFileSystemProvider = CloudStorageFileSystem.forBucket("dummy", cloudStorageConfiguration, storageOptions).provider() + + new GcsPathBuilder(apiStorage, cloudStorage, provider, storageOptions.getProjectId) } +} - // Create a com.google.cloud.storage.Storage - // This is the "relatively" high level API, and recommended one. The nio implementation sits on top of this. - val cloudStorage: com.google.cloud.storage.Storage = storageOptions.getService - - // The CloudStorageFileSystemProvider constructor is not public. Currently the only way to obtain one is through a CloudStorageFileSystem - // Moreover at this point we can use the same provider for all operations as we have usable credentials - // In order to avoid recreating a provider with every getPath call, create a dummy FileSystem just to get its provider - protected val _provider = CloudStorageFileSystem.forBucket("dummy", cloudStorageConfiguration, storageOptions).provider() - - protected def provider: FileSystemProvider = _provider - - def getProjectId = storageOptions.getProjectId - +class GcsPathBuilder(val apiStorage: com.google.api.services.storage.Storage, + val cloudStorage: com.google.cloud.storage.Storage, + provider: CloudStorageFileSystemProvider, + val projectId: String) extends PathBuilder { def build(string: String): Try[GcsPath] = { Try { val uri = getUri(string) diff --git a/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilderFactory.scala b/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilderFactory.scala index ca23c395b..01659e42f 100644 --- a/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilderFactory.scala +++ b/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/GcsPathBuilderFactory.scala @@ -3,6 +3,7 @@ package cromwell.filesystems.gcs import akka.actor.ActorSystem import com.google.api.client.googleapis.media.MediaHttpUploader import com.google.api.gax.retrying.RetrySettings +import com.google.auth.Credentials import com.google.cloud.storage.contrib.nio.CloudStorageConfiguration import com.typesafe.config.ConfigFactory import cromwell.core.WorkflowOptions @@ -10,6 +11,8 @@ import cromwell.core.path.PathBuilderFactory import cromwell.filesystems.gcs.auth.GoogleAuthMode import net.ceedubs.ficus.Ficus._ +import scala.concurrent.ExecutionContext + object GcsPathBuilderFactory { private[this] lazy val UploadBufferBytes = { @@ -33,5 +36,15 @@ case class GcsPathBuilderFactory(authMode: GoogleAuthMode, extends PathBuilderFactory { - def withOptions(options: WorkflowOptions)(implicit actorSystem: ActorSystem) = new GcsPathBuilder(authMode, applicationName, retrySettings, cloudStorageConfiguration, options) + def withOptions(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext) = { + GcsPathBuilder.fromAuthMode(authMode, applicationName, retrySettings, cloudStorageConfiguration, options) + } + + /** + * Ignores the authMode and creates a GcsPathBuilder using the passed credentials directly. + * Can be used when the Credentials are already available. + */ + def fromCredentials(options: WorkflowOptions, credentials: Credentials) = { + GcsPathBuilder.fromCredentials(credentials, applicationName, retrySettings, cloudStorageConfiguration, options) + } } diff --git a/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/auth/GoogleAuthMode.scala b/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/auth/GoogleAuthMode.scala index 5ea07a8a0..e9ba4eb8a 100644 --- a/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/auth/GoogleAuthMode.scala +++ b/filesystems/gcs/src/main/scala/cromwell/filesystems/gcs/auth/GoogleAuthMode.scala @@ -2,8 +2,11 @@ package cromwell.filesystems.gcs.auth import java.io.FileNotFoundException +import akka.actor.ActorSystem +import akka.http.scaladsl.model.StatusCodes import better.files._ import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport +import com.google.api.client.http.HttpResponseException import com.google.api.client.json.jackson2.JacksonFactory import com.google.api.services.storage.StorageScopes import com.google.auth.Credentials @@ -11,11 +14,13 @@ import com.google.auth.http.HttpTransportFactory import com.google.auth.oauth2.{GoogleCredentials, ServiceAccountCredentials, UserCredentials} import com.google.cloud.NoCredentials import cromwell.core.WorkflowOptions +import cromwell.core.retry.Retry import cromwell.filesystems.gcs.auth.GoogleAuthMode._ import cromwell.filesystems.gcs.auth.ServiceAccountMode.{CredentialFileFormat, JsonFileFormat, PemFileFormat} import org.slf4j.LoggerFactory import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success, Try} object GoogleAuthMode { @@ -40,7 +45,20 @@ object GoogleAuthMode { case object MockAuthMode extends GoogleAuthMode { override def name = "no_auth" - override def credential(options: WorkflowOptions): Credentials = NoCredentials.getInstance() + override def credential(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[Credentials] = { + Future.successful(NoCredentials.getInstance()) + } + } + + def isFatal(ex: Throwable) = { + // We wrap the actual exception in a RuntimeException so get the cause + ex.getCause match { + case http: HttpResponseException => + http.getStatusCode == StatusCodes.Unauthorized.intValue || + http.getStatusCode == StatusCodes.Forbidden.intValue || + http.getStatusCode == StatusCodes.BadRequest.intValue + case _ => false + } } } @@ -55,14 +73,12 @@ sealed trait GoogleAuthMode { def name: String // Create a Credential object from the google.api.client.auth library (https://github.com/google/google-api-java-client) - def credential(options: WorkflowOptions): Credentials + def credential(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[Credentials] def requiresAuthFile: Boolean = false - protected def validateCredential(credential: Credentials) = validate(credential, () => credential.refresh()) - - private def validate[T](credential: T, validation: () => Any): T = { - Try(validation()) match { + protected def validateCredential(credential: Credentials) = { + Try(credential.refresh()) match { case Failure(ex) => throw new RuntimeException(s"Google credentials are invalid: ${ex.getMessage}", ex) case Success(_) => credential } @@ -91,10 +107,15 @@ final case class ServiceAccountMode(override val name: String, case _: JsonFileFormat => ServiceAccountCredentials.fromStream(credentialsFile.newInputStream).createScoped(scopes) } + // Validate credentials synchronously here, without retry. + // It's very unlikely to fail as it should not happen more than a few times + // (one for the engine and for each backend using it) per Cromwell instance. validateCredential(serviceAccount) } - override def credential(options: WorkflowOptions): Credentials = _credential + override def credential(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[Credentials] = { + Future.successful(_credential) + } } final case class UserMode(override val name: String, @@ -113,7 +134,7 @@ final case class UserMode(override val name: String, validateCredential(UserCredentials.fromStream(secretsStream)) } - override def credential(options: WorkflowOptions) = _credential + override def credential(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[Credentials] = Future.successful(_credential) } private object ApplicationDefault { @@ -121,7 +142,9 @@ private object ApplicationDefault { } final case class ApplicationDefaultMode(name: String) extends GoogleAuthMode { - override def credential(options: WorkflowOptions) = ApplicationDefault._Credential + override def credential(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[Credentials] = { + Future.successful(ApplicationDefault._Credential) + } } final case class RefreshTokenMode(name: String, @@ -142,10 +165,14 @@ final case class RefreshTokenMode(name: String, () } - override def credential(options: WorkflowOptions): Credentials = { + override def credential(options: WorkflowOptions)(implicit as: ActorSystem, ec: ExecutionContext): Future[Credentials] = { val refreshToken = extractRefreshToken(options) - validateCredential( - new UserCredentials(clientId, clientSecret, refreshToken, null, GoogleAuthMode.HttpTransportFactory, null) + Retry.withRetry( + () => Future(validateCredential( + new UserCredentials(clientId, clientSecret, refreshToken, null, GoogleAuthMode.HttpTransportFactory, null) + )), + isFatal = isFatal, + maxRetries = Option(3) ) } } diff --git a/filesystems/gcs/src/test/scala/cromwell/filesystems/gcs/GcsPathBuilderSpec.scala b/filesystems/gcs/src/test/scala/cromwell/filesystems/gcs/GcsPathBuilderSpec.scala index b91fec505..fbfbf10ef 100644 --- a/filesystems/gcs/src/test/scala/cromwell/filesystems/gcs/GcsPathBuilderSpec.scala +++ b/filesystems/gcs/src/test/scala/cromwell/filesystems/gcs/GcsPathBuilderSpec.scala @@ -1,9 +1,10 @@ package cromwell.filesystems.gcs +import com.google.cloud.NoCredentials import com.google.cloud.storage.contrib.nio.CloudStorageConfiguration import cromwell.core.path._ import cromwell.core.{TestKitSuite, WorkflowOptions} -import cromwell.filesystems.gcs.auth.{GoogleAuthMode, GoogleAuthModeSpec} +import cromwell.filesystems.gcs.auth.GoogleAuthModeSpec import org.scalatest.prop.Tables.Table import org.scalatest.{FlatSpecLike, Matchers} @@ -16,15 +17,15 @@ class GcsPathBuilderSpec extends TestKitSuite with FlatSpecLike with Matchers wi val wfOptionsWithProject = WorkflowOptions.fromMap(Map("google_project" -> "my_project")).get - val gcsPathBuilderWithProjectInfo = new GcsPathBuilder( - GoogleAuthMode.MockAuthMode, + val gcsPathBuilderWithProjectInfo = GcsPathBuilder.fromCredentials( + NoCredentials.getInstance(), "cromwell-test", None, CloudStorageConfiguration.DEFAULT, wfOptionsWithProject ) - gcsPathBuilderWithProjectInfo.getProjectId shouldBe "my_project" + gcsPathBuilderWithProjectInfo.projectId shouldBe "my_project" } it should behave like truncateCommonRoots(pathBuilder, pathsToTruncate) @@ -357,8 +358,8 @@ class GcsPathBuilderSpec extends TestKitSuite with FlatSpecLike with Matchers wi private lazy val pathBuilder = { GoogleAuthModeSpec.assumeHasApplicationDefaultCredentials() - new GcsPathBuilder( - GoogleAuthMode.MockAuthMode, + GcsPathBuilder.fromCredentials( + NoCredentials.getInstance(), "cromwell-test", None, CloudStorageConfiguration.DEFAULT, diff --git a/filesystems/gcs/src/test/scala/cromwell/filesystems/gcs/auth/GoogleAuthModeSpec.scala b/filesystems/gcs/src/test/scala/cromwell/filesystems/gcs/auth/GoogleAuthModeSpec.scala index cda1c12e2..1ce3d5986 100644 --- a/filesystems/gcs/src/test/scala/cromwell/filesystems/gcs/auth/GoogleAuthModeSpec.scala +++ b/filesystems/gcs/src/test/scala/cromwell/filesystems/gcs/auth/GoogleAuthModeSpec.scala @@ -1,6 +1,6 @@ package cromwell.filesystems.gcs.auth -import cromwell.core.WorkflowOptions +import com.google.auth.oauth2.GoogleCredentials import org.scalatest.Assertions._ import scala.util.{Failure, Try} @@ -15,9 +15,7 @@ object GoogleAuthModeSpec { } private lazy val tryApplicationDefaultCredentials: Try[Unit] = Try { - val authMode = ApplicationDefaultMode("application-default") - val workflowOptions = WorkflowOptions.empty - authMode.credential(workflowOptions) + GoogleCredentials.getApplicationDefault () } } diff --git a/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/GenomicsFactory.scala b/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/GenomicsFactory.scala index 2c3a097e6..edbdac0f6 100644 --- a/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/GenomicsFactory.scala +++ b/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/GenomicsFactory.scala @@ -4,18 +4,16 @@ import java.net.URL import com.google.api.client.http.{HttpRequest, HttpRequestInitializer} import com.google.api.services.genomics.Genomics +import com.google.auth.Credentials import com.google.auth.http.HttpCredentialsAdapter -import cromwell.core.WorkflowOptions import cromwell.filesystems.gcs.auth.GoogleAuthMode case class GenomicsFactory(applicationName: String, authMode: GoogleAuthMode, endpointUrl: URL) { - def withOptions(options: WorkflowOptions) = { - val scopedCredentials = authMode.credential(options) - + def fromCredentials(credentials: Credentials) = { val httpRequestInitializer = { - val delegate = new HttpCredentialsAdapter(scopedCredentials) + val delegate = new HttpCredentialsAdapter(credentials) new HttpRequestInitializer() { def initialize(httpRequest: HttpRequest) = { delegate.initialize(httpRequest) diff --git a/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesInitializationActor.scala b/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesInitializationActor.scala index 4619f6e3c..890bea568 100644 --- a/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesInitializationActor.scala +++ b/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesInitializationActor.scala @@ -3,8 +3,10 @@ package cromwell.backend.impl.jes import java.io.IOException import akka.actor.ActorRef +import com.google.api.services.genomics.Genomics +import com.google.auth.Credentials import com.google.cloud.storage.contrib.nio.CloudStorageOptions -import cromwell.backend.impl.jes.authentication.{GcsLocalizing, JesAuthInformation} +import cromwell.backend.impl.jes.authentication.{GcsLocalizing, JesAuthInformation, JesDockerCredentials} import cromwell.backend.standard.{StandardInitializationActor, StandardInitializationActorParams, StandardValidatedRuntimeAttributesBuilder} import cromwell.backend.{BackendConfigurationDescriptor, BackendInitializationData, BackendWorkflowDescriptor} import cromwell.core.io.AsyncIo @@ -31,12 +33,16 @@ class JesInitializationActor(jesParams: JesInitializationActorParams) override lazy val ioActor = jesParams.ioActor private val jesConfiguration = jesParams.jesConfiguration - + private val workflowOptions = workflowDescriptor.workflowOptions + implicit private val system = context.system + context.become(ioReceive orElse receive) override lazy val runtimeAttributesBuilder: StandardValidatedRuntimeAttributesBuilder = JesRuntimeAttributes.runtimeAttributesBuilder(jesConfiguration) + // From the gcs auth and the workflow options, optionally builds a GcsLocalizing that contains + // the information (client Id/Secrets + refresh token) that will be uploaded to Gcs before the workflow start private[jes] lazy val refreshTokenAuth: Option[JesAuthInformation] = { for { clientSecrets <- List(jesConfiguration.jesAttributes.auths.gcs) collectFirst { case s: ClientSecrets => s } @@ -44,31 +50,47 @@ class JesInitializationActor(jesParams: JesInitializationActorParams) } yield GcsLocalizing(clientSecrets, token) } - private lazy val genomics = jesConfiguration.genomicsFactory.withOptions(workflowDescriptor.workflowOptions) - // FIXME: workflow paths indirectly re create part of those credentials via the GcsPathBuilder - // This is unnecessary duplication of credentials. They are needed here so they can be added to the initialization data - // and used to retrieve docker hashes - private lazy val gcsCredentials = jesConfiguration.jesAuths.gcs.credential(workflowDescriptor.workflowOptions) + // Credentials object for the GCS API + private lazy val gcsCredentials: Future[Credentials] = + jesConfiguration.jesAttributes.auths.gcs.credential(workflowOptions) + + // Credentials object for the Genomics API + private lazy val genomicsCredentials: Future[Credentials] = + jesConfiguration.jesAttributes.auths.genomics.credential(workflowOptions) + + // Genomics object to access the Genomics API + private lazy val genomics: Future[Genomics] = { + genomicsCredentials map jesConfiguration.genomicsFactory.fromCredentials + } - override lazy val workflowPaths: JesWorkflowPaths = - new JesWorkflowPaths(workflowDescriptor, jesConfiguration)(context.system) + override lazy val workflowPaths: Future[JesWorkflowPaths] = for { + gcsCred <- gcsCredentials + genomicsCred <- genomicsCredentials + } yield new JesWorkflowPaths(workflowDescriptor, gcsCred, genomicsCred, jesConfiguration) - override lazy val initializationData: JesBackendInitializationData = - JesBackendInitializationData(workflowPaths, runtimeAttributesBuilder, jesConfiguration, gcsCredentials, genomics) + override lazy val initializationData: Future[JesBackendInitializationData] = for { + jesWorkflowPaths <- workflowPaths + gcsCreds <- gcsCredentials + genomicsFactory <- genomics + } yield JesBackendInitializationData(jesWorkflowPaths, runtimeAttributesBuilder, jesConfiguration, gcsCreds, genomicsFactory) override def beforeAll(): Future[Option[BackendInitializationData]] = { - publishWorkflowRoot(workflowPaths.workflowRoot.pathAsString) - if (jesConfiguration.needAuthFileUpload) { - writeAuthenticationFile(workflowPaths) map { _ => Option(initializationData) } recoverWith { - case failure => Future.failed(new IOException("Failed to upload authentication file", failure)) + def fileUpload(paths: JesWorkflowPaths, dockerCredentials: Option[JesDockerCredentials]): Future[Unit] = { + writeAuthenticationFile(paths, dockerCredentials) recoverWith { + case failure => Future.failed(new IOException("Failed to upload authentication file", failure)) } - } else { - Future.successful(Option(initializationData)) } + + for { + paths <- workflowPaths + _ = publishWorkflowRoot(paths.workflowRoot.pathAsString) + _ <- if (jesConfiguration.needAuthFileUpload) fileUpload(paths, jesConfiguration.dockerCredentials) else Future.successful(()) + data <- initializationData + } yield Option(data) } - private def writeAuthenticationFile(workflowPath: JesWorkflowPaths): Future[Unit] = { - generateAuthJson(jesConfiguration.dockerCredentials, refreshTokenAuth) map { content => + private def writeAuthenticationFile(workflowPath: JesWorkflowPaths, dockerCredentials: Option[JesDockerCredentials]): Future[Unit] = { + generateAuthJson(dockerCredentials, refreshTokenAuth) map { content => val path = workflowPath.gcsAuthFilePath workflowLogger.info(s"Creating authentication file for workflow ${workflowDescriptor.id} at \n $path") writeAsync(path, content, Seq(CloudStorageOptions.withMimeType("application/json"))) diff --git a/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesJobPaths.scala b/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesJobPaths.scala index cd24aa3a8..3ce09f9e4 100644 --- a/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesJobPaths.scala +++ b/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesJobPaths.scala @@ -1,18 +1,12 @@ package cromwell.backend.impl.jes import akka.actor.ActorSystem -import cromwell.backend.{BackendJobDescriptorKey, BackendWorkflowDescriptor} +import cromwell.backend.BackendJobDescriptorKey import cromwell.backend.io.JobPaths import cromwell.core.path.Path import cromwell.services.metadata.CallMetadataKeys object JesJobPaths { - def apply(jobKey: BackendJobDescriptorKey, workflowDescriptor: BackendWorkflowDescriptor, - jesConfiguration: JesConfiguration)(implicit actorSystem: ActorSystem): JesJobPaths = { - val workflowPath = new JesWorkflowPaths(workflowDescriptor, jesConfiguration) - new JesJobPaths(workflowPath, jobKey) - } - val JesLogPathKey = "jesLog" val GcsExecPathKey = "gcsExec" } diff --git a/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesWorkflowPaths.scala b/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesWorkflowPaths.scala index 454c06a7d..3d9d669ec 100644 --- a/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesWorkflowPaths.scala +++ b/supportedBackends/jes/src/main/scala/cromwell/backend/impl/jes/JesWorkflowPaths.scala @@ -1,6 +1,7 @@ package cromwell.backend.impl.jes import akka.actor.ActorSystem +import com.google.auth.Credentials import com.typesafe.config.Config import cromwell.backend.impl.jes.JesAsyncBackendJobExecutionActor.WorkflowOptionKeys import cromwell.backend.io.WorkflowPaths @@ -17,25 +18,35 @@ object JesWorkflowPaths { } case class JesWorkflowPaths(workflowDescriptor: BackendWorkflowDescriptor, - jesConfiguration: JesConfiguration)(implicit actorSystem: ActorSystem) extends WorkflowPaths { + gcsCredentials: Credentials, + genomicsCredentials: Credentials, + jesConfiguration: JesConfiguration)(implicit actorSystem: ActorSystem) extends WorkflowPaths { override lazy val executionRootString: String = workflowDescriptor.workflowOptions.getOrElse(JesWorkflowPaths.GcsRootOptionKey, jesConfiguration.root) + private val workflowOptions: WorkflowOptions = workflowDescriptor.workflowOptions - val gcsPathBuilder: GcsPathBuilder = jesConfiguration.gcsPathBuilderFactory.withOptions(workflowOptions) + + private val gcsPathBuilder: GcsPathBuilder = jesConfiguration.gcsPathBuilderFactory.fromCredentials(workflowOptions, gcsCredentials) val gcsAuthFilePath: Path = { + // The default auth file bucket is always at the root of the root workflow + val defaultBucket = executionRoot.resolve(workflowDescriptor.rootWorkflow.unqualifiedName).resolve(workflowDescriptor.rootWorkflowId.toString) + val bucket = workflowDescriptor.workflowOptions.get(JesWorkflowPaths.AuthFilePathOptionKey) getOrElse defaultBucket.pathAsString + /* * This is an "exception". The filesystem used here is built from genomicsAuth * unlike everywhere else where the filesystem used is built from gcsFileSystemAuth */ - val genomicsCredentials = jesConfiguration.jesAuths.genomics + val pathBuilderWithGenomicsAuth = GcsPathBuilder.fromCredentials( + genomicsCredentials, + jesConfiguration.googleConfig.applicationName, + None, + GcsPathBuilderFactory.DefaultCloudStorageConfiguration, + workflowOptions + ) - // The default auth file bucket is always at the root of the root workflow - val defaultBucket = executionRoot.resolve(workflowDescriptor.rootWorkflow.unqualifiedName).resolve(workflowDescriptor.rootWorkflowId.toString) - - val bucket = workflowDescriptor.workflowOptions.get(JesWorkflowPaths.AuthFilePathOptionKey) getOrElse defaultBucket.pathAsString - val authBucket = GcsPathBuilderFactory(genomicsCredentials, jesConfiguration.googleConfig.applicationName).withOptions(workflowOptions).build(bucket) recover { + val authBucket = pathBuilderWithGenomicsAuth.build(bucket) recover { case ex => throw new Exception(s"Invalid gcs auth_bucket path $bucket", ex) } get diff --git a/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesAsyncBackendJobExecutionActorSpec.scala b/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesAsyncBackendJobExecutionActorSpec.scala index 38fd141ad..903794cfa 100644 --- a/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesAsyncBackendJobExecutionActorSpec.scala +++ b/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesAsyncBackendJobExecutionActorSpec.scala @@ -4,6 +4,7 @@ import java.util.UUID import akka.actor.{ActorRef, Props} import akka.testkit.{ImplicitSender, TestActorRef, TestDuration, TestProbe} +import com.google.cloud.NoCredentials import cromwell.backend.BackendJobExecutionActor.{BackendJobExecutionResponse, JobFailedNonRetryableResponse, JobFailedRetryableResponse} import cromwell.backend._ import cromwell.backend.async.AsyncBackendJobExecutionActor.{Execute, ExecutionMode} @@ -19,7 +20,6 @@ import cromwell.core.callcaching.NoDocker import cromwell.core.labels.Labels import cromwell.core.logging.JobLogger import cromwell.core.path.{DefaultPathBuilder, PathBuilder} -import cromwell.filesystems.gcs.auth.GoogleAuthMode.MockAuthMode import cromwell.filesystems.gcs.{GcsPath, GcsPathBuilder, GcsPathBuilderFactory} import cromwell.services.keyvalue.InMemoryKvServiceActor import cromwell.services.keyvalue.KeyValueServiceActor._ @@ -41,8 +41,9 @@ import scala.util.{Success, Try} class JesAsyncBackendJobExecutionActorSpec extends TestKitSuite("JesAsyncBackendJobExecutionActorSpec") with FlatSpecLike with Matchers with ImplicitSender with Mockito with BackendSpec with BeforeAndAfter { - val mockPathBuilder: GcsPathBuilder = GcsPathBuilderFactory(MockAuthMode, "cromwell-test").withOptions(WorkflowOptions.empty) - + val mockPathBuilder: GcsPathBuilder = GcsPathBuilder.fromCredentials(NoCredentials.getInstance(), + "test-cromwell", None, GcsPathBuilderFactory.DefaultCloudStorageConfiguration, WorkflowOptions.empty) + var kvService: ActorRef = system.actorOf(Props(new InMemoryKvServiceActor)) import JesTestConfig._ @@ -86,7 +87,7 @@ class JesAsyncBackendJobExecutionActorSpec extends TestKitSuite("JesAsyncBackend } private def buildInitializationData(jobDescriptor: BackendJobDescriptor, configuration: JesConfiguration) = { - val workflowPaths = JesWorkflowPaths(jobDescriptor.workflowDescriptor, configuration)(system) + val workflowPaths = JesWorkflowPaths(jobDescriptor.workflowDescriptor, NoCredentials.getInstance(), NoCredentials.getInstance(), configuration)(system) val runtimeAttributesBuilder = JesRuntimeAttributes.runtimeAttributesBuilder(configuration) JesBackendInitializationData(workflowPaths, runtimeAttributesBuilder, configuration, null, null) } diff --git a/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesCallPathsSpec.scala b/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesCallPathsSpec.scala index b8859d6cb..ca93bec61 100644 --- a/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesCallPathsSpec.scala +++ b/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesCallPathsSpec.scala @@ -1,5 +1,6 @@ package cromwell.backend.impl.jes +import com.google.cloud.NoCredentials import cromwell.backend.BackendSpec import cromwell.core.TestKitSuite import cromwell.filesystems.gcs.auth.GoogleAuthModeSpec @@ -20,9 +21,10 @@ class JesCallPathsSpec extends TestKitSuite with FlatSpecLike with Matchers with val workflowDescriptor = buildWorkflowDescriptor(SampleWdl.HelloWorld.wdlSource()) val jobDescriptorKey = firstJobDescriptorKey(workflowDescriptor) val jesConfiguration = new JesConfiguration(JesBackendConfigurationDescriptor) - - val callPaths = JesJobPaths(jobDescriptorKey, workflowDescriptor, - jesConfiguration) + val workflowPaths = JesWorkflowPaths(workflowDescriptor, NoCredentials.getInstance(), NoCredentials.getInstance(), jesConfiguration) + + val callPaths = JesJobPaths(workflowPaths, jobDescriptorKey) + callPaths.returnCodeFilename should be("hello-rc.txt") callPaths.stderrFilename should be("hello-stderr.log") callPaths.stdoutFilename should be("hello-stdout.log") @@ -35,8 +37,10 @@ class JesCallPathsSpec extends TestKitSuite with FlatSpecLike with Matchers with val workflowDescriptor = buildWorkflowDescriptor(SampleWdl.HelloWorld.wdlSource()) val jobDescriptorKey = firstJobDescriptorKey(workflowDescriptor) val jesConfiguration = new JesConfiguration(JesBackendConfigurationDescriptor) + val workflowPaths = JesWorkflowPaths(workflowDescriptor, NoCredentials.getInstance(), NoCredentials.getInstance(), jesConfiguration) - val callPaths = JesJobPaths(jobDescriptorKey, workflowDescriptor, jesConfiguration) + val callPaths = JesJobPaths(workflowPaths, jobDescriptorKey) + callPaths.returnCode.pathAsString should be(s"gs://my-cromwell-workflows-bucket/wf_hello/${workflowDescriptor.id}/call-hello/hello-rc.txt") callPaths.stdout.pathAsString should @@ -53,8 +57,10 @@ class JesCallPathsSpec extends TestKitSuite with FlatSpecLike with Matchers with val workflowDescriptor = buildWorkflowDescriptor(SampleWdl.HelloWorld.wdlSource()) val jobDescriptorKey = firstJobDescriptorKey(workflowDescriptor) val jesConfiguration = new JesConfiguration(JesBackendConfigurationDescriptor) + val workflowPaths = JesWorkflowPaths(workflowDescriptor, NoCredentials.getInstance(), NoCredentials.getInstance(), jesConfiguration) - val callPaths = JesJobPaths(jobDescriptorKey, workflowDescriptor, jesConfiguration) + val callPaths = JesJobPaths(workflowPaths, jobDescriptorKey) + callPaths.callContext.root.pathAsString should be(s"gs://my-cromwell-workflows-bucket/wf_hello/${workflowDescriptor.id}/call-hello") callPaths.callContext.stdout should diff --git a/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesWorkflowPathsSpec.scala b/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesWorkflowPathsSpec.scala index 8cc12fd4b..df529491e 100644 --- a/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesWorkflowPathsSpec.scala +++ b/supportedBackends/jes/src/test/scala/cromwell/backend/impl/jes/JesWorkflowPathsSpec.scala @@ -1,5 +1,6 @@ package cromwell.backend.impl.jes +import com.google.cloud.NoCredentials import cromwell.backend.BackendSpec import cromwell.core.TestKitSuite import cromwell.filesystems.gcs.auth.GoogleAuthModeSpec @@ -19,7 +20,7 @@ class JesWorkflowPathsSpec extends TestKitSuite with FlatSpecLike with Matchers val workflowDescriptor = buildWorkflowDescriptor(SampleWdl.HelloWorld.wdlSource()) val jesConfiguration = new JesConfiguration(JesBackendConfigurationDescriptor) - val workflowPaths = JesWorkflowPaths(workflowDescriptor, jesConfiguration)(system) + val workflowPaths = JesWorkflowPaths(workflowDescriptor, NoCredentials.getInstance(), NoCredentials.getInstance(), jesConfiguration)(system) workflowPaths.executionRoot.pathAsString should be("gs://my-cromwell-workflows-bucket/") workflowPaths.workflowRoot.pathAsString should be(s"gs://my-cromwell-workflows-bucket/wf_hello/${workflowDescriptor.id}/") diff --git a/supportedBackends/sfs/src/main/scala/cromwell/backend/impl/sfs/config/ConfigInitializationActor.scala b/supportedBackends/sfs/src/main/scala/cromwell/backend/impl/sfs/config/ConfigInitializationActor.scala index 34f93b0ff..2b9751b5b 100644 --- a/supportedBackends/sfs/src/main/scala/cromwell/backend/impl/sfs/config/ConfigInitializationActor.scala +++ b/supportedBackends/sfs/src/main/scala/cromwell/backend/impl/sfs/config/ConfigInitializationActor.scala @@ -5,6 +5,8 @@ import cromwell.backend.sfs._ import cromwell.backend.standard.{StandardInitializationActorParams, StandardInitializationData, StandardValidatedRuntimeAttributesBuilder} import wdl4s.WdlNamespace +import scala.concurrent.Future + /** * Extension of the SharedFileSystemBackendInitializationData with declarations of extra runtime attributes, and a * wdl namespace containing various tasks for submitting, killing, etc. @@ -40,9 +42,11 @@ class ConfigInitializationActor(params: StandardInitializationActorParams) DeclarationValidation.fromDeclarations(configWdlNamespace.runtimeDeclarations) } - override lazy val initializationData: ConfigInitializationData = { + override lazy val initializationData: Future[ConfigInitializationData] = { val wdlNamespace = configWdlNamespace.wdlNamespace - new ConfigInitializationData(workflowPaths, runtimeAttributesBuilder, declarationValidations, wdlNamespace) + workflowPaths map { + new ConfigInitializationData(_, runtimeAttributesBuilder, declarationValidations, wdlNamespace) + } } override lazy val runtimeAttributesBuilder: StandardValidatedRuntimeAttributesBuilder = { diff --git a/supportedBackends/sfs/src/main/scala/cromwell/backend/sfs/SharedFileSystemInitializationActor.scala b/supportedBackends/sfs/src/main/scala/cromwell/backend/sfs/SharedFileSystemInitializationActor.scala index 937ee317f..dbbca727c 100644 --- a/supportedBackends/sfs/src/main/scala/cromwell/backend/sfs/SharedFileSystemInitializationActor.scala +++ b/supportedBackends/sfs/src/main/scala/cromwell/backend/sfs/SharedFileSystemInitializationActor.scala @@ -1,21 +1,25 @@ package cromwell.backend.sfs import cats.data.Validated.{Invalid, Valid} +import cats.instances.future._ +import cats.instances.list._ +import cats.syntax.traverse._ import cromwell.backend.BackendInitializationData import cromwell.backend.io.WorkflowPaths import cromwell.backend.standard.{StandardExpressionFunctions, StandardInitializationActor, StandardInitializationActorParams} import cromwell.backend.wfs.WorkflowPathBuilder -import cromwell.core.path.{DefaultPathBuilderFactory, PathBuilder, PathBuilderFactory} +import cromwell.core.path.{DefaultPathBuilder, PathBuilder} import cromwell.filesystems.gcs.{GcsPathBuilderFactory, GoogleConfiguration} import lenthall.exception.MessageAggregation import net.ceedubs.ficus.Ficus._ import scala.concurrent.Future -import scala.util.Try class SharedFileSystemInitializationActor(standardParams: StandardInitializationActorParams) extends StandardInitializationActor(standardParams) { + private implicit val system = context.system + /** * If the backend sets a gcs authentication mode, try to create a PathBuilderFactory with it. */ @@ -33,23 +37,21 @@ class SharedFileSystemInitializationActor(standardParams: StandardInitialization } } - lazy val pathBuilderFactories: List[PathBuilderFactory] = - List(gcsPathBuilderFactory, Option(DefaultPathBuilderFactory)).flatten - - override lazy val pathBuilders: List[PathBuilder] = - pathBuilderFactories map { _.withOptions(workflowDescriptor.workflowOptions)(context.system) } + override lazy val pathBuilders: Future[List[PathBuilder]] = + gcsPathBuilderFactory.toList.traverse(_.withOptions(workflowDescriptor.workflowOptions)).map(_ ++ Option(DefaultPathBuilder)) - override lazy val workflowPaths: WorkflowPaths = - WorkflowPathBuilder.workflowPaths(configurationDescriptor, workflowDescriptor, pathBuilders) + override lazy val workflowPaths: Future[WorkflowPaths] = pathBuilders map { + WorkflowPathBuilder.workflowPaths(configurationDescriptor, workflowDescriptor, _) + } override lazy val expressionFunctions: Class[_ <: StandardExpressionFunctions] = classOf[SharedFileSystemExpressionFunctions] override def beforeAll(): Future[Option[BackendInitializationData]] = { - Future.fromTry(Try { - publishWorkflowRoot(workflowPaths.workflowRoot.pathAsString) - workflowPaths.workflowRoot.createPermissionedDirectories() - Option(initializationData) - }) + initializationData map { data => + publishWorkflowRoot(data.workflowPaths.workflowRoot.pathAsString) + data.workflowPaths.workflowRoot.createPermissionedDirectories() + Option(data) + } } } diff --git a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesInitializationActor.scala b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesInitializationActor.scala index 7ee1efdf6..7570576ed 100644 --- a/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesInitializationActor.scala +++ b/supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesInitializationActor.scala @@ -2,16 +2,18 @@ package cromwell.backend.impl.tes import akka.actor.ActorRef import cats.data.Validated.{Invalid, Valid} +import cats.instances.future._ +import cats.instances.list._ +import cats.syntax.traverse._ import cromwell.backend.standard._ import cromwell.backend.{BackendConfigurationDescriptor, BackendInitializationData, BackendWorkflowDescriptor} -import cromwell.core.path.{DefaultPathBuilderFactory, PathBuilder, PathBuilderFactory} +import cromwell.core.path.{DefaultPathBuilder, PathBuilder} import cromwell.filesystems.gcs.{GcsPathBuilderFactory, GoogleConfiguration} import lenthall.exception.MessageAggregation -import wdl4s.TaskCall import net.ceedubs.ficus.Ficus._ +import wdl4s.TaskCall import scala.concurrent.Future -import scala.util.Try case class TesInitializationActorParams ( @@ -27,6 +29,8 @@ class TesInitializationActor(params: TesInitializationActorParams) extends StandardInitializationActor(params) { private val tesConfiguration = params.tesConfiguration + + private implicit val system = context.system /** * If the backend sets a gcs authentication mode, try to create a PathBuilderFactory with it. @@ -45,23 +49,21 @@ class TesInitializationActor(params: TesInitializationActorParams) } } - lazy val pathBuilderFactories: List[PathBuilderFactory] = - List(gcsPathBuilderFactory, Option(DefaultPathBuilderFactory)).flatten - - override lazy val pathBuilders: List[PathBuilder] = - pathBuilderFactories map { _.withOptions(workflowDescriptor.workflowOptions)(context.system) } + override lazy val pathBuilders: Future[List[PathBuilder]] = + gcsPathBuilderFactory.toList.traverse(_.withOptions(workflowDescriptor.workflowOptions)).map(_ ++ Option(DefaultPathBuilder)) - override lazy val workflowPaths: TesWorkflowPaths = - new TesWorkflowPaths(workflowDescriptor, tesConfiguration.configurationDescriptor.backendConfig, pathBuilders) + override lazy val workflowPaths: Future[TesWorkflowPaths] = pathBuilders map { + new TesWorkflowPaths(workflowDescriptor, tesConfiguration.configurationDescriptor.backendConfig, _) + } override lazy val runtimeAttributesBuilder: StandardValidatedRuntimeAttributesBuilder = TesRuntimeAttributes.runtimeAttributesBuilder(tesConfiguration.runtimeConfig) override def beforeAll(): Future[Option[BackendInitializationData]] = { - Future.fromTry(Try { - publishWorkflowRoot(workflowPaths.workflowRoot.toString) - workflowPaths.workflowRoot.createPermissionedDirectories() - Option(TesBackendInitializationData(workflowPaths, runtimeAttributesBuilder, tesConfiguration)) - }) + workflowPaths map { paths => + publishWorkflowRoot(paths.workflowRoot.toString) + paths.workflowRoot.createPermissionedDirectories() + Option(TesBackendInitializationData(paths, runtimeAttributesBuilder, tesConfiguration)) + } } } From 4487a5107722005d68725d49b5b003e34dcbed59 Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Wed, 7 Jun 2017 17:08:02 -0400 Subject: [PATCH 06/74] jfrog --- project/Dependencies.scala | 4 ++-- project/Publishing.scala | 4 ++-- project/Settings.scala | 4 ++-- release/release_workflow.wdl | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c2ae8c2b9..0f580c78c 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,8 +1,8 @@ import sbt._ object Dependencies { - lazy val lenthallV = "0.24" - lazy val wdl4sV = "0.12" + lazy val lenthallV = "0.25-903b3c0-SNAP" + lazy val wdl4sV = "0.13-4804734-SNAP" lazy val sprayV = "1.3.3" /* spray-json is an independent project from the "spray suite" diff --git a/project/Publishing.scala b/project/Publishing.scala index 244ece80b..56301c899 100644 --- a/project/Publishing.scala +++ b/project/Publishing.scala @@ -7,7 +7,7 @@ object Publishing { private def artifactoryResolver(isSnapshot: Boolean): Resolver = { val repoType = if (isSnapshot) "snapshot" else "release" val repoUrl = - s"https://artifactory.broadinstitute.org/artifactory/libs-$repoType-local;build.timestamp=$buildTimestamp" + s"https://broadinstitute.jfrog.io/broadinstitute/libs-$repoType-local;build.timestamp=$buildTimestamp" val repoName = "artifactory-publish" repoName at repoUrl } @@ -15,7 +15,7 @@ object Publishing { private val artifactoryCredentials: Credentials = { val username = sys.env.getOrElse("ARTIFACTORY_USERNAME", "") val password = sys.env.getOrElse("ARTIFACTORY_PASSWORD", "") - Credentials("Artifactory Realm", "artifactory.broadinstitute.org", username, password) + Credentials("Artifactory Realm", "broadinstitute.jfrog.io", username, password) } def publishingSettings: Seq[Setting[_]] = diff --git a/project/Settings.scala b/project/Settings.scala index 56303da63..3a1dc01b6 100644 --- a/project/Settings.scala +++ b/project/Settings.scala @@ -13,8 +13,8 @@ object Settings { val commonResolvers = List( Resolver.jcenterRepo, - "Broad Artifactory Releases" at "https://artifactory.broadinstitute.org/artifactory/libs-release/", - "Broad Artifactory Snapshots" at "https://artifactory.broadinstitute.org/artifactory/libs-snapshot/" + "Broad Artifactory Releases" at "https://broadinstitute.jfrog.io/broadinstitute/libs-release/", + "Broad Artifactory Snapshots" at "https://broadinstitute.jfrog.io/broadinstitute/libs-snapshot/" ) /* diff --git a/release/release_workflow.wdl b/release/release_workflow.wdl index b29a85b60..44ad55531 100644 --- a/release/release_workflow.wdl +++ b/release/release_workflow.wdl @@ -122,7 +122,7 @@ task wait_for_artifactory { command <<< checkIfPresent() { - isPresent=$(curl -s --head https://artifactory.broadinstitute.org/artifactory/simple/libs-release-local/org/broadinstitute/${repo}/${version}/ | head -n 1 | grep -q "HTTP/1.[01] [23]..") + isPresent=$(curl -s --head https://broadinstitute.jfrog.io/broadinstitute/libs-release-local/org/broadinstitute/${repo}/${version}/ | head -n 1 | grep -q "HTTP/1.[01] [23]..") } elapsedTime=0 From ac3d65eed2ba05e46fe41bbf0d13bbb975b7e6a3 Mon Sep 17 00:00:00 2001 From: Thib Date: Fri, 9 Jun 2017 11:34:23 -0400 Subject: [PATCH 07/74] honor allow result reuse (#2341) (#2349) (cherry picked from commit 461cff2) --- .../CallCachingAggregationEntryComponent.scala | 15 ++-- .../callcaching/CallCachingSlickDatabaseSpec.scala | 82 ++++++++++++++++++++++ 2 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCachingSlickDatabaseSpec.scala diff --git a/database/sql/src/main/scala/cromwell/database/slick/tables/CallCachingAggregationEntryComponent.scala b/database/sql/src/main/scala/cromwell/database/slick/tables/CallCachingAggregationEntryComponent.scala index edb0bb861..b3636cd65 100644 --- a/database/sql/src/main/scala/cromwell/database/slick/tables/CallCachingAggregationEntryComponent.scala +++ b/database/sql/src/main/scala/cromwell/database/slick/tables/CallCachingAggregationEntryComponent.scala @@ -34,16 +34,21 @@ trait CallCachingAggregationEntryComponent { callCachingAggregationEntries.map(_.callCachingAggregationEntryId) val existsCallCachingEntriesForBaseAggregationHash = Compiled( - (baseAggregation: Rep[String]) => { - callCachingAggregationEntries.filter(callCachingAggregationEntry => - callCachingAggregationEntry.baseAggregation === baseAggregation - ).exists - } + (baseAggregation: Rep[String]) => (for { + callCachingEntry <- callCachingEntries + if callCachingEntry.allowResultReuse + callCachingAggregationEntry <- callCachingAggregationEntries + if callCachingEntry.callCachingEntryId === callCachingAggregationEntry.callCachingEntryId + if callCachingAggregationEntry.baseAggregation === baseAggregation + } yield ()).exists ) def callCachingEntriesForAggregatedHashes(baseAggregation: Rep[String], inputFilesAggregation: Rep[Option[String]], number: Int) = { (for { + callCachingEntry <- callCachingEntries + if callCachingEntry.allowResultReuse callCachingAggregationEntry <- callCachingAggregationEntries + if callCachingEntry.callCachingEntryId === callCachingAggregationEntry.callCachingEntryId if callCachingAggregationEntry.baseAggregation === baseAggregation if (callCachingAggregationEntry.inputFilesAggregation.isEmpty && inputFilesAggregation.isEmpty) || (callCachingAggregationEntry.inputFilesAggregation === inputFilesAggregation) diff --git a/engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCachingSlickDatabaseSpec.scala b/engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCachingSlickDatabaseSpec.scala new file mode 100644 index 000000000..72ea41700 --- /dev/null +++ b/engine/src/test/scala/cromwell/engine/workflow/lifecycle/execution/callcaching/CallCachingSlickDatabaseSpec.scala @@ -0,0 +1,82 @@ +package cromwell.engine.workflow.lifecycle.execution.callcaching + +import cats.data.NonEmptyList +import com.typesafe.config.ConfigFactory +import cromwell.core.Tags.DbmsTest +import cromwell.core.WorkflowId +import cromwell.database.slick.SlickDatabase +import cromwell.database.sql.joins.CallCachingJoin +import cromwell.database.sql.tables.{CallCachingAggregationEntry, CallCachingEntry, CallCachingHashEntry} +import cromwell.services.ServicesStore +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.{Millis, Seconds, Span} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import org.specs2.mock.Mockito + +import scala.concurrent.ExecutionContext + +class CallCachingSlickDatabaseSpec extends FlatSpec with Matchers with ScalaFutures with BeforeAndAfterAll with Mockito { + + implicit val ec = ExecutionContext.global + implicit val defaultPatience = PatienceConfig(scaled(Span(5, Seconds)), scaled(Span(100, Millis))) + + "SlickDatabase (hsqldb)" should behave like testWith("database") + + "SlickDatabase (mysql)" should behave like testWith("database-test-mysql") + + def testWith(configPath: String): Unit = { + import ServicesStore.EnhancedSqlDatabase + + lazy val databaseConfig = ConfigFactory.load.getConfig(configPath) + lazy val dataAccess = new SlickDatabase(databaseConfig).initialized + + val callCachingEntryA = CallCachingEntry( + WorkflowId.randomId().toString, + "AwesomeWorkflow.GoodJob", + 1, + None, + None, + allowResultReuse = false + ) + + val callCachingHashEntriesA = Seq( + CallCachingHashEntry( + hashKey = "input: String s1", + hashValue = "HASH_S1" + ), + CallCachingHashEntry( + hashKey = "input: String s2", + hashValue = "HASH_S2" + ), + CallCachingHashEntry( + hashKey = "input: String s4", + hashValue = "HASH_S4" + ) + ) + + val aggregation = Option(CallCachingAggregationEntry("BASE_AGGREGATION", Option("FILE_AGGREGATION"))) + + it should "honor allowResultReuse" taggedAs DbmsTest in { + (for { + _ <- dataAccess.addCallCaching(Seq( + CallCachingJoin( + callCachingEntryA, + callCachingHashEntriesA, + aggregation, + Seq.empty, Seq.empty + ) + ), + 100 + ) + hasBaseAggregation <- dataAccess.hasMatchingCallCachingEntriesForBaseAggregation("BASE_AGGREGATION") + _ = hasBaseAggregation shouldBe false + hasHashPairMatch <- dataAccess.hasMatchingCallCachingEntriesForHashKeyValues( + NonEmptyList.of("input: String s1" -> "HASH_S1") + ) + _ = hasHashPairMatch shouldBe false + hit <- dataAccess.findCacheHitForAggregation("BASE_AGGREGATION", Option("FILE_AGGREGATION"), 1) + _ = hit shouldBe empty + } yield ()).futureValue + } + } +} From ef012a655e2f58f2746caa3ffd3119bcb2d3f79d Mon Sep 17 00:00:00 2001 From: Miguel Covarrubias Date: Mon, 5 Jun 2017 15:16:38 -0400 Subject: [PATCH 08/74] First pass of workflow type and version. This takes parameters on REST endpoints, roundtrips to the workflow store, and publishes to metadata. Currently hardcodes type and version for the command line. --- .travis.yml | 2 +- .../scala/cromwell/core/WorkflowMetadataKeys.scala | 3 + .../core/WorkflowSourceFilesCollection.scala | 31 ++-- core/src/main/scala/cromwell/core/package.scala | 2 + core/src/test/scala/cromwell/util/SampleWdl.scala | 16 +- .../main/scala/cromwell/api/CromwellClient.scala | 12 +- .../cromwell/api/model/WorkflowSubmission.scala | 18 ++- .../migration/src/main/resources/changelog.xml | 1 + .../changesets/workflow_store_type_and_version.xml | 23 +++ .../slick/tables/WorkflowStoreEntryComponent.scala | 6 +- .../database/sql/tables/WorkflowStoreEntry.scala | 2 + .../MaterializeWorkflowDescriptorActor.scala | 9 +- .../workflow/workflowstore/SqlWorkflowStore.scala | 15 +- .../workflowstore/WorkflowStoreSubmitActor.scala | 10 +- .../cromwell/webservice/CromwellApiHandler.scala | 10 +- .../cromwell/webservice/CromwellApiService.scala | 58 +++++-- .../cromwell/webservice/WorkflowJsonSupport.scala | 4 +- .../test/scala/cromwell/CromwellTestKitSpec.scala | 8 +- .../test/scala/cromwell/RestartWorkflowSpec.scala | 21 +-- .../scala/cromwell/SimpleWorkflowActorSpec.scala | 15 +- .../cromwell/engine/WorkflowStoreActorSpec.scala | 18 ++- .../MaterializeWorkflowDescriptorActorSpec.scala | 168 +++++++++++++++------ .../subworkflowstore/SubWorkflowStoreSpec.scala | 8 +- .../cromwell/services/ServicesStoreSpec.scala | 56 +++++-- src/main/scala/cromwell/CromwellCommandLine.scala | 21 ++- 25 files changed, 408 insertions(+), 129 deletions(-) create mode 100644 database/migration/src/main/resources/changesets/workflow_store_type_and_version.xml diff --git a/.travis.yml b/.travis.yml index 1be54097a..1f20afb56 100644 --- a/.travis.yml +++ b/.travis.yml @@ -19,7 +19,7 @@ before_install: - openssl aes-256-cbc -K "$encrypted_5ebd3ff04788_key" -iv "$encrypted_5ebd3ff04788_iv" -in src/bin/travis/resources/jesConf.tar.enc -out jesConf.tar -d || true env: global: - - CENTAUR_BRANCH=develop + - CENTAUR_BRANCH=mlc_workflow_type matrix: # Setting this variable twice will cause the 'script' section to run twice with the respective env var invoked - BUILD_TYPE=sbt diff --git a/core/src/main/scala/cromwell/core/WorkflowMetadataKeys.scala b/core/src/main/scala/cromwell/core/WorkflowMetadataKeys.scala index 1a7843e6d..b98699053 100644 --- a/core/src/main/scala/cromwell/core/WorkflowMetadataKeys.scala +++ b/core/src/main/scala/cromwell/core/WorkflowMetadataKeys.scala @@ -20,5 +20,8 @@ object WorkflowMetadataKeys { val SubmissionSection_Inputs = "inputs" val SubmissionSection_Options = "options" val SubmissionSection_Imports = "imports" + val SubmissionSection_WorkflowType = "workflowType" + val SubmissionSection_WorkflowTypeVersion = "workflowTypeVersion" + val Labels = "labels" } diff --git a/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala b/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala index 55fa68b47..1b9b2743e 100644 --- a/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala +++ b/core/src/main/scala/cromwell/core/WorkflowSourceFilesCollection.scala @@ -11,46 +11,45 @@ sealed trait WorkflowSourceFilesCollection { def inputsJson: WdlJson def workflowOptionsJson: WorkflowOptionsJson def labelsJson: WdlJson - + def workflowType: Option[WorkflowType] + def workflowTypeVersion: Option[WorkflowTypeVersion] def importsZipFileOption: Option[Array[Byte]] = this match { case _: WorkflowSourceFilesWithoutImports => None - case WorkflowSourceFilesWithDependenciesZip(_, _, _, _, importsZip) => Option(importsZip) // i.e. Some(importsZip) if our wiring is correct + case w: WorkflowSourceFilesWithDependenciesZip => Option(w.importsZip) // i.e. Some(importsZip) if our wiring is correct } def copyOptions(workflowOptions: WorkflowOptionsJson) = this match { - case w: WorkflowSourceFilesWithoutImports => WorkflowSourceFilesWithoutImports( - wdlSource = w.wdlSource, - inputsJson = w.inputsJson, - workflowOptionsJson = workflowOptions, - labelsJson = w.labelsJson) - - case w: WorkflowSourceFilesWithDependenciesZip => WorkflowSourceFilesWithDependenciesZip( - wdlSource = w.wdlSource, - inputsJson = w.inputsJson, - workflowOptionsJson = workflowOptions, - labelsJson = w.labelsJson, - importsZip = w.importsZip) + case w: WorkflowSourceFilesWithoutImports => w.copy(workflowOptionsJson = workflowOptions) + case w: WorkflowSourceFilesWithDependenciesZip => w.copy(workflowOptionsJson = workflowOptions) } } object WorkflowSourceFilesCollection { def apply(wdlSource: WdlSource, + workflowType: Option[WorkflowType], + workflowTypeVersion: Option[WorkflowTypeVersion], inputsJson: WdlJson, workflowOptionsJson: WorkflowOptionsJson, labelsJson: WdlJson, importsFile: Option[Array[Byte]]): WorkflowSourceFilesCollection = importsFile match { - case Some(imports) => WorkflowSourceFilesWithDependenciesZip(wdlSource, inputsJson, workflowOptionsJson, labelsJson, imports) - case None => WorkflowSourceFilesWithoutImports(wdlSource, inputsJson, workflowOptionsJson, labelsJson) + case Some(imports) => + WorkflowSourceFilesWithDependenciesZip(wdlSource, workflowType, workflowTypeVersion, inputsJson, workflowOptionsJson, labelsJson, imports) + case None => + WorkflowSourceFilesWithoutImports(wdlSource, workflowType, workflowTypeVersion, inputsJson, workflowOptionsJson, labelsJson) } } final case class WorkflowSourceFilesWithoutImports(wdlSource: WdlSource, + workflowType: Option[WorkflowType], + workflowTypeVersion: Option[WorkflowTypeVersion], inputsJson: WdlJson, workflowOptionsJson: WorkflowOptionsJson, labelsJson: WdlJson) extends WorkflowSourceFilesCollection final case class WorkflowSourceFilesWithDependenciesZip(wdlSource: WdlSource, + workflowType: Option[WorkflowType], + workflowTypeVersion: Option[WorkflowTypeVersion], inputsJson: WdlJson, workflowOptionsJson: WorkflowOptionsJson, labelsJson: WdlJson, diff --git a/core/src/main/scala/cromwell/core/package.scala b/core/src/main/scala/cromwell/core/package.scala index 12f2e0e44..625fab0d5 100644 --- a/core/src/main/scala/cromwell/core/package.scala +++ b/core/src/main/scala/cromwell/core/package.scala @@ -12,6 +12,8 @@ package object core { type FullyQualifiedName = String type WorkflowOutputs = Map[FullyQualifiedName, JobOutput] type WorkflowOptionsJson = String + type WorkflowType = String + type WorkflowTypeVersion = String type CallOutputs = Map[LocallyQualifiedName, JobOutput] type HostInputs = Map[String, WdlValue] type EvaluatedRuntimeAttributes = Map[String, WdlValue] diff --git a/core/src/test/scala/cromwell/util/SampleWdl.scala b/core/src/test/scala/cromwell/util/SampleWdl.scala index 5a3940b81..7d20ecdf5 100644 --- a/core/src/test/scala/cromwell/util/SampleWdl.scala +++ b/core/src/test/scala/cromwell/util/SampleWdl.scala @@ -14,8 +14,20 @@ import scala.language.postfixOps trait SampleWdl extends TestFileUtil { def wdlSource(runtime: String = ""): WdlSource - def asWorkflowSources(runtime: String = "", workflowOptions: String = "{}", labels: String = "{}") = - WorkflowSourceFilesWithoutImports(wdlSource = wdlSource(runtime), inputsJson = wdlJson, workflowOptionsJson = workflowOptions, labelsJson = labels) + def asWorkflowSources(runtime: String = "", + workflowOptions: String = "{}", + labels: String = "{}", + workflowType: Option[String] = Option("WDL"), + workflowTypeVersion: Option[String] = None) = { + WorkflowSourceFilesWithoutImports( + wdlSource = wdlSource(runtime), + inputsJson = wdlJson, + workflowOptionsJson = workflowOptions, + labelsJson = labels, + workflowType = workflowType, + workflowTypeVersion = workflowTypeVersion) + } + val rawInputs: WorkflowRawInputs def name = getClass.getSimpleName.stripSuffix("$") diff --git a/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala b/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala index c2a0c536b..c96cbe6b9 100644 --- a/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala +++ b/cromwellApiClient/src/main/scala/cromwell/api/CromwellClient.scala @@ -38,6 +38,8 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto val sourceBodyParts = Map( "wdlSource" -> Option(workflowSubmission.wdl), + "workflowType" -> workflowSubmission.workflowType, + "workflowTypeVersion" -> workflowSubmission.workflowTypeVersion, "workflowInputs" -> workflowSubmission.inputsJson, "workflowOptions" -> insertSecrets(workflowSubmission.options, workflowSubmission.refreshToken), "customLabels" -> Option(workflowSubmission.customLabels.toJson.toString) @@ -64,7 +66,15 @@ class CromwellClient(val cromwellUrl: URL, val apiVersion: String)(implicit acto val requestEntity = requestEntityForSubmit(workflow) // Make a set of submissions that represent the batch (so we can zip with the results later): - val submissionSet = workflow.inputsBatch.map(inputs => WorkflowSingleSubmission(workflow.wdl, Option(inputs), workflow.options, workflow.customLabels, workflow.zippedImports, workflow.refreshToken)) + val submissionSet = workflow.inputsBatch.map(inputs => WorkflowSingleSubmission( + wdl = workflow.wdl, + workflowType = workflow.workflowType, + workflowTypeVersion = workflow.workflowTypeVersion, + inputsJson = Option(inputs), + options = workflow.options, + customLabels = workflow.customLabels, + zippedImports = workflow.zippedImports, + refreshToken = workflow.refreshToken)) makeRequest[List[CromwellStatus]](HttpRequest(HttpMethods.POST, batchSubmitEndpoint, List.empty[HttpHeader], requestEntity)) map { statuses => val zipped = submissionSet.zip(statuses) diff --git a/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowSubmission.scala b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowSubmission.scala index 5a0b69bc6..f20a7aa15 100644 --- a/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowSubmission.scala +++ b/cromwellApiClient/src/main/scala/cromwell/api/model/WorkflowSubmission.scala @@ -4,6 +4,8 @@ import better.files.File sealed trait WorkflowSubmission { val wdl: String + val workflowType: Option[String] + val workflowTypeVersion: Option[String] val inputsJson: Option[String] val options: Option[String] val customLabels: Option[List[Label]] @@ -12,6 +14,8 @@ sealed trait WorkflowSubmission { } final case class WorkflowSingleSubmission(wdl: String, + workflowType: Option[String], + workflowTypeVersion: Option[String], inputsJson: Option[String], options: Option[String], customLabels: Option[List[Label]], @@ -19,11 +23,13 @@ final case class WorkflowSingleSubmission(wdl: String, refreshToken: Option[String]) extends WorkflowSubmission final case class WorkflowBatchSubmission(wdl: String, - inputsBatch: List[String], - options: Option[String], - customLabels: Option[List[Label]], - zippedImports: Option[File], - refreshToken: Option[String]) extends WorkflowSubmission { + workflowType: Option[String], + workflowTypeVersion: Option[String], + inputsBatch: List[String], + options: Option[String], + customLabels: Option[List[Label]], + zippedImports: Option[File], + refreshToken: Option[String]) extends WorkflowSubmission { - override val inputsJson: Option[String] = Option(inputsBatch.mkString(start="[", sep=",", end="]")) + override val inputsJson: Option[String] = Option(inputsBatch.mkString(start = "[", sep = ",", end = "]")) } diff --git a/database/migration/src/main/resources/changelog.xml b/database/migration/src/main/resources/changelog.xml index fc864a848..03883abb7 100644 --- a/database/migration/src/main/resources/changelog.xml +++ b/database/migration/src/main/resources/changelog.xml @@ -65,6 +65,7 @@ +