Skip to content

Commit

Permalink
Add retry to workflow instance state saving
Browse files Browse the repository at this point in the history
  • Loading branch information
eputtone committed Aug 24, 2017
1 parent 8eb3c86 commit a953f49
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecutionFailed;
import static java.lang.Thread.currentThread;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace;
import static org.joda.time.DateTime.now;
import static org.joda.time.DateTimeUtils.currentTimeMillis;
Expand Down Expand Up @@ -68,6 +69,8 @@ class WorkflowStateProcessor implements Runnable {
final String illegalStateChangeAction;
private final int unknownWorkflowTypeRetryDelay;
private final int unknownWorkflowStateRetryDelay;
private final int stateSaveRetryDelay;
private boolean stateSaveRetryEnabled = true;
private final Map<Integer, WorkflowStateProcessor> processingInstances;
private long startTimeSeconds;
private Thread thread;
Expand All @@ -87,6 +90,7 @@ class WorkflowStateProcessor implements Runnable {
illegalStateChangeAction = env.getRequiredProperty("nflow.illegal.state.change.action");
unknownWorkflowTypeRetryDelay = env.getRequiredProperty("nflow.unknown.workflow.type.retry.delay.minutes", Integer.class);
unknownWorkflowStateRetryDelay = env.getRequiredProperty("nflow.unknown.workflow.state.retry.delay.minutes", Integer.class);
stateSaveRetryDelay = env.getRequiredProperty("nflow.executor.stateSaveRetryDelay.seconds", Integer.class);
}

@Override
Expand Down Expand Up @@ -205,6 +209,29 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution,
.setStateText(getStateText(instance, execution)) //
.setState(execution.getNextState()) //
.setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0);
do {
try {
return persistWorkflowInstanceState(execution, instance, actionBuilder, builder);
} catch (Exception ex) {
logger.error("Failed to save workflow instance new state, retrying after {} seconds", stateSaveRetryDelay, ex);
try {
Thread.sleep(SECONDS.toMillis(stateSaveRetryDelay));
} catch (@SuppressWarnings("unused") InterruptedException ok) {
}
}
} while (stateSaveRetryEnabled);
throw new IllegalStateException("Failed to save workflow instance new state");
}

/**
* For unit testing only
*/
void setStateSaveRetryEnabled(boolean stateSaveRetryEnabled) {
this.stateSaveRetryEnabled = stateSaveRetryEnabled;
}

private WorkflowInstance persistWorkflowInstanceState(StateExecutionImpl execution, WorkflowInstance instance, WorkflowInstanceAction.Builder actionBuilder,
WorkflowInstance.Builder builder) {
if (execution.isStateProcessInvoked()) {
actionBuilder.setExecutionEnd(now()).setType(getActionType(execution)).setStateText(execution.getNextStateReason());
if (execution.isFailed()) {
Expand Down
1 change: 1 addition & 0 deletions nflow-engine/src/main/resources/nflow-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ nflow.executor.group=nflow
nflow.executor.timeout.seconds=900
nflow.executor.keepalive.seconds=60
nflow.executor.stuckThreadThreshold.seconds=60
nflow.executor.stateSaveRetryDelay.seconds=60

nflow.dispatcher.sleep.ms=1000
nflow.dispatcher.await.termination.seconds=60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public void setup() {
env.setProperty("nflow.unknown.workflow.type.retry.delay.minutes", "60");
env.setProperty("nflow.unknown.workflow.state.retry.delay.minutes", "60");
env.setProperty("nflow.executor.stuckThreadThreshold.seconds", "60");
env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "60");
when(executorDao.isTransactionSupportEnabled()).thenReturn(true);
executor = new WorkflowInstanceExecutor(3, 2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-"));
dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, workflowDefinitions, executorDao, env);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public void setup() {
env.setProperty("nflow.unknown.workflow.type.retry.delay.minutes", "60");
env.setProperty("nflow.unknown.workflow.state.retry.delay.minutes", "60");
env.setProperty("nflow.executor.stuckThreadThreshold.seconds", Integer.toString(STUCK_THREAD_THRESHOLD));
env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "60");
factory = new WorkflowStateProcessorFactory(workflowDefinitions, workflowInstances, objectMapper, workflowInstanceDao,
workflowInstancePreProcessor, env);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecution;
import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecutionFailed;
import static java.util.Arrays.asList;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
Expand All @@ -26,6 +27,7 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
Expand All @@ -43,6 +45,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;

import org.hamcrest.Description;
import org.hamcrest.Matcher;
Expand Down Expand Up @@ -160,6 +163,8 @@ public void setup() {
env.setProperty("nflow.illegal.state.change.action", "fail");
env.setProperty("nflow.unknown.workflow.type.retry.delay.minutes", "60");
env.setProperty("nflow.unknown.workflow.state.retry.delay.minutes", "60");
env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "1");

executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao,
workflowInstancePreProcessor, env, processingInstances, listener1, listener2);
setCurrentMillisFixed(currentTimeMillis());
Expand Down Expand Up @@ -832,6 +837,21 @@ public void handleRetryAfterSetsActivationWhenMaxRetriesIsNotExceeded() {
verify(executionMock).setNextActivation(tomorrow);
}

@Test
public void saveStateRetryAfterFailedPersistence() throws InterruptedException {
WorkflowInstance instance = executingInstanceBuilder().setType("execute-test").setState("start").build();
when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance);
doThrow(new RuntimeException("some failure")).when(workflowInstanceDao).updateWorkflowInstanceAfterExecution(any(), any(), any(), any(), anyBoolean());

ExecutorService executorService = newSingleThreadExecutor();
newSingleThreadExecutor().submit(executor);
Thread.sleep(1500);
executor.setStateSaveRetryEnabled(false);
executorService.shutdownNow();

verify(workflowInstanceDao, atLeast(2)).updateWorkflowInstanceAfterExecution(any(), any(), any(), any(), anyBoolean());
}

public static class Pojo {
public String field;
public boolean test;
Expand Down
1 change: 1 addition & 0 deletions nflow-engine/src/test/resources/junit.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
nflow.executor.group=junit
nflow.executor.timeout.seconds=900
nflow.executor.keepalive.seconds=60
nflow.executor.stateSaveRetryDelay.seconds=60

nflow.workflow.instance.query.max.results=10000
nflow.workflow.instance.query.max.results.default=100
Expand Down

0 comments on commit a953f49

Please sign in to comment.