Skip to content

Commit

Permalink
make illegal state change action configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Edvard Fonsell committed Mar 14, 2015
1 parent 0a1b9bd commit abd5a82
Show file tree
Hide file tree
Showing 10 changed files with 215 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.MDC;
import org.springframework.core.env.Environment;

import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
import com.nitorcreations.nflow.engine.internal.workflow.ObjectStringMapper;
Expand Down Expand Up @@ -50,17 +51,19 @@ class WorkflowStateProcessor implements Runnable {
private final ObjectStringMapper objectMapper;
private final WorkflowInstanceDao workflowInstanceDao;
private final WorkflowExecutorListener[] executorListeners;
private final String illegalStateChangeAction;
DateTime lastLogged = now();

WorkflowStateProcessor(int instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao,
WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, Environment env,
WorkflowExecutorListener... executorListeners) {
this.instanceId = instanceId;
this.objectMapper = objectMapper;
this.workflowDefinitions = workflowDefinitions;
this.workflowInstances = workflowInstances;
this.workflowInstanceDao = workflowInstanceDao;
this.executorListeners = executorListeners;
illegalStateChangeAction = env.getRequiredProperty("nflow.illegal.state.change.action");
}

@Override
Expand Down Expand Up @@ -211,24 +214,32 @@ private NextAction processState(WorkflowInstance instance, WorkflowDefinition<?>
} else {
try {
nextAction = (NextAction) invokeMethod(method.method, definition, args);
if (nextAction == null) {
logger.error("State '{}' handler method returned null, proceeding to error state '{}'", instance.state, definition
.getErrorState().name());
nextAction = moveToState(definition.getErrorState(), "State handler method returned null");
execution.setFailed();
} else if (!"ignore".equals(illegalStateChangeAction) && !definition.isAllowedNextAction(instance, nextAction)) {
logger.warn("State transition from '{}' to '{}' is not allowed by workflow definition.", instance.state,
nextAction.getNextState());
if ("fail".equals(illegalStateChangeAction)) {
nextAction = moveToState(definition.getErrorState(), "Illegal state transition from " + instance.state + " to "
+ nextAction.getNextState().name() + ", proceeding to error state " + definition.getErrorState().name());
execution.setFailed();
}
}
} catch (InvalidNextActionException e) {
logger.error("State '" + instance.state
+ "' handler method failed to create valid next action, proceeding to error state '"
+ definition.getErrorState().name() + "'", e);
nextAction = moveToState(definition.getErrorState(), e.getMessage());
execution.setFailed(e);
}
if (nextAction == null) {
logger.error("State '{}' handler method returned null, proceeding to error state '{}'",
instance.state, definition.getErrorState().name());
nextAction = moveToState(definition.getErrorState(), "State handler method returned null");
execution.setFailed();
}
}
execution.setNextActivation(nextAction.getActivation());
execution.setNextStateReason(nextAction.getReason());
execution.setSaveTrace(nextAction.isSaveTrace());
if (nextAction.getNextState() == null) {
if (nextAction.isRetry()) {
execution.setNextState(currentState);
execution.setRetry(true);
definition.handleRetryAfter(execution, nextAction.getActivation());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import javax.inject.Inject;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
Expand All @@ -17,21 +18,23 @@ public class WorkflowStateProcessorFactory {
private final WorkflowInstanceService workflowInstances;
private final ObjectStringMapper objectMapper;
private final WorkflowInstanceDao workflowInstanceDao;
private final Environment env;
@Autowired(required = false)
protected WorkflowExecutorListener[] listeners = new WorkflowExecutorListener[0];

@Inject
public WorkflowStateProcessorFactory(WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances,
ObjectStringMapper objectMapper, WorkflowInstanceDao workflowInstanceDao) {
ObjectStringMapper objectMapper, WorkflowInstanceDao workflowInstanceDao, Environment env) {
this.workflowDefinitions = workflowDefinitions;
this.workflowInstances = workflowInstances;
this.objectMapper = objectMapper;
this.workflowInstanceDao = workflowInstanceDao;
this.env = env;
}

public WorkflowStateProcessor createProcessor(int instanceId) {
return new WorkflowStateProcessor(instanceId, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao,
listeners);
env, listeners);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.nitorcreations.nflow.engine.internal.workflow.StateExecutionImpl;
import com.nitorcreations.nflow.engine.internal.workflow.WorkflowDefinitionScanner;
import com.nitorcreations.nflow.engine.internal.workflow.WorkflowStateMethod;
import com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstance;

/**
* The base class for all workflow definitions.
Expand Down Expand Up @@ -288,4 +289,24 @@ public WorkflowState getState(String state) {
public boolean isStartState(String state) {
return getState(state).getType() == WorkflowStateType.start;
}

/**
* Return true if the given nextAction is permitted for given instance.
* @param instance The workflow instance for which the action is checked.
* @param nextAction The action to be checked.
* @return True if the nextAction is permitted, false otherwise.
*/
public boolean isAllowedNextAction(WorkflowInstance instance, NextAction nextAction) {
if (nextAction.isRetry()) {
return true;
}
List<String> allowedNextStates = allowedTransitions.get(instance.state);
if (allowedNextStates != null && allowedNextStates.contains(nextAction.getNextState().name())) {
return true;
}
if (nextAction.getNextState() == failureTransitions.get(instance.state)) {
return true;
}
return nextAction.getNextState() == getErrorState();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,12 @@ private static void assertNotNull(Object object, String message) {
throw new InvalidNextActionException(message);
}
}

/**
* Return true if this action is a retry of the current state.
* @return True if action is a retry, false otherwise.
*/
public boolean isRetry() {
return nextState == null;
}
}
3 changes: 3 additions & 0 deletions nflow-engine/src/main/resources/nflow-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ nflow.dispatcher.sleep.ms=1000
nflow.dispatcher.await.termination.seconds=60
nflow.dispatcher.executor.thread.keepalive.seconds=0

# ignore, log, fail
nflow.illegal.state.change.action=log

nflow.workflow.instance.query.max.results=10000
nflow.workflow.instance.query.max.results.default=100

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class WorkflowDispatcherTest {
public void setup() {
when(env.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class)).thenReturn(0l);
when(env.getRequiredProperty("nflow.dispatcher.executor.queue.wait_until_threshold", Integer.class)).thenReturn(0);
when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("ignore");
when(recovery.isTransactionSupportEnabled()).thenReturn(true);
executor = new WorkflowInstanceExecutor(3, 2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-"));
dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, recovery, env);
Expand All @@ -70,8 +71,10 @@ public void threadDispatcher() {
.thenReturn(ids(1))
.thenThrow(new RuntimeException("Expected: exception during dispatcher execution"))
.thenAnswer(waitForTickAndAnswer(2, ids(2), this));
when(executorFactory.createProcessor(1)).thenReturn(fakeWorkflowExecutor(1, noOpRunnable()));
when(executorFactory.createProcessor(2)).thenReturn(fakeWorkflowExecutor(2, noOpRunnable()));
WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, noOpRunnable());
when(executorFactory.createProcessor(1)).thenReturn(fakeWorkflowExecutor);
WorkflowStateProcessor fakeWorkflowExecutor2 = fakeWorkflowExecutor(2, noOpRunnable());
when(executorFactory.createProcessor(2)).thenReturn(fakeWorkflowExecutor2);

dispatcher.run();
}
Expand Down Expand Up @@ -151,8 +154,8 @@ class ShutdownBlocksUntilPoolShutdown extends MultithreadedTestCase {
public void threadDispatcher() {
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt()))
.thenAnswer(waitForTickAndAnswer(2, ids(1), this));
when(executorFactory.createProcessor(anyInt()))
.thenReturn(fakeWorkflowExecutor(1, waitForTickRunnable(3, this)));
WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this));
when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor);

dispatcher.run();
}
Expand All @@ -179,8 +182,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
return ids(1);
}
});
when(executorFactory.createProcessor(anyInt()))
.thenReturn(fakeWorkflowExecutor(1, waitForTickRunnable(3, this)));
WorkflowStateProcessor fakeWorkflowExecutor = fakeWorkflowExecutor(1, waitForTickRunnable(3, this));
when(executorFactory.createProcessor(anyInt())).thenReturn(fakeWorkflowExecutor);

dispatcher.run();
}
Expand Down Expand Up @@ -273,7 +276,7 @@ public void run() {
}

WorkflowStateProcessor fakeWorkflowExecutor(int instanceId, final Runnable fakeCommand) {
return new WorkflowStateProcessor(instanceId, null, null, null, null, (WorkflowExecutorListener) null) {
return new WorkflowStateProcessor(instanceId, null, null, null, null, env, (WorkflowExecutorListener) null) {
@Override
public void run() {
fakeCommand.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.springframework.core.env.Environment;

import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
import com.nitorcreations.nflow.engine.internal.workflow.ObjectStringMapper;
Expand All @@ -22,6 +23,8 @@ public class WorkflowStateProcessorFactoryTest extends BaseNflowTest {
@Mock
WorkflowInstanceDao workflowInstanceDao;
@Mock
Environment env;
@Mock
WorkflowExecutorListener listener1;
@Mock
WorkflowExecutorListener listener2;
Expand All @@ -31,7 +34,7 @@ public class WorkflowStateProcessorFactoryTest extends BaseNflowTest {

@Before
public void setup() {
factory = new WorkflowStateProcessorFactory(workflowDefinitions, workflowInstances, objectMapper, workflowInstanceDao);
factory = new WorkflowStateProcessorFactory(workflowDefinitions, workflowInstances, objectMapper, workflowInstanceDao, env);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.mockito.ArgumentMatcher;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.core.env.Environment;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
Expand Down Expand Up @@ -75,6 +76,9 @@ public class WorkflowStateProcessorTest extends BaseNflowTest {
@Mock
WorkflowInstanceDao workflowInstanceDao;

@Mock
Environment env;

@Mock
WorkflowExecutorListener listener1;

Expand All @@ -99,8 +103,9 @@ public class WorkflowStateProcessorTest extends BaseNflowTest {

@Before
public void setup() {
when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("fail");
executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao,
listener1, listener2);
env, listener1, listener2);
setCurrentMillisFixed(currentTimeMillis());
doReturn(executeWf).when(workflowDefinitions).getWorkflowDefinition("execute-test");
doReturn(simpleWf).when(workflowDefinitions).getWorkflowDefinition("simple-test");
Expand Down Expand Up @@ -481,6 +486,75 @@ public void runUnsupportedWorkflow() {
argThat(matchesWorkflowInstance(inProgress, FailingTestWorkflow.State.start, 0, is("Unsupported workflow type"))));
}

@Test
public void illegalStateChangeGoesToErrorState() {
WorkflowDefinition<SimpleTestWorkflow.State> wf = new SimpleTestWorkflow();
doReturn(wf).when(workflowDefinitions).getWorkflowDefinition("simple");
WorkflowInstance instance = executingInstanceBuilder().setType("simple").setState("illegalStateChange").build();
when(workflowInstances.getWorkflowInstance(instance.id)).thenReturn(instance);

executor.run();

verify(workflowInstanceDao, times(2)).updateWorkflowInstanceAfterExecution(update.capture(), action.capture());
assertThat(
update.getAllValues().get(0),
matchesWorkflowInstance(executing, SimpleTestWorkflow.State.error, 0,
is("Scheduled by previous state illegalStateChange")));
assertThat(action.getAllValues().get(0),
matchesWorkflowInstanceAction(SimpleTestWorkflow.State.illegalStateChange,
is("Illegal state transition from illegalStateChange to start, proceeding to error state error"), 0,
stateExecutionFailed));
assertThat(update.getAllValues().get(1),
matchesWorkflowInstance(finished, SimpleTestWorkflow.State.error, 0, is("Stopped in state error"),
nullValue(DateTime.class)));
assertThat(action.getAllValues().get(1),
matchesWorkflowInstanceAction(SimpleTestWorkflow.State.error, is("Stopped in final state"), 0, stateExecution));
}

@Test
public void illegalStateChangeGoesToIllegalStateWhenActionIsLog() {
when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("log");
executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, env,
listener1, listener2);

WorkflowDefinition<SimpleTestWorkflow.State> wf = new SimpleTestWorkflow();
doReturn(wf).when(workflowDefinitions).getWorkflowDefinition("simple");
WorkflowInstance instance = executingInstanceBuilder().setType("simple").setState("illegalStateChange").build();
when(workflowInstances.getWorkflowInstance(instance.id)).thenReturn(instance);

executor.run();

verify(workflowInstanceDao, times(3)).updateWorkflowInstanceAfterExecution(update.capture(), action.capture());
assertThat(
update.getAllValues().get(0),
matchesWorkflowInstance(executing, SimpleTestWorkflow.State.start, 0,
is("Scheduled by previous state illegalStateChange")));
assertThat(action.getAllValues().get(0),
matchesWorkflowInstanceAction(SimpleTestWorkflow.State.illegalStateChange, is("illegal state change"), 0, stateExecution));
}

@Test
public void illegalStateChangeGoesToIllegalStateWhenActionIsIgnore() {
when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("ignore");
executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, env,
listener1, listener2);

WorkflowDefinition<SimpleTestWorkflow.State> wf = new SimpleTestWorkflow();
doReturn(wf).when(workflowDefinitions).getWorkflowDefinition("simple");
WorkflowInstance instance = executingInstanceBuilder().setType("simple").setState("illegalStateChange").build();
when(workflowInstances.getWorkflowInstance(instance.id)).thenReturn(instance);

executor.run();

verify(workflowInstanceDao, times(3)).updateWorkflowInstanceAfterExecution(update.capture(), action.capture());
assertThat(
update.getAllValues().get(0),
matchesWorkflowInstance(executing, SimpleTestWorkflow.State.start, 0,
is("Scheduled by previous state illegalStateChange")));
assertThat(action.getAllValues().get(0),
matchesWorkflowInstanceAction(SimpleTestWorkflow.State.illegalStateChange, is("illegal state change"), 0, stateExecution));
}

public static class Pojo {
public String field;
public boolean test;
Expand Down Expand Up @@ -549,6 +623,7 @@ public static class FailingTestWorkflow extends
protected FailingTestWorkflow() {
super("failing", State.start, State.error);
permit(State.start, State.process, State.failure);
permit(State.nextStateNoMethod, State.noMethodEndState);
}

public static enum State implements WorkflowState {
Expand Down Expand Up @@ -619,12 +694,15 @@ public static class SimpleTestWorkflow extends WorkflowDefinition<SimpleTestWork

protected SimpleTestWorkflow() {
super("simple", State.start, State.error);
permit(State.start, State.end);
permit(State.start, State.processing);
permit(State.processing, State.end);
permit(State.beforeManual, State.manualState);
}

public static enum State implements WorkflowState {
start(WorkflowStateType.start), beforeManual(WorkflowStateType.normal), end(WorkflowStateType.end), manualState(
WorkflowStateType.manual), error(WorkflowStateType.end), processing(WorkflowStateType.normal);
WorkflowStateType.manual), error(WorkflowStateType.end), processing(WorkflowStateType.normal), illegalStateChange(
WorkflowStateType.normal);

private final WorkflowStateType stateType;

Expand Down Expand Up @@ -660,6 +738,12 @@ public NextAction processing(StateExecution execution) {
return stopInState(State.end, "Finished.");
}

public void error(StateExecution execution) {}
public NextAction illegalStateChange(StateExecution execution) {
return moveToState(State.start, "illegal state change");
}

public void error(StateExecution execution) {
System.err.println("Executing error state");
}
}
}
Loading

0 comments on commit abd5a82

Please sign in to comment.