From a7b777b8bebedb2f5795746b909c07c2acd819f0 Mon Sep 17 00:00:00 2001 From: Timo Tiuraniemi Date: Mon, 1 Apr 2019 15:32:12 +0300 Subject: [PATCH 1/4] Initial implementation of pause and unpause --- .../internal/executor/WorkflowDispatcher.java | 51 ++++++++++++------- .../internal/executor/WorkflowLifecycle.java | 8 +++ 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index 0fde69214..47224642e 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -30,6 +30,7 @@ public class WorkflowDispatcher implements Runnable { private volatile boolean shutdownRequested; private volatile boolean running = false; + private volatile boolean paused = false; private final CountDownLatch shutdownDone = new CountDownLatch(1); private final WorkflowInstanceExecutor executor; @@ -70,27 +71,31 @@ public void run() { } running = true; while (!shutdownRequested) { - try { - executor.waitUntilQueueSizeLowerThanThreshold(executorDao.getMaxWaitUntil()); - - if (!shutdownRequested) { - if (executorDao.tick()) { - workflowInstances.recoverWorkflowInstancesFromDeadNodes(); - } - int potentiallyStuckProcessors = stateProcessorFactory.getPotentiallyStuckProcessors(); - if (potentiallyStuckProcessors > 0) { - periodicLogger.warn("{} of {} state processor threads are potentially stuck (processing longer than {} seconds)", - potentiallyStuckProcessors, executor.getThreadCount(), stuckThreadThresholdSeconds); + if (paused) { + sleep(false); + } else { + try { + executor.waitUntilQueueSizeLowerThanThreshold(executorDao.getMaxWaitUntil()); + + if (!shutdownRequested) { + if (executorDao.tick()) { + workflowInstances.recoverWorkflowInstancesFromDeadNodes(); + } + int potentiallyStuckProcessors = stateProcessorFactory.getPotentiallyStuckProcessors(); + if (potentiallyStuckProcessors > 0) { + periodicLogger.warn("{} of {} state processor threads are potentially stuck (processing longer than {} seconds)", + potentiallyStuckProcessors, executor.getThreadCount(), stuckThreadThresholdSeconds); + } + dispatch(getNextInstanceIds()); } - dispatch(getNextInstanceIds()); + } catch (PollingRaceConditionException pex) { + logger.info(pex.getMessage()); + sleep(true); + } catch (@SuppressWarnings("unused") InterruptedException dropThrough) { + } catch (Exception e) { + logger.error("Exception in executing dispatcher - retrying after sleep period (" + e.getMessage() + ")", e); + sleep(false); } - } catch (PollingRaceConditionException pex) { - logger.info(pex.getMessage()); - sleep(true); - } catch (@SuppressWarnings("unused") InterruptedException dropThrough) { - } catch (Exception e) { - logger.error("Exception in executing dispatcher - retrying after sleep period (" + e.getMessage() + ")", e); - sleep(false); } } @@ -121,6 +126,14 @@ public void shutdown() { } } + public void pause() { + paused = true; + } + + public void unpause() { + paused = false; + } + public boolean isRunning() { return running; } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java index 0bf105a9c..280b4379a 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java @@ -57,6 +57,14 @@ public void start() { dispatcherThread.start(); } + public void pause() { + dispatcher.pause(); + } + + public void unpause() { + dispatcher.unpause(); + } + @Override public boolean isRunning() { return dispatcherThread.isAlive(); From 80c8974b36d7aba3b1912281c115624da8ca00c3 Mon Sep 17 00:00:00 2001 From: Timo Tiuraniemi Date: Tue, 2 Apr 2019 08:18:00 +0300 Subject: [PATCH 2/4] Changed unpause to resume and added tests --- .../engine/internal/executor/WorkflowDispatcher.java | 6 +++++- .../engine/internal/executor/WorkflowLifecycle.java | 8 ++++++-- .../internal/executor/WorkflowDispatcherTest.java | 9 +++++++++ .../internal/executor/WorkflowLifecycleTest.java | 10 ++++++++++ 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java index 47224642e..e0680828f 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowDispatcher.java @@ -130,10 +130,14 @@ public void pause() { paused = true; } - public void unpause() { + public void resume() { paused = false; } + public boolean isPaused() { + return paused; + } + public boolean isRunning() { return running; } diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java index 280b4379a..739a4a247 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/executor/WorkflowLifecycle.java @@ -61,8 +61,12 @@ public void pause() { dispatcher.pause(); } - public void unpause() { - dispatcher.unpause(); + public void resume() { + dispatcher.resume(); + } + + public boolean isPaused() { + return dispatcher.isPaused(); } @Override diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java index c037df2d7..9d5d2ccfe 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowDispatcherTest.java @@ -325,6 +325,15 @@ public void finish() { runOnce(new DispatcherLogsWarning()); } + @Test + public void pauseAndResumeWorks() { + assertEquals(dispatcher.isPaused(), false); + dispatcher.pause(); + assertEquals(dispatcher.isPaused(), true); + dispatcher.resume(); + assertEquals(dispatcher.isPaused(), false); + } + void assertPoolIsShutdown(boolean isTrue) { assertEquals(isTrue, executor.executor.isShutdown()); } diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowLifecycleTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowLifecycleTest.java index fde908edd..cd1c4ddd3 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowLifecycleTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/WorkflowLifecycleTest.java @@ -73,6 +73,16 @@ public void stopWithCallbackStopsDispatcherThreadAndRunsCallback() { verify(callback).run(); } + @Test + public void pauseAndResumeWorks() { + lifecycle.pause(); + verify(dispatcher).pause(); + lifecycle.resume(); + verify(dispatcher).resume(); + lifecycle.isPaused(); + verify(dispatcher).isPaused(); + } + // @Test // public void isRunningReturnsDispatcherThreadStatus() { // when(dispatcherThread.isAlive()).thenReturn(true); // native method mocking would require PowerMock From 2c044758027b53e7f8c7d78a5d9c232d66af9ec6 Mon Sep 17 00:00:00 2001 From: Timo Tiuraniemi Date: Tue, 2 Apr 2019 09:01:28 +0300 Subject: [PATCH 3/4] Added changelog entry# --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index eac6943d7..a29cf9b3b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - Update libraries for nFlow Explorer. Includes fix for morgan library security issue. - https://github.com/NitorCreations/nflow/network/alert/nflow-explorer/package-lock.json/morgan/open - Fix travis build to actually run unit tests for nflow-explorer module. +- Add possibility to pause and resume WorkflowLifecycle. ## 5.4.1 (2019-03-18) From 1f2cf7f6194bfe5564a3482aa6da5469b61305e4 Mon Sep 17 00:00:00 2001 From: Timo Tiuraniemi Date: Tue, 2 Apr 2019 09:16:33 +0300 Subject: [PATCH 4/4] Added more thorough explanation about pause and resume --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a29cf9b3b..473c4be29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ - Update libraries for nFlow Explorer. Includes fix for morgan library security issue. - https://github.com/NitorCreations/nflow/network/alert/nflow-explorer/package-lock.json/morgan/open - Fix travis build to actually run unit tests for nflow-explorer module. -- Add possibility to pause and resume WorkflowLifecycle. +- Add possibility for an executor to temporarily stop polling for new workflow instances by invoking pause() on WorkflowLifecycle, and continue polling with resume(). ## 5.4.1 (2019-03-18)