Skip to content

Commit

Permalink
Rename WorkflowExecutor to WorkflowStateProcessor.
Browse files Browse the repository at this point in the history
Also rename WorkflowExecutorFactory to WorklfowStateProcessorFactory.
This better describes its role.
  • Loading branch information
jsyrjala committed Sep 23, 2014
1 parent c290e11 commit dcaac26
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ public class WorkflowDispatcher implements Runnable {

private final ThresholdThreadPoolTaskExecutor pool;
private final WorkflowInstanceDao workflowInstances;
private final WorkflowExecutorFactory executorFactory;
private final WorkflowStateProcessorFactory stateProcessorFactory;
private final ExecutorDao executorRecovery;
private final long sleepTime;
private final Random rand = new Random();

@Inject
public WorkflowDispatcher(@Named("nflowExecutor") ThresholdThreadPoolTaskExecutor pool, WorkflowInstanceDao workflowInstances,
WorkflowExecutorFactory executorFactory, ExecutorDao executorRecovery, Environment env) {
WorkflowStateProcessorFactory stateProcessorFactory, ExecutorDao executorRecovery, Environment env) {
this.pool = pool;
this.workflowInstances = workflowInstances;
this.executorFactory = executorFactory;
this.stateProcessorFactory = stateProcessorFactory;
this.executorRecovery = executorRecovery;
this.sleepTime = env.getProperty("nflow.dispatcher.sleep.ms", Long.class, 5000l);
}
Expand Down Expand Up @@ -98,7 +98,7 @@ private void dispatch(List<Integer> nextInstanceIds) {

logger.debug("Found {} workflow instances, dispatching executors.", nextInstanceIds.size());
for (Integer instanceId : nextInstanceIds) {
pool.submit(executorFactory.createExecutor(instanceId));
pool.submit(stateProcessorFactory.createProcessor(instanceId));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstance;
import com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstanceAction;

class WorkflowExecutor implements Runnable {
class WorkflowStateProcessor implements Runnable {

private static final Logger logger = getLogger(WorkflowExecutor.class);
private static final Logger logger = getLogger(WorkflowStateProcessor.class);
private static final String MDC_KEY = "workflowInstanceId";

private final int MAX_SUBSEQUENT_STATE_EXECUTIONS = 100;
Expand All @@ -36,7 +36,7 @@ class WorkflowExecutor implements Runnable {
private final ObjectStringMapper objectMapper;
private final WorkflowExecutorListener[] executorListeners;

WorkflowExecutor(int instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowStateProcessor(int instanceId, ObjectStringMapper objectMapper, WorkflowDefinitionService workflowDefinitions,
WorkflowInstanceService workflowInstances, WorkflowExecutorListener... executorListeners) {
this.instanceId = instanceId;
this.objectMapper = objectMapper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@
import com.nitorcreations.nflow.engine.service.WorkflowInstanceService;

@Component
public class WorkflowExecutorFactory {
public class WorkflowStateProcessorFactory {
private final WorkflowDefinitionService workflowDefinitions;
private final WorkflowInstanceService workflowInstances;
private final ObjectStringMapper objectMapper;
@Autowired(required = false)
protected WorkflowExecutorListener[] listeners = new WorkflowExecutorListener[0];

@Inject
public WorkflowExecutorFactory(WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances,
public WorkflowStateProcessorFactory(WorkflowDefinitionService workflowDefinitions, WorkflowInstanceService workflowInstances,
ObjectStringMapper objectMapper) {
this.workflowDefinitions = workflowDefinitions;
this.workflowInstances = workflowInstances;
this.objectMapper = objectMapper;
}

public WorkflowExecutor createExecutor(int instanceId) {
return new WorkflowExecutor(instanceId, objectMapper, workflowDefinitions, workflowInstances, listeners);
public WorkflowStateProcessor createProcessor(int instanceId) {
return new WorkflowStateProcessor(instanceId, objectMapper, workflowDefinitions, workflowInstances, listeners);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.joda.time.DateTime;
import org.junit.Test;

import com.nitorcreations.nflow.engine.internal.executor.ThresholdBlockingQueue;

public class ThresholdBlockingQueueTest {
ThresholdBlockingQueue<Integer> q = new ThresholdBlockingQueue<>(3, 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@

import com.nitorcreations.nflow.engine.internal.dao.ExecutorDao;
import com.nitorcreations.nflow.engine.internal.dao.WorkflowInstanceDao;
import com.nitorcreations.nflow.engine.internal.executor.ThresholdBlockingQueue;
import com.nitorcreations.nflow.engine.internal.executor.ThresholdThreadPoolTaskExecutor;
import com.nitorcreations.nflow.engine.internal.executor.WorkflowStateProcessor;
import com.nitorcreations.nflow.engine.internal.executor.WorkflowStateProcessorFactory;
import com.nitorcreations.nflow.engine.internal.executor.WorkflowDispatcher;
import com.nitorcreations.nflow.engine.listener.WorkflowExecutorListener;

import edu.umd.cs.mtc.MultithreadedTestCase;
Expand All @@ -41,7 +46,7 @@ public class WorkflowDispatcherTest {

@Mock WorkflowInstanceDao workflowInstances;
@Mock ExecutorDao recovery;
@Mock WorkflowExecutorFactory executorFactory;
@Mock WorkflowStateProcessorFactory executorFactory;

@Mock Environment env;

Expand All @@ -62,8 +67,8 @@ public void threadDispatcher() {
.thenReturn(ids(1))
.thenThrow(new RuntimeException("Expected: exception during dispatcher execution"))
.thenAnswer(waitForTickAndAnswer(2, ids(2), this));
when(executorFactory.createExecutor(1)).thenReturn(fakeWorkflowExecutor(1, noOpRunnable()));
when(executorFactory.createExecutor(2)).thenReturn(fakeWorkflowExecutor(2, noOpRunnable()));
when(executorFactory.createProcessor(1)).thenReturn(fakeWorkflowExecutor(1, noOpRunnable()));
when(executorFactory.createProcessor(2)).thenReturn(fakeWorkflowExecutor(2, noOpRunnable()));

dispatcher.run();
}
Expand All @@ -77,8 +82,8 @@ public void threadShutdown() {
public void finish() {
verify(workflowInstances, times(3)).pollNextWorkflowInstanceIds(anyInt());
InOrder inOrder = inOrder(executorFactory);
inOrder.verify(executorFactory).createExecutor(1);
inOrder.verify(executorFactory).createExecutor(2);
inOrder.verify(executorFactory).createProcessor(1);
inOrder.verify(executorFactory).createProcessor(2);
}
}
TestFramework.runOnce(new ExceptionDuringDispatcherExecutionCausesRetry());
Expand All @@ -104,7 +109,7 @@ public void threadDispatcher() {
@Override
public void finish() {
verify(workflowInstances).pollNextWorkflowInstanceIds(anyInt());
verify(executorFactory, never()).createExecutor(anyInt());
verify(executorFactory, never()).createProcessor(anyInt());
}
}
TestFramework.runOnce(new ErrorDuringDispatcherExecutionStopsDispatcher());
Expand All @@ -130,7 +135,7 @@ public void threadShutdown() {
@Override
public void finish() {
verify(workflowInstances, times(3)).pollNextWorkflowInstanceIds(anyInt());
verify(executorFactory, never()).createExecutor(anyInt());
verify(executorFactory, never()).createProcessor(anyInt());
}
}
TestFramework.runOnce(new EmptyPollResultCausesNoTasksToBeScheduled());
Expand All @@ -143,7 +148,7 @@ class ShutdownBlocksUntilPoolShutdown extends MultithreadedTestCase {
public void threadDispatcher() {
when(workflowInstances.pollNextWorkflowInstanceIds(anyInt()))
.thenAnswer(waitForTickAndAnswer(2, ids(1), this));
when(executorFactory.createExecutor(anyInt()))
when(executorFactory.createProcessor(anyInt()))
.thenReturn(fakeWorkflowExecutor(1, waitForTickRunnable(3, this)));

dispatcher.run();
Expand Down Expand Up @@ -171,7 +176,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
return ids(1);
}
});
when(executorFactory.createExecutor(anyInt()))
when(executorFactory.createProcessor(anyInt()))
.thenReturn(fakeWorkflowExecutor(1, waitForTickRunnable(3, this)));

dispatcher.run();
Expand Down Expand Up @@ -278,8 +283,8 @@ public void run() {
};
}

WorkflowExecutor fakeWorkflowExecutor(int instanceId, final Runnable fakeCommand) {
return new WorkflowExecutor(instanceId, null, null, null, (WorkflowExecutorListener) null) {
WorkflowStateProcessor fakeWorkflowExecutor(int instanceId, final Runnable fakeCommand) {
return new WorkflowStateProcessor(instanceId, null, null, null, (WorkflowExecutorListener) null) {
@Override
public void run() {
fakeCommand.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import org.junit.Test;
import org.mockito.Mock;

import com.nitorcreations.nflow.engine.internal.executor.WorkflowStateProcessor;
import com.nitorcreations.nflow.engine.internal.executor.WorkflowStateProcessorFactory;
import com.nitorcreations.nflow.engine.internal.workflow.ObjectStringMapper;
import com.nitorcreations.nflow.engine.listener.WorkflowExecutorListener;
import com.nitorcreations.nflow.engine.service.WorkflowDefinitionService;
import com.nitorcreations.nflow.engine.service.WorkflowInstanceService;

public class WorkflowExecutorFactoryTest extends BaseNflowTest {
public class WorkflowStateProcessorFactoryTest extends BaseNflowTest {
@Mock
WorkflowDefinitionService workflowDefinitions;
@Mock
Expand All @@ -24,23 +26,23 @@ public class WorkflowExecutorFactoryTest extends BaseNflowTest {
WorkflowExecutorListener listener2;
WorkflowExecutorListener[] listeners = new WorkflowExecutorListener[]{listener1, listener2};

WorkflowExecutorFactory factory;
WorkflowStateProcessorFactory factory;

@Before
public void setup() {
factory = new WorkflowExecutorFactory(workflowDefinitions, workflowInstances, objectMapper);
factory = new WorkflowStateProcessorFactory(workflowDefinitions, workflowInstances, objectMapper);
}

@Test
public void factoryCreatesExecutorsWithoutListeners() {
WorkflowExecutor executor = factory.createExecutor(12);
WorkflowStateProcessor executor = factory.createProcessor(12);
assertNotNull(executor);
}

@Test
public void factoryCreatesExecutorsWithListeners() {
factory.listeners = listeners;
WorkflowExecutor executor = factory.createExecutor(122);
WorkflowStateProcessor executor = factory.createProcessor(122);
assertNotNull(executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.mockito.Mockito;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.nitorcreations.nflow.engine.internal.executor.WorkflowStateProcessor;
import com.nitorcreations.nflow.engine.internal.workflow.ObjectStringMapper;
import com.nitorcreations.nflow.engine.listener.WorkflowExecutorListener;
import com.nitorcreations.nflow.engine.listener.WorkflowExecutorListener.ListenerContext;
Expand All @@ -55,7 +56,7 @@
import com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstance;
import com.nitorcreations.nflow.engine.workflow.instance.WorkflowInstanceAction;

public class WorkflowExecutorTest extends BaseNflowTest {
public class WorkflowStateProcessorTest extends BaseNflowTest {

@Mock
WorkflowDefinitionService workflowDefinitions;
Expand All @@ -77,12 +78,12 @@ public class WorkflowExecutorTest extends BaseNflowTest {

ObjectStringMapper objectMapper = new ObjectStringMapper(new ObjectMapper());

WorkflowExecutor executor;
WorkflowStateProcessor executor;


@Before
public void setup() {
executor = new WorkflowExecutor(1, objectMapper, workflowDefinitions, workflowInstances, listener1, listener2);
executor = new WorkflowStateProcessor(1, objectMapper, workflowDefinitions, workflowInstances, listener1, listener2);
}

@Test
Expand Down

0 comments on commit dcaac26

Please sign in to comment.