From c9c3a25c195b8319eb2285a036a7baf1a2812773 Mon Sep 17 00:00:00 2001 From: Mikko Tiihonen Date: Fri, 17 Apr 2020 15:13:31 +0300 Subject: [PATCH 01/16] Pass information to workflow disptcher about pending shutdown to allow faster clean shutdown --- .../nflow/engine/internal/executor/WorkflowDispatcher.java | 2 +- .../engine/internal/executor/WorkflowStateProcessor.java | 7 +++++-- .../internal/executor/WorkflowStateProcessorFactory.java | 5 +++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index e8dc712aa..064a6a0e4 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -155,7 +155,7 @@ private void dispatch(List nextInstanceIds) { } logger.debug("Found {} workflow instances, dispatching executors.", nextInstanceIds.size()); for (Long instanceId : nextInstanceIds) { - executor.execute(stateProcessorFactory.createProcessor(instanceId)); + executor.execute(stateProcessorFactory.createProcessor(instanceId, () -> shutdownRequested)); } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index d37aeea9c..2bdf2ac7c 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -68,6 +69,7 @@ class WorkflowStateProcessor implements Runnable { private final WorkflowDefinitionService workflowDefinitions; private final WorkflowInstanceService workflowInstances; private final WorkflowInstancePreProcessor workflowInstancePreProcessor; + private final Supplier shutdownRequested; final ObjectStringMapper objectMapper; private final WorkflowInstanceDao workflowInstanceDao; private final MaintenanceDao maintenanceDao; @@ -83,11 +85,12 @@ class WorkflowStateProcessor implements Runnable { private long startTimeSeconds; private Thread thread; - WorkflowStateProcessor(long instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions, + WorkflowStateProcessor(long instanceId, Supplier shutdownRequested, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, MaintenanceDao maintenanceDao, WorkflowInstancePreProcessor workflowInstancePreProcessor, Environment env, Map processingInstances, WorkflowExecutorListener... executorListeners) { this.instanceId = instanceId; + this.shutdownRequested = shutdownRequested; this.objectMapper = objectMapper; this.workflowDefinitions = workflowDefinitions; this.workflowInstances = workflowInstances; @@ -136,7 +139,7 @@ private void runImpl() { } WorkflowSettings settings = definition.getSettings(); int subsequentStateExecutions = 0; - while (instance.status == executing) { + while (instance.status == executing && !shutdownRequested.get()) { startTimeSeconds = currentTimeMillis() / 1000; StateExecutionImpl execution = new StateExecutionImpl(instance, objectMapper, workflowInstanceDao, workflowInstancePreProcessor, workflowInstances); diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java index 70d6e0f3f..3cb8bc7bb 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java @@ -4,6 +4,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; import javax.inject.Inject; @@ -47,8 +48,8 @@ public WorkflowStateProcessorFactory(WorkflowDefinitionService workflowDefinitio this.env = env; } - public WorkflowStateProcessor createProcessor(long instanceId) { - return new WorkflowStateProcessor(instanceId, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + public WorkflowStateProcessor createProcessor(long instanceId, Supplier shutdownRequested) { + return new WorkflowStateProcessor(instanceId, shutdownRequested, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listeners); } From 96721cb1d21ee9921575366ce85dba56e0d8df4d Mon Sep 17 00:00:00 2001 From: Mikko Tiihonen Date: Fri, 17 Apr 2020 19:49:38 +0300 Subject: [PATCH 02/16] Fix tests --- .../executor/WorkflowDispatcherTest.java | 23 +++++++++++-------- .../WorkflowStateProcessorFactoryTest.java | 5 ++-- .../executor/WorkflowStateProcessorTest.java | 13 +++++++---- 3 files changed, 24 insertions(+), 17 deletions(-) diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java index 4bf49700f..d8ff1a66b 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -1,13 +1,17 @@ package io.nflow.engine.internal.executor; import static edu.umd.cs.mtc.TestFramework.runOnce; +import static java.lang.Boolean.FALSE; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -53,7 +57,6 @@ @ExtendWith(MockitoExtension.class) public class WorkflowDispatcherTest { - WorkflowDispatcher dispatcher; WorkflowInstanceExecutor executor; MockEnvironment env = new MockEnvironment(); @@ -120,9 +123,9 @@ public void threadDispatcher() { .thenThrow(new RuntimeException("Expected: exception during dispatcher execution")) .thenAnswer(waitForTickAndAnswer(2, ids(2L), this)); WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, noOpRunnable()); - when(executorFactory.createProcessor(1)).thenReturn(fakeWorkflowExecutor); + when(executorFactory.createProcessor(eq(1L), any())).thenReturn(fakeWorkflowExecutor); WorkflowStateProcessor fakeWorkflowExecutor2 = fakeWorkflowExecutor(2, noOpRunnable()); - when(executorFactory.createProcessor(2)).thenReturn(fakeWorkflowExecutor2); + when(executorFactory.createProcessor(eq(2L), any())).thenReturn(fakeWorkflowExecutor2); dispatcher.run(); } @@ -135,8 +138,8 @@ public void threadShutdown() { public void finish() { verify(workflowInstances, times(3)).pollNextWorkflowInstanceIds(anyInt()); InOrder inOrder = inOrder(executorFactory); - inOrder.verify(executorFactory).createProcessor(1); - inOrder.verify(executorFactory).createProcessor(2); + inOrder.verify(executorFactory).createProcessor(eq(1L), any()); + inOrder.verify(executorFactory).createProcessor(eq(2L), any()); } } runOnce(new ExceptionDuringDispatcherExecutionCausesRetry()); @@ -159,7 +162,7 @@ public void threadDispatcher() { @Override public void finish() { verify(workflowInstances).pollNextWorkflowInstanceIds(anyInt()); - verify(executorFactory, never()).createProcessor(anyInt()); + verify(executorFactory, never()).createProcessor(anyLong(), any()); } } runOnce(new ErrorDuringDispatcherExecutionStopsDispatcher()); @@ -184,7 +187,7 @@ public void threadShutdown() { @Override public void finish() { verify(workflowInstances, times(3)).pollNextWorkflowInstanceIds(anyInt()); - verify(executorFactory, never()).createProcessor(anyInt()); + verify(executorFactory, never()).createProcessor(anyLong(), any()); } } runOnce(new EmptyPollResultCausesNoTasksToBeScheduled()); @@ -197,7 +200,7 @@ class ShutdownBlocksUntilPoolShutdown extends MultithreadedTestCase { public void threadDispatcher() { when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())).thenAnswer(waitForTickAndAnswer(2, ids(1L), this)); WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this)); - when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor); + when(executorFactory.createProcessor(anyLong(), any())).thenReturn(fakeWorkflowExecutor); dispatcher.run(); } @@ -238,7 +241,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable { } }); WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this)); - when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor); + when(executorFactory.createProcessor(anyLong(), any())).thenReturn(fakeWorkflowExecutor); dispatcher.run(); } @@ -369,7 +372,7 @@ public void run() { } WorkflowStateProcessor fakeWorkflowExecutor(long instanceId, final Runnable fakeCommand) { - return new WorkflowStateProcessor(instanceId, null, null, null, null, null, null, env, new ConcurrentHashMap<>(), + return new WorkflowStateProcessor(instanceId, FALSE::booleanValue, null, null, null, null, null, null, env, new ConcurrentHashMap<>(), (WorkflowExecutorListener) null) { @Override public void run() { diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java index d8cf15168..98fdaaa32 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java @@ -1,5 +1,6 @@ package io.nflow.engine.internal.executor; +import static java.lang.Boolean.FALSE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.joda.time.DateTimeUtils.currentTimeMillis; @@ -61,14 +62,14 @@ public void setup() { @Test public void factoryCreatesExecutorsWithoutListeners() { - WorkflowStateProcessor executor = factory.createProcessor(12); + WorkflowStateProcessor executor = factory.createProcessor(12, () -> FALSE); assertNotNull(executor); } @Test public void factoryCreatesExecutorsWithListeners() { factory.listeners = listeners; - WorkflowStateProcessor executor = factory.createProcessor(122); + WorkflowStateProcessor executor = factory.createProcessor(122, () -> FALSE); assertNotNull(executor); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index b5c24f383..f8c2ec12e 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -57,6 +57,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import org.hamcrest.Description; import org.hamcrest.Matcher; @@ -174,6 +175,8 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { private final Set INCLUDES = EnumSet.of(CURRENT_STATE_VARIABLES); + private final AtomicBoolean shutdownRequest = new AtomicBoolean(); + @BeforeEach public void setup() { processingInstances = new ConcurrentHashMap<>(); @@ -184,7 +187,7 @@ public void setup() { env.setProperty("nflow.executor.stateSaveRetryDelay.seconds", "1"); env.setProperty("nflow.executor.stateVariableValueTooLongRetryDelay.minutes", "60"); env.setProperty("nflow.db.workflowInstanceType.cacheSize", "10000"); - executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + executor = new WorkflowStateProcessor(1, shutdownRequest::get, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listener1, listener2); setCurrentMillisFixed(currentTimeMillis()); lenient().doReturn(executeWf).when(workflowDefinitions).getWorkflowDefinition("execute-test"); @@ -370,7 +373,7 @@ public void skippingWorkflowWithListenerCausesProcessorToStopProcessingWorkflow( .setStateText("myStateText").build(); when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); WorkflowExecutorListener listener = mock(WorkflowExecutorListener.class); - executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + executor = new WorkflowStateProcessor(1, shutdownRequest::get, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listener); doAnswer((Answer) invocation -> @@ -500,7 +503,7 @@ public void finishingChildWakesParentAutomaticallyWhenParentIsInWaitState() { @Test public void goToErrorStateWhenNextStateIsInvalid() { env.setProperty("nflow.illegal.state.change.action", "ignore"); - executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + executor = new WorkflowStateProcessor(1, shutdownRequest::get, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listener1, listener2); WorkflowInstance instance = executingInstanceBuilder().setType("failing-test").setState("invalidNextState").build(); @@ -769,7 +772,7 @@ public void illegalStateChangeGoesToErrorState() { @Test public void illegalStateChangeGoesToIllegalStateWhenActionIsLog() { env.setProperty("nflow.illegal.state.change.action", "log"); - executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + executor = new WorkflowStateProcessor(1, shutdownRequest::get, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listener1, listener2); WorkflowInstance instance = executingInstanceBuilder().setType("simple-test").setState("illegalStateChange").build(); @@ -788,7 +791,7 @@ public void illegalStateChangeGoesToIllegalStateWhenActionIsLog() { @Test public void illegalStateChangeGoesToIllegalStateWhenActionIsIgnore() { env.setProperty("nflow.illegal.state.change.action", "ignore"); - executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, + executor = new WorkflowStateProcessor(1, shutdownRequest::get, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, maintenanceDao, workflowInstancePreProcessor, env, processingInstances, listener1, listener2); WorkflowInstance instance = executingInstanceBuilder().setType("simple-test").setState("illegalStateChange").build(); From 4e550ed3f0fdbbbfb0d33ee35db2026a2712960a Mon Sep 17 00:00:00 2001 From: Mikko Tiihonen Date: Fri, 17 Apr 2020 20:31:13 +0300 Subject: [PATCH 03/16] Add test for expedited shutdown for workflow processor. Remove unneeded internalRetryEnabled variable. Tests seem to pass nicely without it --- .../executor/WorkflowStateProcessor.java | 14 +--- .../executor/WorkflowStateProcessorTest.java | 69 ++++++++++++++++--- 2 files changed, 64 insertions(+), 19 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index 2bdf2ac7c..ba04a9b43 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -80,7 +80,6 @@ class WorkflowStateProcessor implements Runnable { private final int stateProcessingRetryDelay; private final int stateSaveRetryDelay; private final int stateVariableValueTooLongRetryDelay; - private boolean internalRetryEnabled = true; private final Map processingInstances; private long startTimeSeconds; private Thread thread; @@ -123,7 +122,7 @@ public void run() { logger.error("Failed to process workflow instance, retrying after {} seconds", stateProcessingRetryDelay, ex); sleepIgnoreInterrupted(stateProcessingRetryDelay); } - } while (!stateProcessingFinished && internalRetryEnabled); + } while (!stateProcessingFinished); processingInstances.remove(instanceId); MDC.remove(MDC_KEY); } @@ -270,15 +269,8 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, ex); sleepIgnoreInterrupted(stateSaveRetryDelay); } - } while (internalRetryEnabled); - throw new IllegalStateException(format("Failed to save workflow instance %s new state", instance.id)); - } - - /** - * For unit testing only - */ - void setInternalRetryEnabled(boolean internalRetryEnabled) { - this.internalRetryEnabled = internalRetryEnabled; + } while (!shutdownRequested.get()); + throw new IllegalStateException(format("Failed to save workflow instance %s new state and shutdown requested", instance.id)); } private WorkflowInstance persistWorkflowInstanceState(StateExecutionImpl execution, Map originalStateVars, diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index f8c2ec12e..68e49e6c2 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -12,11 +12,13 @@ import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.manual; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecution; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecutionFailed; +import static java.lang.Thread.sleep; import static java.time.Duration.ofSeconds; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static java.util.concurrent.Executors.newSingleThreadExecutor; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; @@ -165,6 +167,8 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { WorkflowDefinition nonRetryableWf = new NonRetryableWorkflow(); + LoopingTestWorkflow loopingWf = new LoopingTestWorkflow(); + static WorkflowInstance newChildWorkflow = mock(WorkflowInstance.class); static WorkflowInstance newWorkflow = mock(WorkflowInstance.class); @@ -198,6 +202,7 @@ public void setup() { lenient().doReturn(wakeWf).when(workflowDefinitions).getWorkflowDefinition("wake-test"); lenient().doReturn(stateVariableWf).when(workflowDefinitions).getWorkflowDefinition("state-variable"); lenient().doReturn(nonRetryableWf).when(workflowDefinitions).getWorkflowDefinition("non-retryable"); + lenient().doReturn(loopingWf).when(workflowDefinitions).getWorkflowDefinition("looping-test"); filterChain(listener1); filterChain(listener2); lenient().when(executionMock.getRetries()).thenReturn(testWorkflowDef.getSettings().maxRetries); @@ -813,10 +818,10 @@ public void stateProcessingRetryAfterFailedGetWorkflow() throws InterruptedExcep doThrow(new RuntimeException("some failure")).when(workflowInstances).getWorkflowInstance(instance.id, INCLUDES, null); ExecutorService executorService = newSingleThreadExecutor(); - newSingleThreadExecutor().submit(executor); - Thread.sleep(1500); - executor.setInternalRetryEnabled(false); - executorService.shutdownNow(); + executorService.submit(executor); + sleep(1500); + executorService.shutdown(); + shutdownRequest.set(true); verify(workflowInstances, atLeast(2)).getWorkflowInstance(instance.id, INCLUDES, null); } @@ -829,14 +834,30 @@ public void saveStateRetryAfterFailedPersistence() throws InterruptedException { any(), any(), anyBoolean()); ExecutorService executorService = newSingleThreadExecutor(); - newSingleThreadExecutor().submit(executor); - Thread.sleep(1500); - executor.setInternalRetryEnabled(false); - executorService.shutdownNow(); + executorService.submit(executor); + sleep(1500); + executorService.shutdown(); + shutdownRequest.set(true); verify(workflowInstanceDao, atLeast(2)).updateWorkflowInstanceAfterExecution(any(), any(), any(), any(), anyBoolean()); } + @Test + public void stateProcessingSeriesStopsOnShutdown() throws InterruptedException { + WorkflowInstance instance = executingInstanceBuilder().setType("looping-test").setState("start").build(); + when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + + ExecutorService executorService = newSingleThreadExecutor(); + executorService.submit(executor); + sleep(500); + + executorService.shutdown(); + shutdownRequest.set(true); + executorService.awaitTermination(10, SECONDS); + + assertThat(loopingWf.counter, Matchers.lessThan(20)); + } + @Test public void deleteWorkflowInstanceHistoryNotExecutedWithDefaultSettings() { WorkflowInstance instance = executingInstanceBuilder().setType("simple-test").setState("start").build(); @@ -1081,6 +1102,38 @@ public NextAction start(@SuppressWarnings("unused") StateExecution execution) { } + public static class LoopingTestWorkflow extends WorkflowDefinition { + + int counter = 0; + + protected LoopingTestWorkflow() { + super("looping-test", State.start, State.error); + permit(State.start, State.start); + permit(State.start, State.done); + } + + public enum State implements WorkflowState { + start(WorkflowStateType.start), done(WorkflowStateType.end), error(WorkflowStateType.manual); + + private WorkflowStateType stateType; + + State(WorkflowStateType stateType) { + this.stateType = stateType; + } + + @Override + public WorkflowStateType getType() { + return stateType; + } + } + + public NextAction start(@SuppressWarnings("unused") StateExecution execution) throws InterruptedException { + sleep(100); + return counter++ < 100 ? moveToState(State.start, "loop") : moveToState(State.done, "done"); + } + + } + public static class SimpleTestWorkflow extends WorkflowDefinition { protected SimpleTestWorkflow() { From 32ba6cf07c7fe198c4d1adba068a8881458c3168 Mon Sep 17 00:00:00 2001 From: Mikko Tiihonen Date: Fri, 17 Apr 2020 20:55:09 +0300 Subject: [PATCH 04/16] Add changes --- CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 145cbdc1a..c80f79b8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,12 @@ ## 7.1.1-SNAPSHOT (future release) **Highlights** +- `nflow-engine` + - Expedited clean shutdown for workflows that run many steps without delays. **Details** - - `nflow-engine` + - After shutdown is requested workflows stop immediately after the in-progress step has been executed. - Dependency updates: - spring 5.2.5 - jackson 2.10.3 From 269336b5219d90afbf658990e536276968d67812 Mon Sep 17 00:00:00 2001 From: Mikko Tiihonen Date: Fri, 17 Apr 2020 23:59:52 +0300 Subject: [PATCH 05/16] Review fixes --- .../executor/WorkflowStateProcessorFactoryTest.java | 4 ++-- .../internal/executor/WorkflowStateProcessorTest.java | 6 ++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java index 98fdaaa32..f4c4ce060 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java @@ -62,14 +62,14 @@ public void setup() { @Test public void factoryCreatesExecutorsWithoutListeners() { - WorkflowStateProcessor executor = factory.createProcessor(12, () -> FALSE); + WorkflowStateProcessor executor = factory.createProcessor(12, FALSE::booleanValue); assertNotNull(executor); } @Test public void factoryCreatesExecutorsWithListeners() { factory.listeners = listeners; - WorkflowStateProcessor executor = factory.createProcessor(122, () -> FALSE); + WorkflowStateProcessor executor = factory.createProcessor(122, FALSE::booleanValue); assertNotNull(executor); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index 68e49e6c2..b6693c6b2 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -12,6 +12,7 @@ import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.manual; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecution; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecutionFailed; +import static java.lang.Boolean.FALSE; import static java.lang.Thread.sleep; import static java.time.Duration.ofSeconds; import static java.util.Arrays.asList; @@ -24,6 +25,7 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.joda.time.DateTime.now; @@ -855,7 +857,7 @@ public void stateProcessingSeriesStopsOnShutdown() throws InterruptedException { shutdownRequest.set(true); executorService.awaitTermination(10, SECONDS); - assertThat(loopingWf.counter, Matchers.lessThan(20)); + assertThat(loopingWf.counter, lessThan(20)); } @Test @@ -1045,7 +1047,7 @@ public static class ForceCleaningTestWorkflow extends WorkflowDefinition false).build()); + new WorkflowSettings.Builder().setHistoryDeletableAfter(hours(2)).setDeleteHistoryCondition(FALSE::booleanValue).build()); permit(State.start, State.done, State.error); } From 859302599966cf5b0617d9cf561867206720ed85 Mon Sep 17 00:00:00 2001 From: Mikko Tiihonen Date: Sat, 18 Apr 2020 00:20:30 +0300 Subject: [PATCH 06/16] Tune retry loops log properly on shutdown and avoid extra sleeps --- .../executor/WorkflowStateProcessor.java | 22 +++++++++++++------ 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index ba04a9b43..3318365d1 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -113,16 +113,19 @@ public void run() { startTimeSeconds = currentTimeMillis() / 1000; thread = currentThread(); processingInstances.put(instanceId, this); - boolean stateProcessingFinished = false; - do { + while (true) { try { runImpl(); - stateProcessingFinished = true; + break; } catch (Throwable ex) { + if (shutdownRequested.get()) { + logger.error("Failed to process workflow instance and shutdown requested", ex); + break; + } logger.error("Failed to process workflow instance, retrying after {} seconds", stateProcessingRetryDelay, ex); sleepIgnoreInterrupted(stateProcessingRetryDelay); } - } while (!stateProcessingFinished); + }; processingInstances.remove(instanceId); MDC.remove(MDC_KEY); } @@ -261,16 +264,21 @@ private WorkflowInstance saveWorkflowInstanceState(StateExecutionImpl execution, .setStateText(getStateText(instance, execution)) // .setState(execution.getNextState()) // .setRetries(execution.isRetry() ? execution.getRetries() + 1 : 0); - do { + while (true) { try { return persistWorkflowInstanceState(execution, instance.stateVariables, actionBuilder, instanceBuilder); } catch (Exception ex) { + if (shutdownRequested.get()) { + logger.error("Failed to save workflow instance {} new state, not retrying due to shutdown request. The state will be rerun on recovery.", + instance.id, ex); + // return the original instance since persisting failed + return instance; + } logger.error("Failed to save workflow instance {} new state, retrying after {} seconds", instance.id, stateSaveRetryDelay, ex); sleepIgnoreInterrupted(stateSaveRetryDelay); } - } while (!shutdownRequested.get()); - throw new IllegalStateException(format("Failed to save workflow instance %s new state and shutdown requested", instance.id)); + } } private WorkflowInstance persistWorkflowInstanceState(StateExecutionImpl execution, Map originalStateVars, From 2659cf4f840002ad51ce08d11a05284dd3fa83f9 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sat, 18 Apr 2020 10:18:51 +0300 Subject: [PATCH 07/16] cleanup --- .../nflow/engine/internal/executor/WorkflowStateProcessor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index 3318365d1..80ae643db 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -7,7 +7,6 @@ import static io.nflow.engine.workflow.instance.WorkflowInstance.WorkflowInstanceStatus.inProgress; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecution; import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecutionFailed; -import static java.lang.String.format; import static java.lang.Thread.currentThread; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; @@ -125,7 +124,7 @@ public void run() { logger.error("Failed to process workflow instance, retrying after {} seconds", stateProcessingRetryDelay, ex); sleepIgnoreInterrupted(stateProcessingRetryDelay); } - }; + } processingInstances.remove(instanceId); MDC.remove(MDC_KEY); } From aba60d6331a9bf41c8813371646ec38e1c89525e Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sat, 18 Apr 2020 10:22:45 +0300 Subject: [PATCH 08/16] detect failure faster and get rid of unnecessary counter --- .../internal/executor/WorkflowStateProcessorTest.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index b6693c6b2..e6285879b 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -25,7 +25,6 @@ import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.joda.time.DateTime.now; @@ -33,6 +32,7 @@ import static org.joda.time.DateTimeUtils.setCurrentMillisFixed; import static org.joda.time.DateTimeUtils.setCurrentMillisSystem; import static org.joda.time.Period.hours; +import static org.junit.Assert.assertTrue; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -855,9 +855,9 @@ public void stateProcessingSeriesStopsOnShutdown() throws InterruptedException { executorService.shutdown(); shutdownRequest.set(true); - executorService.awaitTermination(10, SECONDS); + boolean wasTerminated = executorService.awaitTermination(5, SECONDS); - assertThat(loopingWf.counter, lessThan(20)); + assertTrue(wasTerminated); } @Test @@ -1106,8 +1106,6 @@ public NextAction start(@SuppressWarnings("unused") StateExecution execution) { public static class LoopingTestWorkflow extends WorkflowDefinition { - int counter = 0; - protected LoopingTestWorkflow() { super("looping-test", State.start, State.error); permit(State.start, State.start); @@ -1131,7 +1129,7 @@ public WorkflowStateType getType() { public NextAction start(@SuppressWarnings("unused") StateExecution execution) throws InterruptedException { sleep(100); - return counter++ < 100 ? moveToState(State.start, "loop") : moveToState(State.done, "done"); + return moveToState(State.start, "loop"); } } From f0a78eeb42ae74d5ab6c466d853ad28aee003c3d Mon Sep 17 00:00:00 2001 From: Mikko Tiihonen Date: Sat, 18 Apr 2020 11:11:25 +0300 Subject: [PATCH 09/16] Shut down the executor forcibly to interrupt any stuck workflows --- .../engine/internal/executor/WorkflowStateProcessorTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index e6285879b..f1e8ad9c9 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -856,6 +856,7 @@ public void stateProcessingSeriesStopsOnShutdown() throws InterruptedException { executorService.shutdown(); shutdownRequest.set(true); boolean wasTerminated = executorService.awaitTermination(5, SECONDS); + executorService.shutdownNow(); assertTrue(wasTerminated); } From 6dd9ad64e88c67b33e3eec3dcb21a5cfa5bdcbf1 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sat, 18 Apr 2020 17:25:32 +0300 Subject: [PATCH 10/16] Make dispatcher shutdown block until ongoing shutdown is finished when the shutdown was already in progress, tune logging --- .../internal/executor/WorkflowDispatcher.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index 064a6a0e4..cac9c354f 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -28,9 +28,10 @@ public class WorkflowDispatcher implements Runnable { private static final Logger logger = getLogger(WorkflowDispatcher.class); private static final PeriodicLogger periodicLogger = new PeriodicLogger(logger, 60); + private volatile boolean started; private volatile boolean shutdownRequested; - private volatile boolean running = false; - private volatile boolean paused = false; + private volatile boolean running; + private volatile boolean paused; private final CountDownLatch shutdownDone = new CountDownLatch(1); private final WorkflowInstanceExecutor executor; @@ -65,8 +66,9 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao @Override public void run() { - logger.info("Starting."); + logger.info("Dispacther started."); try { + started = true; workflowDefinitions.postProcessWorkflowDefinitions(); running = true; while (!shutdownRequested) { @@ -102,33 +104,33 @@ public void run() { running = false; shutdownPool(); executorDao.markShutdown(); - logger.info("Shutdown finished."); shutdownDone.countDown(); } } public void shutdown() { shutdownRequested = true; - if (running) { - logger.info("Shutdown requested."); + if (started && shutdownDone.getCount() > 0) { + logger.info("Shutdown initiated."); + try { + shutdownDone.await(); + logger.info("Shutdown completed."); + } catch (@SuppressWarnings("unused") InterruptedException e) { + logger.warn("Shutdown interrupted."); + } } else { - logger.info("Shutdown requested, but executor not running, exiting."); - return; - } - try { - // TODO use timeout? - shutdownDone.await(); - } catch (@SuppressWarnings("unused") InterruptedException e) { - logger.info("Shutdown interrupted."); + logger.info("Dispatcher was not started or was already shut down."); } } public void pause() { paused = true; + logger.info("Dispatcher paused."); } public void resume() { paused = false; + logger.info("Dispatcher resumed."); } public boolean isPaused() { From e8f7dd787eb5d44d49aa98272043ebe7cc8361e0 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sat, 18 Apr 2020 17:27:15 +0300 Subject: [PATCH 11/16] tune changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c80f79b8b..7b151ae50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ **Details** - `nflow-engine` - - After shutdown is requested workflows stop immediately after the in-progress step has been executed. + - When shutdown is requested, stop processing workflows immediately after the current state has been executed. - Dependency updates: - spring 5.2.5 - jackson 2.10.3 From c9a8c14175acc92e8eabb7434f548dbf122b1043 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sat, 18 Apr 2020 18:52:17 +0300 Subject: [PATCH 12/16] wip --- .../internal/executor/WorkflowDispatcher.java | 91 +++++++++++++------ 1 file changed, 61 insertions(+), 30 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index cac9c354f..64b54226a 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -1,10 +1,16 @@ package io.nflow.engine.internal.executor; +import static io.nflow.engine.internal.executor.WorkflowDispatcher.Status.finished; +import static io.nflow.engine.internal.executor.WorkflowDispatcher.Status.notStarted; +import static io.nflow.engine.internal.executor.WorkflowDispatcher.Status.running; +import static io.nflow.engine.internal.executor.WorkflowDispatcher.Status.shuttingDown; import static org.slf4j.LoggerFactory.getLogger; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.inject.Inject; @@ -28,10 +34,8 @@ public class WorkflowDispatcher implements Runnable { private static final Logger logger = getLogger(WorkflowDispatcher.class); private static final PeriodicLogger periodicLogger = new PeriodicLogger(logger, 60); - private volatile boolean started; - private volatile boolean shutdownRequested; - private volatile boolean running; - private volatile boolean paused; + private final AtomicReference status = new AtomicReference<>(notStarted); + private final AtomicBoolean paused = new AtomicBoolean(); private final CountDownLatch shutdownDone = new CountDownLatch(1); private final WorkflowInstanceExecutor executor; @@ -42,6 +46,19 @@ public class WorkflowDispatcher implements Runnable { private final long sleepTimeMillis; private final int stuckThreadThresholdSeconds; private final Random rand = new Random(); + private final Object waitObject = new Object(); + + enum Status { + notStarted(false, false), running(true, false), shuttingDown(true, true), finished(false, true); + + public final boolean isRunning; + public final boolean isShutdownRequested; + + Status(boolean isRunning, boolean isShutdownRequested) { + this.isRunning = isRunning; + this.isShutdownRequested = isShutdownRequested; + } + } @Inject @SuppressFBWarnings(value = "WEM_WEAK_EXCEPTION_MESSAGING", justification = "Transaction support exception message is fine") @@ -68,16 +85,15 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao public void run() { logger.info("Dispacther started."); try { - started = true; + status.set(running); workflowDefinitions.postProcessWorkflowDefinitions(); - running = true; - while (!shutdownRequested) { - if (paused) { + while (status.get() == running) { + if (paused.get()) { sleep(false); } else { try { executor.waitUntilQueueSizeLowerThanThreshold(executorDao.getMaxWaitUntil()); - if (!shutdownRequested) { + if (status.get() == running) { if (executorDao.tick()) { workflowInstances.recoverWorkflowInstancesFromDeadNodes(); } @@ -101,7 +117,6 @@ public void run() { } } } finally { - running = false; shutdownPool(); executorDao.markShutdown(); shutdownDone.countDown(); @@ -109,36 +124,50 @@ public void run() { } public void shutdown() { - shutdownRequested = true; - if (started && shutdownDone.getCount() > 0) { - logger.info("Shutdown initiated."); - try { - shutdownDone.await(); + switch (status.get()) { + case notStarted: + case finished: + logger.info("Dispatcher was not started or was already shut down."); + return; + case running: + if (status.compareAndSet(running, shuttingDown)) { + logger.info("Shutdown initiated."); + synchronized (waitObject) { + waitObject.notifyAll(); + } + } + break; + default: + break; + } + try { + shutdownDone.await(); + if (status.compareAndSet(shuttingDown, finished)) { logger.info("Shutdown completed."); - } catch (@SuppressWarnings("unused") InterruptedException e) { - logger.warn("Shutdown interrupted."); } - } else { - logger.info("Dispatcher was not started or was already shut down."); + } catch (@SuppressWarnings("unused") InterruptedException e) { + logger.warn("Shutdown interrupted."); } } public void pause() { - paused = true; - logger.info("Dispatcher paused."); + if (paused.compareAndSet(false, true)) { + logger.info("Dispatcher paused."); + } } public void resume() { - paused = false; - logger.info("Dispatcher resumed."); + if (paused.compareAndSet(true, false)) { + logger.info("Dispatcher resumed."); + } } public boolean isPaused() { - return paused; + return paused.get(); } public boolean isRunning() { - return running; + return status.get().isRunning; } private void shutdownPool() { @@ -157,7 +186,7 @@ private void dispatch(List nextInstanceIds) { } logger.debug("Found {} workflow instances, dispatching executors.", nextInstanceIds.size()); for (Long instanceId : nextInstanceIds) { - executor.execute(stateProcessorFactory.createProcessor(instanceId, () -> shutdownRequested)); + executor.execute(stateProcessorFactory.createProcessor(instanceId, () -> status.get().isShutdownRequested)); } } @@ -170,10 +199,12 @@ private List getNextInstanceIds() { @SuppressFBWarnings(value = "MDM_THREAD_YIELD", justification = "Intentionally masking race condition") private void sleep(boolean randomize) { try { - if (randomize) { - Thread.sleep((long) (sleepTimeMillis * rand.nextFloat())); - } else { - Thread.sleep(sleepTimeMillis); + synchronized (waitObject) { + if (randomize) { + waitObject.wait((long) (sleepTimeMillis * rand.nextFloat())); + } else { + waitObject.wait(sleepTimeMillis); + } } } catch (@SuppressWarnings("unused") InterruptedException ok) { } From 1be73cda6d11109d36be004e5dddd02e2c867a38 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sat, 18 Apr 2020 19:08:17 +0300 Subject: [PATCH 13/16] revert --- .../internal/executor/WorkflowDispatcher.java | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index 64b54226a..e580c08d7 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -46,7 +46,6 @@ public class WorkflowDispatcher implements Runnable { private final long sleepTimeMillis; private final int stuckThreadThresholdSeconds; private final Random rand = new Random(); - private final Object waitObject = new Object(); enum Status { notStarted(false, false), running(true, false), shuttingDown(true, true), finished(false, true); @@ -132,9 +131,6 @@ public void shutdown() { case running: if (status.compareAndSet(running, shuttingDown)) { logger.info("Shutdown initiated."); - synchronized (waitObject) { - waitObject.notifyAll(); - } } break; default: @@ -199,12 +195,10 @@ private List getNextInstanceIds() { @SuppressFBWarnings(value = "MDM_THREAD_YIELD", justification = "Intentionally masking race condition") private void sleep(boolean randomize) { try { - synchronized (waitObject) { - if (randomize) { - waitObject.wait((long) (sleepTimeMillis * rand.nextFloat())); - } else { - waitObject.wait(sleepTimeMillis); - } + if (randomize) { + Thread.sleep((long) (sleepTimeMillis * rand.nextFloat())); + } else { + Thread.sleep(sleepTimeMillis); } } catch (@SuppressWarnings("unused") InterruptedException ok) { } From 2930770676c58b4f7038b7932e5bbe70ca5e333c Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sat, 18 Apr 2020 19:55:38 +0300 Subject: [PATCH 14/16] remove started flag --- .../internal/executor/WorkflowDispatcher.java | 75 +++++++------------ 1 file changed, 25 insertions(+), 50 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index e580c08d7..7636425f8 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -1,16 +1,10 @@ package io.nflow.engine.internal.executor; -import static io.nflow.engine.internal.executor.WorkflowDispatcher.Status.finished; -import static io.nflow.engine.internal.executor.WorkflowDispatcher.Status.notStarted; -import static io.nflow.engine.internal.executor.WorkflowDispatcher.Status.running; -import static io.nflow.engine.internal.executor.WorkflowDispatcher.Status.shuttingDown; import static org.slf4j.LoggerFactory.getLogger; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import javax.inject.Inject; @@ -34,8 +28,9 @@ public class WorkflowDispatcher implements Runnable { private static final Logger logger = getLogger(WorkflowDispatcher.class); private static final PeriodicLogger periodicLogger = new PeriodicLogger(logger, 60); - private final AtomicReference status = new AtomicReference<>(notStarted); - private final AtomicBoolean paused = new AtomicBoolean(); + private volatile boolean shutdownRequested; + private volatile boolean running; + private volatile boolean paused; private final CountDownLatch shutdownDone = new CountDownLatch(1); private final WorkflowInstanceExecutor executor; @@ -47,18 +42,6 @@ public class WorkflowDispatcher implements Runnable { private final int stuckThreadThresholdSeconds; private final Random rand = new Random(); - enum Status { - notStarted(false, false), running(true, false), shuttingDown(true, true), finished(false, true); - - public final boolean isRunning; - public final boolean isShutdownRequested; - - Status(boolean isRunning, boolean isShutdownRequested) { - this.isRunning = isRunning; - this.isShutdownRequested = isShutdownRequested; - } - } - @Inject @SuppressFBWarnings(value = "WEM_WEAK_EXCEPTION_MESSAGING", justification = "Transaction support exception message is fine") public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao workflowInstances, @@ -84,15 +67,15 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao public void run() { logger.info("Dispacther started."); try { - status.set(running); workflowDefinitions.postProcessWorkflowDefinitions(); - while (status.get() == running) { - if (paused.get()) { + running = true; + while (!shutdownRequested) { + if (paused) { sleep(false); } else { try { executor.waitUntilQueueSizeLowerThanThreshold(executorDao.getMaxWaitUntil()); - if (status.get() == running) { + if (!shutdownRequested) { if (executorDao.tick()) { workflowInstances.recoverWorkflowInstancesFromDeadNodes(); } @@ -118,52 +101,44 @@ public void run() { } finally { shutdownPool(); executorDao.markShutdown(); + running = false; + logger.info("Shutdown completed."); shutdownDone.countDown(); } } public void shutdown() { - switch (status.get()) { - case notStarted: - case finished: - logger.info("Dispatcher was not started or was already shut down."); - return; - case running: - if (status.compareAndSet(running, shuttingDown)) { + if (running) { + if (!shutdownRequested) { + shutdownRequested = true; logger.info("Shutdown initiated."); } - break; - default: - break; - } - try { - shutdownDone.await(); - if (status.compareAndSet(shuttingDown, finished)) { - logger.info("Shutdown completed."); + try { + shutdownDone.await(); + } catch (@SuppressWarnings("unused") InterruptedException e) { + logger.warn("Shutdown interrupted."); } - } catch (@SuppressWarnings("unused") InterruptedException e) { - logger.warn("Shutdown interrupted."); + } else { + logger.info("Dispatcher was not started or was already shut down."); } } public void pause() { - if (paused.compareAndSet(false, true)) { - logger.info("Dispatcher paused."); - } + paused = true; + logger.info("Dispatcher paused."); } public void resume() { - if (paused.compareAndSet(true, false)) { - logger.info("Dispatcher resumed."); - } + paused = false; + logger.info("Dispatcher resumed."); } public boolean isPaused() { - return paused.get(); + return paused; } public boolean isRunning() { - return status.get().isRunning; + return running; } private void shutdownPool() { @@ -182,7 +157,7 @@ private void dispatch(List nextInstanceIds) { } logger.debug("Found {} workflow instances, dispatching executors.", nextInstanceIds.size()); for (Long instanceId : nextInstanceIds) { - executor.execute(stateProcessorFactory.createProcessor(instanceId, () -> status.get().isShutdownRequested)); + executor.execute(stateProcessorFactory.createProcessor(instanceId, () -> shutdownRequested)); } } From 7c402b4e5b0cc2f2dec2b35fd74c869dd262122a Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sat, 18 Apr 2020 20:01:43 +0300 Subject: [PATCH 15/16] remove started flag --- .../io/nflow/engine/internal/executor/WorkflowDispatcher.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index 807c570f5..7636425f8 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -28,7 +28,6 @@ public class WorkflowDispatcher implements Runnable { private static final Logger logger = getLogger(WorkflowDispatcher.class); private static final PeriodicLogger periodicLogger = new PeriodicLogger(logger, 60); - private volatile boolean started; private volatile boolean shutdownRequested; private volatile boolean running; private volatile boolean paused; @@ -68,7 +67,6 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao public void run() { logger.info("Dispacther started."); try { - started = true; workflowDefinitions.postProcessWorkflowDefinitions(); running = true; while (!shutdownRequested) { From 12ba985a49d9669e416083a24f4b1ed218e45ef1 Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sat, 18 Apr 2020 20:21:56 +0300 Subject: [PATCH 16/16] log before setting shutdownRequested true --- .../io/nflow/engine/internal/executor/WorkflowDispatcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index 7636425f8..ebc27ff0f 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -110,8 +110,8 @@ public void run() { public void shutdown() { if (running) { if (!shutdownRequested) { + logger.info("Initiating shutdown."); shutdownRequested = true; - logger.info("Shutdown initiated."); } try { shutdownDone.await();