From 916496266d8ad3a8ae95ca9694207a74d5fb4f0e Mon Sep 17 00:00:00 2001 From: joewitt Date: Tue, 28 Nov 2017 12:51:47 -0500 Subject: [PATCH 1/2] NIFI-4642 updated tests to be more tolerant/variable to different system speeds. Many of these should be integration tests and not unit tests --- .../scheduling/TestProcessorLifecycle.java | 140 ++++++++---------- 1 file changed, 65 insertions(+), 75 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index c544ef4bbf4e..3038fda89e77 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -86,7 +86,7 @@ public class TestProcessorLifecycle { private static final Logger logger = LoggerFactory.getLogger(TestProcessorLifecycle.class); private FlowController fc; - private Map properties = new HashMap<>(); + private Map properties = new HashMap<>(); private volatile String propsFile = TestProcessorLifecycle.class.getResource("/lifecycletest.nifi.properties").getFile(); @Before @@ -100,6 +100,36 @@ public void after() throws Exception { FileUtils.deleteDirectory(new File("./target/lifecycletest")); } + private void assertScheduledState(final ScheduledState desiredState, final ScheduledState actualState) { + assertScheduledState(desiredState, actualState, 1000L); + } + + private void assertScheduledState(final ScheduledState desiredState, final ScheduledState actualState, final long delayToleranceMillis) { + final long startTime = System.currentTimeMillis(); + while (((System.currentTimeMillis() - startTime) < delayToleranceMillis) && desiredState != actualState) { + try { + Thread.sleep(50); + } catch (InterruptedException ex) { + Thread.interrupted(); + break; + } + } + assertEquals(desiredState, actualState); + } + + private void assertOperationCount(final List actualNames, final int expectedCount, final long delayToleranceMillis) { + final long startTime = System.currentTimeMillis(); + while (((System.currentTimeMillis() - startTime) < delayToleranceMillis) && actualNames.size() < expectedCount) { + try { + Thread.sleep(50); + } catch (InterruptedException ex) { + Thread.interrupted(); + break; + } + } + assertEquals(expectedCount, actualNames.size()); + } + @Test public void validateEnableOperation() throws Exception { final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); @@ -109,17 +139,17 @@ public void validateEnableOperation() throws Exception { final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); - assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); - assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); // validates idempotency for (int i = 0; i < 2; i++) { testProcNode.enable(); } - assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); - assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); testProcNode.disable(); - assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); - assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); + assertScheduledState(ScheduledState.DISABLED, testProcNode.getScheduledState()); + assertScheduledState(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); } @Test @@ -132,18 +162,18 @@ public void validateDisableOperation() throws Exception { final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); - assertEquals(ScheduledState.STOPPED, testProcNode.getScheduledState()); - assertEquals(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); // validates idempotency for (int i = 0; i < 2; i++) { testProcNode.disable(); } - assertEquals(ScheduledState.DISABLED, testProcNode.getScheduledState()); - assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); + assertScheduledState(ScheduledState.DISABLED, testProcNode.getScheduledState()); + assertScheduledState(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - assertEquals(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); + assertScheduledState(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); } /** @@ -171,7 +201,7 @@ public void validateIdempotencyOfProcessorStartOperation() throws Exception { ps.startProcessor(testProcNode); Thread.sleep(500); - assertEquals(1, testProcessor.operationNames.size()); + assertOperationCount(testProcessor.operationNames, 1, 1000L); assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); } @@ -189,14 +219,14 @@ public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Except fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); // sets the scenario for the processor to run int randomDelayLimit = 3000; this.randomOnTriggerDelay(testProcessor, randomDelayLimit); final ProcessScheduler ps = fc.getProcessScheduler(); ps.stopProcessor(testProcNode); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - assertTrue(testProcessor.operationNames.size() == 0); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertTrue(testProcessor.operationNames.isEmpty()); } /** @@ -205,7 +235,7 @@ public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Except */ @Test @Ignore - public void validateSuccessfullAndOrderlyShutdown() throws Exception { + public void validateSuccessfulAndOrderlyShutdown() throws Exception { final FlowControllerAndSystemBundle fcsb = this.buildFlowControllerForTest(); fc = fcsb.getFlowController(); ProcessGroup testGroup = fc.createProcessGroup(UUID.randomUUID().toString()); @@ -226,8 +256,7 @@ public void validateSuccessfullAndOrderlyShutdown() throws Exception { testGroup.addProcessor(testProcNode); fc.startProcessGroup(testGroup.getIdentifier()); - Thread.sleep(2000); // let it run for a while - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 2000L); fc.stopAllProcessors(); @@ -235,9 +264,9 @@ public void validateSuccessfullAndOrderlyShutdown() throws Exception { // validates that regardless of how many running tasks, lifecycle // operation are invoked atomically (once each). - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 1000L); // . . . hence only 3 operations must be in the list - assertEquals(3, testProcessor.operationNames.size()); + assertOperationCount(testProcessor.operationNames, 3, 2000L); // . . . and ordered as @OnScheduled, @OnUnscheduled, @OnStopped assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); @@ -268,7 +297,7 @@ public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() thro ExecutorService executor = Executors.newFixedThreadPool(100); int startCallsCount = 10000; final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); final Random random = new Random(); for (int i = 0; i < startCallsCount / 2; i++) { executor.execute(new Runnable() { @@ -326,18 +355,11 @@ public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcesso ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 5000L); ps.stopProcessor(testProcNode); - Thread.sleep(100); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - Thread.sleep(1000); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - - assertEquals(2, testProcessor.operationNames.size()); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 5000L); + assertOperationCount(testProcessor.operationNames, 2, 8000L); assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); } @@ -366,15 +388,9 @@ public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledE ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - Thread.sleep(100); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 10000L); ps.stopProcessor(testProcNode); - Thread.sleep(500); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 2000L); } /** @@ -401,15 +417,9 @@ public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); + assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); - Thread.sleep(100); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); - Thread.sleep(500); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 2000L); } /** @@ -432,15 +442,9 @@ public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterr ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); + assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); - Thread.sleep(100); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); - Thread.sleep(4000); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 5000L); } /** @@ -463,20 +467,9 @@ public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninte ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - Thread.sleep(1000); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - ps.disableProcessor(testProcNode); // no effect - Thread.sleep(100); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STARTING); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 3000L); ps.stopProcessor(testProcNode); - Thread.sleep(100); - assertTrue(testProcNode.getPhysicalScheduledState() == ScheduledState.STOPPING); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); - Thread.sleep(4000); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 4000L); } /** @@ -501,14 +494,11 @@ public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws E ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - Thread.sleep(1000); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 2000L); ps.disableProcessor(testProcNode); - Thread.sleep(100); - assertTrue(testProcNode.getScheduledState() == ScheduledState.RUNNING); + assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); - Thread.sleep(500); - assertTrue(testProcNode.getScheduledState() == ScheduledState.STOPPED); + assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 2000L); } /** From 057dc5357691dc97797beed80269dcd19098d3b9 Mon Sep 17 00:00:00 2001 From: joewitt Date: Tue, 28 Nov 2017 13:32:17 -0500 Subject: [PATCH 2/2] NIFI-4642 mark pointed out some broken logic and suggested a cleaner lambda based approach --- .../scheduling/TestProcessorLifecycle.java | 88 ++++++++----------- 1 file changed, 38 insertions(+), 50 deletions(-) diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java index 3038fda89e77..f8f0426b6464 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestProcessorLifecycle.java @@ -72,6 +72,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.LockSupport; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -100,13 +101,13 @@ public void after() throws Exception { FileUtils.deleteDirectory(new File("./target/lifecycletest")); } - private void assertScheduledState(final ScheduledState desiredState, final ScheduledState actualState) { - assertScheduledState(desiredState, actualState, 1000L); + private void assertCondition(final Supplier supplier) { + assertCondition(supplier, 1000L); } - private void assertScheduledState(final ScheduledState desiredState, final ScheduledState actualState, final long delayToleranceMillis) { + private void assertCondition(final Supplier supplier, final long delayToleranceMillis) { final long startTime = System.currentTimeMillis(); - while (((System.currentTimeMillis() - startTime) < delayToleranceMillis) && desiredState != actualState) { + while (((System.currentTimeMillis() - startTime) < delayToleranceMillis) && !supplier.get()) { try { Thread.sleep(50); } catch (InterruptedException ex) { @@ -114,20 +115,7 @@ private void assertScheduledState(final ScheduledState desiredState, final Sched break; } } - assertEquals(desiredState, actualState); - } - - private void assertOperationCount(final List actualNames, final int expectedCount, final long delayToleranceMillis) { - final long startTime = System.currentTimeMillis(); - while (((System.currentTimeMillis() - startTime) < delayToleranceMillis) && actualNames.size() < expectedCount) { - try { - Thread.sleep(50); - } catch (InterruptedException ex) { - Thread.interrupted(); - break; - } - } - assertEquals(expectedCount, actualNames.size()); + assertTrue(supplier.get()); } @Test @@ -139,17 +127,17 @@ public void validateEnableOperation() throws Exception { final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState()); // validates idempotency for (int i = 0; i < 2; i++) { testProcNode.enable(); } - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState()); testProcNode.disable(); - assertScheduledState(ScheduledState.DISABLED, testProcNode.getScheduledState()); - assertScheduledState(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); } @Test @@ -162,18 +150,18 @@ public void validateDisableOperation() throws Exception { final ProcessorNode testProcNode = fc.createProcessor(TestProcessor.class.getName(), UUID.randomUUID().toString(), fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getPhysicalScheduledState()); // validates idempotency for (int i = 0; i < 2; i++) { testProcNode.disable(); } - assertScheduledState(ScheduledState.DISABLED, testProcNode.getScheduledState()); - assertScheduledState(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.DISABLED == testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - assertScheduledState(ScheduledState.DISABLED, testProcNode.getPhysicalScheduledState()); + assertCondition(() -> ScheduledState.DISABLED == testProcNode.getPhysicalScheduledState()); } /** @@ -201,7 +189,7 @@ public void validateIdempotencyOfProcessorStartOperation() throws Exception { ps.startProcessor(testProcNode); Thread.sleep(500); - assertOperationCount(testProcessor.operationNames, 1, 1000L); + assertCondition(() -> testProcessor.operationNames.size() == 1); assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); } @@ -219,13 +207,13 @@ public void validateStopCallsAreMeaninglessIfProcessorNotStarted() throws Except fcsb.getSystemBundle().getBundleDetails().getCoordinate()); testProcNode.setProperties(properties); TestProcessor testProcessor = (TestProcessor) testProcNode.getProcessor(); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); // sets the scenario for the processor to run int randomDelayLimit = 3000; this.randomOnTriggerDelay(testProcessor, randomDelayLimit); final ProcessScheduler ps = fc.getProcessScheduler(); ps.stopProcessor(testProcNode); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); assertTrue(testProcessor.operationNames.isEmpty()); } @@ -256,7 +244,7 @@ public void validateSuccessfulAndOrderlyShutdown() throws Exception { testGroup.addProcessor(testProcNode); fc.startProcessGroup(testGroup.getIdentifier()); - assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 2000L); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); fc.stopAllProcessors(); @@ -264,9 +252,9 @@ public void validateSuccessfulAndOrderlyShutdown() throws Exception { // validates that regardless of how many running tasks, lifecycle // operation are invoked atomically (once each). - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 1000L); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 1000L); // . . . hence only 3 operations must be in the list - assertOperationCount(testProcessor.operationNames, 3, 2000L); + assertCondition(() -> testProcessor.operationNames.size() == 3, 2000L); // . . . and ordered as @OnScheduled, @OnUnscheduled, @OnStopped assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); @@ -297,7 +285,7 @@ public void validateLifecycleOperationOrderWithConcurrentCallsToStartStop() thro ExecutorService executor = Executors.newFixedThreadPool(100); int startCallsCount = 10000; final CountDownLatch countDownCounter = new CountDownLatch(startCallsCount); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState()); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState()); final Random random = new Random(); for (int i = 0; i < startCallsCount / 2; i++) { executor.execute(new Runnable() { @@ -355,11 +343,11 @@ public void validateProcessorUnscheduledAndStoppedWhenStopIsCalledBeforeProcesso ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 5000L); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 5000L); ps.stopProcessor(testProcNode); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 5000L); - assertOperationCount(testProcessor.operationNames, 2, 8000L); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L); + assertCondition(() -> testProcessor.operationNames.size() == 2, 8000L); assertEquals("@OnScheduled", testProcessor.operationNames.get(0)); assertEquals("@OnUnscheduled", testProcessor.operationNames.get(1)); } @@ -388,9 +376,9 @@ public void validateProcessScheduledAfterAdministrativeDelayDueToTheOnScheduledE ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 10000L); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 10000L); ps.stopProcessor(testProcNode); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 2000L); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L); } /** @@ -417,9 +405,9 @@ public void validateProcessorCanBeStoppedWhenOnScheduledConstantlyFails() throws ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 2000L); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 2000L); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L); } /** @@ -442,9 +430,9 @@ public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyInterr ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 2000L); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 5000L); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 5000L); } /** @@ -467,9 +455,9 @@ public void validateProcessorCanBeStoppedWhenOnScheduledBlocksIndefinitelyUninte ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 3000L); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 3000L); ps.stopProcessor(testProcNode); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 4000L); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 4000L); } /** @@ -494,11 +482,11 @@ public void validateProcessorCanBeStoppedWhenOnTriggerThrowsException() throws E ProcessScheduler ps = fc.getProcessScheduler(); ps.startProcessor(testProcNode); - assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 2000L); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.disableProcessor(testProcNode); - assertScheduledState(ScheduledState.RUNNING, testProcNode.getScheduledState(), 2000L); + assertCondition(() -> ScheduledState.RUNNING == testProcNode.getScheduledState(), 2000L); ps.stopProcessor(testProcNode); - assertScheduledState(ScheduledState.STOPPED, testProcNode.getScheduledState(), 2000L); + assertCondition(() -> ScheduledState.STOPPED == testProcNode.getScheduledState(), 2000L); } /**