diff --git a/CHANGELOG.md b/CHANGELOG.md index 145cbdc1a..7b151ae50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,12 @@ ## 7.1.1-SNAPSHOT (future release) **Highlights** +- `nflow-engine` + - Expedited clean shutdown for workflows that run many steps without delays. **Details** - - `nflow-engine` + - When shutdown is requested, stop processing workflows immediately after the current state has been executed. - Dependency updates: - spring 5.2.5 - jackson 2.10.3 diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index e8dc712aa..cac9c354f 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -28,9 +28,10 @@ public class WorkflowDispatcher implements Runnable { private static final Logger logger = getLogger(WorkflowDispatcher.class); private static final PeriodicLogger periodicLogger = new PeriodicLogger(logger, 60); + private volatile boolean started; private volatile boolean shutdownRequested; - private volatile boolean running = false; - private volatile boolean paused = false; + private volatile boolean running; + private volatile boolean paused; private final CountDownLatch shutdownDone = new CountDownLatch(1); private final WorkflowInstanceExecutor executor; @@ -65,8 +66,9 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao @Override public void run() { - logger.info("Starting."); + logger.info("Dispacther started."); try { + started = true; workflowDefinitions.postProcessWorkflowDefinitions(); running = true; while (!shutdownRequested) { @@ -102,33 +104,33 @@ public void run() { running = false; shutdownPool(); executorDao.markShutdown(); - logger.info("Shutdown finished."); shutdownDone.countDown(); } } public void shutdown() { shutdownRequested = true; - if (running) { - logger.info("Shutdown requested."); + if (started && shutdownDone.getCount() > 0) { + logger.info("Shutdown initiated."); + try { + shutdownDone.await(); + logger.info("Shutdown completed."); + } catch (@SuppressWarnings("unused") InterruptedException e) { + logger.warn("Shutdown interrupted."); + } } else { - logger.info("Shutdown requested, but executor not running, exiting."); - return; - } - try { - // TODO use timeout? - shutdownDone.await(); - } catch (@SuppressWarnings("unused") InterruptedException e) { - logger.info("Shutdown interrupted."); + logger.info("Dispatcher was not started or was already shut down."); } } public void pause() { paused = true; + logger.info("Dispatcher paused."); } public void resume() { paused = false; + logger.info("Dispatcher resumed."); } public boolean isPaused() { @@ -155,7 +157,7 @@ private void dispatch(List nextInstanceIds) { } logger.debug("Found {} workflow instances, dispatching executors.", nextInstanceIds.size()); for (Long instanceId : nextInstanceIds) { - executor.execute(stateProcessorFactory.createProcessor(instanceId)); + executor.execute(stateProcessorFactory.createProcessor(instanceId, () -> shutdownRequested)); } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index d37aeea9c..80ae643db 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -7,7 +7,6 @@ import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecution; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecutionFailed; -import static java.lang.String.format; import static java.lang.Thread.currentThread; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; @@ -25,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -68,6 +68,7 @@ class WorkflowStateProcessor implements Runnable { private final WorkflowDefinitionService workflowDefinitions; private final WorkflowInstanceService workflowInstances; private final WorkflowInstancePreProcessor workflowInstancePreProcessor; + private final Supplier shutdownRequested; final ObjectStringMapper objectMapper; private final WorkflowInstanceDao workflowInstanceDao; private final MaintenanceDao maintenanceDao; @@ -78,16 +79,16 @@ class WorkflowStateProcessor implements Runnable { private final int stateProcessingRetryDelay; private final int stateSaveRetryDelay; private final int stateVariableValueTooLongRetryDelay; - private boolean internalRetryEnabled = true; private final Map processingInstances; private long startTimeSeconds; private Thread thread; - WorkflowStateProcessor(long instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions, + WorkflowStateProcessor(long instanceId, Supplier shutdownRequested, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, MaintenanceDao maintenanceDao, WorkflowInstancePreProcessor workflowInstancePreProcessor, Environment env, Map processingInstances, WorkflowExecutorListener... executorListeners) { this.instanceId = instanceId; + this.shutdownRequested = shutdownRequested; this.objectMapper = objectMapper; this.workflowDefinitions = workflowDefinitions; this.workflowInstances = workflowInstances; @@ -111,16 +112,19 @@ public void run() { startTimeSeconds = currentTimeMillis() / 1000; thread = currentThread(); processingInstances.put(instanceId, this); - boolean stateProcessingFinished = false; - do { + while (true) { try { runImpl(); - stateProcessingFinished = true; + break; } catch (Throwable ex) { + if (shutdownRequested.get()) { + logger.error("Failed to process workflow instance and shutdown requested", ex); + break; + } logger.error("Failed to process workflow instance, retrying after {} seconds", stateProcessingRetryDelay, ex); sleepIgnoreInterrupted(stateProcessingRetryDelay); } - } while (!stateProcessingFinished && internalRetryEnabled); + } processingInstances.remove(instanceId); MDC.remove(MDC_KEY); } @@ -136,7 +140,7 @@ private void runImpl() { } WorkflowSettings settings = definition.getSettings(); int subsequentStateExecutions = 0; - while (instance.status == executing) { + while (instance.status == executing && !shutdownRequested.get()) { startTimeSeconds = currentTimeMillis() / 1000; StateExecutionImpl execution = new StateExecutionImpl(instance, objectMapper, workflowInstanceDao, workflowInstancePreProcessor, workflowInstances); @@ -259,23 +263,21 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, .setStateText(getStateText(instance, execution)) // .setState(execution.getNextState()) // .setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0); - do { + while (true) { try { return persistWorkflowInstanceState(execution, instance.stateVariables, actionBuilder, instanceBuilder); } catch (Exception ex) { + if (shutdownRequested.get()) { + logger.error("Failed to save workflow instance {} new state, not retrying due to shutdown request. The state will be rerun on recovery.", + instance.id, ex); + // return the original instance since persisting failed + return instance; + } logger.error("Failed to save workflow instance {} new state, retrying after {} seconds", instance.id, stateSaveRetryDelay, ex); sleepIgnoreInterrupted(stateSaveRetryDelay); } - } while (internalRetryEnabled); - throw new IllegalStateException(format("Failed to save workflow instance %s new state", instance.id)); - } - - /** - * For unit testing only - */ - void setInternalRetryEnabled(boolean internalRetryEnabled) { - this.internalRetryEnabled = internalRetryEnabled; + } } private WorkflowInstance persistWorkflowInstanceState(StateExecutionImpl execution, Map originalStateVars, diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java index 70d6e0f3f..3cb8bc7bb 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java @@ -4,6 +4,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; import javax.inject.Inject; @@ -47,8 +48,8 @@ public WorkflowStateProcessorFactory(WorkflowDefinitionService workflowDefinitio this.env = env; } - public WorkflowStateProcessor createProcessor(long instanceId) { - return new WorkflowStateProcessor(instanceId, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + public WorkflowStateProcessor createProcessor(long instanceId, Supplier shutdownRequested) { + return new WorkflowStateProcessor(instanceId, shutdownRequested, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listeners); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java index 4bf49700f..d8ff1a66b 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -1,13 +1,17 @@ package io.nflow.engine.internal.executor; import static edu.umd.cs.mtc.TestFramework.runOnce; +import static java.lang.Boolean.FALSE; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -53,7 +57,6 @@ @ExtendWith(MockitoExtension.class) public class WorkflowDispatcherTest { - WorkflowDispatcher dispatcher; WorkflowInstanceExecutor executor; MockEnvironment env = new MockEnvironment(); @@ -120,9 +123,9 @@ public void threadDispatcher() { .thenThrow(new RuntimeException("Expected: exception during dispatcher execution")) .thenAnswer(waitForTickAndAnswer(2, ids(2L), this)); WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, noOpRunnable()); - when(executorFactory.createProcessor(1)).thenReturn(fakeWorkflowExecutor); + when(executorFactory.createProcessor(eq(1L), any())).thenReturn(fakeWorkflowExecutor); WorkflowStateProcessor fakeWorkflowExecutor2 = fakeWorkflowExecutor(2, noOpRunnable()); - when(executorFactory.createProcessor(2)).thenReturn(fakeWorkflowExecutor2); + when(executorFactory.createProcessor(eq(2L), any())).thenReturn(fakeWorkflowExecutor2); dispatcher.run(); } @@ -135,8 +138,8 @@ public void threadShutdown() { public void finish() { verify(workflowInstances, times(3)).pollNextWorkflowInstanceIds(anyInt()); InOrder inOrder = inOrder(executorFactory); - inOrder.verify(executorFactory).createProcessor(1); - inOrder.verify(executorFactory).createProcessor(2); + inOrder.verify(executorFactory).createProcessor(eq(1L), any()); + inOrder.verify(executorFactory).createProcessor(eq(2L), any()); } } runOnce(new ExceptionDuringDispatcherExecutionCausesRetry()); @@ -159,7 +162,7 @@ public void threadDispatcher() { @Override public void finish() { verify(workflowInstances).pollNextWorkflowInstanceIds(anyInt()); - verify(executorFactory, never()).createProcessor(anyInt()); + verify(executorFactory, never()).createProcessor(anyLong(), any()); } } runOnce(new ErrorDuringDispatcherExecutionStopsDispatcher()); @@ -184,7 +187,7 @@ public void threadShutdown() { @Override public void finish() { verify(workflowInstances, times(3)).pollNextWorkflowInstanceIds(anyInt()); - verify(executorFactory, never()).createProcessor(anyInt()); + verify(executorFactory, never()).createProcessor(anyLong(), any()); } } runOnce(new EmptyPollResultCausesNoTasksToBeScheduled()); @@ -197,7 +200,7 @@ class ShutdownBlocksUntilPoolShutdown extends MultithreadedTestCase { public void threadDispatcher() { when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenAnswer(waitForTickAndAnswer(2, ids(1L), this)); WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this)); - when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor); + when(executorFactory.createProcessor(anyLong(), any())).thenReturn(fakeWorkflowExecutor); dispatcher.run(); } @@ -238,7 +241,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { } }); WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this)); - when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor); + when(executorFactory.createProcessor(anyLong(), any())).thenReturn(fakeWorkflowExecutor); dispatcher.run(); } @@ -369,7 +372,7 @@ public void run() { } WorkflowStateProcessor fakeWorkflowExecutor(long instanceId, final Runnable fakeCommand) { - return new WorkflowStateProcessor(instanceId, null, null, null, null, null, null, env, new ConcurrentHashMap<>(), + return new WorkflowStateProcessor(instanceId, FALSE::booleanValue, null, null, null, null, null, null, env, new ConcurrentHashMap<>(), (WorkflowExecutorListener) null) { @Override public void run() { diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java index d8cf15168..f4c4ce060 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java @@ -1,5 +1,6 @@ package io.nflow.engine.internal.executor; +import static java.lang.Boolean.FALSE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.joda.time.DateTimeUtils.currentTimeMillis; @@ -61,14 +62,14 @@ public void setup() { @Test public void factoryCreatesExecutorsWithoutListeners() { - WorkflowStateProcessor executor = factory.createProcessor(12); + WorkflowStateProcessor executor = factory.createProcessor(12, FALSE::booleanValue); assertNotNull(executor); } @Test public void factoryCreatesExecutorsWithListeners() { factory.listeners = listeners; - WorkflowStateProcessor executor = factory.createProcessor(122); + WorkflowStateProcessor executor = factory.createProcessor(122, FALSE::booleanValue); assertNotNull(executor); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index b5c24f383..f1e8ad9c9 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -12,11 +12,14 @@ import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.manual; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecution; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecutionFailed; +import static java.lang.Boolean.FALSE; +import static java.lang.Thread.sleep; import static java.time.Duration.ofSeconds; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -29,6 +32,7 @@ import static org.joda.time.DateTimeUtils.setCurrentMillisFixed; import static org.joda.time.DateTimeUtils.setCurrentMillisSystem; import static org.joda.time.Period.hours; +import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -57,6 +61,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -164,6 +169,8 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { WorkflowDefinition nonRetryableWf = new NonRetryableWorkflow(); + LoopingTestWorkflow loopingWf = new LoopingTestWorkflow(); + static WorkflowInstance newChildWorkflow = mock(WorkflowInstance.class); static WorkflowInstance newWorkflow = mock(WorkflowInstance.class); @@ -174,6 +181,8 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { private final Set INCLUDES = EnumSet.of(CURRENT_STATE_VARIABLES); + private final AtomicBoolean shutdownRequest = new AtomicBoolean(); + @BeforeEach public void setup() { processingInstances = new ConcurrentHashMap<>(); @@ -184,7 +193,7 @@ public void setup() { env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "1"); env.setProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes", "60"); env.setProperty("nflow.db.workflowInstanceType.cacheSize", "10000"); - executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + executor = new WorkflowStateProcessor(1, shutdownRequest::get, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listener1, listener2); setCurrentMillisFixed(currentTimeMillis()); lenient().doReturn(executeWf).when(workflowDefinitions).getWorkflowDefinition("execute-test"); @@ -195,6 +204,7 @@ public void setup() { lenient().doReturn(wakeWf).when(workflowDefinitions).getWorkflowDefinition("wake-test"); lenient().doReturn(stateVariableWf).when(workflowDefinitions).getWorkflowDefinition("state-variable"); lenient().doReturn(nonRetryableWf).when(workflowDefinitions).getWorkflowDefinition("non-retryable"); + lenient().doReturn(loopingWf).when(workflowDefinitions).getWorkflowDefinition("looping-test"); filterChain(listener1); filterChain(listener2); lenient().when(executionMock.getRetries()).thenReturn(testWorkflowDef.getSettings().maxRetries); @@ -370,7 +380,7 @@ public void skippingWorkflowWithListenerCausesProcessorToStopProcessingWorkflow( .setStateText("myStateText").build(); when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); WorkflowExecutorListener listener = mock(WorkflowExecutorListener.class); - executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + executor = new WorkflowStateProcessor(1, shutdownRequest::get, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listener); doAnswer((Answer) invocation -> @@ -500,7 +510,7 @@ public void finishingChildWakesParentAutomaticallyWhenParentIsInWaitState() { @Test public void goToErrorStateWhenNextStateIsInvalid() { env.setProperty("nflow.illegal.state.change.action", "ignore"); - executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + executor = new WorkflowStateProcessor(1, shutdownRequest::get, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listener1, listener2); WorkflowInstance instance = executingInstanceBuilder().setType("failing-test").setState("invalidNextState").build(); @@ -769,7 +779,7 @@ public void illegalStateChangeGoesToErrorState() { @Test public void illegalStateChangeGoesToIllegalStateWhenActionIsLog() { env.setProperty("nflow.illegal.state.change.action", "log"); - executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + executor = new WorkflowStateProcessor(1, shutdownRequest::get, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listener1, listener2); WorkflowInstance instance = executingInstanceBuilder().setType("simple-test").setState("illegalStateChange").build(); @@ -788,7 +798,7 @@ public void illegalStateChangeGoesToIllegalStateWhenActionIsLog() { @Test public void illegalStateChangeGoesToIllegalStateWhenActionIsIgnore() { env.setProperty("nflow.illegal.state.change.action", "ignore"); - executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + executor = new WorkflowStateProcessor(1, shutdownRequest::get, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listener1, listener2); WorkflowInstance instance = executingInstanceBuilder().setType("simple-test").setState("illegalStateChange").build(); @@ -810,10 +820,10 @@ public void stateProcessingRetryAfterFailedGetWorkflow() throws InterruptedExcep doThrow(new RuntimeException("some failure")).when(workflowInstances).getWorkflowInstance(instance.id, INCLUDES, null); ExecutorService executorService = newSingleThreadExecutor(); - newSingleThreadExecutor().submit(executor); - Thread.sleep(1500); - executor.setInternalRetryEnabled(false); - executorService.shutdownNow(); + executorService.submit(executor); + sleep(1500); + executorService.shutdown(); + shutdownRequest.set(true); verify(workflowInstances, atLeast(2)).getWorkflowInstance(instance.id, INCLUDES, null); } @@ -826,14 +836,31 @@ public void saveStateRetryAfterFailedPersistence() throws InterruptedException { any(), any(), anyBoolean()); ExecutorService executorService = newSingleThreadExecutor(); - newSingleThreadExecutor().submit(executor); - Thread.sleep(1500); - executor.setInternalRetryEnabled(false); - executorService.shutdownNow(); + executorService.submit(executor); + sleep(1500); + executorService.shutdown(); + shutdownRequest.set(true); verify(workflowInstanceDao, atLeast(2)).updateWorkflowInstanceAfterExecution(any(), any(), any(), any(), anyBoolean()); } + @Test + public void stateProcessingSeriesStopsOnShutdown() throws InterruptedException { + WorkflowInstance instance = executingInstanceBuilder().setType("looping-test").setState("start").build(); + when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + + ExecutorService executorService = newSingleThreadExecutor(); + executorService.submit(executor); + sleep(500); + + executorService.shutdown(); + shutdownRequest.set(true); + boolean wasTerminated = executorService.awaitTermination(5, SECONDS); + executorService.shutdownNow(); + + assertTrue(wasTerminated); + } + @Test public void deleteWorkflowInstanceHistoryNotExecutedWithDefaultSettings() { WorkflowInstance instance = executingInstanceBuilder().setType("simple-test").setState("start").build(); @@ -1021,7 +1048,7 @@ public static class ForceCleaningTestWorkflow extends WorkflowDefinition false).build()); + new WorkflowSettings.Builder().setHistoryDeletableAfter(hours(2)).setDeleteHistoryCondition(FALSE::booleanValue).build()); permit(State.start, State.done, State.error); } @@ -1078,6 +1105,36 @@ public NextAction start(@SuppressWarnings("unused") StateExecution execution) { } + public static class LoopingTestWorkflow extends WorkflowDefinition { + + protected LoopingTestWorkflow() { + super("looping-test", State.start, State.error); + permit(State.start, State.start); + permit(State.start, State.done); + } + + public enum State implements WorkflowState { + start(WorkflowStateType.start), done(WorkflowStateType.end), error(WorkflowStateType.manual); + + private WorkflowStateType stateType; + + State(WorkflowStateType stateType) { + this.stateType = stateType; + } + + @Override + public WorkflowStateType getType() { + return stateType; + } + } + + public NextAction start(@SuppressWarnings("unused") StateExecution execution) throws InterruptedException { + sleep(100); + return moveToState(State.start, "loop"); + } + + } + public static class SimpleTestWorkflow extends WorkflowDefinition { protected SimpleTestWorkflow() {