Skip to content

Commit

Permalink
make MAX_SUBSEQUENT_STATE_EXECUTIONS configurable in workflow settings
Browse files Browse the repository at this point in the history
  • Loading branch information
Edvard Fonsell committed Nov 11, 2016
1 parent 6914ee0 commit 2bf7829
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,6 @@ class WorkflowStateProcessor implements Runnable {
private static final PeriodicLogger threadStuckLogger = new PeriodicLogger(logger, 60);
private static final String MDC_KEY = "workflowInstanceId";

private static final int MAX_SUBSEQUENT_STATE_EXECUTIONS = 100;

private final int instanceId;
private final WorkflowDefinitionService workflowDefinitions;
private final WorkflowInstanceService workflowInstances;
Expand Down Expand Up @@ -145,7 +143,7 @@ private void runImpl() {
} else {
processAfterListeners(listenerContext);
}
subsequentStateExecutions = busyLoopPrevention(settings, subsequentStateExecutions, execution);
subsequentStateExecutions = busyLoopPrevention(state, settings, subsequentStateExecutions, execution);
instance = saveWorkflowInstanceState(execution, instance, definition, actionBuilder);
}
}
Expand Down Expand Up @@ -176,10 +174,12 @@ private void rescheduleUnknownWorkflowState(WorkflowInstance instance) {
logger.debug("Finished.");
}

private int busyLoopPrevention(WorkflowSettings settings, int subsequentStateExecutions, StateExecutionImpl execution) {
private int busyLoopPrevention(WorkflowState state, WorkflowSettings settings, int subsequentStateExecutions,
StateExecutionImpl execution) {
DateTime nextActivation = execution.getNextActivation();
if (subsequentStateExecutions++ >= MAX_SUBSEQUENT_STATE_EXECUTIONS && nextActivation != null) {
logger.warn("Executed {} times without delay, forcing short transition delay", MAX_SUBSEQUENT_STATE_EXECUTIONS);
int maxSubsequentStateExecutions = settings.getMaxSubsequentStateExecutions(state);
if (subsequentStateExecutions++ >= maxSubsequentStateExecutions && nextActivation != null) {
logger.warn("Executed {} times without delay, forcing short transition delay", maxSubsequentStateExecutions);
DateTime shortTransitionActivation = settings.getShortTransitionActivation();
if (nextActivation.isBefore(shortTransitionActivation)) {
execution.setNextActivation(shortTransitionActivation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import static org.joda.time.DateTime.now;

import java.math.BigInteger;
import java.util.HashMap;
import java.util.Map;

import org.joda.time.DateTime;

Expand Down Expand Up @@ -39,13 +41,23 @@ public class WorkflowSettings extends ModelObject {
* Maximum retry attempts.
*/
public final int maxRetries;
/**
* Maximum number of subsequent state executions before forcing a short transition delay.
*/
public final int maxSubsequentStateExecutions;
/**
* Maximum number of subsequent state executions before forcing a short transition delay, per state.
*/
public final Map<WorkflowState, Integer> maxSubsequentStateExecutionsPerState;

WorkflowSettings(Builder builder) {
this.minErrorTransitionDelay = builder.minErrorTransitionDelay;
this.maxErrorTransitionDelay = builder.maxErrorTransitionDelay;
this.shortTransitionDelay = builder.shortTransitionDelay;
this.immediateTransitionDelay = builder.immediateTransitionDelay;
this.maxRetries = builder.maxRetries;
this.maxSubsequentStateExecutions = builder.maxSubsequentStateExecutions;
this.maxSubsequentStateExecutionsPerState = new HashMap<>(builder.maxSubsequentStateExecutionsPerState);
}

/**
Expand All @@ -58,6 +70,8 @@ public static class Builder {
int shortTransitionDelay = (int) SECONDS.toMillis(30);
int immediateTransitionDelay = 0;
int maxRetries = 17;
int maxSubsequentStateExecutions = 100;
Map<WorkflowState, Integer> maxSubsequentStateExecutionsPerState = new HashMap<>();

/**
* Set the maximum delay on execution retry after an error.
Expand Down Expand Up @@ -119,6 +133,32 @@ public Builder setMaxRetries(int maxRetries) {
return this;
}

/**
* Set maximum number of subsequent state executions before forcing a short transition delay.
*
* @param maxSubsequentStateExecutions
* Maximum number of subsequent state executions.
* @return this.
*/
public Builder setMaxSubsequentStateExecutions(int maxSubsequentStateExecutions) {
this.maxSubsequentStateExecutions = maxSubsequentStateExecutions;
return this;
}

/**
* Set maximum number of subsequent state executions before forcing a short transition delay for given state.
*
* @param state
* The state for which the limit is applied.
* @param maxSubsequentStateExecutions
* Maximum number of subsequent state executions.
* @return this.
*/
public Builder setMaxSubsequentStateExecutions(WorkflowState state, int maxSubsequentStateExecutions) {
this.maxSubsequentStateExecutionsPerState.put(state, maxSubsequentStateExecutions);
return this;
}

/**
* Create workflow settings object.
*
Expand Down Expand Up @@ -169,4 +209,15 @@ protected long calculateBinaryBackoffDelay(int retryCount, long minDelay, long m
public DateTime getShortTransitionActivation() {
return now().plusMillis(shortTransitionDelay);
}

/**
* Return the maximum number of subsequent state executions before forcing a short transition delay.
* @param state The state for which the limit is checked.
*
* @return The maximum number of subsequent state executions.
*/
public int getMaxSubsequentStateExecutions(WorkflowState state) {
return maxSubsequentStateExecutionsPerState.getOrDefault(state, maxSubsequentStateExecutions);
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.nflow.engine.workflow.definition;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -49,4 +50,26 @@ public void errorTransitionDelayIsBetweenMinAndMaxDelay() {
prevDelay = delay;
}
}

@Test
public void getMaxSubsequentStateExecutionsReturns100ByDefault() {
WorkflowSettings s = new WorkflowSettings.Builder().build();
assertThat(s.getMaxSubsequentStateExecutions(TestWorkflow.State.begin), is(equalTo(100)));
}

@Test
public void getMaxSubsequentStateExecutionsReturnsValueDefinedForTheState() {
int executionsDefault = 200;
int executionsForBegin = 300;
WorkflowSettings s = new WorkflowSettings.Builder().setMaxSubsequentStateExecutions(executionsDefault)
.setMaxSubsequentStateExecutions(TestWorkflow.State.begin, executionsForBegin).build();
assertThat(s.getMaxSubsequentStateExecutions(TestWorkflow.State.begin), is(equalTo(executionsForBegin)));
}

@Test
public void getMaxSubsequentStateExecutionsReturnsGivenDefaultValueWhenNotDefinedForState() {
int executionsDefault = 200;
WorkflowSettings s = new WorkflowSettings.Builder().setMaxSubsequentStateExecutions(executionsDefault).build();
assertThat(s.getMaxSubsequentStateExecutions(TestWorkflow.State.begin), is(equalTo(executionsDefault)));
}
}

0 comments on commit 2bf7829

Please sign in to comment.