diff --git a/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessor.java b/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessor.java index 22a493ade..228594885 100644 --- a/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessor.java +++ b/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessor.java @@ -19,6 +19,7 @@ import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.MDC; +import org.springframework.core.env.Environment; import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao; import com.nitorcreations.nflow.engine.internal.workflow.ObjectStringMapper; @@ -50,10 +51,11 @@ class WorkflowStateProcessor implements Runnable { private final ObjectStringMapper objectMapper; private final WorkflowInstanceDao workflowInstanceDao; private final WorkflowExecutorListener[] executorListeners; + private final String illegalStateChangeAction; DateTime lastLogged = now(); WorkflowStateProcessor(int instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions, - WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, + WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, Environment env, WorkflowExecutorListener... executorListeners) { this.instanceId = instanceId; this.objectMapper = objectMapper; @@ -61,6 +63,7 @@ class WorkflowStateProcessor implements Runnable { this.workflowInstances = workflowInstances; this.workflowInstanceDao = workflowInstanceDao; this.executorListeners = executorListeners; + illegalStateChangeAction = env.getRequiredProperty("nflow.illegal.state.change.action"); } @Override @@ -211,6 +214,20 @@ private NextAction processState(WorkflowInstance instance, WorkflowDefinition } else { try { nextAction = (NextAction) invokeMethod(method.method, definition, args); + if (nextAction == null) { + logger.error("State '{}' handler method returned null, proceeding to error state '{}'", instance.state, definition + .getErrorState().name()); + nextAction = moveToState(definition.getErrorState(), "State handler method returned null"); + execution.setFailed(); + } else if (!"ignore".equals(illegalStateChangeAction) && !definition.isAllowedNextAction(instance, nextAction)) { + logger.warn("State transition from '{}' to '{}' is not allowed by workflow definition.", instance.state, + nextAction.getNextState()); + if ("fail".equals(illegalStateChangeAction)) { + nextAction = moveToState(definition.getErrorState(), "Illegal state transition from " + instance.state + " to " + + nextAction.getNextState().name() + ", proceeding to error state " + definition.getErrorState().name()); + execution.setFailed(); + } + } } catch (InvalidNextActionException e) { logger.error("State '" + instance.state + "' handler method failed to create valid next action, proceeding to error state '" @@ -218,17 +235,11 @@ private NextAction processState(WorkflowInstance instance, WorkflowDefinition nextAction = moveToState(definition.getErrorState(), e.getMessage()); execution.setFailed(e); } - if (nextAction == null) { - logger.error("State '{}' handler method returned null, proceeding to error state '{}'", - instance.state, definition.getErrorState().name()); - nextAction = moveToState(definition.getErrorState(), "State handler method returned null"); - execution.setFailed(); - } } execution.setNextActivation(nextAction.getActivation()); execution.setNextStateReason(nextAction.getReason()); execution.setSaveTrace(nextAction.isSaveTrace()); - if (nextAction.getNextState() == null) { + if (nextAction.isRetry()) { execution.setNextState(currentState); execution.setRetry(true); definition.handleRetryAfter(execution, nextAction.getActivation()); diff --git a/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java b/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java index bceb3a212..9d4164eab 100644 --- a/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java +++ b/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorFactory.java @@ -3,6 +3,7 @@ import javax.inject.Inject; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; import org.springframework.stereotype.Component; import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao; @@ -17,21 +18,23 @@ public class WorkflowStateProcessorFactory { private final WorkflowInstanceService workflowInstances; private final ObjectStringMapper objectMapper; private final WorkflowInstanceDao workflowInstanceDao; + private final Environment env; @Autowired(required = false) protected WorkflowExecutorListener[] listeners = new WorkflowExecutorListener[0]; @Inject public WorkflowStateProcessorFactory(WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances, - ObjectStringMapper objectMapper, WorkflowInstanceDao workflowInstanceDao) { + ObjectStringMapper objectMapper, WorkflowInstanceDao workflowInstanceDao, Environment env) { this.workflowDefinitions = workflowDefinitions; this.workflowInstances = workflowInstances; this.objectMapper = objectMapper; this.workflowInstanceDao = workflowInstanceDao; + this.env = env; } public WorkflowStateProcessor createProcessor(int instanceId) { return new WorkflowStateProcessor(instanceId, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, - listeners); + env, listeners); } } diff --git a/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/workflow/definition/AbstractWorkflowDefinition.java b/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/workflow/definition/AbstractWorkflowDefinition.java index 6e8f9606b..69be5b27c 100644 --- a/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/workflow/definition/AbstractWorkflowDefinition.java +++ b/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/workflow/definition/AbstractWorkflowDefinition.java @@ -18,6 +18,7 @@ import com.nitorcreations.nflow.engine.internal.workflow.StateExecutionImpl; import com.nitorcreations.nflow.engine.internal.workflow.WorkflowDefinitionScanner; import com.nitorcreations.nflow.engine.internal.workflow.WorkflowStateMethod; +import com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstance; /** * The base class for all workflow definitions. @@ -288,4 +289,24 @@ public WorkflowState getState(String state) { public boolean isStartState(String state) { return getState(state).getType() == WorkflowStateType.start; } + + /** + * Return true if the given nextAction is permitted for given instance. + * @param instance The workflow instance for which the action is checked. + * @param nextAction The action to be checked. + * @return True if the nextAction is permitted, false otherwise. + */ + public boolean isAllowedNextAction(WorkflowInstance instance, NextAction nextAction) { + if (nextAction.isRetry()) { + return true; + } + List allowedNextStates = allowedTransitions.get(instance.state); + if (allowedNextStates != null && allowedNextStates.contains(nextAction.getNextState().name())) { + return true; + } + if (nextAction.getNextState() == failureTransitions.get(instance.state)) { + return true; + } + return nextAction.getNextState() == getErrorState(); + } } diff --git a/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/workflow/definition/NextAction.java b/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/workflow/definition/NextAction.java index ef948a5f1..261b2fae7 100644 --- a/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/workflow/definition/NextAction.java +++ b/nflow-engine/src/main/java/com/nitorcreations/nflow/engine/workflow/definition/NextAction.java @@ -117,4 +117,12 @@ private static void assertNotNull(Object object, String message) { throw new InvalidNextActionException(message); } } + + /** + * Return true if this action is a retry of the current state. + * @return True if action is a retry, false otherwise. + */ + public boolean isRetry() { + return nextState == null; + } } diff --git a/nflow-engine/src/main/resources/nflow-engine.properties b/nflow-engine/src/main/resources/nflow-engine.properties index f33dcfb08..233aac669 100644 --- a/nflow-engine/src/main/resources/nflow-engine.properties +++ b/nflow-engine/src/main/resources/nflow-engine.properties @@ -7,6 +7,9 @@ nflow.dispatcher.sleep.ms=1000 nflow.dispatcher.await.termination.seconds=60 nflow.dispatcher.executor.thread.keepalive.seconds=0 +# ignore, log, fail +nflow.illegal.state.change.action=log + nflow.workflow.instance.query.max.results=10000 nflow.workflow.instance.query.max.results.default=100 diff --git a/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowDispatcherTest.java index 14a4d68c1..d004f7086 100644 --- a/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -50,6 +50,7 @@ public class WorkflowDispatcherTest { public void setup() { when(env.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class)).thenReturn(0l); when(env.getRequiredProperty("nflow.dispatcher.executor.queue.wait_until_threshold", Integer.class)).thenReturn(0); + when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("ignore"); when(recovery.isTransactionSupportEnabled()).thenReturn(true); executor = new WorkflowInstanceExecutor(3, 2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-")); dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, recovery, env); @@ -70,8 +71,10 @@ public void threadDispatcher() { .thenReturn(ids(1)) .thenThrow(new RuntimeException("Expected: exception during dispatcher execution")) .thenAnswer(waitForTickAndAnswer(2, ids(2), this)); - when(executorFactory.createProcessor(1)).thenReturn(fakeWorkflowExecutor(1, noOpRunnable())); - when(executorFactory.createProcessor(2)).thenReturn(fakeWorkflowExecutor(2, noOpRunnable())); + WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, noOpRunnable()); + when(executorFactory.createProcessor(1)).thenReturn(fakeWorkflowExecutor); + WorkflowStateProcessor fakeWorkflowExecutor2 = fakeWorkflowExecutor(2, noOpRunnable()); + when(executorFactory.createProcessor(2)).thenReturn(fakeWorkflowExecutor2); dispatcher.run(); } @@ -151,8 +154,8 @@ class ShutdownBlocksUntilPoolShutdown extends MultithreadedTestCase { public void threadDispatcher() { when(workflowInstances.pollNextWorkflowInstanceIds(anyInt())) .thenAnswer(waitForTickAndAnswer(2, ids(1), this)); - when(executorFactory.createProcessor(anyInt())) - .thenReturn(fakeWorkflowExecutor(1, waitForTickRunnable(3, this))); + WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this)); + when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor); dispatcher.run(); } @@ -179,8 +182,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable { return ids(1); } }); - when(executorFactory.createProcessor(anyInt())) - .thenReturn(fakeWorkflowExecutor(1, waitForTickRunnable(3, this))); + WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this)); + when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor); dispatcher.run(); } @@ -273,7 +276,7 @@ public void run() { } WorkflowStateProcessor fakeWorkflowExecutor(int instanceId, final Runnable fakeCommand) { - return new WorkflowStateProcessor(instanceId, null, null, null, null, (WorkflowExecutorListener) null) { + return new WorkflowStateProcessor(instanceId, null, null, null, null, env, (WorkflowExecutorListener) null) { @Override public void run() { fakeCommand.run(); diff --git a/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java b/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java index edc4506e4..ad4ff57f6 100644 --- a/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java +++ b/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorFactoryTest.java @@ -5,6 +5,7 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mock; +import org.springframework.core.env.Environment; import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao; import com.nitorcreations.nflow.engine.internal.workflow.ObjectStringMapper; @@ -22,6 +23,8 @@ public class WorkflowStateProcessorFactoryTest extends BaseNflowTest { @Mock WorkflowInstanceDao workflowInstanceDao; @Mock + Environment env; + @Mock WorkflowExecutorListener listener1; @Mock WorkflowExecutorListener listener2; @@ -31,7 +34,7 @@ public class WorkflowStateProcessorFactoryTest extends BaseNflowTest { @Before public void setup() { - factory = new WorkflowStateProcessorFactory(workflowDefinitions, workflowInstances, objectMapper, workflowInstanceDao); + factory = new WorkflowStateProcessorFactory(workflowDefinitions, workflowInstances, objectMapper, workflowInstanceDao, env); } @Test diff --git a/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorTest.java b/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorTest.java index 253beefab..146c87df9 100644 --- a/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorTest.java +++ b/nflow-engine/src/test/java/com/nitorcreations/nflow/engine/internal/executor/WorkflowStateProcessorTest.java @@ -44,6 +44,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.Captor; import org.mockito.Mock; +import org.springframework.core.env.Environment; import com.fasterxml.jackson.databind.ObjectMapper; import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao; @@ -75,6 +76,9 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { @Mock WorkflowInstanceDao workflowInstanceDao; + @Mock + Environment env; + @Mock WorkflowExecutorListener listener1; @@ -99,8 +103,9 @@ public class WorkflowStateProcessorTest extends BaseNflowTest { @Before public void setup() { + when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("fail"); executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, - listener1, listener2); + env, listener1, listener2); setCurrentMillisFixed(currentTimeMillis()); doReturn(executeWf).when(workflowDefinitions).getWorkflowDefinition("execute-test"); doReturn(simpleWf).when(workflowDefinitions).getWorkflowDefinition("simple-test"); @@ -481,6 +486,75 @@ public void runUnsupportedWorkflow() { argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.start, 0, is("Unsupported workflow type")))); } + @Test + public void illegalStateChangeGoesToErrorState() { + WorkflowDefinition wf = new SimpleTestWorkflow(); + doReturn(wf).when(workflowDefinitions).getWorkflowDefinition("simple"); + WorkflowInstance instance = executingInstanceBuilder().setType("simple").setState("illegalStateChange").build(); + when(workflowInstances.getWorkflowInstance(instance.id)).thenReturn(instance); + + executor.run(); + + verify(workflowInstanceDao, times(2)).updateWorkflowInstanceAfterExecution(update.capture(), action.capture()); + assertThat( + update.getAllValues().get(0), + matchesWorkflowInstance(executing, SimpleTestWorkflow.State.error, 0, + is("Scheduled by previous state illegalStateChange"))); + assertThat(action.getAllValues().get(0), + matchesWorkflowInstanceAction(SimpleTestWorkflow.State.illegalStateChange, + is("Illegal state transition from illegalStateChange to start, proceeding to error state error"), 0, + stateExecutionFailed)); + assertThat(update.getAllValues().get(1), + matchesWorkflowInstance(finished, SimpleTestWorkflow.State.error, 0, is("Stopped in state error"), + nullValue(DateTime.class))); + assertThat(action.getAllValues().get(1), + matchesWorkflowInstanceAction(SimpleTestWorkflow.State.error, is("Stopped in final state"), 0, stateExecution)); + } + + @Test + public void illegalStateChangeGoesToIllegalStateWhenActionIsLog() { + when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("log"); + executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, env, + listener1, listener2); + + WorkflowDefinition wf = new SimpleTestWorkflow(); + doReturn(wf).when(workflowDefinitions).getWorkflowDefinition("simple"); + WorkflowInstance instance = executingInstanceBuilder().setType("simple").setState("illegalStateChange").build(); + when(workflowInstances.getWorkflowInstance(instance.id)).thenReturn(instance); + + executor.run(); + + verify(workflowInstanceDao, times(3)).updateWorkflowInstanceAfterExecution(update.capture(), action.capture()); + assertThat( + update.getAllValues().get(0), + matchesWorkflowInstance(executing, SimpleTestWorkflow.State.start, 0, + is("Scheduled by previous state illegalStateChange"))); + assertThat(action.getAllValues().get(0), + matchesWorkflowInstanceAction(SimpleTestWorkflow.State.illegalStateChange, is("illegal state change"), 0, stateExecution)); + } + + @Test + public void illegalStateChangeGoesToIllegalStateWhenActionIsIgnore() { + when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("ignore"); + executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, env, + listener1, listener2); + + WorkflowDefinition wf = new SimpleTestWorkflow(); + doReturn(wf).when(workflowDefinitions).getWorkflowDefinition("simple"); + WorkflowInstance instance = executingInstanceBuilder().setType("simple").setState("illegalStateChange").build(); + when(workflowInstances.getWorkflowInstance(instance.id)).thenReturn(instance); + + executor.run(); + + verify(workflowInstanceDao, times(3)).updateWorkflowInstanceAfterExecution(update.capture(), action.capture()); + assertThat( + update.getAllValues().get(0), + matchesWorkflowInstance(executing, SimpleTestWorkflow.State.start, 0, + is("Scheduled by previous state illegalStateChange"))); + assertThat(action.getAllValues().get(0), + matchesWorkflowInstanceAction(SimpleTestWorkflow.State.illegalStateChange, is("illegal state change"), 0, stateExecution)); + } + public static class Pojo { public String field; public boolean test; @@ -549,6 +623,7 @@ public static class FailingTestWorkflow extends protected FailingTestWorkflow() { super("failing", State.start, State.error); permit(State.start, State.process, State.failure); + permit(State.nextStateNoMethod, State.noMethodEndState); } public static enum State implements WorkflowState { @@ -619,12 +694,15 @@ public static class SimpleTestWorkflow extends WorkflowDefinition