Skip to content

Commit

Permalink
simplify step interface by consolidating step and heartbeat timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
John Roesler committed Dec 30, 2016
1 parent f506949 commit a129373
Show file tree
Hide file tree
Showing 22 changed files with 104 additions and 107 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Your AWS credentials must have access to at least the following [SWF actions](ht
- StartWorkflowExecution
- SignalWorkflowExecution
- TerminateWorkflowExecution
- RegisterActivityType
- RegisterActivityTypef
- RegisterWorkflowType
- RegisterDomain
- RequestCancelWorkflowExecution
Expand Down
3 changes: 2 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ lazy val example = (project in file("sswf-java-example"))
.settings(Commons.settings: _*)
.settings(
name := "sswf-example",
mainClass in(Compile, run) := Some("example.ExampleWorkflowService")
mainClass in(Compile, run) := Some("example.ExampleWorkflowService"),
fork in run := true
)
.settings(Commons.nopublish: _*)
.dependsOn(guava20)
Expand Down
5 changes: 2 additions & 3 deletions sswf-core/src/it/scala/com/bazaarvoice/sswf/CancelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.bazaarvoice.sswf.model.history.StepsHistory
import com.bazaarvoice.sswf.model.result.{InProgress, StepResult}
import com.bazaarvoice.sswf.model.{DefinedStep, ScheduledStep, StepInput}
import com.bazaarvoice.sswf.service.{StepActionWorker, StepDecisionWorker, WorkflowManagement}
import example.StdOutLogger
import org.scalatest.FlatSpec

import scala.collection.JavaConversions._
Expand All @@ -23,7 +22,7 @@ class CancelTestWorkflowDef(rememberer: Rememberer) extends WorkflowDefinition[S
}

class Rememberer {
var toRemember: String = null
var toRemember: String = _
def remember(s: String): Unit = {
toRemember = s
}
Expand All @@ -39,7 +38,7 @@ class CancelTest extends FlatSpec {
private val domain: String = "sswf-tests"
private val wf: String = "cancel-test"
private val swf: AmazonSimpleWorkflowClient = new AmazonSimpleWorkflowClient()
private val logger: StdOutLogger = new StdOutLogger
private val logger: Logger = new SilentLogger
val manager = new WorkflowManagement[String, TestSteps](domain, wf, "0.0", wf, swf, inputParser = parser, log = logger)
val definition = new CancelTestWorkflowDef(rememberer)
val actor = new StepActionWorker[String, TestSteps](domain, wf, swf, parser, definition, log = logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import com.bazaarvoice.sswf.model.history.StepsHistory
import com.bazaarvoice.sswf.model.result.{StepResult, Success}
import com.bazaarvoice.sswf.model.{DefinedStep, ScheduledStep, StepInput}
import com.bazaarvoice.sswf.service.{StepActionWorker, StepDecisionWorker, WorkflowManagement}
import example.StdOutLogger
import org.joda.time.DateTime
import org.scalatest.FlatSpec

Expand Down Expand Up @@ -39,7 +38,7 @@ class ListOpenExecutionTest extends FlatSpec {
private val domain: String = "sswf-tests"
private val wf: String = "list-open-executions-test"
private val swf: AmazonSimpleWorkflowClient = new AmazonSimpleWorkflowClient()
private val logger: StdOutLogger = new StdOutLogger
private val logger: Logger = new SilentLogger

val manager = new WorkflowManagement[String, ListOpenExecutionTestSteps](domain, wf, "0.0", wf, swf, inputParser = parser, log = logger)
val definition = new DummyWorkflowDefinition()
Expand Down Expand Up @@ -88,7 +87,8 @@ class ListOpenExecutionTest extends FlatSpec {
}
def waitForStepResult(): RespondActivityTaskCompletedRequest = {
val scheduleActivityDecisionTask: DecisionTask = untilNotNull(decider.pollForDecisionsToMake())
val scheduleActivityDecision: RespondDecisionTaskCompletedRequest = decider.makeDecision(scheduleActivityDecisionTask)

decider.makeDecision(scheduleActivityDecisionTask)

val activityTask: ActivityTask = untilNotNull(actor.pollForWork())
actor.doWork(activityTask)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
package com.bazaarvoice.sswf;

public enum ListOpenExecutionTestSteps implements WorkflowStep {
DUMMY_STEP1(1, 1),
DUMMY_STEP2(1, 1);
DUMMY_STEP1(1),
DUMMY_STEP2(1);

private int startToFinishTimeout;
private int startToHeartbeatTimeoutSeconds;
private int timeout;

ListOpenExecutionTestSteps(final int startToFinishTimeout, final int startToHeartbeatTimeoutSeconds) {
this.startToFinishTimeout = startToFinishTimeout;
this.startToHeartbeatTimeoutSeconds = startToHeartbeatTimeoutSeconds;
ListOpenExecutionTestSteps(final int timeout) {
this.timeout = timeout;
}

@Override public int startToFinishTimeoutSeconds() {
return startToFinishTimeout;
@Override public int timeoutSeconds() {
return timeout;
}

@Override public int startToHeartbeatTimeoutSeconds() { return startToHeartbeatTimeoutSeconds; }

@Override public InProgressTimerFunction inProgressTimerSecondsFn() {
@Override public InProgressSleepFunction inProgressSleepSecondsFn() {
return (invocationNum, cumulativeStepDurationSeconds) -> Math.min(invocationNum * 2, 4);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,21 @@


public enum StateTransitionSteps implements WorkflowStep {
FINAL_STEP(1, 1),
TIMEOUT_STEP(1, 1);
FINAL_STEP(1),
TIMEOUT_STEP(1);

private int startToFinishTimeout;
private int startToHeartbeatTimeoutSeconds;
private int timeout;

StateTransitionSteps(final int startToFinishTimeout, final int startToHeartbeatTimeoutSeconds) {
this.startToFinishTimeout = startToFinishTimeout;
this.startToHeartbeatTimeoutSeconds = startToHeartbeatTimeoutSeconds;
StateTransitionSteps(final int timeout) {
this.timeout = timeout;
}

@Override public int startToFinishTimeoutSeconds() {
return startToFinishTimeout;
}

@Override public int startToHeartbeatTimeoutSeconds() { return startToHeartbeatTimeoutSeconds; }

@Override public InProgressTimerFunction inProgressTimerSecondsFn() {
@Override public InProgressSleepFunction inProgressSleepSecondsFn() {
return (invocationNum, cumulativeStepDurationSeconds) -> Math.min(invocationNum * 2, 4);
}

@Override public int timeoutSeconds() {
return timeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.bazaarvoice.sswf.model.history.StepsHistory
import com.bazaarvoice.sswf.model.result.{StepResult, Success, TimedOut}
import com.bazaarvoice.sswf.model.{DefinedStep, ScheduledStep, SleepStep, StepInput}
import com.bazaarvoice.sswf.service.{StepActionWorker, StepDecisionWorker, WorkflowManagement}
import example.StdOutLogger
import org.scalatest.FlatSpec

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -43,7 +42,7 @@ class StepTransitionTest extends FlatSpec {
private val domain: String = "sswf-tests"
private val wf: String = "transition-test"
private val swf: AmazonSimpleWorkflowClient = new AmazonSimpleWorkflowClient()
private val logger: StdOutLogger = new StdOutLogger
private val logger: Logger = new SilentLogger
val manager = new WorkflowManagement[String, StateTransitionSteps](domain, wf, "0.0", wf, swf, inputParser = parser, log = logger)
val definition = new StepTransitionWorkflowDefinition()
val actor = new StepActionWorker[String, StateTransitionSteps](domain, wf, swf, parser, definition, log = logger)
Expand Down Expand Up @@ -86,7 +85,8 @@ class StepTransitionTest extends FlatSpec {

def waitForStepResult(): RespondActivityTaskCompletedRequest = {
val scheduleActivityDecisionTask: DecisionTask = untilNotNull(decider.pollForDecisionsToMake())
val scheduleActivityDecision: RespondDecisionTaskCompletedRequest = decider.makeDecision(scheduleActivityDecisionTask)

decider.makeDecision(scheduleActivityDecisionTask)

val activityTask: ActivityTask = untilNotNull(actor.pollForWork())
actor.doWork(activityTask)
Expand Down
21 changes: 7 additions & 14 deletions sswf-core/src/it/scala/com/bazaarvoice/sswf/TestSteps.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,19 @@
package com.bazaarvoice.sswf;

public enum TestSteps implements WorkflowStep {
INPROGRESS_STEP(120, 120) {
@Override public InProgressTimerFunction inProgressTimerSecondsFn() {
INPROGRESS_STEP(120) {
@Override public InProgressSleepFunction inProgressSleepSecondsFn() {
return (invocationNum, cumulativeStepDurationSeconds) -> 1;
}
};

private int startToFinishTimeout;
private int startToHeartbeatTimeoutSeconds;
private int timeout;

TestSteps(final int startToFinishTimeout, final int startToHeartbeatTimeoutSeconds) {
this.startToFinishTimeout = startToFinishTimeout;
this.startToHeartbeatTimeoutSeconds = startToHeartbeatTimeoutSeconds;
TestSteps(final int timeout) {
this.timeout = timeout;
}

@Override public int startToFinishTimeoutSeconds() {
return startToFinishTimeout;
@Override public int timeoutSeconds() {
return timeout;
}

@Override public int startToHeartbeatTimeoutSeconds() { return startToHeartbeatTimeoutSeconds; }



}
23 changes: 9 additions & 14 deletions sswf-core/src/it/scala/com/bazaarvoice/sswf/WaitTimeSteps.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
package com.bazaarvoice.sswf;

public enum WaitTimeSteps implements WorkflowStep {
DUMMY_STEP(120, 120),
WAIT_STEP(120, 120);
DUMMY_STEP(120),
WAIT_STEP(120);

private int startToFinishTimeout;
private int startToHeartbeatTimeoutSeconds;
private int timeout;

WaitTimeSteps(final int startToFinishTimeout, final int startToHeartbeatTimeoutSeconds) {
this.startToFinishTimeout = startToFinishTimeout;
this.startToHeartbeatTimeoutSeconds = startToHeartbeatTimeoutSeconds;
WaitTimeSteps(final int timeout) {
this.timeout = timeout;
}

@Override public int startToFinishTimeoutSeconds() {
return startToFinishTimeout;
}

@Override public int startToHeartbeatTimeoutSeconds() { return startToHeartbeatTimeoutSeconds; }

@Override public InProgressTimerFunction inProgressTimerSecondsFn() {
@Override public InProgressSleepFunction inProgressSleepSecondsFn() {
return (invocationNum, cumulativeStepDurationSeconds) -> Math.min(invocationNum * 2, 4);
}

@Override public int timeoutSeconds() {
return timeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.bazaarvoice.sswf.model.history.StepsHistory
import com.bazaarvoice.sswf.model.result.{InProgress, StepResult, Success}
import com.bazaarvoice.sswf.model.{DefinedStep, ScheduledStep, StepInput}
import com.bazaarvoice.sswf.service.{StepActionWorker, StepDecisionWorker, WorkflowManagement}
import example.StdOutLogger
import org.joda.time.Duration
import org.scalatest.FlatSpec

Expand Down Expand Up @@ -44,7 +43,7 @@ class WaitTimeTest extends FlatSpec {
private val domain: String = "sswf-tests"
private val wf: String = "wait-test"
private val swf: AmazonSimpleWorkflowClient = new AmazonSimpleWorkflowClient()
private val logger: StdOutLogger = new StdOutLogger
private val logger: Logger = new SilentLogger
val manager = new WorkflowManagement[String, WaitTimeSteps](domain, wf, "0.0", wf, swf, inputParser = parser, log = logger)
val definition = new WaitTimeTestWorkflowDef()
val actor = new StepActionWorker[String, WaitTimeSteps](domain, wf, swf, parser, definition, log = logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package com.bazaarvoice.sswf

trait HeartbeatCallback {
/**
* Report liveness and progress. Response `true` if cancellation is requested.
* @param progressMessage Report any information about your progress.
* @return `true` if cancellation is requested.
*/
* Report liveness and progress.
* Calling this method resets the timeout for the step.
*
* Response `true` if cancellation is requested.
*
* @param progressMessage Report any information about your progress.
* @return `true` if cancellation is requested.
*/
def checkIn(progressMessage: String): Boolean
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ trait WorkflowDefinition[SSWFInput, StepEnum <: (Enum[StepEnum] with WorkflowSte
* @param step The action to take next
* @param input The input to the workflow
* @param stepInput The input to this particular step
* @param heartbeatCallback A function to call to report liveness and progress. Response `true` if cancellation is requested.
* @param heartbeatCallback A function to call to report liveness and progress. Calling this method resets the timeout for the step. Response `true` if cancellation is requested.
* @return The outcome of the execution.
*/
def act(step: StepEnum, input: SSWFInput, stepInput: StepInput, heartbeatCallback: HeartbeatCallback, execution: WorkflowExecution): StepResult
Expand Down
15 changes: 5 additions & 10 deletions sswf-core/src/main/scala/com/bazaarvoice/sswf/WorkflowStep.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,16 @@ trait WorkflowStep {
/** when to consider the Step thread hung and schedule another one.
* This does not cancel the execution.
*/
def startToFinishTimeoutSeconds: Int

/** when to consider the Step thread hung and schedule another one.
* This does not cancel the execution.
*/
def startToHeartbeatTimeoutSeconds: Int
def timeoutSeconds: Int

/** How long to wait before the next attempt when the step returns InProgress. */
def inProgressTimerSecondsFn: InProgressTimerFunction
def inProgressSleepSecondsFn: InProgressSleepFunction
}

trait InProgressTimerFunction {
trait InProgressSleepFunction {
def apply(invocationNum: Int, cumulativeStepDurationSeconds: Int): Int
}

class ConstantInProgressTimerFunction(secondsToWait: Int) extends InProgressTimerFunction {
class ConstantInProgressSleepFunction(secondsToWait: Int) extends InProgressSleepFunction {
override def apply(invocationNum: Int, cumulativeStepDurationSeconds: Int): Int = secondsToWait
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class StepActionWorker[SSWFInput, StepEnum <: (Enum[StepEnum] with WorkflowStep)

val heartbeatCallback = new HeartbeatCallback {
/**
* Report liveness and progress. Response `true` if cancellation is requested.
* Report liveness and progress. Calling this method resets the timeout for the step. Response `true` if cancellation is requested.
*
* @param progressMessage Report any information about your progress.
* @return `true` if cancellation is requested.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ class StepDecisionWorker[SSWFInput, StepEnum <: (Enum[StepEnum] with WorkflowSte
new StartTimerDecisionAttributes()
.withTimerId(UUID.randomUUID().toString)
.withControl(packTimer(retry.step.name, retry.stepInput))
.withStartToFireTimeout(retry.step.inProgressTimerSecondsFn(stepInvocations, cumulativeStepDurationSeconds).toString)
.withStartToFireTimeout(retry.step.inProgressSleepSecondsFn(stepInvocations, cumulativeStepDurationSeconds).toString)
)

private[this] def waitForSignals(durationSeconds: Int, signals: List[String]) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import scala.util.Random
* @param stepScheduleToStartTimeoutSeconds The duration you expect to pass _after_ a task is scheduled, and _before_ an actionWorker picks it up. If there is always a free actionWorker, this is
* just the polling interval for actions to execute. If all the actionWorkers are busy, though, the action may time out waiting to start. This isn't
* harmful, though, since the decisionWorker will simply re-schedule it. Advice: make your actionWorker pool large enough that all scheduled work can
* execute immediately, and set this timeout to the polling interval for action work. Default: 60s
* execute immediately, and set this timeout to the polling interval for action work. Default: 5 minutes
* @param inputParser See InputParser
* @tparam StepEnum The enum containing workflow step definitions
*/
Expand Down Expand Up @@ -332,10 +332,10 @@ class WorkflowManagement[SSWFInput, StepEnum <: (Enum[StepEnum] with WorkflowSte
.withVersion(version)
.withDomain(domain)
.withDefaultTaskList(new TaskList().withName(taskList))
.withDefaultTaskHeartbeatTimeout(activity.startToHeartbeatTimeoutSeconds.toString)
.withDefaultTaskHeartbeatTimeout(activity.timeoutSeconds.toString)
.withDefaultTaskScheduleToStartTimeout(stepScheduleToStartTimeoutSeconds.toString)
.withDefaultTaskScheduleToCloseTimeout((stepScheduleToStartTimeoutSeconds + activity.startToFinishTimeoutSeconds).toString)
.withDefaultTaskStartToCloseTimeout(activity.startToFinishTimeoutSeconds.toString)
.withDefaultTaskScheduleToCloseTimeout((stepScheduleToStartTimeoutSeconds + activity.timeoutSeconds).toString)
.withDefaultTaskStartToCloseTimeout(activity.timeoutSeconds.toString)
)
} catch {
case t: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package com.bazaarvoice.sswf.service
import com.bazaarvoice.sswf.WorkflowStep

object util {
// Observation: the version number doesn't have to be monotonically increasing, it's really just a way to
// fingerprint a certain step configuration so we can be sure we are executing the step we intend to.
// So instead of a real version number, we use the timeout. If that changes, the step is logically different.
private[service] def stepToVersion[StepEnum <: (Enum[StepEnum] with WorkflowStep)](step: StepEnum): String =
s"${step.startToFinishTimeoutSeconds}.${step.startToHeartbeatTimeoutSeconds}"
s"${step.timeoutSeconds}"

}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ class ActionService[SSWFInput, StepEnum <: (Enum[StepEnum] with WorkflowStep) :
}
override def shutDown(): Unit = {
workerPool.shutdown()
while(!workerPool.isShutdown) {
LOG.info("Waiting on worker pool to shut down...")
Thread.sleep(10);
}
super.shutDown()
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.bazaarvoice.sswf.service

import java.util.concurrent.TimeUnit
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import com.amazonaws.AbortedException
import com.amazonaws.services.simpleworkflow.model.{DecisionTask, RespondDecisionTaskCompletedRequest}
import com.bazaarvoice.sswf.WorkflowStep
import com.google.common.util.concurrent.{AbstractScheduledService, Service}
import com.google.common.util.concurrent.AbstractScheduledService.Scheduler
import com.google.common.util.concurrent.{AbstractScheduledService, MoreExecutors, Service}
import org.slf4j.LoggerFactory

import scala.reflect.ClassTag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public void start() {
},10,10, TimeUnit.SECONDS);
}

public void stop() {
executorService.shutdownNow();
}

void addSignal(String signal) {
final boolean offer = signalsToSend.offer(signal);
if (!offer) {
Expand Down
Loading

0 comments on commit a129373

Please sign in to comment.