Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEODE-5652: use unlimited ThreadPoolExecutor in ExecutorServiceRule #2395

Merged
merged 1 commit into from Aug 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -51,7 +51,6 @@
* will verify the functionality under distributed scenario.
*/
@SuppressWarnings("serial")

public class PartitionedRegionCreationDUnitTest extends CacheTestCase {

private VM vm0;
Expand Down
Expand Up @@ -44,15 +44,14 @@
import org.apache.geode.test.junit.categories.DLockTest;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;

@Category({DLockTest.class})
@Category(DLockTest.class)
public class DLockServiceLeakTest {

private Cache cache;
private DistributedRegion testRegion;

@Rule
public ExecutorServiceRule executorServiceRule =
ExecutorServiceRule.builder().threadCount(5).build();
public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();

@Before
public void setUp() {
Expand Down
Expand Up @@ -14,125 +14,87 @@
*/
package org.apache.geode.internal.cache;

import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_TIME_STATISTICS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static org.apache.geode.cache.RegionShortcut.REPLICATE_PERSISTENT;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_ARCHIVE_FILE;
import static org.apache.geode.distributed.ConfigurationProperties.STATISTIC_SAMPLING_ENABLED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.io.File;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.distributed.DistributedSystem;
import org.apache.geode.test.junit.rules.ExecutorServiceRule;

/**
* Test of interrupting threads doing disk writes to see the effect.
*
*/
public class InterruptDiskJUnitTest {

private static volatile Thread puttingThread;
private static final long MAX_WAIT = 60 * 1000;
private DistributedSystem ds;
private final AtomicInteger nextValue = new AtomicInteger();
private final AtomicReference<Thread> puttingThread = new AtomicReference<>();

private Cache cache;
private Region<Object, Object> region;
private AtomicLong nextValue = new AtomicLong();

@Rule
public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();

@Test
@Ignore
public void testLoop() throws Throwable {
for (int i = 0; i < 100; i++) {
System.err.println("i=" + i);
System.out.println("i=" + i);
testDRPutWithInterrupt();
tearDown();
setUp();
}
}

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

@Before
public void setUp() {
Properties props = new Properties();
props.setProperty(MCAST_PORT, "0");
props.setProperty(LOCATORS, "");
props.setProperty(LOG_LEVEL, "config"); // to keep diskPerf logs smaller
props.setProperty(STATISTIC_SAMPLING_ENABLED, "true");
props.setProperty(ENABLE_TIME_STATISTICS, "true");
props.setProperty(STATISTIC_ARCHIVE_FILE, "stats.gfs");
ds = DistributedSystem.connect(props);
cache = CacheFactory.create(ds);
File diskStore = new File("diskStore");
diskStore.mkdir();
cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskStore})
.create("store");
region = cache.createRegionFactory(RegionShortcut.REPLICATE_PERSISTENT)
.setDiskStoreName("store").create("region");
}
String diskStoreName = getClass().getSimpleName() + "_diskStore";
String regionName = getClass().getSimpleName() + "_region";

Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");

@After
public void tearDown() {
ds.disconnect();
}
File diskDir = temporaryFolder.getRoot();

cache = new CacheFactory(config).create();

@Test
public void testDRPutWithInterrupt() throws Throwable {
Callable doPuts = new Callable() {

@Override
public Object call() {
puttingThread = Thread.currentThread();
long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT);
while (!Thread.currentThread().isInterrupted()) {
region.put(0, nextValue.incrementAndGet());
if (System.nanoTime() > end) {
fail("Did not get interrupted in 60 seconds");
}
}
return null;
}
};
cache.createDiskStoreFactory().setMaxOplogSize(1).setDiskDirs(new File[] {diskDir})
.create(diskStoreName);

Future result = executorServiceRule.submit(doPuts);
region = cache.createRegionFactory(REPLICATE_PERSISTENT).setDiskStoreName(diskStoreName)
.create(regionName);
}

@After
public void tearDown() {
cache.close();
}

Thread.sleep(50);
long end = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(MAX_WAIT);
while (puttingThread == null) {
Thread.sleep(50);
if (System.nanoTime() > end) {
fail("Putting thread not set in 60 seconds");
@Test
public void testDRPutWithInterrupt() throws Exception {
Future<Void> doPutWhileNotInterrupted = executorServiceRule.runAsync(() -> {
puttingThread.set(Thread.currentThread());
while (!Thread.currentThread().isInterrupted()) {
region.put(0, nextValue.incrementAndGet());
}
}

puttingThread.interrupt();

result.get(60, TimeUnit.SECONDS);
});

assertEquals(nextValue.get(), region.get(0));
await().atMost(2, MINUTES).untilAsserted(() -> assertThat(puttingThread).isNotNull());
Thread.sleep(Duration.ofSeconds(1).toMillis());
puttingThread.get().interrupt();

doPutWhileNotInterrupted.get(2, MINUTES);
assertThat(region.get(0)).isEqualTo(nextValue.get());
}
}
Expand Up @@ -34,10 +34,10 @@

public class ExecutorServiceRuleIntegrationTest {

static volatile CountDownLatch hangLatch;
static volatile CountDownLatch terminateLatch;
static volatile ExecutorService executorService;
static Awaits.Invocations invocations;
private static volatile CountDownLatch hangLatch;
private static volatile CountDownLatch terminateLatch;
private static volatile ExecutorService executorService;
private static Awaits.Invocations invocations;

@Before
public void setUp() throws Exception {
Expand All @@ -64,7 +64,7 @@ public void tearDown() throws Exception {
}

@Test
public void awaitTermination() throws Exception {
public void awaitTermination() {
Result result = TestRunner.runTest(Awaits.class);
assertThat(result.wasSuccessful()).isTrue();

Expand Down
Expand Up @@ -14,8 +14,6 @@
*/
package org.apache.geode.test.junit.rules;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
Expand All @@ -32,22 +30,15 @@
* creates an {@code ExecutorService} which is terminated after the scope of the {@code Rule}. This
* {@code Rule} can be used in tests for hangs, deadlocks, and infinite loops.
*
* <p>
* By default, the {@code ExecutorService} is single-threaded. You can specify the thread count by
* using {@link Builder#threadCount(int)} or {@link #ExecutorServiceRule(int)}.
*
* <p>
* Example with default configuration (single-threaded and does not assert that tasks are done):
*
* <pre>
* private CountDownLatch hangLatch = new CountDownLatch(1);
*
* {@literal @}Rule
* public AsynchronousRule asynchronousRule = new AsynchronousRule();
* public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule();
*
* {@literal @}Test
* public void doTest() throws Exception {
* Future<Void> result = asynchronousRule.runAsync(() -> {
* Future<Void> result = executorServiceRule.runAsync(() -> {
* try {
* hangLatch.await();
* } catch (InterruptedException e) {
Expand All @@ -73,17 +64,13 @@
* private CountDownLatch hangLatch = new CountDownLatch(1);
*
* {@literal @}Rule
* public ExecutorServiceRule asynchronousRule = ExecutorServiceRule.builder().threadCount(10).awaitTermination(10, MILLISECONDS).build();
* public ExecutorServiceRule executorServiceRule = ExecutorServiceRule.builder().awaitTermination(10, SECONDS).build();
*
* {@literal @}Test
* public void doTest() throws Exception {
* for (int i = 0; i < 10; i++) {
* asynchronousRule.runAsync(() -> {
* try {
* hangLatch.await();
* } catch (InterruptedException e) {
* // do nothing
* }
* executorServiceRule.runAsync(() -> {
* hangLatch.await();
* });
* }
* }
Expand All @@ -92,7 +79,6 @@
@SuppressWarnings("unused")
public class ExecutorServiceRule extends SerializableExternalResource {

protected final int threadCount;
protected final boolean enableAwaitTermination;
protected final long awaitTerminationTimeout;
protected final TimeUnit awaitTerminationTimeUnit;
Expand All @@ -110,7 +96,6 @@ public static Builder builder() {
}

protected ExecutorServiceRule(Builder builder) {
threadCount = builder.threadCount;
enableAwaitTermination = builder.enableAwaitTermination;
awaitTerminationTimeout = builder.awaitTerminationTimeout;
awaitTerminationTimeUnit = builder.awaitTerminationTimeUnit;
Expand All @@ -120,25 +105,10 @@ protected ExecutorServiceRule(Builder builder) {
}

/**
* Constructs a new single-threaded {@code ExecutorServiceRule} which invokes
* {@code ExecutorService.shutdownNow()} during {@code tearDown}.
* Constructs a {@code ExecutorServiceRule} which invokes {@code ExecutorService.shutdownNow()}
* during {@code tearDown}.
*/
public ExecutorServiceRule() {
threadCount = 1;
enableAwaitTermination = false;
awaitTerminationTimeout = 0;
awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
awaitTerminationBeforeShutdown = false;
useShutdown = false;
useShutdownNow = true;
}

/**
* Constructs a new multi-threaded {@code ExecutorServiceRule} which invokes
* {@code ExecutorService.shutdownNow()} during {@code tearDown}.
*/
public ExecutorServiceRule(int threadCount) {
this.threadCount = threadCount;
enableAwaitTermination = false;
awaitTerminationTimeout = 0;
awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
Expand All @@ -149,11 +119,7 @@ public ExecutorServiceRule(int threadCount) {

@Override
public void before() {
if (threadCount > 1) {
executor = Executors.newFixedThreadPool(threadCount);
} else {
executor = Executors.newSingleThreadExecutor();
}
executor = Executors.newCachedThreadPool();
}

@Override
Expand Down Expand Up @@ -272,28 +238,17 @@ public <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {

public static class Builder {

protected int threadCount = 1;
protected boolean enableAwaitTermination = false;
protected long awaitTerminationTimeout = 0;
protected boolean enableAwaitTermination;
protected long awaitTerminationTimeout;
protected TimeUnit awaitTerminationTimeUnit = TimeUnit.NANOSECONDS;
protected boolean awaitTerminationBeforeShutdown = true;
protected boolean useShutdown = false;
protected boolean useShutdown;
protected boolean useShutdownNow = true;

protected Builder() {
// nothing
}

/**
* Configures the number of threads. Default is one thread.
*
* @param threadCount the number of threads in the pool
*/
public Builder threadCount(int threadCount) {
this.threadCount = threadCount;
return this;
}

/**
* Enables invocation of {@code awaitTermination} during {@code tearDown}. Default is disabled.
*
Expand Down Expand Up @@ -348,7 +303,6 @@ public Builder awaitTerminationAfterShutdown() {
* Builds the instance of {@code ExecutorServiceRule}.
*/
public ExecutorServiceRule build() {
assertThat(threadCount).isGreaterThan(0);
return new ExecutorServiceRule(this);
}
}
Expand Down