From a6bce76107d59251c4d7c0ec1beca9291ef4a30e Mon Sep 17 00:00:00 2001 From: Edvard Fonsell Date: Sun, 19 Apr 2020 00:06:06 +0300 Subject: [PATCH] Support of WorkflowExecutorListener.handlePotentiallyStuck (#391) * make test more robust * Support of WorkflowExecutorListener.handlePotentiallyStuck * refactor * Unwrap the real exception from UndeclaredThrowableException * Test reliably that the interruption of a workflow worked correctly * method ref and assertFalse * add entry to changelog * fix changelog * remove unnecessary lenient * Update CHANGELOG.md * pass ListenerContext to handlePotentiallyStuck Co-authored-by: Edvard Fonsell Co-authored-by: Mikko Tiihonen --- CHANGELOG.md | 2 + .../executor/WorkflowStateProcessor.java | 53 +++++++---- .../WorkflowStateProcessorFactory.java | 13 +-- .../listener/WorkflowExecutorListener.java | 11 +++ .../WorkflowStateProcessorFactoryTest.java | 11 ++- .../executor/WorkflowStateProcessorTest.java | 89 ++++++++++++++++--- .../java/io/nflow/tests/CronWorkflowTest.java | 25 +++--- 7 files changed, 157 insertions(+), 47 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b151ae50..b934b12f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,10 +3,12 @@ **Highlights** - `nflow-engine` - Expedited clean shutdown for workflows that run many steps without delays. + - Add support for custom logic when nFlow engine thinks the workflow state processing may be stuck. **Details** - `nflow-engine` - When shutdown is requested, stop processing workflows immediately after the current state has been executed. + - Add `WorkflowExecutorLister.handlePotentiallyStuck(Duration processingTime)` to support custom handling when nFlow engine thinks the workflow state processing may be stuck. If any registered listener implementation returns true from this method, nFlow will interrupt the processing thread. The default implementation returns false. - Dependency updates: - spring 5.2.5 - jackson 2.10.3 diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java index 80ae643db..28c5e5487 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -13,11 +13,11 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.exception.ExceptionUtils.getStackTrace; import static org.joda.time.DateTime.now; -import static org.joda.time.DateTimeUtils.currentTimeMillis; import static org.joda.time.Duration.standardMinutes; import static org.slf4j.LoggerFactory.getLogger; import static org.springframework.util.ReflectionUtils.invokeMethod; +import java.lang.reflect.UndeclaredThrowableException; import java.util.ArrayList; import java.util.Collection; import java.util.EnumSet; @@ -80,8 +80,9 @@ class WorkflowStateProcessor implements Runnable { private final int stateSaveRetryDelay; private final int stateVariableValueTooLongRetryDelay; private final Map processingInstances; - private long startTimeSeconds; + private DateTime startTime; private Thread thread; + private ListenerContext listenerContext; WorkflowStateProcessor(long instanceId, Supplier shutdownRequested, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, MaintenanceDao maintenanceDao, @@ -109,7 +110,7 @@ class WorkflowStateProcessor implements Runnable { @Override public void run() { MDC.put(MDC_KEY, String.valueOf(instanceId)); - startTimeSeconds = currentTimeMillis() / 1000; + startTime = now(); thread = currentThread(); processingInstances.put(instanceId, this); while (true) { @@ -141,10 +142,10 @@ private void runImpl() { WorkflowSettings settings = definition.getSettings(); int subsequentStateExecutions = 0; while (instance.status == executing && !shutdownRequested.get()) { - startTimeSeconds = currentTimeMillis() / 1000; + startTime = now(); StateExecutionImpl execution = new StateExecutionImpl(instance, objectMapper, workflowInstanceDao, workflowInstancePreProcessor, workflowInstances); - ListenerContext listenerContext = new ListenerContext(definition, instance, execution); + listenerContext = new ListenerContext(definition, instance, execution); WorkflowInstanceAction.Builder actionBuilder = new WorkflowInstanceAction.Builder(instance); WorkflowState state; try { @@ -155,12 +156,15 @@ private void runImpl() { } boolean saveInstanceState = true; try { - processBeforeListeners(listenerContext); - listenerContext.nextAction = processWithListeners(listenerContext, instance, definition, execution, state); + processBeforeListeners(); + listenerContext.nextAction = processWithListeners(instance, definition, execution, state); } catch (StateVariableValueTooLongException e) { instance = rescheduleStateVariableValueTooLong(e, instance); saveInstanceState = false; } catch (Throwable t) { + if (t instanceof UndeclaredThrowableException) { + t = t.getCause(); + } execution.setFailed(t); if (state.isRetryAllowed(t)) { logger.error("Handler threw a retryable exception, trying again later.", t); @@ -175,9 +179,9 @@ private void runImpl() { } finally { if (saveInstanceState) { if (execution.isFailed()) { - processAfterFailureListeners(listenerContext, execution.getThrown()); + processAfterFailureListeners(execution.getThrown()); } else { - processAfterListeners(listenerContext); + processAfterListeners(); optionallyCleanupWorkflowInstanceHistory(definition.getSettings(), execution); } subsequentStateExecutions = busyLoopPrevention(state, settings, subsequentStateExecutions, execution); @@ -337,7 +341,7 @@ private boolean isNextActivationImmediately(StateExecutionImpl execution) { && !execution.getNextActivation().isAfterNow(); } - private NextAction processWithListeners(ListenerContext listenerContext, WorkflowInstance instance, + private NextAction processWithListeners(WorkflowInstance instance, AbstractWorkflowDefinition definition, StateExecutionImpl execution, WorkflowState state) { ProcessingExecutorListener processingListener = new ProcessingExecutorListener(instance, definition, execution, state); List chain = new ArrayList<>(executorListeners.size() + 1); @@ -365,7 +369,7 @@ private void optionallyCleanupWorkflowInstanceHistory(WorkflowSettings settings, private void sleepIgnoreInterrupted(int seconds) { try { - Thread.sleep(SECONDS.toMillis(seconds)); + SECONDS.sleep(seconds); } catch (@SuppressWarnings("unused") InterruptedException ok) { } } @@ -400,7 +404,7 @@ public ProcessingExecutorListener(WorkflowInstance instance, AbstractWorkflowDef } @Override - public NextAction process(ListenerContext listenerContext, ListenerChain chain) { + public NextAction process(ListenerContext context, ListenerChain chain) { return new NormalStateHandler(instance, definition, execution, state).processState(); } } @@ -511,7 +515,7 @@ public NextAction processState() { } - private void processBeforeListeners(ListenerContext listenerContext) { + private void processBeforeListeners() { for (WorkflowExecutorListener listener : executorListeners) { try { listener.beforeProcessing(listenerContext); @@ -521,7 +525,7 @@ private void processBeforeListeners(ListenerContext listenerContext) { } } - private void processAfterListeners(ListenerContext listenerContext) { + private void processAfterListeners() { for (WorkflowExecutorListener listener : executorListeners) { try { listener.afterProcessing(listenerContext); @@ -531,7 +535,7 @@ private void processAfterListeners(ListenerContext listenerContext) { } } - private void processAfterFailureListeners(ListenerContext listenerContext, Throwable ex) { + private void processAfterFailureListeners(Throwable ex) { for (WorkflowExecutorListener listener : executorListeners) { try { listener.afterFailure(listenerContext, ex); @@ -541,8 +545,8 @@ private void processAfterFailureListeners(ListenerContext listenerContext, Throw } } - public long getStartTimeSeconds() { - return startTimeSeconds; + public DateTime getStartTime() { + return startTime; } public void logPotentiallyStuck(long processingTimeSeconds) { @@ -558,4 +562,19 @@ private StringBuilder getStackTraceAsString() { return sb; } + public void handlePotentiallyStuck(Duration processingTime) { + boolean interrupt = false; + for (WorkflowExecutorListener listener : executorListeners) { + try { + if (listener.handlePotentiallyStuck(listenerContext, processingTime)) { + interrupt = true; + } + } catch (Throwable t) { + logger.error("Error in " + listener.getClass().getName() + ".handleStuck (" + t.getMessage() + ")", t); + } + } + if (interrupt) { + thread.interrupt(); + } + } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java index 3cb8bc7bb..5130bd656 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java @@ -1,6 +1,6 @@ package io.nflow.engine.internal.executor; -import static org.joda.time.DateTimeUtils.currentTimeMillis; +import static org.joda.time.DateTime.now; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -8,6 +8,8 @@ import javax.inject.Inject; +import org.joda.time.DateTime; +import org.joda.time.Duration; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; @@ -54,13 +56,14 @@ public WorkflowStateProcessor createProcessor(long instanceId, Supplier } public int getPotentiallyStuckProcessors() { - long currentTimeSeconds = currentTimeMillis() / 1000; + DateTime currentTime = now(); int potentiallyStuck = 0; for (WorkflowStateProcessor processor : processingInstances.values()) { - long processingTimeSeconds = currentTimeSeconds - processor.getStartTimeSeconds(); - if (processingTimeSeconds > stuckThreadThresholdSeconds) { + Duration processingTime = new Duration(processor.getStartTime(), currentTime); + if (processingTime.getStandardSeconds() > stuckThreadThresholdSeconds) { potentiallyStuck++; - processor.logPotentiallyStuck(processingTimeSeconds); + processor.logPotentiallyStuck(processingTime.getStandardSeconds()); + processor.handlePotentiallyStuck(processingTime); } } return potentiallyStuck; diff --git a/nflow-engine/src/main/java/io/nflow/engine/listener/WorkflowExecutorListener.java b/nflow-engine/src/main/java/io/nflow/engine/listener/WorkflowExecutorListener.java index bffa8a7bf..b325a7c9f 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/listener/WorkflowExecutorListener.java +++ b/nflow-engine/src/main/java/io/nflow/engine/listener/WorkflowExecutorListener.java @@ -6,6 +6,7 @@ import java.util.Map; import org.joda.time.DateTime; +import org.joda.time.Duration; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.nflow.engine.model.ModelObject; @@ -129,4 +130,14 @@ default void afterFailure(ListenerContext listenerContext, Throwable throwable) // no-op } + /** + * Called when instance processing is potentially stuck. Return true to interrupt the processing thread. Default implementation + * returns false. + * @param listenerContext The listener context. + * @param processingTime How long the instances has been processed. + * @return True if processing should be interruped, false otherwise. + */ + default boolean handlePotentiallyStuck(ListenerContext listenerContext, Duration processingTime) { + return false; + } } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java index f4c4ce060..9ee47d8ae 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java @@ -3,14 +3,16 @@ import static java.lang.Boolean.FALSE; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import static org.joda.time.DateTimeUtils.currentTimeMillis; +import static org.joda.time.DateTime.now; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import org.joda.time.Duration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -77,15 +79,18 @@ public void factoryCreatesExecutorsWithListeners() { public void checkIfStateProcessorsAreStuckLogsLongRunningInstance() { WorkflowStateProcessor executor1 = mock(WorkflowStateProcessor.class); WorkflowStateProcessor executor2 = mock(WorkflowStateProcessor.class); - when(executor1.getStartTimeSeconds()).thenReturn(currentTimeMillis() / 1000 - STUCK_THREAD_THRESHOLD - 1); - when(executor2.getStartTimeSeconds()).thenReturn(currentTimeMillis() / 1000 - STUCK_THREAD_THRESHOLD); + when(executor1.getStartTime()).thenReturn(now().minusSeconds(STUCK_THREAD_THRESHOLD + 1)); + when(executor2.getStartTime()).thenReturn(now().minusSeconds(STUCK_THREAD_THRESHOLD)); factory.processingInstances.put(111L, executor1); factory.processingInstances.put(222L, executor2); + factory.listeners = listeners; int potentiallyStuckProcessors = factory.getPotentiallyStuckProcessors(); assertThat(potentiallyStuckProcessors, is(1)); verify(executor1).logPotentiallyStuck(anyLong()); verify(executor2, never()).logPotentiallyStuck(anyLong()); + verify(executor1).handlePotentiallyStuck(any(Duration.class)); + verify(executor2, never()).handlePotentiallyStuck(any(Duration.class)); } } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index f1e8ad9c9..b591b4e53 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -31,9 +31,11 @@ import static org.joda.time.DateTimeUtils.currentTimeMillis; import static org.joda.time.DateTimeUtils.setCurrentMillisFixed; import static org.joda.time.DateTimeUtils.setCurrentMillisSystem; +import static org.joda.time.Duration.standardHours; import static org.joda.time.Period.hours; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.jupiter.api.Assertions.assertTimeoutPreemptively; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; @@ -68,6 +70,7 @@ import org.hamcrest.Matchers; import org.hamcrest.TypeSafeMatcher; import org.joda.time.DateTime; +import org.joda.time.Duration; import org.joda.time.Period; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -153,24 +156,26 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { WorkflowStateProcessor executor; - WorkflowDefinition executeWf = new ExecuteTestWorkflow(); + ExecuteTestWorkflow executeWf = new ExecuteTestWorkflow(); - WorkflowDefinition forceWf = new ForceCleaningTestWorkflow(); + ForceCleaningTestWorkflow forceWf = new ForceCleaningTestWorkflow(); - WorkflowDefinition failCleaningWf = new FailCleaningTestWorkflow(); + FailCleaningTestWorkflow failCleaningWf = new FailCleaningTestWorkflow(); - WorkflowDefinition simpleWf = new SimpleTestWorkflow(); + SimpleTestWorkflow simpleWf = new SimpleTestWorkflow(); - WorkflowDefinition failingWf = new FailingTestWorkflow(); + FailingTestWorkflow failingWf = new FailingTestWorkflow(); - WorkflowDefinition wakeWf = new NotifyTestWorkflow(); + NotifyTestWorkflow wakeWf = new NotifyTestWorkflow(); - WorkflowDefinition stateVariableWf = new StateVariableWorkflow(); + StateVariableWorkflow stateVariableWf = new StateVariableWorkflow(); - WorkflowDefinition nonRetryableWf = new NonRetryableWorkflow(); + NonRetryableWorkflow nonRetryableWf = new NonRetryableWorkflow(); LoopingTestWorkflow loopingWf = new LoopingTestWorkflow(); + StuckWorkflow stuckWf = new StuckWorkflow(); + static WorkflowInstance newChildWorkflow = mock(WorkflowInstance.class); static WorkflowInstance newWorkflow = mock(WorkflowInstance.class); @@ -205,6 +210,7 @@ public void setup() { lenient().doReturn(stateVariableWf).when(workflowDefinitions).getWorkflowDefinition("state-variable"); lenient().doReturn(nonRetryableWf).when(workflowDefinitions).getWorkflowDefinition("non-retryable"); lenient().doReturn(loopingWf).when(workflowDefinitions).getWorkflowDefinition("looping-test"); + lenient().doReturn(stuckWf).when(workflowDefinitions).getWorkflowDefinition("stuck"); filterChain(listener1); filterChain(listener2); lenient().when(executionMock.getRetries()).thenReturn(testWorkflowDef.getSettings().maxRetries); @@ -914,7 +920,43 @@ public void goToErrorStateWhenRetryIsNotAllowed() { } private void runExecutorWithTimeout() { - assertTimeoutPreemptively(ofSeconds(5), () -> executor.run()); + assertTimeoutPreemptively(ofSeconds(5), executor::run); + } + + @Test + public void handlePotentiallyStuckCallsListeners() { + Duration processingTime = standardHours(1); + + executor.handlePotentiallyStuck(processingTime); + + verify(listener1).handlePotentiallyStuck(null, processingTime); + verify(listener2).handlePotentiallyStuck(null, processingTime); + } + + @Test + public void handlePotentiallyStuckInterruptsThreadWhenListenerReturnsTrue() throws InterruptedException { + Duration processingTime = standardHours(1); + when(listener1.handlePotentiallyStuck(any(ListenerContext.class), eq(processingTime))).thenReturn(true); + WorkflowInstance instance = executingInstanceBuilder().setType("stuck").setState("start").build(); + when(workflowInstances.getWorkflowInstance(instance.id, INCLUDES, null)).thenReturn(instance); + Thread thread = new Thread(executor::run); + thread.start(); + + // let the workflow process until it is in stuck in the sleep + sleep(500); + + executor.handlePotentiallyStuck(processingTime); + + thread.join(1000); + assertFalse("Processing thread did not die after interruption", thread.isAlive()); + + verify(listener1).handlePotentiallyStuck(any(ListenerContext.class), eq(processingTime)); + verify(listener2).handlePotentiallyStuck(any(ListenerContext.class), eq(processingTime)); + + verify(workflowInstanceDao).updateWorkflowInstanceAfterExecution(update.capture(), action.capture(), childWorkflows.capture(), + workflows.capture(), eq(true)); + assertThat(action.getValue().type, is(stateExecutionFailed)); + assertThat(action.getValue().stateText, containsString("InterruptedException")); } public static class Pojo { @@ -1275,4 +1317,31 @@ public NextAction start(@SuppressWarnings("unused") StateExecution execution) { throw new RuntimeException(); } } + + public static class StuckWorkflow extends WorkflowDefinition { + + protected StuckWorkflow() { + super("stuck", State.start, State.end); + } + + public static enum State implements WorkflowState { + start(WorkflowStateType.start), end(WorkflowStateType.end); + + private final WorkflowStateType stateType; + + private State(WorkflowStateType stateType) { + this.stateType = stateType; + } + + @Override + public WorkflowStateType getType() { + return stateType; + } + } + + public NextAction start(@SuppressWarnings("unused") StateExecution execution) throws InterruptedException { + SECONDS.sleep(10); + return stopInState(State.end, "Done"); + } + } } diff --git a/nflow-tests/src/test/java/io/nflow/tests/CronWorkflowTest.java b/nflow-tests/src/test/java/io/nflow/tests/CronWorkflowTest.java index f6308bea1..475bb817c 100644 --- a/nflow-tests/src/test/java/io/nflow/tests/CronWorkflowTest.java +++ b/nflow-tests/src/test/java/io/nflow/tests/CronWorkflowTest.java @@ -1,14 +1,16 @@ package io.nflow.tests; import static io.nflow.engine.workflow.curated.CronWorkflow.State.failed; -import static io.nflow.engine.workflow.instance.WorkflowInstanceAction.WorkflowActionType.stateExecution; import static io.nflow.tests.demo.workflow.TestCronWorkflow.TYPE; -import static java.util.Arrays.asList; import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.cxf.jaxrs.client.WebClient.fromClient; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.List; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; @@ -16,6 +18,7 @@ import org.junit.jupiter.api.TestMethodOrder; import org.springframework.context.annotation.ComponentScan; +import io.nflow.engine.workflow.curated.CronWorkflow; import io.nflow.rest.v1.msg.Action; import io.nflow.rest.v1.msg.CreateWorkflowInstanceRequest; import io.nflow.rest.v1.msg.CreateWorkflowInstanceResponse; @@ -47,7 +50,7 @@ public void startCronWorkflow() { req.type = TYPE; req.stateVariables = singletonMap("cron", "*/3 * * * * *"); resp = fromClient(workflowInstanceResource, true).put(req, CreateWorkflowInstanceResponse.class); - assertThat(resp.id, notNullValue()); + assertNotNull(resp.id); } @Test @@ -58,14 +61,12 @@ public void letItRunFor5Seconds() throws InterruptedException { @Test @Order(3) - public void verifyThatIsHasRunPeriodically() { - int i = 1; - assertWorkflowInstance(resp.id, - actionHistoryValidator(asList(new Action(i++, stateExecution.name(), "schedule", "", 0, null, null, 0), - new Action(i++, stateExecution.name(), "doWork", "", 0, null, null, 0), - new Action(i++, stateExecution.name(), "schedule", "", 0, null, null, 0), - new Action(i++, stateExecution.name(), "doWork", "", 0, null, null, 0), - new Action(i++, stateExecution.name(), "schedule", "", 0, null, null, 0)))); + public void verifyItHasRunPeriodically() { + List actions = getWorkflowInstance(resp.id).actions; + long scheduleActions = actions.stream().filter(a -> CronWorkflow.State.schedule.name().equals(a.state)).count(); + long doWorkActions = actions.stream().filter(a -> CronWorkflow.State.doWork.name().equals(a.state)).count(); + assertThat(scheduleActions, is(greaterThanOrEqualTo(1L))); + assertThat(doWorkActions, is(greaterThanOrEqualTo(1L))); } @BeforeServerStop