From ece0a3d625c1ca85d211e551305ae71ef6641b7f Mon Sep 17 00:00:00 2001 From: sergeyuttsel Date: Fri, 16 Jun 2023 01:02:32 +0300 Subject: [PATCH 1/8] IGNITE-19736 Do not cancel tasks in DistributionZoneManager#executor if they were created by immediate scaleUp/scaleDown events. Avoid concurrent executing several tasks for the same zone. --- .../DistributionZoneManager.java | 15 +-- .../DistributionZonesSchedulersTest.java | 99 +++++++++++++++++-- 2 files changed, 99 insertions(+), 15 deletions(-) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index 9666848b0118..d3aea12bd0f8 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.CompletableFuture.supplyAsync; +import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys; @@ -71,7 +72,6 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import org.apache.ignite.configuration.ConfigurationChangeException; @@ -278,8 +278,9 @@ public DistributionZoneManager( nodesAttributes = new ConcurrentHashMap<>(); + // It must be a single thread executor to avoid concurrent executing several tasks for the same zone. executor = new ScheduledThreadPoolExecutor( - Math.min(Runtime.getRuntime().availableProcessors() * 3, 20), + 1, new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG), new ThreadPoolExecutor.DiscardPolicy() ); @@ -357,7 +358,7 @@ public void stop() throws Exception { metaStorageManager.unregisterWatch(topologyWatchListener); metaStorageManager.unregisterWatch(dataNodesWatchListener); - shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS); + shutdownAndAwaitTermination(executor, 10, SECONDS); } /** @@ -1658,11 +1659,11 @@ ConcurrentSkipListMap topologyAugmentationMap() { * @param runnable Custom logic to run. */ synchronized void rescheduleScaleUp(long delay, Runnable runnable) { - if (scaleUpTask != null) { + if (scaleUpTask != null && scaleUpTask.getDelay(SECONDS) != IMMEDIATE_TIMER_VALUE) { scaleUpTask.cancel(false); } - scaleUpTask = executor.schedule(runnable, delay, TimeUnit.SECONDS); + scaleUpTask = executor.schedule(runnable, delay, SECONDS); } /** @@ -1672,11 +1673,11 @@ synchronized void rescheduleScaleUp(long delay, Runnable runnable) { * @param runnable Custom logic to run. */ synchronized void rescheduleScaleDown(long delay, Runnable runnable) { - if (scaleDownTask != null) { + if (scaleDownTask != null && scaleDownTask.getDelay(SECONDS) != IMMEDIATE_TIMER_VALUE) { scaleDownTask.cancel(false); } - scaleDownTask = executor.schedule(runnable, delay, TimeUnit.SECONDS); + scaleDownTask = executor.schedule(runnable, delay, SECONDS); } /** diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java index 2e14cc044ee1..c286db3d4565 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState; import org.apache.ignite.internal.logger.IgniteLogger; @@ -46,7 +47,7 @@ public class DistributionZonesSchedulersTest { private static final IgniteLogger LOG = Loggers.forClass(DistributionZonesSchedulersTest.class); private static final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor( - Math.min(Runtime.getRuntime().availableProcessors() * 3, 20), + 1, new NamedThreadFactory("test-dst-zones-scheduler", LOG), new ThreadPoolExecutor.DiscardPolicy() ); @@ -83,27 +84,109 @@ private static void testSchedule(BiConsumer fn) throws Interrupt } @Test - void testScaleUpReScheduleNotStartedTask() { + void testScaleUpReScheduling() { ZoneState state = new DistributionZoneManager.ZoneState(executor); - testReScheduleNotStartedTask(state::rescheduleScaleUp); + testReScheduling(state::rescheduleScaleUp); } @Test - void testScaleDownReScheduleNotStartedTask() { + void testScaleDownReScheduling() { ZoneState state = new DistributionZoneManager.ZoneState(executor); - testReScheduleNotStartedTask(state::rescheduleScaleDown); + testReScheduling(state::rescheduleScaleDown); } - private static void testReScheduleNotStartedTask(BiConsumer fn) { + private static void testReScheduling(BiConsumer fn) { Runnable runnable = mock(Runnable.class); - fn.accept(1L, runnable); + CountDownLatch latch = new CountDownLatch(1); + + fn.accept(0L, () -> { + try { + assertTrue(latch.await(3, TimeUnit.SECONDS)); + + runnable.run(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + fn.accept(2L, runnable); + + fn.accept(2L, runnable); + + fn.accept(0L, runnable); + + fn.accept(0L, runnable); + + fn.accept(2L, runnable); + + fn.accept(2L, runnable); + + fn.accept(0L, runnable); fn.accept(0L, runnable); - verify(runnable, after(1200).times(1)).run(); + latch.countDown(); + + verify(runnable, after(2500).times(5)).run(); + } + + @Test + void testScaleUpOrdering() throws InterruptedException { + ZoneState state = new DistributionZoneManager.ZoneState(executor); + + testOrdering(state::rescheduleScaleUp); + } + + @Test + void testScaleDownOrdering() throws InterruptedException { + ZoneState state = new DistributionZoneManager.ZoneState(executor); + + testOrdering(state::rescheduleScaleDown); + } + + private static void testOrdering(BiConsumer fn) throws InterruptedException { + AtomicInteger counter = new AtomicInteger(); + + CountDownLatch latch = new CountDownLatch(1); + + AtomicBoolean rightOrder = new AtomicBoolean(true); + + fn.accept(0L, () -> { + try { + assertTrue(latch.await(3, TimeUnit.SECONDS)); + + counter.incrementAndGet(); + + if (counter.get() != 1) { + rightOrder.set(false); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + for (int i = 2; i < 11; i++) { + int j = i; + + fn.accept(0L, () -> { + counter.incrementAndGet(); + + if (counter.get() != j) { + rightOrder.set(false); + } + }); + } + + latch.countDown(); + + waitForCondition(() -> counter.get() == 10, 3000); + + assertEquals(10, counter.get()); + + assertTrue(rightOrder.get()); } @Test From 2b1fffa9092159f870d5274abe782335e3a47255 Mon Sep 17 00:00:00 2001 From: sergeyuttsel Date: Fri, 16 Jun 2023 12:19:54 +0300 Subject: [PATCH 2/8] IGNITE-19736 Removed invalid tests. --- .../DistributionZoneManagerScaleUpTest.java | 306 ------------------ 1 file changed, 306 deletions(-) diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java index 753358bccd21..2532024a2fc5 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java @@ -307,312 +307,6 @@ void testDropZoneDoNotPropagateDataNodesAfterScaleDown() throws Exception { assertZoneScaleDownChangeTriggerKey(null, ZONE_1_ID, keyValueStorage); } - @Test - void testTwoScaleUpTimersSecondTimerRunFirst() throws Exception { - preparePrerequisites(); - - NamedConfigurationTree zones = - zonesConfiguration.distributionZones(); - - DistributionZoneView zoneView = zones.value().get(0); - - CountDownLatch in1 = new CountDownLatch(1); - CountDownLatch in2 = new CountDownLatch(1); - CountDownLatch out1 = new CountDownLatch(1); - CountDownLatch out2 = new CountDownLatch(1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(D), - Set.of(), - prerequisiteRevision + 1, - (zoneId, revision) -> { - try { - in1.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleUp(zoneId, revision).thenRun(out1::countDown); - }, - (t1, t2) -> null - ); - - // Assert that first task was run and event about adding node "D" with revision {@code prerequisiteRevision + 1} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(E), - Set.of(), - prerequisiteRevision + 2, - (zoneId, revision) -> { - try { - in2.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleUp(zoneId, revision).thenRun(() -> { - try { - out2.await(); - } catch (InterruptedException e) { - fail(); - } - }); - }, - (t1, t2) -> null - ); - - // Assert that second task was run and event about adding node "E" with revision {@code prerequisiteRevision + 2} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 2); - - //Second task is propagating data nodes first. - in2.countDown(); - - assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage); - - out2.countDown(); - - in1.countDown(); - - //Waiting for the first scheduler ends it work. - out1.countDown(); - - // Assert that nothing has been changed. - assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage); - } - - @Test - void testTwoScaleDownTimersSecondTimerRunFirst() throws Exception { - preparePrerequisites(); - - NamedConfigurationTree zones = - zonesConfiguration.distributionZones(); - - DistributionZoneView zoneView = zones.value().get(0); - - CountDownLatch in1 = new CountDownLatch(1); - CountDownLatch in2 = new CountDownLatch(1); - CountDownLatch out1 = new CountDownLatch(1); - CountDownLatch out2 = new CountDownLatch(1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(), - Set.of(B), - prerequisiteRevision + 1, - (t1, t2) -> null, - (zoneId, revision) -> { - try { - in1.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleDown(zoneId, revision).thenRun(out1::countDown); - } - ); - - // Assert that first task was run and event about removing node "B" with revision {@code prerequisiteRevision + 1} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(), - Set.of(C), - prerequisiteRevision + 2, - (t1, t2) -> null, - (zoneId, revision) -> { - try { - in2.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleDown(zoneId, revision).thenRun(() -> { - try { - out2.await(); - } catch (InterruptedException e) { - fail(); - } - }); - } - ); - - // Assert that second task was run and event about removing node "C" with revision {@code prerequisiteRevision + 2} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 2); - - //Second task is propagating data nodes first. - in2.countDown(); - - assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage); - - out2.countDown(); - - in1.countDown(); - - //Waiting for the first scheduler ends it work. - out1.countDown(); - - // Assert that nothing has been changed. - assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage); - } - - @Test - void testTwoScaleUpTimersFirstTimerRunFirst() throws Exception { - preparePrerequisites(); - - NamedConfigurationTree zones = - zonesConfiguration.distributionZones(); - - DistributionZoneView zoneView = zones.value().get(0); - - CountDownLatch in1 = new CountDownLatch(1); - CountDownLatch in2 = new CountDownLatch(1); - CountDownLatch out1 = new CountDownLatch(1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(D), - Set.of(), - prerequisiteRevision + 1, - (zoneId, revision) -> { - in1.countDown(); - - return testSaveDataNodesOnScaleUp(zoneId, revision).thenRun(() -> { - try { - out1.await(); - } catch (InterruptedException e) { - fail(); - } - }); - }, - (t1, t2) -> null - ); - - // Waiting for the first task to be run. We have to do that to be sure that watch events, - // which we try to emulate, are handled sequentially. - in1.await(); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(E), - Set.of(), - prerequisiteRevision + 2, - (zoneId, revision) -> { - try { - in2.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleUp(zoneId, revision); - }, - (t1, t2) -> null - ); - - // Assert that second task was run and event about adding node "E" with revision {@code prerequisiteRevision + 2} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 2); - - assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 1, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D), keyValueStorage); - - // Second task is run and we await that data nodes will be changed from ["A", "B", "C", "D"] to ["A", "B", "C", "D", "E"] - in2.countDown(); - - assertZoneScaleUpChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, B, C, D, E), keyValueStorage); - - out1.countDown(); - } - - @Test - void testTwoScaleDownTimersFirstTimerRunFirst() throws Exception { - preparePrerequisites(); - - NamedConfigurationTree zones = - zonesConfiguration.distributionZones(); - - DistributionZoneView zoneView = zones.value().get(0); - - CountDownLatch in1 = new CountDownLatch(1); - CountDownLatch in2 = new CountDownLatch(1); - CountDownLatch out1 = new CountDownLatch(1); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(), - Set.of(B), - prerequisiteRevision + 1, - (t1, t2) -> null, - (zoneId, revision) -> { - in1.countDown(); - - return testSaveDataNodesOnScaleDown(zoneId, revision).thenRun(() -> { - try { - out1.await(); - } catch (InterruptedException e) { - fail(); - } - }); - } - ); - - // Waiting for the first task to be run. We have to do that to be sure that watch events, - // which we try to emulate, are handled sequentially. - in1.await(); - - distributionZoneManager.scheduleTimers( - zoneView, - Set.of(), - Set.of(C), - prerequisiteRevision + 2, - (t1, t2) -> null, - (zoneId, revision) -> { - try { - in2.await(); - } catch (InterruptedException e) { - fail(); - } - - return testSaveDataNodesOnScaleDown(zoneId, revision); - } - ); - - // Assert that second task was run and event about removing node "C" with revision {@code prerequisiteRevision + 2} was added - // to the topologyAugmentationMap of the zone. - assertThatZonesAugmentationMapContainsRevision(ZONE_1_ID, prerequisiteRevision + 2); - - assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 1, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A, C), keyValueStorage); - - // Second task is run and we await that data nodes will be changed from ["A", "C"] to ["A"] - in2.countDown(); - - assertZoneScaleDownChangeTriggerKey(prerequisiteRevision + 2, ZONE_1_ID, keyValueStorage); - - assertDataNodesForZoneWithAttributes(ZONE_1_ID, Set.of(A), keyValueStorage); - - out1.countDown(); - } - @Test void testEmptyDataNodesOnStart() throws Exception { startDistributionZoneManager(); From b2294f4174d323b72393ac0e63cdd016f872720d Mon Sep 17 00:00:00 2001 From: sergeyuttsel Date: Fri, 16 Jun 2023 14:19:43 +0300 Subject: [PATCH 3/8] IGNITE-19736 Do not cancel tasks in DistributionZoneManager#executor if they were created by immediate scaleUp/scaleDown events. Avoid concurrent executing several tasks for the same zone. --- .../distributionzones/DistributionZoneManager.java | 8 +++++--- .../DistributionZoneManagerScaleUpTest.java | 6 ------ .../DistributionZonesSchedulersTest.java | 11 +++++------ 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index d3aea12bd0f8..b761625ee6a8 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -278,7 +278,7 @@ public DistributionZoneManager( nodesAttributes = new ConcurrentHashMap<>(); - // It must be a single thread executor to avoid concurrent executing several tasks for the same zone. + // Use a single thread executor to avoid concurrent executing several tasks for the same zone. executor = new ScheduledThreadPoolExecutor( 1, new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG), @@ -1653,7 +1653,8 @@ ConcurrentSkipListMap topologyAugmentationMap() { } /** - * Reschedules existing scale up task, if it is not started yet, or schedules new one, if the current task cannot be canceled. + * Reschedules existing scale up task, if it is not started yet and the delay of this task is not immediate, + * or schedules new one, if the current task cannot be canceled. * * @param delay Delay to start runnable in seconds. * @param runnable Custom logic to run. @@ -1667,7 +1668,8 @@ synchronized void rescheduleScaleUp(long delay, Runnable runnable) { } /** - * Reschedules existing scale down task, if it is not started yet, or schedules new one, if the current task cannot be canceled. + * Reschedules existing scale down task, if it is not started yet and the delay of this task is not immediate, + * or schedules new one, if the current task cannot be canceled. * * @param delay Delay to start runnable in seconds. * @param runnable Custom logic to run. diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java index 2532024a2fc5..b419b3bab5f8 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java @@ -37,7 +37,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; @@ -46,15 +45,10 @@ import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledThreadPoolExecutor; -import org.apache.ignite.configuration.NamedConfigurationTree; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.distributionzones.DistributionZoneConfigurationParameters.Builder; import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState; -import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneChange; -import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration; -import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneView; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.metastorage.server.If; import org.apache.ignite.network.NetworkAddress; diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java index c286db3d4565..a324c064198e 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java @@ -152,7 +152,7 @@ private static void testOrdering(BiConsumer fn) throws Interrupt CountDownLatch latch = new CountDownLatch(1); - AtomicBoolean rightOrder = new AtomicBoolean(true); + AtomicBoolean sequentialOrder = new AtomicBoolean(true); fn.accept(0L, () -> { try { @@ -161,7 +161,7 @@ private static void testOrdering(BiConsumer fn) throws Interrupt counter.incrementAndGet(); if (counter.get() != 1) { - rightOrder.set(false); + sequentialOrder.set(false); } } catch (InterruptedException e) { throw new RuntimeException(e); @@ -175,7 +175,7 @@ private static void testOrdering(BiConsumer fn) throws Interrupt counter.incrementAndGet(); if (counter.get() != j) { - rightOrder.set(false); + sequentialOrder.set(false); } }); } @@ -183,10 +183,9 @@ private static void testOrdering(BiConsumer fn) throws Interrupt latch.countDown(); waitForCondition(() -> counter.get() == 10, 3000); + assertEquals(10, counter.get(), "Not all tasks were executed."); - assertEquals(10, counter.get()); - - assertTrue(rightOrder.get()); + assertTrue(sequentialOrder.get(), "The order of tasks execution is not sequential."); } @Test From 0a76e2757728d5b8ffda83b8b1c398e051ff197a Mon Sep 17 00:00:00 2001 From: Sergey Uttsel Date: Tue, 20 Jun 2023 18:22:19 +0300 Subject: [PATCH 4/8] IGNITE-19736 Added TODO for a striped executor. --- .../internal/distributionzones/DistributionZoneManager.java | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index b761625ee6a8..d4dbf4c7893f 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -279,6 +279,7 @@ public DistributionZoneManager( nodesAttributes = new ConcurrentHashMap<>(); // Use a single thread executor to avoid concurrent executing several tasks for the same zone. + // TODO: IGNITE-19783 Need to use a striped executor. executor = new ScheduledThreadPoolExecutor( 1, new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG), From 292130fa1a0fec6ada6386278368a8ad949a8690 Mon Sep 17 00:00:00 2001 From: Sergey Uttsel Date: Wed, 21 Jun 2023 18:16:56 +0300 Subject: [PATCH 5/8] IGNITE-19736 Fix after reviev from Mirza Aliev. --- .../DistributionZoneManager.java | 14 +++++- .../DistributionZonesSchedulersTest.java | 48 ++++++++++--------- 2 files changed, 37 insertions(+), 25 deletions(-) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index ff7cdf8dbff9..308eb070bf1f 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -1612,6 +1612,12 @@ static class ZoneState { /** Schedule task for a scale down process. */ private ScheduledFuture scaleDownTask; + /** The delay for the scale up task */ + private long scaleUpTaskDelay; + + /** The delay for the scale down task */ + private long scaleDownTaskDelay; + /** * Map that stores pairs revision -> {@link Augmentation} for a zone. With this map we can track which nodes * should be added or removed in the processes of scale up or scale down. Revision helps to track visibility of the events @@ -1653,11 +1659,13 @@ ConcurrentSkipListMap topologyAugmentationMap() { * @param runnable Custom logic to run. */ synchronized void rescheduleScaleUp(long delay, Runnable runnable) { - if (scaleUpTask != null && scaleUpTask.getDelay(SECONDS) != IMMEDIATE_TIMER_VALUE) { + if (scaleUpTask != null && scaleUpTaskDelay > 0) { scaleUpTask.cancel(false); } scaleUpTask = executor.schedule(runnable, delay, SECONDS); + + scaleUpTaskDelay = delay; } /** @@ -1668,11 +1676,13 @@ synchronized void rescheduleScaleUp(long delay, Runnable runnable) { * @param runnable Custom logic to run. */ synchronized void rescheduleScaleDown(long delay, Runnable runnable) { - if (scaleDownTask != null && scaleDownTask.getDelay(SECONDS) != IMMEDIATE_TIMER_VALUE) { + if (scaleDownTask != null && scaleDownTaskDelay > 0) { scaleDownTask.cancel(false); } scaleDownTask = executor.schedule(runnable, delay, SECONDS); + + scaleDownTaskDelay = delay; } /** diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java index a324c064198e..9280320d817f 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java @@ -21,9 +21,6 @@ import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.after; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; @@ -84,53 +81,60 @@ private static void testSchedule(BiConsumer fn) throws Interrupt } @Test - void testScaleUpReScheduling() { + void testScaleUpReScheduling() throws InterruptedException { ZoneState state = new DistributionZoneManager.ZoneState(executor); testReScheduling(state::rescheduleScaleUp); } @Test - void testScaleDownReScheduling() { + void testScaleDownReScheduling() throws InterruptedException { ZoneState state = new DistributionZoneManager.ZoneState(executor); testReScheduling(state::rescheduleScaleDown); } - private static void testReScheduling(BiConsumer fn) { - Runnable runnable = mock(Runnable.class); + /** Tests that scaleUp/scaleDown tasks with a delay grater then zero will be canceled by tasks with a zero delay. */ + private static void testReScheduling(BiConsumer fn) throws InterruptedException { + AtomicInteger counter = new AtomicInteger(); - CountDownLatch latch = new CountDownLatch(1); + CountDownLatch firstTaskLatch = new CountDownLatch(1); + + CountDownLatch lastTaskLatch = new CountDownLatch(1); fn.accept(0L, () -> { try { - assertTrue(latch.await(3, TimeUnit.SECONDS)); + assertTrue(firstTaskLatch.await(3, TimeUnit.SECONDS)); - runnable.run(); + counter.incrementAndGet(); } catch (InterruptedException e) { throw new RuntimeException(e); } }); - fn.accept(2L, runnable); + fn.accept(1L, counter::incrementAndGet); - fn.accept(2L, runnable); + fn.accept(0L, counter::incrementAndGet); - fn.accept(0L, runnable); + fn.accept(0L, counter::incrementAndGet); - fn.accept(0L, runnable); + fn.accept(1L, counter::incrementAndGet); - fn.accept(2L, runnable); + fn.accept(1L, counter::incrementAndGet); - fn.accept(2L, runnable); + fn.accept(0L, counter::incrementAndGet); - fn.accept(0L, runnable); + fn.accept(0L, () -> { + counter.incrementAndGet(); - fn.accept(0L, runnable); + lastTaskLatch.countDown(); + }); - latch.countDown(); + firstTaskLatch.countDown(); + + assertTrue(lastTaskLatch.await(3, TimeUnit.SECONDS)); - verify(runnable, after(2500).times(5)).run(); + assertEquals(5, counter.get()); } @Test @@ -213,9 +217,7 @@ private static void testReScheduleWhenTaskIsEnded(BiConsumer fn) latch.await(1000, TimeUnit.MILLISECONDS); - fn.accept(0L, () -> { - flag.set(true); - }); + fn.accept(0L, () -> flag.set(true)); assertTrue(waitForCondition(() -> 0L == latch.getCount(), 1500)); assertTrue(waitForCondition(flag::get, 1500)); From 25b7dd8ba24c3512eb9604131c67cac3e24bb08b Mon Sep 17 00:00:00 2001 From: Sergey Uttsel Date: Wed, 21 Jun 2023 18:20:46 +0300 Subject: [PATCH 6/8] IGNITE-19736 Fix after reviev from Mirza Aliev. --- .../internal/distributionzones/DistributionZoneManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index 308eb070bf1f..c96173f70465 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -1612,10 +1612,10 @@ static class ZoneState { /** Schedule task for a scale down process. */ private ScheduledFuture scaleDownTask; - /** The delay for the scale up task */ + /** The delay for the scale up task. */ private long scaleUpTaskDelay; - /** The delay for the scale down task */ + /** The delay for the scale down task. */ private long scaleDownTaskDelay; /** From eb6638df9eeb4d9b4deadf1ba9fe37186a3a7a6a Mon Sep 17 00:00:00 2001 From: Sergey Uttsel Date: Fri, 23 Jun 2023 08:22:39 +0300 Subject: [PATCH 7/8] IGNITE-19736 Fixed task cancelling on stopScaleUp, stopScaleDown and stopTimers. --- .../DistributionZoneManager.java | 28 ++-- .../DistributionZonesSchedulersTest.java | 149 ++++++++++++++++++ 2 files changed, 164 insertions(+), 13 deletions(-) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index c96173f70465..66a91b17e696 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -1659,9 +1659,7 @@ ConcurrentSkipListMap topologyAugmentationMap() { * @param runnable Custom logic to run. */ synchronized void rescheduleScaleUp(long delay, Runnable runnable) { - if (scaleUpTask != null && scaleUpTaskDelay > 0) { - scaleUpTask.cancel(false); - } + stopScaleUp(); scaleUpTask = executor.schedule(runnable, delay, SECONDS); @@ -1676,9 +1674,7 @@ synchronized void rescheduleScaleUp(long delay, Runnable runnable) { * @param runnable Custom logic to run. */ synchronized void rescheduleScaleDown(long delay, Runnable runnable) { - if (scaleDownTask != null && scaleDownTaskDelay > 0) { - scaleDownTask.cancel(false); - } + stopScaleDown(); scaleDownTask = executor.schedule(runnable, delay, SECONDS); @@ -1686,28 +1682,34 @@ synchronized void rescheduleScaleDown(long delay, Runnable runnable) { } /** - * Cancels task for scale up and scale down. + * Cancels task for scale up and scale down. Used on {@link ZonesConfigurationListener#onDelete(ConfigurationNotificationEvent)}. + * Not need to check {@code scaleUpTaskDelay} and {@code scaleDownTaskDelay} because after timer stopping on zone delete event + * the data nodes value will be updated. */ synchronized void stopTimers() { - stopScaleUp(); + if (scaleUpTask != null) { + scaleUpTask.cancel(false); + } - stopScaleDown(); + if (scaleDownTask != null) { + scaleDownTask.cancel(false); + } } /** - * Cancels task for scale up. + * Cancels task for scale up if it is not started yet and the delay of this task is not immediate. */ synchronized void stopScaleUp() { - if (scaleUpTask != null) { + if (scaleUpTask != null && scaleUpTaskDelay > 0) { scaleUpTask.cancel(false); } } /** - * Cancels task for scale down. + * Cancels task for scale down if it is not started yet and the delay of this task is not immediate. */ synchronized void stopScaleDown() { - if (scaleDownTask != null) { + if (scaleDownTask != null && scaleDownTaskDelay > 0) { scaleDownTask.cancel(false); } } diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java index 9280320d817f..e0cb8010975b 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java @@ -20,6 +20,7 @@ import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.concurrent.CountDownLatch; @@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; +import java.util.function.Supplier; import org.apache.ignite.internal.distributionzones.DistributionZoneManager.ZoneState; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -222,4 +224,151 @@ private static void testReScheduleWhenTaskIsEnded(BiConsumer fn) assertTrue(waitForCondition(() -> 0L == latch.getCount(), 1500)); assertTrue(waitForCondition(flag::get, 1500)); } + + @Test + void testCancelScaleUpTaskOnStopScaleUp() { + ZoneState state = new DistributionZoneManager.ZoneState(executor); + + testCancelTask(state::rescheduleScaleUp, state::stopScaleUp, () -> state.scaleUpTask().isCancelled()); + } + + @Test + void testCancelScaleDownTaskOnStopScaleDown() { + ZoneState state = new ZoneState(executor); + + testCancelTask(state::rescheduleScaleDown, state::stopScaleDown, () -> state.scaleDownTask().isCancelled()); + } + + @Test + void testCancelScaleUpTasksOnStopTimers() { + ZoneState state = new ZoneState(executor); + + testCancelTask(state::rescheduleScaleUp, state::stopTimers, () -> state.scaleUpTask().isCancelled()); + } + + @Test + void testCancelScaleDownTasksOnStopTimers() { + ZoneState state = new ZoneState(executor); + + testCancelTask(state::rescheduleScaleDown, state::stopTimers, () -> state.scaleDownTask().isCancelled()); + } + + /** + * {@link ZoneState#stopScaleUp()}, {@link ZoneState#stopScaleDown()} and {@link ZoneState#stopTimers()} cancel task + * if it is not started and has a delay greater than zero. + */ + private static void testCancelTask( + BiConsumer fn, + Runnable stopTask, + Supplier isTaskCancelled + ) { + CountDownLatch latch = new CountDownLatch(1); + + fn.accept(0L, () -> { + try { + assertTrue(latch.await(3, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + fn.accept(1L, () -> {}); + + assertFalse(isTaskCancelled.get()); + + stopTask.run(); + + assertTrue(isTaskCancelled.get()); + + latch.countDown(); + } + + @Test + void testNotCancelScaleUpTaskOnStopScaleUp() { + ZoneState state = new DistributionZoneManager.ZoneState(executor); + + testNotCancelTask(state::rescheduleScaleUp, state::stopScaleUp, () -> state.scaleUpTask().isCancelled()); + } + + @Test + void testNotCancelScaleDownTaskOnStopScaleDown() { + ZoneState state = new ZoneState(executor); + + testNotCancelTask(state::rescheduleScaleDown, state::stopScaleDown, () -> state.scaleDownTask().isCancelled()); + + } + + /** + * {@link ZoneState#stopScaleUp()} and {@link ZoneState#stopScaleDown()} doesn't cancel task + * if it is not started and has a delay equal to zero. + */ + private static void testNotCancelTask( + BiConsumer fn, + Runnable stopTask, + Supplier isTaskCancelled + ) { + CountDownLatch latch = new CountDownLatch(1); + + fn.accept(0L, () -> { + try { + assertTrue(latch.await(3, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + fn.accept(0L, () -> {}); + + assertFalse(isTaskCancelled.get()); + + stopTask.run(); + + assertFalse(isTaskCancelled.get()); + + latch.countDown(); + } + + /** + * {@link ZoneState#stopTimers()} cancel task if it is not started and has a delay equal to zero. + */ + @Test + public void testCancelTasksOnStopTimersAndImmediateTimerValues() { + ZoneState state = new DistributionZoneManager.ZoneState(executor); + + CountDownLatch scaleUpTaskLatch = new CountDownLatch(1); + + state.rescheduleScaleUp(0L, () -> { + try { + assertTrue(scaleUpTaskLatch.await(3, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + state.rescheduleScaleUp(0L, () -> {}); + + assertFalse(state.scaleUpTask().isCancelled()); + + CountDownLatch scaleDownTaskLatch = new CountDownLatch(1); + + state.rescheduleScaleDown(0L, () -> { + try { + assertTrue(scaleDownTaskLatch.await(3, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + + state.rescheduleScaleDown(0L, () -> {}); + + assertFalse(state.scaleDownTask().isCancelled()); + + state.stopTimers(); + + assertTrue(state.scaleUpTask().isCancelled()); + assertTrue(state.scaleDownTask().isCancelled()); + + scaleUpTaskLatch.countDown(); + scaleDownTaskLatch.countDown(); + } } From 65ecd6db7e0dc99d4174cc66c4c56bf0ab3b836e Mon Sep 17 00:00:00 2001 From: Sergey Uttsel Date: Mon, 26 Jun 2023 20:27:14 +0300 Subject: [PATCH 8/8] IGNITE-19736 Fix after a review by Mirza. --- .../DistributionZoneManager.java | 11 +++------- .../DistributionZonesUtil.java | 22 +++++++++++++++++++ .../DistributionZonesSchedulersTest.java | 14 ++++++------ 3 files changed, 32 insertions(+), 15 deletions(-) diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java index 66a91b17e696..c2fb15a2afa8 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java @@ -24,6 +24,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.deleteDataNodesAndUpdateTriggerKeys; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractChangeTriggerRevision; import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.extractDataNodes; @@ -74,8 +75,6 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import org.apache.ignite.configuration.ConfigurationChangeException; @@ -282,12 +281,8 @@ public DistributionZoneManager( nodesAttributes = new ConcurrentHashMap<>(); - // Use a single thread executor to avoid concurrent executing several tasks for the same zone. - // TODO: IGNITE-19783 Need to use a striped executor. - executor = new ScheduledThreadPoolExecutor( - 1, - new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG), - new ThreadPoolExecutor.DiscardPolicy() + executor = createZoneManagerExecutor( + new NamedThreadFactory(NamedThreadFactory.threadPrefix(nodeName, DISTRIBUTION_ZONE_MANAGER_POOL_NAME), LOG) ); // It's safe to leak with partially initialised object here, because rebalanceEngine is only accessible through this or by diff --git a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java index 2046629c2a17..32d357168328 100644 --- a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java +++ b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZonesUtil.java @@ -40,6 +40,9 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode; import org.apache.ignite.internal.distributionzones.configuration.DistributionZoneConfiguration; import org.apache.ignite.internal.distributionzones.configuration.DistributionZonesConfiguration; @@ -47,6 +50,7 @@ import org.apache.ignite.internal.metastorage.dsl.CompoundCondition; import org.apache.ignite.internal.metastorage.dsl.SimpleCondition; import org.apache.ignite.internal.metastorage.dsl.Update; +import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.util.ByteUtils; import org.apache.ignite.lang.ByteArray; import org.apache.ignite.lang.DistributionZoneNotFoundException; @@ -532,4 +536,22 @@ public static Set filterDataNodes( .map(Node::nodeName) .collect(toSet()); } + + /** + * Create an executor for the zone manager. + * Used a single thread executor to avoid concurrent executing several tasks for the same zone. + * ScheduledThreadPoolExecutor guarantee that tasks scheduled for exactly the same + * execution time are enabled in first-in-first-out (FIFO) order of submission. + * // TODO: IGNITE-19783 Need to use a striped executor. + * + * @param namedThreadFactory Named thread factory. + * @return Executor. + */ + static ScheduledExecutorService createZoneManagerExecutor(NamedThreadFactory namedThreadFactory) { + return new ScheduledThreadPoolExecutor( + 1, + namedThreadFactory, + new ThreadPoolExecutor.DiscardPolicy() + ); + } } diff --git a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java index e0cb8010975b..0a6eb584e5c6 100644 --- a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java +++ b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZonesSchedulersTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.distributionzones; +import static org.apache.ignite.internal.distributionzones.DistributionZonesUtil.createZoneManagerExecutor; import static org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition; import static org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -25,8 +26,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -45,10 +44,8 @@ public class DistributionZonesSchedulersTest { private static final IgniteLogger LOG = Loggers.forClass(DistributionZonesSchedulersTest.class); - private static final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor( - 1, - new NamedThreadFactory("test-dst-zones-scheduler", LOG), - new ThreadPoolExecutor.DiscardPolicy() + private static ScheduledExecutorService executor = createZoneManagerExecutor( + new NamedThreadFactory("test-dst-zones-scheduler", LOG) ); @AfterAll @@ -96,7 +93,10 @@ void testScaleDownReScheduling() throws InterruptedException { testReScheduling(state::rescheduleScaleDown); } - /** Tests that scaleUp/scaleDown tasks with a delay grater then zero will be canceled by tasks with a zero delay. */ + /** + * Tests that scaleUp/scaleDown tasks with a zero delay will not be canceled by other tasks. + * Tests that scaleUp/scaleDown tasks with a delay grater then zero will be canceled by other tasks. + */ private static void testReScheduling(BiConsumer fn) throws InterruptedException { AtomicInteger counter = new AtomicInteger();