Skip to content

Commit

Permalink
rename Threshold...Executor to WorkflowInstanceExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
Edvard Fonsell committed Dec 3, 2014
1 parent 08a6eae commit 61b422a
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.joda.JodaModule;
import com.nitorcreations.nflow.engine.internal.executor.ThresholdThreadPoolExecutor;
import com.nitorcreations.nflow.engine.internal.executor.WorkflowInstanceExecutor;

@Configuration
@ComponentScan("com.nitorcreations.nflow.engine")
public class EngineConfiguration {

@Bean
public ThresholdThreadPoolExecutor nflowExecutor(@NFlow ThreadFactory nflowThreadFactory, Environment env) {
public WorkflowInstanceExecutor nflowExecutor(@NFlow ThreadFactory nflowThreadFactory, Environment env) {
int threadCount = env.getProperty("nflow.executor.thread.count", Integer.class, 2 * getRuntime().availableProcessors());
int awaitTerminationSeconds = env.getProperty("nflow.dispatcher.await.termination.seconds", Integer.class, 60);
int notifyThreshold = env.getProperty("nflow.dispatcher.executor.queue.wait_until_threshold", Integer.class, 0);
int keepAliveSeconds = env.getProperty("nflow.dispatcher.executor.thread.keepalive.seconds", Integer.class, 0);
return new ThresholdThreadPoolExecutor(threadCount, notifyThreshold, awaitTerminationSeconds, keepAliveSeconds,
return new WorkflowInstanceExecutor(threadCount, notifyThreshold, awaitTerminationSeconds, keepAliveSeconds,
nflowThreadFactory);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,6 @@ public void setSQLVariants(SQLVariants sqlVariants) {
this.sqlVariants = sqlVariants;
}

/**
* @param nflowJdbcTemplate The JDBC template for accessing the nFlow data source.
*/
@Inject
public void setJdbcTemplate(@NFlow JdbcTemplate nflowJdbcTemplate) {
this.jdbc = nflowJdbcTemplate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ public class WorkflowDispatcher implements Runnable {
private volatile boolean shutdownRequested;
private final CountDownLatch shutdownDone = new CountDownLatch(1);

private final ThresholdThreadPoolExecutor pool;
private final WorkflowInstanceExecutor executor;
private final WorkflowInstanceDao workflowInstances;
private final WorkflowStateProcessorFactory stateProcessorFactory;
private final ExecutorDao executorRecovery;
private final long sleepTime;
private final Random rand = new Random();

@Inject
public WorkflowDispatcher(ThresholdThreadPoolExecutor nflowExecutor, WorkflowInstanceDao workflowInstances,
public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao workflowInstances,
WorkflowStateProcessorFactory stateProcessorFactory, ExecutorDao executorRecovery, Environment env) {
this.pool = nflowExecutor;
this.executor = executor;
this.workflowInstances = workflowInstances;
this.stateProcessorFactory = stateProcessorFactory;
this.executorRecovery = executorRecovery;
Expand All @@ -52,7 +52,7 @@ public void run() {
try {
while (!shutdownRequested) {
try {
pool.waitUntilQueueSizeLowerThanThreshold(executorRecovery.getMaxWaitUntil());
executor.waitUntilQueueSizeLowerThanThreshold(executorRecovery.getMaxWaitUntil());

if (!shutdownRequested) {
executorRecovery.tick();
Expand Down Expand Up @@ -87,7 +87,7 @@ public void shutdown() {

private void shutdownPool() {
try {
pool.shutdown();
executor.shutdown();
} catch (Exception e) {
logger.error("Error in shutting down thread pool.", e);
}
Expand All @@ -102,12 +102,12 @@ private void dispatch(List<Integer> nextInstanceIds) {

logger.debug("Found {} workflow instances, dispatching executors.", nextInstanceIds.size());
for (Integer instanceId : nextInstanceIds) {
pool.execute(stateProcessorFactory.createProcessor(instanceId));
executor.execute(stateProcessorFactory.createProcessor(instanceId));
}
}

private List<Integer> getNextInstanceIds() {
int nextBatchSize = max(0, 2 * pool.getMaximumPoolSize() - pool.getActiveCount());
int nextBatchSize = max(0, 2 * executor.getMaximumPoolSize() - executor.getActiveCount());
logger.debug("Polling next {} workflow instances.", nextBatchSize);
return workflowInstances.pollNextWorkflowInstanceIds(nextBatchSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import org.joda.time.DateTime;
import org.slf4j.Logger;

public class ThresholdThreadPoolExecutor {
private static final Logger logger = getLogger(ThresholdThreadPoolExecutor.class);
public class WorkflowInstanceExecutor {
private static final Logger logger = getLogger(WorkflowInstanceExecutor.class);

private final int awaitTerminationSeconds;
final ThreadPoolExecutor executor;
final ThresholdBlockingQueue<Runnable> queue;

public ThresholdThreadPoolExecutor(int threadCount, int notifyThreshold, int awaitTerminationSeconds, int keepAliveSeconds,
public WorkflowInstanceExecutor(int threadCount, int notifyThreshold, int awaitTerminationSeconds, int keepAliveSeconds,
ThreadFactory threadFactory) {
queue = new ThresholdBlockingQueue<>(MAX_VALUE, notifyThreshold);
executor = new ThreadPoolExecutor(threadCount, threadCount, keepAliveSeconds, SECONDS, queue, threadFactory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.mockito.runners.MockitoJUnitRunner;
import org.springframework.mock.env.MockEnvironment;

import com.nitorcreations.nflow.engine.internal.executor.ThresholdThreadPoolExecutor;
import com.nitorcreations.nflow.engine.internal.executor.WorkflowInstanceExecutor;

@RunWith(MockitoJUnitRunner.class)
public class EngineConfigurationTest {
Expand All @@ -28,7 +28,7 @@ public class EngineConfigurationTest {
private final EngineConfiguration configuration = new EngineConfiguration();

public void dispatcherPoolExecutorInstantiation() {
ThresholdThreadPoolExecutor executor = configuration.nflowExecutor(threadFactory, environment);
WorkflowInstanceExecutor executor = configuration.nflowExecutor(threadFactory, environment);
assertThat(executor.getMaximumPoolSize(), is(100));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.nitorcreations.nflow.engine.internal.executor;

import static java.lang.Runtime.getRuntime;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyInt;
Expand Down Expand Up @@ -39,7 +38,7 @@
@RunWith(MockitoJUnitRunner.class)
public class WorkflowDispatcherTest {
WorkflowDispatcher dispatcher;
ThresholdThreadPoolExecutor pool;
WorkflowInstanceExecutor executor;

@Mock WorkflowInstanceDao workflowInstances;
@Mock ExecutorDao recovery;
Expand All @@ -52,14 +51,14 @@ public void setup() {
when(env.getProperty("nflow.dispatcher.sleep.ms", Long.class, 5000l)).thenReturn(0l);
when(env.getProperty("nflow.dispatcher.executor.queue.wait_until_threshold", Integer.class, 0)).thenReturn(0);
when(recovery.isTransactionSupportEnabled()).thenReturn(true);
pool = dispatcherPoolExecutor();
dispatcher = new WorkflowDispatcher(pool, workflowInstances, executorFactory, recovery, env);
executor = new WorkflowInstanceExecutor(2, 0, 10, 0, new CustomizableThreadFactory("nflow-executor-"));
dispatcher = new WorkflowDispatcher(executor, workflowInstances, executorFactory, recovery, env);
}

@Test(expected = BeanCreationException.class)
public void workflowDispatcherCreationFailsWithoutTransactionSupport() {
when(recovery.isTransactionSupportEnabled()).thenReturn(false);
new WorkflowDispatcher(pool, workflowInstances, executorFactory, recovery, env);
new WorkflowDispatcher(executor, workflowInstances, executorFactory, recovery, env);
}

@Test
Expand Down Expand Up @@ -201,11 +200,11 @@ public void threadShutdown() {
public void exceptionOnPoolShutdownIsNotPropagated() throws Throwable {
@SuppressWarnings("unused")
class ExceptionOnPoolShutdownIsNotPropagated extends MultithreadedTestCase {
private ThresholdThreadPoolExecutor poolSpy;
private WorkflowInstanceExecutor poolSpy;

@Override
public void initialize() {
poolSpy = Mockito.spy(pool);
poolSpy = Mockito.spy(executor);
dispatcher = new WorkflowDispatcher(poolSpy, workflowInstances, executorFactory, recovery, env);
}

Expand Down Expand Up @@ -252,15 +251,8 @@ public void finish() {
TestFramework.runOnce(new ShutdownCanBeCalledMultipleTimes());
}

private static ThresholdThreadPoolExecutor dispatcherPoolExecutor() {
int threadCount = 2 * getRuntime().availableProcessors();
ThresholdThreadPoolExecutor executor = new ThresholdThreadPoolExecutor(threadCount, 0, 10, 0, new CustomizableThreadFactory(
"nflow-executor-"));
return executor;
}

void assertPoolIsShutdown(boolean isTrue) {
assertEquals(isTrue, pool.executor.isShutdown());
assertEquals(isTrue, executor.executor.isShutdown());
}

Runnable noOpRunnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

@RunWith(MockitoJUnitRunner.class)
public class ThresholdThreadPoolExecutorTest {
public class WorkflowInstanceExecutorTest {

@Mock
ThreadFactory threadFactory;
Expand All @@ -27,7 +27,7 @@ public class ThresholdThreadPoolExecutorTest {

@Test
public void testThreadPoolCreateWithCorrectParameters() {
ThresholdThreadPoolExecutor t = new ThresholdThreadPoolExecutor(2, 1, 3, 4, threadFactory);
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(2, 1, 3, 4, threadFactory);
assertThat(t.executor.getCorePoolSize(), is(2));
assertThat(t.executor.getMaximumPoolSize(), is(2));
assertThat(t.executor.getKeepAliveTime(SECONDS), is(4L));
Expand All @@ -38,28 +38,29 @@ public void testThreadPoolCreateWithCorrectParameters() {

@Test
public void testDummyGetters() {
ThresholdThreadPoolExecutor t = new ThresholdThreadPoolExecutor(2, 1, 3, 4, threadFactory);
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(2, 1, 3, 4, threadFactory);
assertThat(t.getActiveCount(), is(0));
assertThat(t.getMaximumPoolSize(), is(2));
}

@Test
public void testExecute() {
ThresholdThreadPoolExecutor t = new ThresholdThreadPoolExecutor(2, 1, 3, 4, new CustomizableThreadFactory("test"));
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(2, 1, 3, 4, new CustomizableThreadFactory("test"));
t.execute(runnable);
verify(runnable, timeout(1000)).run();
}

@Test
public void testWait() throws InterruptedException {
ThresholdThreadPoolExecutor t = new ThresholdThreadPoolExecutor(2, 1, 3, 4, new CustomizableThreadFactory("test"));
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(2, 1, 3, 4, new CustomizableThreadFactory("test"));
t.execute(runnable);
t.waitUntilQueueSizeLowerThanThreshold(new DateTime().plusSeconds(5));
}

@Test
public void testShutdown() {
ThresholdThreadPoolExecutor t = new ThresholdThreadPoolExecutor(2, 1, 3, 4, new CustomizableThreadFactory("test"));
WorkflowInstanceExecutor t = new WorkflowInstanceExecutor(2, 1, 3, 4, new CustomizableThreadFactory("test"));
t.shutdown();
assertThat(t.executor.isShutdown(), is(true));
}
}

0 comments on commit 61b422a

Please sign in to comment.