diff --git a/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala b/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala index 75a3be386cb..5429a86e017 100644 --- a/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala +++ b/src/main/scala/cromwell/engine/workflow/WorkflowActor.scala @@ -305,7 +305,7 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) } } - private def initializeExecutionStore(startMode: StartMode): Unit = { + private def initializeExecutionStore(startMode: StartMode): Future[Unit] = { val initializationCode = startMode.runInitialization(this) val futureCaches = for { _ <- initializationCode @@ -320,6 +320,9 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) case Failure(t) => self ! AsyncFailure(t) } + + // "Lose" the actual value on purpose, the caller doesn't care/need to know about it + futureCaches map { _ => () } } private def initializeWorkflow: Try[HostInputs] = backend.initializeForWorkflow(workflow) @@ -340,7 +343,7 @@ case class WorkflowActor(workflow: WorkflowDescriptor, backend: Backend) when(WorkflowSubmitted) { case Event(startMode: StartMode, _) => logger.info(s"$startMode message received") - initializeExecutionStore(startMode) + initializeExecutionStore(startMode) pipeTo sender() stay() case Event(CachesCreated(startMode), data) => logger.info(s"ExecutionStoreCreated($startMode) message received") diff --git a/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala b/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala index 24ef523c22d..00c992e20da 100644 --- a/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala +++ b/src/main/scala/cromwell/engine/workflow/WorkflowManagerActor.scala @@ -3,7 +3,7 @@ package cromwell.engine.workflow import akka.actor.FSM.SubscribeTransitionCallBack import akka.actor.{Actor, ActorRef, Props} import akka.event.{Logging, LoggingReceive} -import akka.pattern.pipe +import akka.pattern.{pipe, ask} import com.typesafe.config.ConfigFactory import cromwell.binding import cromwell.binding._ @@ -243,15 +243,14 @@ class WorkflowManagerActor(backend: Backend) extends Actor with CromwellActor { maybeWorkflowId: Option[WorkflowId]): Future[WorkflowId] = { val workflowId: WorkflowId = maybeWorkflowId.getOrElse(WorkflowId.randomId()) log.info(s"$tag submitWorkflow input id = $maybeWorkflowId, effective id = $workflowId") + val isRestart = maybeWorkflowId.isDefined + val futureId = for { descriptor <- Future.fromTry(Try(new WorkflowDescriptor(workflowId, source))) workflowActor = context.actorOf(WorkflowActor.props(descriptor, backend), s"WorkflowActor-$workflowId") _ <- Future.fromTry(workflowStore.insert(workflowId, workflowActor)) - } yield { - val isRestart = maybeWorkflowId.isDefined - workflowActor ! (if (isRestart) Restart else Start) - workflowId - } + _ <- workflowActor ? (if (isRestart) Restart else Start) + } yield workflowId futureId onFailure { case e =>