-
Notifications
You must be signed in to change notification settings - Fork 350
/
WorkflowStoreEngineActor.scala
168 lines (147 loc) · 7.42 KB
/
WorkflowStoreEngineActor.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package cromwell.engine.workflow.workflowstore
import akka.actor.{ActorLogging, ActorRef, LoggingFSM, Props}
import cats.data.NonEmptyList
import cromwell.core.Dispatcher._
import cromwell.engine.workflow.WorkflowManagerActor
import cromwell.engine.workflow.WorkflowManagerActor.WorkflowNotFoundException
import cromwell.engine.workflow.workflowstore.WorkflowStoreEngineActor.WorkflowStoreActorState
import cromwell.engine.workflow.workflowstore.WorkflowStoreState.StartableState
import WorkflowStoreEngineActor._
import cromwell.core.WorkflowId
import cromwell.engine.workflow.workflowstore.WorkflowStoreActor._
import org.apache.commons.lang3.exception.ExceptionUtils
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}
final case class WorkflowStoreEngineActor(store: WorkflowStore, serviceRegistryActor: ActorRef)
extends LoggingFSM[WorkflowStoreActorState, WorkflowStoreActorData] with ActorLogging {
implicit val ec: ExecutionContext = context.dispatcher
startWith(Unstarted, WorkflowStoreActorData(None, List.empty))
self ! InitializerCommand
when(Unstarted) {
case Event(InitializerCommand, _) =>
val work = store.initialize map { _ =>
log.debug("Workflow store initialization successful")
}
addWorkCompletionHooks(InitializerCommand, work)
goto(Working) using stateData.withCurrentCommand(InitializerCommand, sender)
case Event(x: WorkflowStoreActorEngineCommand, _) =>
stay using stateData.withPendingCommand(x, sender)
}
when(Idle) {
case Event(cmd: WorkflowStoreActorEngineCommand, _) =>
if (stateData.currentOperation.nonEmpty || stateData.pendingOperations.nonEmpty) {
log.error("Non-empty WorkflowStoreActorData when in Idle state: {}", stateData)
}
startNewWork(cmd, sender, stateData.withCurrentCommand(cmd, sender))
}
when(Working) {
case Event(WorkDone, data) =>
val newData = data.pop
newData.currentOperation match {
case None => goto(Idle) using newData
case Some(WorkflowStoreActorCommandWithSender(cmd, sndr)) => startNewWork(cmd, sndr, newData)
}
case Event(cmd: WorkflowStoreActorEngineCommand, data) => stay using data.withPendingCommand(cmd, sender)
}
whenUnhandled {
case Event(msg, _) =>
log.warning("Unexpected message to WorkflowStoreActor in state {} with data {}: {}", stateName, stateData, msg)
stay
}
onTransition {
case fromState -> toState =>
log.debug("WorkflowStore moving from {} (using {}) to {} (using {})", fromState, stateData, toState, nextStateData)
}
private def startNewWork(command: WorkflowStoreActorEngineCommand, sndr: ActorRef, nextData: WorkflowStoreActorData) = {
val work: Future[Any] = command match {
case cmd @ FetchRunnableWorkflows(n) =>
newWorkflowMessage(n) map { nwm =>
nwm match {
case NewWorkflowsToStart(workflows) => log.info("{} new workflows fetched", workflows.toList.size)
case NoNewWorkflowsToStart => log.debug("No workflows fetched")
case _ => log.error("Unexpected response from newWorkflowMessage({}): {}", n, nwm)
}
sndr ! nwm
}
case cmd @ AbortWorkflow(id, manager) =>
store.remove(id) map { removed =>
if (removed) {
log.debug(s"Workflow $id aborted and removed from the workflow store.")
manager ! WorkflowManagerActor.AbortWorkflowCommand(id, sndr)
} else {
sndr ! WorkflowAbortFailed(id, new WorkflowNotFoundException(s"Couldn't abort $id because no workflow with that ID is in progress"))
}
}
case cmd @ RemoveWorkflow(id) =>
store.remove(id) map { removed =>
if (removed) {
log.debug("Workflow {} removed from store successfully.", id)
} else {
log.warning(s"Attempted to remove ID {} from the WorkflowStore but it didn't exist", id)
}
}
case oops =>
log.error("Unexpected type of start work command: {}", oops.getClass.getSimpleName)
Future.successful(self ! WorkDone)
}
addWorkCompletionHooks(command, work)
goto(Working) using nextData
}
private def addWorkCompletionHooks[A](command: WorkflowStoreActorEngineCommand, work: Future[A]) = {
work.onComplete {
case Success(_) =>
self ! WorkDone
case Failure(t) =>
log.error("Error occurred during {}: {} because {}", command.getClass.getSimpleName, t.toString, ExceptionUtils.getStackTrace(t))
self ! WorkDone
}
}
/**
* Fetches at most n workflows, and builds the correct response message based on if there were any workflows or not
*/
private def newWorkflowMessage(maxWorkflows: Int): Future[WorkflowStoreEngineActorResponse] = {
def fetchRunnableWorkflowsIfNeeded(maxWorkflowsInner: Int, state: StartableState) = {
if (maxWorkflows > 0) {
store.fetchRunnableWorkflows(maxWorkflowsInner, state)
} else {
Future.successful(List.empty[WorkflowToStart])
}
}
val runnableWorkflows = for {
restartableWorkflows <- fetchRunnableWorkflowsIfNeeded(maxWorkflows, WorkflowStoreState.Restartable)
submittedWorkflows <- fetchRunnableWorkflowsIfNeeded(maxWorkflows - restartableWorkflows.size, WorkflowStoreState.Submitted)
} yield restartableWorkflows ++ submittedWorkflows
runnableWorkflows map {
case x :: xs => NewWorkflowsToStart(NonEmptyList.of(x, xs: _*))
case _ => NoNewWorkflowsToStart
} recover {
case e =>
// Log the error but return a successful Future so as not to hang future workflow store polls.
log.error(e, "Error trying to fetch new workflows")
NoNewWorkflowsToStart
}
}
}
object WorkflowStoreEngineActor {
def props(workflowStoreDatabase: WorkflowStore, serviceRegistryActor: ActorRef) = {
Props(WorkflowStoreEngineActor(workflowStoreDatabase, serviceRegistryActor)).withDispatcher(EngineDispatcher)
}
sealed trait WorkflowStoreEngineActorResponse
case object NoNewWorkflowsToStart extends WorkflowStoreEngineActorResponse
final case class NewWorkflowsToStart(workflows: NonEmptyList[WorkflowToStart]) extends WorkflowStoreEngineActorResponse
final case class WorkflowAborted(workflowId: WorkflowId) extends WorkflowStoreEngineActorResponse
final case class WorkflowAbortFailed(workflowId: WorkflowId, reason: Throwable) extends WorkflowStoreEngineActorResponse
final case class WorkflowStoreActorCommandWithSender(command: WorkflowStoreActorEngineCommand, sender: ActorRef)
final case class WorkflowStoreActorData(currentOperation: Option[WorkflowStoreActorCommandWithSender], pendingOperations: List[WorkflowStoreActorCommandWithSender]) {
def withCurrentCommand(command: WorkflowStoreActorEngineCommand, sender: ActorRef) = this.copy(currentOperation = Option(WorkflowStoreActorCommandWithSender(command, sender)))
def withPendingCommand(newCommand: WorkflowStoreActorEngineCommand, sender: ActorRef) = this.copy(pendingOperations = this.pendingOperations :+ WorkflowStoreActorCommandWithSender(newCommand, sender))
def pop = {
if (pendingOperations.isEmpty) { WorkflowStoreActorData(None, List.empty) }
else { WorkflowStoreActorData(Option(pendingOperations.head), pendingOperations.tail) }
}
}
sealed trait WorkflowStoreActorState
case object Unstarted extends WorkflowStoreActorState
case object Working extends WorkflowStoreActorState
case object Idle extends WorkflowStoreActorState
}