Skip to content

Commit

Permalink
swap running jobs to aborting in the workflow store if abort-on-termi…
Browse files Browse the repository at this point in the history
…nate is true (#2819)
  • Loading branch information
Horneth committed Nov 6, 2017
1 parent 1c00c83 commit b296fc4
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class SingleWorkflowRunnerActor(source: WorkflowSourceFilesCollection,
private val backoff = SimpleExponentialBackoff(1 second, 1 minute, 1.2)

override lazy val workflowStore = new InMemoryWorkflowStore()
override lazy val jobStoreActor = context.actorOf(EmptyJobStoreActor.props)
override lazy val subWorkflowStoreActor = context.actorOf(EmptySubWorkflowStoreActor.props)
override lazy val jobStoreActor = context.actorOf(EmptyJobStoreActor.props, "JobStoreActor")
override lazy val subWorkflowStoreActor = context.actorOf(EmptySubWorkflowStoreActor.props, "SubWorkflowStoreActor")

startWith(NotStarted, EmptySwraData)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cromwell.engine.workflow.workflowstore

import cats.data.NonEmptyList
import cromwell.core.{WorkflowId, WorkflowSourceFilesCollection}
import cromwell.engine.workflow.workflowstore.WorkflowStoreState.{Aborting, StartableState}
import cromwell.engine.workflow.workflowstore.WorkflowStoreState.{Aborting, Running, StartableState}

import scala.concurrent.{ExecutionContext, Future}

Expand Down Expand Up @@ -50,6 +50,14 @@ class InMemoryWorkflowStore extends WorkflowStore {

override def stats(implicit ec: ExecutionContext): Future[Map[String, Int]] = Future.successful(Map("Submitted" -> workflowStore.size))

override def abortAllRunning()(implicit ec: ExecutionContext): Future[Unit] = {
workflowStore = workflowStore.map({
case (workflow, Running) => workflow -> Aborting
case (workflow, state) => workflow -> state
})
Future.successful(())
}

override def aborting(id: WorkflowId)(implicit ec: ExecutionContext): Future[Boolean] = {
if (workflowStore.exists(_._1.id == id)) {
workflowStore = workflowStore ++ workflowStore.find(_._1.id == id).map({ _._1 -> Aborting }).toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ case class SqlWorkflowStore(sqlDatabase: WorkflowStoreSqlDatabase) extends Workf
id.toString, WorkflowStoreState.Aborting.toString
).map(_ > 0)
}

override def abortAllRunning()(implicit ec: ExecutionContext): Future[Unit] = {
sqlDatabase.updateWorkflowsInState(
List(
WorkflowStoreState.Running.toString -> WorkflowStoreState.Aborting.toString
)
)
}

override def stats(implicit ec: ExecutionContext): Future[Map[String, Int]] = sqlDatabase.stats

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ trait WorkflowStore {
def initialize(implicit ec: ExecutionContext): Future[Unit]

def aborting(id: WorkflowId)(implicit ec: ExecutionContext): Future[Boolean]

def abortAllRunning()(implicit ec: ExecutionContext): Future[Unit]

def stats(implicit ec: ExecutionContext): Future[Map[String, Int]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ import cromwell.core._
import cromwell.util.GracefulShutdownHelper
import cromwell.util.GracefulShutdownHelper.ShutdownCommand

final case class WorkflowStoreActor private(store: WorkflowStore, serviceRegistryActor: ActorRef)
final case class WorkflowStoreActor private(store: WorkflowStore, serviceRegistryActor: ActorRef, abortAllJobsOnTerminate: Boolean)
extends Actor with ActorLogging with GracefulShutdownHelper {
import WorkflowStoreActor._

lazy val workflowStoreSubmitActor: ActorRef = context.actorOf(WorkflowStoreSubmitActor.props(store, serviceRegistryActor), "WorkflowStoreSubmitActor")
lazy val workflowStoreEngineActor: ActorRef = context.actorOf(
WorkflowStoreEngineActor.props(store, serviceRegistryActor), "WorkflowStoreEngineActor")
WorkflowStoreEngineActor.props(store, serviceRegistryActor, abortAllJobsOnTerminate), "WorkflowStoreEngineActor")

override def receive = {
case ShutdownCommand => waitForActorsAndShutdown(NonEmptyList.of(workflowStoreSubmitActor))
case ShutdownCommand => waitForActorsAndShutdown(NonEmptyList.of(workflowStoreSubmitActor, workflowStoreEngineActor))
case cmd: WorkflowStoreActorSubmitCommand => workflowStoreSubmitActor forward cmd
case cmd: WorkflowStoreActorEngineCommand => workflowStoreEngineActor forward cmd
}
Expand All @@ -28,12 +28,13 @@ object WorkflowStoreActor {
final case class AbortWorkflowCommand(id: WorkflowId) extends WorkflowStoreActorEngineCommand
case object InitializerCommand extends WorkflowStoreActorEngineCommand
case object WorkDone extends WorkflowStoreActorEngineCommand
case object AbortAllRunningWorkflowsCommandAndStop extends WorkflowStoreActorEngineCommand

sealed trait WorkflowStoreActorSubmitCommand
final case class SubmitWorkflow(source: WorkflowSourceFilesCollection) extends WorkflowStoreActorSubmitCommand
final case class BatchSubmitWorkflows(sources: NonEmptyList[WorkflowSourceFilesCollection]) extends WorkflowStoreActorSubmitCommand

def props(workflowStoreDatabase: WorkflowStore, serviceRegistryActor: ActorRef) = {
Props(WorkflowStoreActor(workflowStoreDatabase, serviceRegistryActor)).withDispatcher(EngineDispatcher)
def props(workflowStoreDatabase: WorkflowStore, serviceRegistryActor: ActorRef, abortAllJobsOnTerminate: Boolean) = {
Props(WorkflowStoreActor(workflowStoreDatabase, serviceRegistryActor, abortAllJobsOnTerminate)).withDispatcher(EngineDispatcher)
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cromwell.engine.workflow.workflowstore

import akka.actor.{ActorLogging, ActorRef, LoggingFSM, Props}
import akka.actor.{ActorLogging, ActorRef, LoggingFSM, PoisonPill, Props}
import cats.data.NonEmptyList
import cromwell.core.Dispatcher._
import cromwell.core.abort.{WorkflowAbortFailureResponse, WorkflowAbortingResponse}
Expand All @@ -10,12 +10,13 @@ import cromwell.engine.workflow.workflowstore.WorkflowStoreActor._
import cromwell.engine.workflow.workflowstore.WorkflowStoreEngineActor.{WorkflowStoreActorState, _}
import cromwell.engine.workflow.workflowstore.WorkflowStoreState.StartableState
import cromwell.services.instrumentation.CromwellInstrumentationScheduler
import cromwell.util.GracefulShutdownHelper.ShutdownCommand
import org.apache.commons.lang3.exception.ExceptionUtils

import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

final case class WorkflowStoreEngineActor private(store: WorkflowStore, serviceRegistryActor: ActorRef)
final case class WorkflowStoreEngineActor private(store: WorkflowStore, serviceRegistryActor: ActorRef, abortAllJobsOnTerminate: Boolean)
extends LoggingFSM[WorkflowStoreActorState, WorkflowStoreActorData] with ActorLogging with WorkflowInstrumentation with CromwellInstrumentationScheduler {

implicit val ec: ExecutionContext = context.dispatcher
Expand Down Expand Up @@ -63,6 +64,12 @@ final case class WorkflowStoreEngineActor private(store: WorkflowStore, serviceR
}

whenUnhandled {
case Event(ShutdownCommand, _) if abortAllJobsOnTerminate =>
self ! AbortAllRunningWorkflowsCommandAndStop
stay()
case Event(ShutdownCommand, _) =>
context stop self
stay()
case Event(msg, _) =>
log.warning("Unexpected message to WorkflowStoreActor in state {} with data {}: {}", stateName, stateData, msg)
stay
Expand Down Expand Up @@ -100,6 +107,11 @@ final case class WorkflowStoreEngineActor private(store: WorkflowStore, serviceR
// given we don't know much about what went wrong here. `t.getMessage` so the cause propagates to the client.
sndr ! WorkflowAbortFailureResponse(id, new RuntimeException(s"$message: ${t.getMessage}", t))
}
case AbortAllRunningWorkflowsCommandAndStop =>
store.abortAllRunning() map { _ =>
log.info(s"Aborting all running workflows.")
self ! PoisonPill
}
case oops =>
log.error("Unexpected type of start work command: {}", oops.getClass.getSimpleName)
Future.successful(self ! WorkDone)
Expand Down Expand Up @@ -149,8 +161,8 @@ final case class WorkflowStoreEngineActor private(store: WorkflowStore, serviceR
}

object WorkflowStoreEngineActor {
def props(workflowStoreDatabase: WorkflowStore, serviceRegistryActor: ActorRef) = {
Props(WorkflowStoreEngineActor(workflowStoreDatabase, serviceRegistryActor)).withDispatcher(EngineDispatcher)
def props(workflowStoreDatabase: WorkflowStore, serviceRegistryActor: ActorRef, abortAllJobsOnTerminate: Boolean) = {
Props(WorkflowStoreEngineActor(workflowStoreDatabase, serviceRegistryActor, abortAllJobsOnTerminate)).withDispatcher(EngineDispatcher)
}

sealed trait WorkflowStoreEngineActorResponse
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package cromwell.jobstore
import akka.actor.{Actor, Props}
import cromwell.jobstore.JobStoreActor._
import cromwell.core.Dispatcher.EngineDispatcher
import cromwell.util.GracefulShutdownHelper.ShutdownCommand

class EmptyJobStoreActor extends Actor {
override def receive: Receive = {
case w: JobStoreWriterCommand => sender ! JobStoreWriteSuccess(w)
case _: QueryJobCompletion => sender ! JobNotComplete
case ShutdownCommand => context stop self
}
}

Expand Down
16 changes: 13 additions & 3 deletions engine/src/main/scala/cromwell/server/CromwellRootActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import cromwell.jobstore.{JobStore, JobStoreActor, SqlJobStore}
import cromwell.services.{EngineServicesStore, ServiceRegistryActor}
import cromwell.subworkflowstore.{SqlSubWorkflowStore, SubWorkflowStoreActor}
import cromwell.util.GracefulShutdownHelper
import cromwell.util.GracefulShutdownHelper.ShutdownCommand
import net.ceedubs.ficus.Ficus._

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -65,7 +66,7 @@ abstract class CromwellRootActor(gracefulShutdown: Boolean, abortJobsOnTerminate

lazy val workflowStore: WorkflowStore = SqlWorkflowStore(EngineServicesStore.engineDatabaseInterface)
lazy val workflowStoreActor =
context.actorOf(WorkflowStoreActor.props(workflowStore, serviceRegistryActor), "WorkflowStoreActor")
context.actorOf(WorkflowStoreActor.props(workflowStore, serviceRegistryActor, abortJobsOnTerminate), "WorkflowStoreActor")

lazy val jobStore: JobStore = new SqlJobStore(EngineServicesStore.engineDatabaseInterface)
lazy val jobStoreActor = context.actorOf(JobStoreActor.props(jobStore), "JobStoreActor")
Expand Down Expand Up @@ -144,7 +145,16 @@ abstract class CromwellRootActor(gracefulShutdown: Boolean, abortJobsOnTerminate
} else if (abortJobsOnTerminate) {
// If gracefulShutdown is false but abortJobsOnTerminate is true, set up a classic JVM shutdown hook
sys.addShutdownHook {
Try(Await.result(gracefulStop(workflowManagerActor, AbortTimeout, AbortAllWorkflowsCommand), AbortTimeout)) match {
implicit val ec = context.system.dispatcher

val abortFuture: Future[Unit] = for {
// Give 30 seconds to the workflow store to switch all running workflows to aborting and shutdown. Should be more than enough
_ <- gracefulStop(workflowStoreActor, 30.seconds, ShutdownCommand)
// Once all workflows are "Aborting" in the workflow store, ask the WMA to effectively abort all of them
_ <- gracefulStop(workflowManagerActor, AbortTimeout, AbortAllWorkflowsCommand)
} yield ()

Try(Await.result(abortFuture, AbortTimeout)) match {
case Success(_) => logger.info("All workflows aborted")
case Failure(f) => logger.error("Failed to abort workflows", f)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package cromwell.subworkflowstore
import akka.actor.{Actor, ActorLogging, Props}
import cromwell.subworkflowstore.SubWorkflowStoreActor._
import cromwell.core.Dispatcher.EngineDispatcher
import cromwell.util.GracefulShutdownHelper.ShutdownCommand

class EmptySubWorkflowStoreActor extends Actor with ActorLogging {
override def receive: Receive = {
case register: RegisterSubWorkflow => sender() ! SubWorkflowStoreRegisterSuccess(register)
case query: QuerySubWorkflow => sender() ! SubWorkflowNotFound(query)
case _: WorkflowComplete => // No-op!
case ShutdownCommand => context stop self
case unknown => log.error(s"SubWorkflowStoreActor received unknown message: $unknown")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with Matchers with
"The WorkflowStoreActor" should {
"return an ID for a submitted workflow" in {
val store = new InMemoryWorkflowStore
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance))
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance, abortAllJobsOnTerminate = false))
storeActor ! SubmitWorkflow(helloWorldSourceFiles)
expectMsgType[WorkflowSubmittedToStore](10 seconds)
}

"return 3 IDs for a batch submission of 3" in {
val store = new InMemoryWorkflowStore
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance))
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance, abortAllJobsOnTerminate = false))
storeActor ! BatchSubmitWorkflows(NonEmptyList.of(helloWorldSourceFiles, helloWorldSourceFiles, helloWorldSourceFiles))
expectMsgPF(10 seconds) {
case WorkflowsBatchSubmittedToStore(ids) => ids.toList.size shouldBe 3
Expand All @@ -64,7 +64,7 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with Matchers with

"fetch exactly N workflows" in {
val store = new InMemoryWorkflowStore
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance))
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance, abortAllJobsOnTerminate = false))
storeActor ! BatchSubmitWorkflows(NonEmptyList.of(helloWorldSourceFiles, helloWorldSourceFiles, helloCwlWorldSourceFiles))
val insertedIds = expectMsgType[WorkflowsBatchSubmittedToStore](10 seconds).workflowIds.toList

Expand Down Expand Up @@ -107,7 +107,7 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with Matchers with


val store = new InMemoryWorkflowStore
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance))
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance, abortAllJobsOnTerminate = false))
val readMetadataActor = system.actorOf(ReadMetadataActor.props())
storeActor ! BatchSubmitWorkflows(NonEmptyList.of(optionedSourceFiles))
val insertedIds = expectMsgType[WorkflowsBatchSubmittedToStore](10 seconds).workflowIds.toList
Expand Down Expand Up @@ -150,7 +150,7 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with Matchers with

"return only the remaining workflows if N is larger than size" in {
val store = new InMemoryWorkflowStore
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance))
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance, abortAllJobsOnTerminate = false))
storeActor ! BatchSubmitWorkflows(NonEmptyList.of(helloWorldSourceFiles, helloWorldSourceFiles, helloWorldSourceFiles))
val insertedIds = expectMsgType[WorkflowsBatchSubmittedToStore](10 seconds).workflowIds.toList

Expand All @@ -170,7 +170,7 @@ class WorkflowStoreActorSpec extends CromwellTestKitWordSpec with Matchers with

"remain responsive if you ask to remove a workflow it doesn't have" in {
val store = new InMemoryWorkflowStore
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance))
val storeActor = system.actorOf(WorkflowStoreActor.props(store, CromwellTestKitSpec.ServiceRegistryActorInstance, abortAllJobsOnTerminate = false))

storeActor ! FetchRunnableWorkflows(100)
expectMsgPF(10 seconds) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object SingleWorkflowRunnerActorSpec {

abstract class SingleWorkflowRunnerActorSpec extends CromwellTestKitWordSpec with Mockito {
private val workflowStore =
system.actorOf(WorkflowStoreActor.props(new InMemoryWorkflowStore, dummyServiceRegistryActor))
system.actorOf(WorkflowStoreActor.props(new InMemoryWorkflowStore, dummyServiceRegistryActor, abortAllJobsOnTerminate = false))
private val serviceRegistry = TestProbe().ref
private val jobStore = system.actorOf(AlwaysHappyJobStoreActor.props)
private val ioActor = system.actorOf(SimpleIoActor.props)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class SubWorkflowStoreSpec extends CromwellTestKitWordSpec with Matchers with Mo
val subWorkflowStoreService = system.actorOf(SubWorkflowStoreActor.props(subWorkflowStore))

lazy val workflowStore = SqlWorkflowStore(EngineServicesStore.engineDatabaseInterface)
val workflowStoreService = system.actorOf(WorkflowStoreActor.props(workflowStore, TestProbe().ref))
val workflowStoreService = system.actorOf(WorkflowStoreActor.props(workflowStore, TestProbe().ref, abortAllJobsOnTerminate = false))

val parentWorkflowId = WorkflowId.randomId()
val subWorkflowId = WorkflowId.randomId()
Expand Down

0 comments on commit b296fc4

Please sign in to comment.