Skip to content

Commit

Permalink
GEODE-5652: use newCachedThreadPool by default in ExecutorServiceRule
Browse files Browse the repository at this point in the history
Changed default behavior so that the type of thread pool created is:
* Default (threadCount == 0): Executors.newCachedThreadPool()
* ThreadCount == 1: Executors.newSingleThreadExecutor()
* ThreadCount > 1: Executors.newFixedThreadPool(threadCount)
  • Loading branch information
kirklund committed Aug 28, 2018
1 parent 7b6f5fa commit 37c35a5
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 100 deletions.
Expand Up @@ -51,7 +51,6 @@
* will verify the functionality under distributed scenario.
*/
@SuppressWarnings("serial")

public class PartitionedRegionCreationDUnitTest extends CacheTestCase {

private VM vm0;
Expand Down
Expand Up @@ -44,15 +44,14 @@
import org.apache.geode.test.junit.categories.DLockTest;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;

@Category({DLockTest.class})
@Category(DLockTest.class)
public class DLockServiceLeakTest {

private Cache cache;
private DistributedRegion testRegion;

@Rule
public ExecutorServiceRule executorServiceRule =
ExecutorServiceRule.builder().threadCount(5).build();
public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();

@Before
public void setUp() {
Expand Down
Expand Up @@ -14,125 +14,87 @@
*/
package org.apache.geode.internal.cache;

import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_TIME_STATISTICS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.geode.cache.RegionShortcut.REPLICATE_PERSISTENT;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_ARCHIVE_FILE;
import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.io.File;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;

/**
* Test of interrupting threads doing disk writes to see the effect.
*
*/
public class InterruptDiskJUnitTest {

private static volatile Thread puttingThread;
private static final long MAX_WAIT = 60 * 1000;
private DistributedSystem ds;
private final AtomicInteger nextValue = new AtomicInteger();
private final AtomicReference<Thread> puttingThread = new AtomicReference<>();

private Cache cache;
private Region<Object, Object> region;
private AtomicLong nextValue = new AtomicLong();

@Rule
public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();

@Test
@Ignore
public void testLoop() throws Throwable {
for (int i = 0; i < 100; i++) {
System.err.println("i=" + i);
System.out.println("i=" + i);
testDRPutWithInterrupt();
tearDown();
setUp();
}
}

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Before
public void setUp() {
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
props.setProperty(LOG_LEVEL, "config"); // to keep diskPerf logs smaller
props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
props.setProperty(ENABLE_TIME_STATISTICS, "true");
props.setProperty(STATISTIC_ARCHIVE_FILE, "stats.gfs");
ds = DistributedSystem.connect(props);
cache = CacheFactory.create(ds);
File diskStore = new File("diskStore");
diskStore.mkdir();
cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskStore})
.create("store");
region = cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT)
.setDiskStoreName("store").create("region");
}
String diskStoreName = getClass().getSimpleName() + "_diskStore";
String regionName = getClass().getSimpleName() + "_region";

Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");

@After
public void tearDown() {
ds.disconnect();
}
File diskDir = temporaryFolder.getRoot();

cache = new CacheFactory(config).create();

@Test
public void testDRPutWithInterrupt() throws Throwable {
Callable doPuts = new Callable() {

@Override
public Object call() {
puttingThread = Thread.currentThread();
long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT);
while (!Thread.currentThread().isInterrupted()) {
region.put(0, nextValue.incrementAndGet());
if (System.nanoTime() > end) {
fail("Did not get interrupted in 60 seconds");
}
}
return null;
}
};
cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskDir})
.create(diskStoreName);

Future result = executorServiceRule.submit(doPuts);
region = cache.createRegionFactory(REPLICATE_PERSISTENT).setDiskStoreName(diskStoreName)
.create(regionName);
}

@After
public void tearDown() {
cache.close();
}

Thread.sleep(50);
long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT);
while (puttingThread == null) {
Thread.sleep(50);
if (System.nanoTime() > end) {
fail("Putting thread not set in 60 seconds");
@Test
public void testDRPutWithInterrupt() throws Exception {
Future<Void> doPutWhileNotInterrupted = executorServiceRule.runAsync(() -> {
puttingThread.set(Thread.currentThread());
while (!Thread.currentThread().isInterrupted()) {
region.put(0, nextValue.incrementAndGet());
}
}

puttingThread.interrupt();

result.get(60, TimeUnit.SECONDS);
});

assertEquals(nextValue.get(), region.get(0));
await().atMost(2, MINUTES).untilAsserted(() -> assertThat(puttingThread).isNotNull());
Thread.sleep(Duration.ofSeconds(1).toMillis());
puttingThread.get().interrupt();

doPutWhileNotInterrupted.get(2, MINUTES);
assertThat(region.get(0)).isEqualTo(nextValue.get());
}
}
Expand Up @@ -34,10 +34,10 @@

public class ExecutorServiceRuleIntegrationTest {

static volatile CountDownLatch hangLatch;
static volatile CountDownLatch terminateLatch;
static volatile ExecutorService executorService;
static Awaits.Invocations invocations;
private static volatile CountDownLatch hangLatch;
private static volatile CountDownLatch terminateLatch;
private static volatile ExecutorService executorService;
private static Awaits.Invocations invocations;

@Before
public void setUp() throws Exception {
Expand All @@ -64,7 +64,7 @@ public void tearDown() throws Exception {
}

@Test
public void awaitTermination() throws Exception {
public void awaitTermination() {
Result result = TestRunner.runTest(Awaits.class);
assertThat(result.wasSuccessful()).isTrue();

Expand Down
Expand Up @@ -14,7 +14,6 @@
*/
package org.apache.geode.test.junit.rules;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -124,7 +123,7 @@ protected ExecutorServiceRule(Builder builder) {
* {@code ExecutorService.shutdownNow()} during {@code tearDown}.
*/
public ExecutorServiceRule() {
threadCount = 1;
threadCount = 0;
enableAwaitTermination = false;
awaitTerminationTimeout = 0;
awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
Expand All @@ -149,10 +148,12 @@ public ExecutorServiceRule(int threadCount) {

@Override
public void before() {
if (threadCount > 1) {
executor = Executors.newFixedThreadPool(threadCount);
} else {
if (threadCount == 0) {
executor = Executors.newCachedThreadPool();
} else if (threadCount == 1) {
executor = Executors.newSingleThreadExecutor();
} else {
executor = Executors.newFixedThreadPool(threadCount);
}
}

Expand Down Expand Up @@ -272,7 +273,7 @@ public <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {

public static class Builder {

protected int threadCount = 1;
protected int threadCount = 0;
protected boolean enableAwaitTermination = false;
protected long awaitTerminationTimeout = 0;
protected TimeUnit awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
Expand All @@ -285,11 +286,16 @@ protected Builder() {
}

/**
* Configures the number of threads. Default is one thread.
* Configures the number of threads. Default is zero which results in using a cached thread pool
* with unlimited (Integer.MAX_VALUE) number of threads. Specifying a value above zero results
* in using a fixed thread pool.
*
* @param threadCount the number of threads in the pool
*/
public Builder threadCount(int threadCount) {
if (threadCount < 0) {
throw new IllegalArgumentException("threadCount cannot be less than 0");
}
this.threadCount = threadCount;
return this;
}
Expand Down Expand Up @@ -348,7 +354,6 @@ public Builder awaitTerminationAfterShutdown() {
* Builds the instance of {@code ExecutorServiceRule}.
*/
public ExecutorServiceRule build() {
assertThat(threadCount).isGreaterThan(0);
return new ExecutorServiceRule(this);
}
}
Expand Down
Expand Up @@ -36,10 +36,10 @@

public class ExecutorServiceRuleTest {

static volatile AtomicIntegerWithMaxValueSeen concurrentTasks;
static volatile CountDownLatch hangLatch;
static volatile CountDownLatch terminateLatch;
static volatile ExecutorService executorService;
private static volatile AtomicIntegerWithMaxValueSeen concurrentTasks;
private static volatile CountDownLatch hangLatch;
private static volatile CountDownLatch terminateLatch;
private static volatile ExecutorService executorService;

@Before
public void setUp() throws Exception {
Expand Down

0 comments on commit 37c35a5

Please sign in to comment.