From 37c35a544e6ef012046eabf8149f5750a745fa14 Mon Sep 17 00:00:00 2001 From: Kirk Lund Date: Tue, 28 Aug 2018 14:57:52 -0700 Subject: [PATCH] GEODE-5652: use newCachedThreadPool by default in ExecutorServiceRule Changed default behavior so that the type of thread pool created is: * Default (threadCount == 0): Executors.newCachedThreadPool() * ThreadCount == 1: Executors.newSingleThreadExecutor() * ThreadCount > 1: Executors.newFixedThreadPool(threadCount) --- .../PartitionedRegionCreationDUnitTest.java | 1 - .../internal/locks/DLockServiceLeakTest.java | 5 +- .../cache/InterruptDiskJUnitTest.java | 120 ++++++------------ .../ExecutorServiceRuleIntegrationTest.java | 10 +- .../test/junit/rules/ExecutorServiceRule.java | 21 +-- .../junit/rules/ExecutorServiceRuleTest.java | 8 +- 6 files changed, 65 insertions(+), 100 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java index 5c4a54631005..e28df99226e1 100755 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/PartitionedRegionCreationDUnitTest.java @@ -51,7 +51,6 @@ * will verify the functionality under distributed scenario. */ @SuppressWarnings("serial") - public class PartitionedRegionCreationDUnitTest extends CacheTestCase { private VM vm0; diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java index f95f8e7ee38e..61fc97c24794 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/locks/DLockServiceLeakTest.java @@ -44,15 +44,14 @@ import org.apache.geode.test.junit.categories.DLockTest; import org.apache.geode.test.junit.rules.ExecutorServiceRule; -@Category({DLockTest.class}) +@Category(DLockTest.class) public class DLockServiceLeakTest { private Cache cache; private DistributedRegion testRegion; @Rule - public ExecutorServiceRule executorServiceRule = - ExecutorServiceRule.builder().threadCount(5).build(); + public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); @Before public void setUp() { diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java index f68e615a6a6d..bdc2f338bd86 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/InterruptDiskJUnitTest.java @@ -14,125 +14,87 @@ */ package org.apache.geode.internal.cache; -import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_TIME_STATISTICS; +import static java.util.concurrent.TimeUnit.MINUTES; +import static org.apache.geode.cache.RegionShortcut.REPLICATE_PERSISTENT; import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; -import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; -import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_ARCHIVE_FILE; -import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import java.io.File; +import java.time.Duration; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionShortcut; -import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.test.junit.rules.ExecutorServiceRule; /** * Test of interrupting threads doing disk writes to see the effect. - * */ public class InterruptDiskJUnitTest { - private static volatile Thread puttingThread; - private static final long MAX_WAIT = 60 * 1000; - private DistributedSystem ds; + private final AtomicInteger nextValue = new AtomicInteger(); + private final AtomicReference puttingThread = new AtomicReference<>(); + private Cache cache; private Region region; - private AtomicLong nextValue = new AtomicLong(); @Rule public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); - @Test - @Ignore - public void testLoop() throws Throwable { - for (int i = 0; i < 100; i++) { - System.err.println("i=" + i); - System.out.println("i=" + i); - testDRPutWithInterrupt(); - tearDown(); - setUp(); - } - } - + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); @Before public void setUp() { - Properties props = new Properties(); - props.setProperty(MCAST_PORT, "0"); - props.setProperty(LOCATORS, ""); - props.setProperty(LOG_LEVEL, "config"); // to keep diskPerf logs smaller - props.setProperty(STATISTIC_SAMPLING_ENABLED, "true"); - props.setProperty(ENABLE_TIME_STATISTICS, "true"); - props.setProperty(STATISTIC_ARCHIVE_FILE, "stats.gfs"); - ds = DistributedSystem.connect(props); - cache = CacheFactory.create(ds); - File diskStore = new File("diskStore"); - diskStore.mkdir(); - cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskStore}) - .create("store"); - region = cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT) - .setDiskStoreName("store").create("region"); - } + String diskStoreName = getClass().getSimpleName() + "_diskStore"; + String regionName = getClass().getSimpleName() + "_region"; + Properties config = new Properties(); + config.setProperty(MCAST_PORT, "0"); + config.setProperty(LOCATORS, ""); - @After - public void tearDown() { - ds.disconnect(); - } + File diskDir = temporaryFolder.getRoot(); + cache = new CacheFactory(config).create(); - @Test - public void testDRPutWithInterrupt() throws Throwable { - Callable doPuts = new Callable() { - - @Override - public Object call() { - puttingThread = Thread.currentThread(); - long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT); - while (!Thread.currentThread().isInterrupted()) { - region.put(0, nextValue.incrementAndGet()); - if (System.nanoTime() > end) { - fail("Did not get interrupted in 60 seconds"); - } - } - return null; - } - }; + cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskDir}) + .create(diskStoreName); - Future result = executorServiceRule.submit(doPuts); + region = cache.createRegionFactory(REPLICATE_PERSISTENT).setDiskStoreName(diskStoreName) + .create(regionName); + } + @After + public void tearDown() { + cache.close(); + } - Thread.sleep(50); - long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT); - while (puttingThread == null) { - Thread.sleep(50); - if (System.nanoTime() > end) { - fail("Putting thread not set in 60 seconds"); + @Test + public void testDRPutWithInterrupt() throws Exception { + Future doPutWhileNotInterrupted = executorServiceRule.runAsync(() -> { + puttingThread.set(Thread.currentThread()); + while (!Thread.currentThread().isInterrupted()) { + region.put(0, nextValue.incrementAndGet()); } - } - - puttingThread.interrupt(); - - result.get(60, TimeUnit.SECONDS); + }); - assertEquals(nextValue.get(), region.get(0)); + await().atMost(2, MINUTES).untilAsserted(() -> assertThat(puttingThread).isNotNull()); + Thread.sleep(Duration.ofSeconds(1).toMillis()); + puttingThread.get().interrupt(); + doPutWhileNotInterrupted.get(2, MINUTES); + assertThat(region.get(0)).isEqualTo(nextValue.get()); } } diff --git a/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java b/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java index d2b2d6b05773..05e3bd606610 100644 --- a/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java +++ b/geode-junit/src/integrationTest/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleIntegrationTest.java @@ -34,10 +34,10 @@ public class ExecutorServiceRuleIntegrationTest { - static volatile CountDownLatch hangLatch; - static volatile CountDownLatch terminateLatch; - static volatile ExecutorService executorService; - static Awaits.Invocations invocations; + private static volatile CountDownLatch hangLatch; + private static volatile CountDownLatch terminateLatch; + private static volatile ExecutorService executorService; + private static Awaits.Invocations invocations; @Before public void setUp() throws Exception { @@ -64,7 +64,7 @@ public void tearDown() throws Exception { } @Test - public void awaitTermination() throws Exception { + public void awaitTermination() { Result result = TestRunner.runTest(Awaits.class); assertThat(result.wasSuccessful()).isTrue(); diff --git a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java index 934c3c12ca43..6782e1165e6a 100644 --- a/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java +++ b/geode-junit/src/main/java/org/apache/geode/test/junit/rules/ExecutorServiceRule.java @@ -14,7 +14,6 @@ */ package org.apache.geode.test.junit.rules; -import static org.assertj.core.api.Assertions.assertThat; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; @@ -124,7 +123,7 @@ protected ExecutorServiceRule(Builder builder) { * {@code ExecutorService.shutdownNow()} during {@code tearDown}. */ public ExecutorServiceRule() { - threadCount = 1; + threadCount = 0; enableAwaitTermination = false; awaitTerminationTimeout = 0; awaitTerminationTimeUnit = TimeUnit.NANOSECONDS; @@ -149,10 +148,12 @@ public ExecutorServiceRule(int threadCount) { @Override public void before() { - if (threadCount > 1) { - executor = Executors.newFixedThreadPool(threadCount); - } else { + if (threadCount == 0) { + executor = Executors.newCachedThreadPool(); + } else if (threadCount == 1) { executor = Executors.newSingleThreadExecutor(); + } else { + executor = Executors.newFixedThreadPool(threadCount); } } @@ -272,7 +273,7 @@ public CompletableFuture supplyAsync(Supplier supplier) { public static class Builder { - protected int threadCount = 1; + protected int threadCount = 0; protected boolean enableAwaitTermination = false; protected long awaitTerminationTimeout = 0; protected TimeUnit awaitTerminationTimeUnit = TimeUnit.NANOSECONDS; @@ -285,11 +286,16 @@ protected Builder() { } /** - * Configures the number of threads. Default is one thread. + * Configures the number of threads. Default is zero which results in using a cached thread pool + * with unlimited (Integer.MAX_VALUE) number of threads. Specifying a value above zero results + * in using a fixed thread pool. * * @param threadCount the number of threads in the pool */ public Builder threadCount(int threadCount) { + if (threadCount < 0) { + throw new IllegalArgumentException("threadCount cannot be less than 0"); + } this.threadCount = threadCount; return this; } @@ -348,7 +354,6 @@ public Builder awaitTerminationAfterShutdown() { * Builds the instance of {@code ExecutorServiceRule}. */ public ExecutorServiceRule build() { - assertThat(threadCount).isGreaterThan(0); return new ExecutorServiceRule(this); } } diff --git a/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleTest.java b/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleTest.java index 7a8b5e5f3bc0..dc8381940798 100644 --- a/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleTest.java +++ b/geode-junit/src/test/java/org/apache/geode/test/junit/rules/ExecutorServiceRuleTest.java @@ -36,10 +36,10 @@ public class ExecutorServiceRuleTest { - static volatile AtomicIntegerWithMaxValueSeen concurrentTasks; - static volatile CountDownLatch hangLatch; - static volatile CountDownLatch terminateLatch; - static volatile ExecutorService executorService; + private static volatile AtomicIntegerWithMaxValueSeen concurrentTasks; + private static volatile CountDownLatch hangLatch; + private static volatile CountDownLatch terminateLatch; + private static volatile ExecutorService executorService; @Before public void setUp() throws Exception {