Skip to content

Commit

Permalink
Merge eefde49 into cae321c
Browse files Browse the repository at this point in the history
  • Loading branch information
ttiurani committed Apr 1, 2019
2 parents cae321c + eefde49 commit 6d3c898
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
- Update libraries for nFlow Explorer. Includes fix for morgan library security issue.
- https://github.com/NitorCreations/nflow/network/alert/nflow-explorer/package-lock.json/morgan/open
- Fix travis build to actually run unit tests for nflow-explorer module.
- Fix bug with WorkflowLifecycle re-start throwing error.

## 5.4.1 (2019-03-18)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public WorkflowDispatcher(WorkflowInstanceExecutor executor, WorkflowInstanceDao
@Override
public void run() {
logger.info("Starting.");
executor.initialize();
try {
if (!autoInit) {
workflowDefinitions.postProcessWorkflowDefinitions();
Expand Down Expand Up @@ -118,6 +119,8 @@ public void shutdown() {
shutdownDone.await();
} catch (@SuppressWarnings("unused") InterruptedException e) {
logger.info("Shutdown interrupted.");
} finally {
shutdownRequested = false;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,35 @@
public class WorkflowInstanceExecutor {
private static final Logger logger = getLogger(WorkflowInstanceExecutor.class);

private volatile boolean needsInitialization = true;
private final int awaitTerminationSeconds;
private final int maxQueueSize;
private final int threadCount;
final ThreadPoolExecutor executor;
final ThresholdBlockingQueue<Runnable> queue;
private final int notifyThreshold;
private final int keepAliveSeconds;
private final ThreadFactory threadFactory;
ThreadPoolExecutor executor;
ThresholdBlockingQueue<Runnable> queue;

public WorkflowInstanceExecutor(int maxQueueSize, int threadCount, int notifyThreshold, int awaitTerminationSeconds,
int keepAliveSeconds,
ThreadFactory threadFactory) {
queue = new ThresholdBlockingQueue<>(maxQueueSize, notifyThreshold);
executor = new ThreadPoolExecutor(threadCount, threadCount, keepAliveSeconds, SECONDS, queue, threadFactory);
executor.allowCoreThreadTimeOut(keepAliveSeconds > 0);
this.awaitTerminationSeconds = awaitTerminationSeconds;
this.threadCount = threadCount;
this.maxQueueSize = maxQueueSize;
this.notifyThreshold = notifyThreshold;
this.keepAliveSeconds = keepAliveSeconds;
this.threadFactory = threadFactory;
initialize();
}

public synchronized void initialize() {
if (needsInitialization) {
queue = new ThresholdBlockingQueue<>(maxQueueSize, notifyThreshold);
executor = new ThreadPoolExecutor(threadCount, threadCount, keepAliveSeconds, SECONDS, queue, threadFactory);
executor.allowCoreThreadTimeOut(keepAliveSeconds > 0);
needsInitialization = false;
}
}

public int getThreadCount() {
Expand All @@ -48,7 +64,7 @@ public int getQueueRemainingCapacity() {
return queue.remainingCapacity();
}

public void shutdown() {
public synchronized void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(awaitTerminationSeconds, SECONDS)) {
Expand All @@ -57,6 +73,8 @@ public void shutdown() {
} catch (@SuppressWarnings("unused") InterruptedException ex) {
logger.warn("Interrupted while waiting for executor to terminate");
currentThread().interrupt();
} finally {
needsInitialization = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,22 @@ public class WorkflowLifecycle implements SmartLifecycle {

private final WorkflowDefinitionService workflowDefinitions;
private final WorkflowDispatcher dispatcher;
private final ThreadFactory nflowThreadFactory;
private final boolean autoStart;
private final Thread dispatcherThread;
private volatile Thread dispatcherThread;

@Inject
public WorkflowLifecycle(WorkflowDefinitionService workflowDefinitions, WorkflowDispatcher dispatcher,
@NFlow ThreadFactory nflowThreadFactory, Environment env) throws IOException, ReflectiveOperationException {
this.dispatcher = dispatcher;
this.nflowThreadFactory = nflowThreadFactory;
this.workflowDefinitions = workflowDefinitions;
if (env.getRequiredProperty("nflow.autoinit", Boolean.class)) {
this.workflowDefinitions.postProcessWorkflowDefinitions();
} else {
logger.info("nFlow engine autoinit disabled (system property nflow.autoinit=false)");
}
autoStart = env.getRequiredProperty("nflow.autostart", Boolean.class);
dispatcherThread = nflowThreadFactory.newThread(dispatcher);
dispatcherThread.setName("nflow-dispatcher");
if (!autoStart) {
logger.info("nFlow engine autostart disabled (system property nflow.autostart=false)");
}
}

@Override
Expand All @@ -53,23 +50,36 @@ public boolean isAutoStartup() {
}

@Override
public void start() {
dispatcherThread.start();
public synchronized void start() {
if (dispatcherThread == null) {
dispatcherThread = createDispatcherThread();
dispatcherThread.start();
}
}

@Override
public boolean isRunning() {
return dispatcherThread.isAlive();
return dispatcherThread != null && dispatcherThread.isAlive();
}

@Override
public void stop() {
public synchronized void stop() {
dispatcher.shutdown();
dispatcherThread = null;
}

@Override
public void stop(Runnable callback) {
stop();
callback.run();
}

private Thread createDispatcherThread() {
final Thread thread = nflowThreadFactory.newThread(dispatcher);
thread.setName("nflow-dispatcher");
if (!autoStart) {
logger.info("nFlow engine autostart disabled (system property nflow.autostart=false)");
}
return thread;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import static java.lang.Boolean.TRUE;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -39,7 +41,7 @@ public class WorkflowLifecycleTest {
public void setup() throws IOException, ReflectiveOperationException {
when(env.getRequiredProperty("nflow.autoinit", Boolean.class)).thenReturn(TRUE);
when(env.getRequiredProperty("nflow.autostart", Boolean.class)).thenReturn(TRUE);
when(threadFactory.newThread(dispatcher)).thenReturn(dispatcherThread);
lenient().when(threadFactory.newThread(dispatcher)).thenReturn(dispatcherThread);
lifecycle = new WorkflowLifecycle(workflowDefinitions, dispatcher, threadFactory, env);
}

Expand All @@ -65,6 +67,16 @@ public void stopStopsDispatcherThread() {
verify(dispatcher).shutdown();
}

@Test
public void restartRelaunchesDispatcherThread() {
lifecycle.start();
lifecycle.stop();
lifecycle.start();
verify(dispatcher).shutdown();
verify(threadFactory, times(2)).newThread(dispatcher);
verify(dispatcherThread, times(2)).start();
}

@Test
public void stopWithCallbackStopsDispatcherThreadAndRunsCallback() {
Runnable callback = mock(Runnable.class);
Expand Down

0 comments on commit 6d3c898

Please sign in to comment.