Skip to content

Commit

Permalink
make unknow state and type delays configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
Edvard Fonsell committed Apr 14, 2015
1 parent 8e3eb02 commit 5922d0a
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class WorkflowStateProcessor implements Runnable {
private final WorkflowExecutorListener[] executorListeners;
private final String illegalStateChangeAction;
DateTime lastLogged = now();
private final int unknownWorkflowTypeRetryDelay;
private final int unknownWorkflowStateRetryDelay;

WorkflowStateProcessor(int instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowInstanceService workflowInstances, WorkflowInstanceDao workflowInstanceDao, Environment env,
Expand All @@ -64,6 +66,9 @@ class WorkflowStateProcessor implements Runnable {
this.workflowInstanceDao = workflowInstanceDao;
this.executorListeners = executorListeners;
illegalStateChangeAction = env.getRequiredProperty("nflow.illegal.state.change.action");
unknownWorkflowTypeRetryDelay = env.getRequiredProperty("nflow.unknown.workflow.type.retry.delay.minutes", Integer.class);
unknownWorkflowStateRetryDelay = env.getRequiredProperty("nflow.unknown.workflow.state.retry.delay.minutes", Integer.class);

}

@Override
Expand Down Expand Up @@ -140,17 +145,17 @@ void logIfLagging(WorkflowInstance instance) {

private void rescheduleUnknownWorkflowType(WorkflowInstance instance) {
logger.warn("Workflow type {} not configured to this nFlow instance - rescheduling workflow instance", instance.type);
instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusHours(1)).setStatus(inProgress)
.setStateText("Unsupported workflow type").build();
instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusMinutes(unknownWorkflowTypeRetryDelay))
.setStatus(inProgress).setStateText("Unsupported workflow type").build();
workflowInstanceDao.updateWorkflowInstance(instance);
logger.debug("Finished.");
}

private void rescheduleUnknownWorkflowState(WorkflowInstance instance) {
logger.warn("Workflow state {} not configured to workflow type {} - rescheduling workflow instance", instance.state,
instance.type);
instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusHours(1)).setStatus(inProgress)
.setStateText("Unsupported workflow state").build();
instance = new WorkflowInstance.Builder(instance).setNextActivation(now().plusMinutes(unknownWorkflowStateRetryDelay))
.setStatus(inProgress).setStateText("Unsupported workflow state").build();
workflowInstanceDao.updateWorkflowInstance(instance);
logger.debug("Finished.");
}
Expand Down
3 changes: 3 additions & 0 deletions nflow-engine/src/main/resources/nflow-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ nflow.illegal.state.change.action=log
nflow.workflow.instance.query.max.results=10000
nflow.workflow.instance.query.max.results.default=100

nflow.unknown.workflow.type.retry.delay.minutes=60
nflow.unknown.workflow.state.retry.delay.minutes=60

nflow.db.user=nflow
nflow.db.password=nflow

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.springframework.beans.factory.BeanCreationException;
import org.springframework.core.env.Environment;
import org.springframework.mock.env.MockEnvironment;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

Expand All @@ -44,13 +44,15 @@ public class WorkflowDispatcherTest {
@Mock ExecutorDao recovery;
@Mock WorkflowStateProcessorFactory executorFactory;

@Mock Environment env;
MockEnvironment env = new MockEnvironment();

@Before
public void setup() {
when(env.getRequiredProperty("nflow.dispatcher.sleep.ms", Long.class)).thenReturn(0l);
when(env.getRequiredProperty("nflow.dispatcher.executor.queue.wait_until_threshold", Integer.class)).thenReturn(0);
when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("ignore");
env.setProperty("nflow.dispatcher.sleep.ms", "0");
env.setProperty("nflow.dispatcher.executor.queue.wait_until_threshold", "0");
env.setProperty("nflow.illegal.state.change.action", "ignore");
env.setProperty("nflow.unknown.workflow.type.retry.delay.minutes", "60");
env.setProperty("nflow.unknown.workflow.state.retry.delay.minutes", "60");
when(recovery.isTransactionSupportEnabled()).thenReturn(true);
executor = new WorkflowInstanceExecutor(3, 2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-"));
dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, recovery, env);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.springframework.core.env.Environment;
import org.springframework.mock.env.MockEnvironment;

import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
import com.nitorcreations.nflow.engine.internal.workflow.ObjectStringMapper;
Expand All @@ -22,8 +22,7 @@ public class WorkflowStateProcessorFactoryTest extends BaseNflowTest {
ObjectStringMapper objectMapper;
@Mock
WorkflowInstanceDao workflowInstanceDao;
@Mock
Environment env;
MockEnvironment env = new MockEnvironment();
@Mock
WorkflowExecutorListener listener1;
@Mock
Expand All @@ -34,6 +33,9 @@ public class WorkflowStateProcessorFactoryTest extends BaseNflowTest {

@Before
public void setup() {
env.setProperty("nflow.illegal.state.change.action", "ignore");
env.setProperty("nflow.unknown.workflow.type.retry.delay.minutes", "60");
env.setProperty("nflow.unknown.workflow.state.retry.delay.minutes", "60");
factory = new WorkflowStateProcessorFactory(workflowDefinitions, workflowInstances, objectMapper, workflowInstanceDao, env);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import org.mockito.ArgumentMatcher;
import org.mockito.Captor;
import org.mockito.Mock;
import org.springframework.core.env.Environment;
import org.springframework.mock.env.MockEnvironment;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
Expand Down Expand Up @@ -76,8 +76,7 @@ public class WorkflowStateProcessorTest extends BaseNflowTest {
@Mock
WorkflowInstanceDao workflowInstanceDao;

@Mock
Environment env;
MockEnvironment env = new MockEnvironment();

@Mock
WorkflowExecutorListener listener1;
Expand All @@ -103,7 +102,9 @@ public class WorkflowStateProcessorTest extends BaseNflowTest {

@Before
public void setup() {
when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("fail");
env.setProperty("nflow.illegal.state.change.action", "fail");
env.setProperty("nflow.unknown.workflow.type.retry.delay.minutes", "60");
env.setProperty("nflow.unknown.workflow.state.retry.delay.minutes", "60");
executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao,
env, listener1, listener2);
setCurrentMillisFixed(currentTimeMillis());
Expand Down Expand Up @@ -292,7 +293,7 @@ public void goToErrorStateWhenNextStateIsNull() {

@Test
public void goToErrorStateWhenNextStateIsInvalid() {
when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("ignore");
env.setProperty("nflow.illegal.state.change.action", "ignore");
executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, env,
listener1, listener2);

Expand Down Expand Up @@ -553,7 +554,7 @@ public void illegalStateChangeGoesToErrorState() {

@Test
public void illegalStateChangeGoesToIllegalStateWhenActionIsLog() {
when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("log");
env.setProperty("nflow.illegal.state.change.action", "log");
executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, env,
listener1, listener2);

Expand All @@ -573,7 +574,7 @@ public void illegalStateChangeGoesToIllegalStateWhenActionIsLog() {

@Test
public void illegalStateChangeGoesToIllegalStateWhenActionIsIgnore() {
when(env.getRequiredProperty("nflow.illegal.state.change.action")).thenReturn("ignore");
env.setProperty("nflow.illegal.state.change.action", "ignore");
executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, workflowInstanceDao, env,
listener1, listener2);

Expand Down

0 comments on commit 5922d0a

Please sign in to comment.