Skip to content

Commit

Permalink
Abort harder (#2829)
Browse files Browse the repository at this point in the history
* Abort on restart even in early workflow stages
  • Loading branch information
Horneth committed Nov 9, 2017
1 parent 05c8dfd commit 9942d52
Show file tree
Hide file tree
Showing 38 changed files with 443 additions and 253 deletions.
2 changes: 1 addition & 1 deletion centaur/src/main/scala/centaur/test/Test.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ object Operations {
// as soon as it has requested cancellation, not when PAPI says its cancelled
Try(checkPAPIAborted())
// Give some time to the VM to actually die (PAPI says it's cancelled before the VM is actually killed)
eventually(OffsetDateTime.now(), 3.minutes) {
eventually(OffsetDateTime.now(), 5.minutes) {
Try(checkVMTerminated())
}
} else Success(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package cromwell.core.abort
import cromwell.core.WorkflowId

sealed trait AbortResponse
case class WorkflowAbortingResponse(workflowId: WorkflowId) extends AbortResponse
case class WorkflowAbortingResponse(workflowId: WorkflowId, restarted: Boolean) extends AbortResponse
case class WorkflowAbortFailureResponse(workflowId: WorkflowId, failure: Throwable) extends AbortResponse
1 change: 1 addition & 0 deletions database/migration/src/main/resources/changelog.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
<include file="changesets/remove_pre_pbe_tables.xml" relativeToChangelogFile="true" />
<include file="changesets/move_sql_metadata_changelog.xml" relativeToChangelogFile="true" />
<include file="changesets/workflow_store_state_widening.xml" relativeToChangelogFile="true" />
<include file="changesets/workflow_store_restarted_column.xml" relativeToChangelogFile="true" />
</databaseChangeLog>
<!--
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.3.xsd">

<changeSet author="tjeandet" id="workflow-store-restarted-column">
<addColumn
tableName="WORKFLOW_STORE_ENTRY">
<column name="RESTARTED" type="BOOLEAN" valueBoolean = "false">
<constraints nullable="false"/>
</column>
</addColumn>
</changeSet>
<changeSet author="tjeandet" id="update-restartable">
<!--RestartableRunning to Running-->
<update
tableName="WORKFLOW_STORE_ENTRY">
<column name="WORKFLOW_STATE" value="Running" />
<where>WORKFLOW_STATE = 'RestartableRunning'</where>
</update>

<!--RestartableAborting to Aborting-->
<update
tableName="WORKFLOW_STORE_ENTRY">
<column name="WORKFLOW_STATE" value="Aborting" />
<where>WORKFLOW_STATE = 'RestartableAborting'</where>
</update>

<!--Note: there's no need to set the flags to true because that's the first thing Cromwell will do when it starts-->
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,74 @@ import cats.instances.future._
import cats.syntax.functor._
import cromwell.database.sql.WorkflowStoreSqlDatabase
import cromwell.database.sql.tables.WorkflowStoreEntry
import cromwell.database.sql.tables.WorkflowStoreEntry.WorkflowStoreState
import cromwell.database.sql.tables.WorkflowStoreEntry.WorkflowStoreState.WorkflowStoreState

import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps

trait WorkflowStoreSlickDatabase extends WorkflowStoreSqlDatabase {
this: EngineSlickDatabase =>

import dataAccess.driver.api._

override def updateWorkflowsInState(updates: List[(String, String)])
(implicit ec: ExecutionContext): Future[Unit] = {
val updateQueries = updates.map({
case (oldState, newState) => dataAccess.workflowStateForWorkflowState(oldState).update(newState)
})
override def setAllRunningToAborting()(implicit ec: ExecutionContext): Future[Unit] = {
val action = dataAccess
.workflowStateForWorkflowState(WorkflowStoreState.Running)
.update(WorkflowStoreState.Aborting)

val action = DBIO.sequence(updateQueries)
runTransaction(action) void
runTransaction(action).void
}

override def updateWorkflowState(workflowId: String, newWorkflowState: String)
(implicit ec: ExecutionContext): Future[Int] = {
val action = dataAccess.workflowStateForId(workflowId).update(newWorkflowState)
override def markRunningAndAbortingAsRestarted()(implicit ec: ExecutionContext): Future[Unit] = {
val action = dataAccess.restartedFlagForRunningAndAborting.update(true)

runTransaction(action).void
}

/**
* Set the workflow Id to Aborting state.
* @return Some(restarted) if the workflow exists in the store, where restarted is the value of its restarted flag
* None if the workflow does not exist in the store
*/
override def setToAborting(workflowId: String)
(implicit ec: ExecutionContext): Future[Option[Boolean]] = {
val action = for {
restarted <- dataAccess.workflowRestartedForId(workflowId).result.headOption
_ <- dataAccess.workflowStateForId(workflowId).update(WorkflowStoreState.Aborting)
} yield restarted

runTransaction(action)
}

override def addWorkflowStoreEntries(workflowStoreEntries: Iterable[WorkflowStoreEntry])
(implicit ec: ExecutionContext): Future[Unit] = {
val action = dataAccess.workflowStoreEntryIdsAutoInc ++= workflowStoreEntries
runTransaction(action) void
runTransaction(action).void
}

override def queryWorkflowStoreEntries(limit: Int, queryWorkflowState: String, updateWorkflowState: String)
(implicit ec: ExecutionContext): Future[Seq[WorkflowStoreEntry]] = {
override def fetchStartableWorkflows(limit: Int)
(implicit ec: ExecutionContext): Future[Seq[WorkflowStoreEntry]] = {
val action = for {
workflowStoreEntries <- dataAccess.workflowStoreEntriesForWorkflowState((queryWorkflowState, limit.toLong)).result
_ <- DBIO.sequence(workflowStoreEntries map updateWorkflowStateForWorkflowExecutionUuid(updateWorkflowState))
workflowStoreEntries <- dataAccess.fetchStartableWorkflows(limit.toLong).result
_ <- DBIO.sequence(workflowStoreEntries map updateWorkflowStateAndRestartedForWorkflowExecutionUuid)
} yield workflowStoreEntries

runTransaction(action)
}

private def updateWorkflowStateForWorkflowExecutionUuid(updateWorkflowState: String)
(workflowStoreEntry: WorkflowStoreEntry)
private def updateWorkflowStateAndRestartedForWorkflowExecutionUuid(workflowStoreEntry: WorkflowStoreEntry)
(implicit ec: ExecutionContext): DBIO[Unit] = {
val workflowExecutionUuid = workflowStoreEntry.workflowExecutionUuid
val updateState = workflowStoreEntry.workflowState match {
// Submitted workflows become running when fetched
case WorkflowStoreState.Submitted => WorkflowStoreState.Running
// Running or Aborting stay as is
case other => other
}
for {
updateCount <- dataAccess.workflowStateForWorkflowExecutionUuid(workflowExecutionUuid).update(updateWorkflowState)
_ <- assertUpdateCount(s"Update $workflowExecutionUuid to $updateWorkflowState", updateCount, 1)
// When fetched, the restarted flag is set back to false so we don't pick it up next time.
updateCount <- dataAccess.workflowStateAndRestartedForWorkflowExecutionUuid(workflowExecutionUuid).update((updateState, false))
_ <- assertUpdateCount(s"Update $workflowExecutionUuid to $updateState", updateCount, 1)
} yield ()
}

Expand All @@ -59,7 +80,7 @@ trait WorkflowStoreSlickDatabase extends WorkflowStoreSqlDatabase {
runTransaction(action)
}

override def stats(implicit ec: ExecutionContext): Future[Map[String, Int]] = {
override def stats(implicit ec: ExecutionContext): Future[Map[WorkflowStoreState, Int]] = {
val action = dataAccess.workflowStoreStats.result
runTransaction(action) map { _.toMap }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,24 @@ package cromwell.database.slick.tables
import java.sql.{Blob, Clob, Timestamp}

import cromwell.database.sql.tables.WorkflowStoreEntry
import cromwell.database.sql.tables.WorkflowStoreEntry.WorkflowStoreState
import cromwell.database.sql.tables.WorkflowStoreEntry.WorkflowStoreState.WorkflowStoreState

trait WorkflowStoreEntryComponent {

this: DriverComponent =>

import driver.api._

object WorkflowStoreEntries {
implicit val workflowStoreStateMapper = MappedColumnType.base[WorkflowStoreState, String](
e => e.toString,
s => WorkflowStoreState.withName(s)
)
}

import WorkflowStoreEntries._

class WorkflowStoreEntries(tag: Tag) extends Table[WorkflowStoreEntry](tag, "WORKFLOW_STORE_ENTRY") {
def workflowStoreEntryId = column[Int]("WORKFLOW_STORE_ENTRY_ID", O.PrimaryKey, O.AutoInc)

Expand All @@ -27,14 +38,16 @@ trait WorkflowStoreEntryComponent {

def customLabels = column[Clob]("CUSTOM_LABELS")

def workflowState = column[String]("WORKFLOW_STATE", O.Length(20))
def workflowState = column[WorkflowStoreState]("WORKFLOW_STATE", O.Length(20))

def restarted = column[Boolean]("RESTARTED")

def submissionTime = column[Timestamp]("SUBMISSION_TIME")

def importsZip = column[Option[Blob]]("IMPORTS_ZIP")

override def * = (workflowExecutionUuid, workflowDefinition, workflowType, workflowTypeVersion, workflowInputs, workflowOptions, workflowState,
submissionTime, importsZip, customLabels, workflowStoreEntryId.?) <> (WorkflowStoreEntry.tupled, WorkflowStoreEntry.unapply)
restarted, submissionTime, importsZip, customLabels, workflowStoreEntryId.?) <> ((WorkflowStoreEntry.apply _).tupled, WorkflowStoreEntry.unapply)

def ucWorkflowStoreEntryWeu = index("UC_WORKFLOW_STORE_ENTRY_WEU", workflowExecutionUuid, unique = true)

Expand All @@ -56,13 +69,15 @@ trait WorkflowStoreEntryComponent {
)

/**
* Useful for selecting workflow stores with a given state.
* Returns up to "limit" startable workflows, sorted by submission time.
*/
val workflowStoreEntriesForWorkflowState = Compiled(
(workflowState: Rep[String], limit: ConstColumn[Long]) => {
val fetchStartableWorkflows = Compiled(
(limit: ConstColumn[Long]) => {
val query = for {
workflowStoreEntryRow <- workflowStoreEntries
if workflowStoreEntryRow.workflowState === workflowState
if (workflowStoreEntryRow.workflowState === WorkflowStoreState.Aborting && workflowStoreEntryRow.restarted === true) ||
(workflowStoreEntryRow.workflowState === WorkflowStoreState.Running && workflowStoreEntryRow.restarted === true) ||
(workflowStoreEntryRow.workflowState === WorkflowStoreState.Submitted && workflowStoreEntryRow.restarted === false)
} yield workflowStoreEntryRow
query.sortBy(_.submissionTime.asc).take(limit)
}
Expand All @@ -80,23 +95,33 @@ trait WorkflowStoreEntryComponent {
/**
* Useful for updating state for all entries matching a given UUID
*/
val workflowStateForWorkflowExecutionUuid = Compiled(
val workflowStateAndRestartedForWorkflowExecutionUuid = Compiled(
(workflowExecutionUuid: Rep[String]) => for {
workflowStoreEntry <- workflowStoreEntries
if workflowStoreEntry.workflowExecutionUuid === workflowExecutionUuid
} yield workflowStoreEntry.workflowState
} yield (workflowStoreEntry.workflowState, workflowStoreEntry.restarted)
)

/**
* Useful for updating state for all entries matching a given state
*/
val workflowStateForWorkflowState = Compiled(
(workflowState: Rep[String]) => for {
(workflowState: Rep[WorkflowStoreState]) => for {
workflowStoreEntry <- workflowStoreEntries
if workflowStoreEntry.workflowState === workflowState
} yield workflowStoreEntry.workflowState
)

/**
* Useful for updating restarted flags on server restart.
*/
val restartedFlagForRunningAndAborting = Compiled(
for {
workflowStoreEntry <- workflowStoreEntries
if workflowStoreEntry.workflowState === WorkflowStoreState.Running || workflowStoreEntry.workflowState === WorkflowStoreState.Aborting
} yield workflowStoreEntry.restarted
)

/**
* Useful for updating a given workflow to a new state
*/
Expand All @@ -106,4 +131,14 @@ trait WorkflowStoreEntryComponent {
if workflowStoreEntry.workflowExecutionUuid === workflowId
} yield workflowStoreEntry.workflowState
)

/**
* Useful for updating the restarted flag for a given workflow
*/
val workflowRestartedForId = Compiled(
(workflowId: Rep[String]) => for {
workflowStoreEntry <- workflowStoreEntries
if workflowStoreEntry.workflowExecutionUuid === workflowId
} yield workflowStoreEntry.restarted
)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cromwell.database.sql

import cromwell.database.sql.tables.WorkflowStoreEntry
import cromwell.database.sql.tables.WorkflowStoreEntry.WorkflowStoreState.WorkflowStoreState

import scala.concurrent.{ExecutionContext, Future}

Expand All @@ -24,11 +25,25 @@ ____ __ ____ ______ .______ __ ___ _______ __ ______
*/

def updateWorkflowsInState(updates: List[(String, String)])
(implicit ec: ExecutionContext): Future[Unit]

def updateWorkflowState(workflowId: String, newWorkflowState: String)
(implicit ec: ExecutionContext): Future[Int]
/**
* Set all running workflows to aborting state.
*/
def setAllRunningToAborting()
(implicit ec: ExecutionContext): Future[Unit]

/**
* Set restarted flags for all running and aborting workflows to true.
*/
def markRunningAndAbortingAsRestarted()
(implicit ec: ExecutionContext): Future[Unit]

/**
* Set the workflow to aborting state.
* @return Some(restarted) if the workflow exists in the store, where restarted is the value of its restarted flag
* None if the workflow does not exist in the store
*/
def setToAborting(workflowId: String)
(implicit ec: ExecutionContext): Future[Option[Boolean]]

/**
* Adds the requested WorkflowSourceFiles to the store.
Expand All @@ -40,13 +55,13 @@ ____ __ ____ ______ .______ __ ___ _______ __ ______
* Retrieves up to limit workflows which have not already been pulled into the engine and updates their state.
* NOTE: Rows are returned with the query state, NOT the update state.
*/
def queryWorkflowStoreEntries(limit: Int, queryWorkflowState: String, updateWorkflowState: String)
(implicit ec: ExecutionContext): Future[Seq[WorkflowStoreEntry]]
def fetchStartableWorkflows(limit: Int)
(implicit ec: ExecutionContext): Future[Seq[WorkflowStoreEntry]]

/**
* Deletes a workflow from the database, returning the number of rows affected.
*/
def removeWorkflowStoreEntry(workflowExecutionUuid: String)(implicit ec: ExecutionContext): Future[Int]

def stats(implicit ec: ExecutionContext): Future[Map[String, Int]]
def stats(implicit ec: ExecutionContext): Future[Map[WorkflowStoreState, Int]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@ package cromwell.database.sql.tables

import java.sql.{Blob, Clob, Timestamp}

import cromwell.database.sql.tables.WorkflowStoreEntry.WorkflowStoreState.WorkflowStoreState

object WorkflowStoreEntry {
object WorkflowStoreState extends Enumeration {
type WorkflowStoreState = Value
val Submitted = Value("Submitted")
val Running = Value("Running")
val Aborting = Value("Aborting")
}
}

case class WorkflowStoreEntry
(
workflowExecutionUuid: String,
Expand All @@ -10,7 +21,8 @@ case class WorkflowStoreEntry
workflowTypeVersion: Option[String],
workflowInputs: Option[Clob],
workflowOptions: Option[Clob],
workflowState: String,
workflowState: WorkflowStoreState,
restarted: Boolean,
submissionTime: Timestamp,
importsZip: Option[Blob],
customLabels: Clob,
Expand Down
30 changes: 0 additions & 30 deletions docs/Abort.md

This file was deleted.

Binary file added docs/execution/ABdependency.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/execution/CWP_B_fail.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/execution/CWP_B_fail_A_retryable.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/execution/CWP_B_retryable_fail.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 9942d52

Please sign in to comment.