Skip to content
Browse files

extended ParallelComputer with new factory methods

  • Loading branch information...
1 parent 30f2b16 commit f2fd08ee94c8ba72f8e6bcd552ecee04c2c4f891 @Tibor17 committed Oct 10, 2012
View
282 src/main/java/org/junit/experimental/ParallelComputer.java
@@ -3,22 +3,96 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Queue;
import org.junit.runner.Computer;
import org.junit.runner.Runner;
import org.junit.runners.ParentRunner;
import org.junit.runners.model.InitializationError;
import org.junit.runners.model.RunnerBuilder;
import org.junit.runners.model.RunnerScheduler;
+import org.junit.internal.runners.ErrorReportingRunner;
public class ParallelComputer extends Computer {
- private final boolean fClasses;
+ private final boolean fParallelClasses;
+ private final boolean fParallelMethods;
+ private final boolean fProvidedPools;
+ private final boolean fHasSinglePoll;
+ private final int fSinglePoolCoreSize;
+ private final int fSinglePoolMaxSize;
+ private final int fSinglePoolMinConcurrentMethods;
+ private final ExecutorService fPoolClasses;
+ private final ExecutorService fPoolMethods;
- private final boolean fMethods;
+ //disables the callers Thread for scheduling purposes until all classes finished
+ private volatile CountDownLatch fClassesFinisher;
+
+ //prevents resource exhaustion on classes when used with single Thread pool
+ private volatile Semaphore fSinglePoolBalancer;
+
+ private int countClasses;
public ParallelComputer(boolean classes, boolean methods) {
- fClasses = classes;
- fMethods = methods;
+ fParallelClasses= classes;
+ fParallelMethods= methods;
+ fPoolClasses= null;
+ fPoolMethods= null;
+ fProvidedPools= false;
+ fHasSinglePoll= fProvidedPools;
+ fSinglePoolCoreSize= -1;
+ fSinglePoolMaxSize= -1;
+ fSinglePoolMinConcurrentMethods= -1;
+ }
+
+ public ParallelComputer(ExecutorService poolClasses, ExecutorService poolMethods) {
+ this(poolClasses, poolMethods, -1, -1, -1);
+ }
+
+ public ParallelComputer(ThreadPoolExecutor pool, int minConcurrentMethods) {
+ this(pool, pool, pool.getCorePoolSize(), pool.getMaximumPoolSize(), minConcurrentMethods);
+ }
+
+ public ParallelComputer(ThreadPoolExecutor pool) {
+ this(pool, 1);
+ }
+
+ private ParallelComputer(ExecutorService poolClasses, ExecutorService poolMethods,
+ int singlePoolCoreSize, int singlePoolMaxSize,
+ int minConcurrentMethods) {
+ if (poolClasses == null && poolMethods == null)
+ throw new NullPointerException("null classes/methods executor");
+ if (poolClasses != null && poolClasses.isShutdown())
+ throw new IllegalStateException(poolClasses
+ + " provided classes executor is in shutdown state and cannot restart");
+ if (poolMethods != null && poolMethods.isShutdown())
+ throw new IllegalStateException(poolMethods
+ + " provided methods executor is in shutdown state and cannot restart");
+ fParallelClasses= poolClasses != null;
+ fParallelMethods= poolMethods != null;
+ fPoolClasses= poolClasses;
+ fPoolMethods= poolMethods;
+ fProvidedPools= true;
+ fHasSinglePoll= fPoolClasses == fPoolMethods;
+ fSinglePoolCoreSize= fHasSinglePoll ? singlePoolCoreSize : -1;
+ fSinglePoolMaxSize= fHasSinglePoll ? singlePoolMaxSize : -1;
+ fSinglePoolMinConcurrentMethods= fHasSinglePoll ? minConcurrentMethods : -1;
+ boolean isUnbounded= fSinglePoolCoreSize == 0 & fSinglePoolMaxSize > 1;
+ if (fHasSinglePoll && !isUnbounded && fSinglePoolCoreSize <= 1)
+ throw new IllegalArgumentException("core pool size " + fSinglePoolCoreSize + " should be > 1");
+ if (fHasSinglePoll && fSinglePoolMaxSize <= 1)
+ throw new IllegalArgumentException("max pool size " + fSinglePoolMaxSize + " should be > 1");
+ if (fHasSinglePoll && fSinglePoolMinConcurrentMethods < 1)
+ throw new IllegalArgumentException("min concurrent methods " + fSinglePoolMinConcurrentMethods + " should be >= 1");
+ if (fHasSinglePoll && fSinglePoolMinConcurrentMethods >= fSinglePoolMaxSize)
+ throw new IllegalArgumentException("min methods pool size should be less than max pool size");
}
public static Computer classes() {
@@ -29,39 +103,203 @@ public static Computer methods() {
return new ParallelComputer(false, true);
}
- private static Runner parallelize(Runner runner) {
- if (runner instanceof ParentRunner) {
- ((ParentRunner<?>) runner).setScheduler(new RunnerScheduler() {
- private final ExecutorService fService = Executors.newCachedThreadPool();
+ public static Computer methods(ExecutorService pool) {
+ if (pool == null) throw new NullPointerException("null methods executor");
+ return new ParallelComputer(null, pool);
+ }
- public void schedule(Runnable childStatement) {
- fService.submit(childStatement);
- }
+ public static Computer classes(ExecutorService pool) {
+ if (pool == null) throw new NullPointerException("null classes executor");
+ return new ParallelComputer(pool, null);
+ }
+
+ /**
+ * Parallel computer with infinitive thread pool size.
+ * @return parallel computer
+ */
+ public static Computer classesAndMethodsUnbounded() {
+ ThreadPoolExecutor pool= new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>());
+ return classesAndMethods(pool);
+ }
+
+ /**
+ * Parallel computer with fixed-size thred pool of concurrent classes and methods.
+ * @param nThreads two threads at least
+ * @return parallel computer with nThreads
+ * @throws IllegalArgumentException if nThreads < 2
+ */
+ public static Computer classesAndMethodsBounded(int nThreads) {
+ ThreadPoolExecutor pool= new ThreadPoolExecutor(nThreads, nThreads,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>());
+ return classesAndMethods(pool);
+ }
+
+ /**
+ * Parallel computer for concurrent classes and methods.
+ * Zero corePoolSize and maximumPoolSize greater than one is accepted.
+ * @param pool unbounded or fixed-size thread pool.
+ * @return computer parallelized by given pool
+ * @throws IllegalArgumentException if maximumPoolSize < 2; or corePoolSize < 2 for fixed-size pool
+ * ; or the pool is shut down
+ * @throws NullPointerException pool is null
+ */
+ public static Computer classesAndMethods(ThreadPoolExecutor pool) {
+ return classesAndMethods(pool, 1);
+ }
+
+ /**
+ * Parallel computer for concurrent classes and methods.
+ * Zero corePoolSize and maximumPoolSize greater than one is accepted.
+ * @param pool unbounded or fixed-size thread pool.
+ * @param minConcurrentMethods 1 to (pool capacity - 1)
+ * @return computer parallelized by given pool
+ * @throws IllegalArgumentException if maximumPoolSize < 2; or corePoolSize < 2 for fixed-size pool
+ * ; or minConcurrentMethods < 1; or the pool is shut down
+ * @throws NullPointerException pool is null
+ */
+ public static Computer classesAndMethods(ThreadPoolExecutor pool, int minConcurrentMethods) {
+ return new ParallelComputer(pool, minConcurrentMethods);
+ }
+
+ public static Computer classesAndMethods(ExecutorService poolClasses, ExecutorService poolMethods) {
+ if (poolClasses == null) throw new NullPointerException("null classes executor");
+ if (poolMethods == null) throw new NullPointerException("null methods executor");
+ if (poolClasses == poolMethods)
+ throw new IllegalArgumentException("instead call #classesAndMethods(ThreadPoolExecutor, int)");
+ return new ParallelComputer(poolClasses, poolMethods);
+ }
+
+ private Runner sequencer(ParentRunner runner) {
+ runner.setScheduler(new RunnerScheduler() {
+ public void schedule(Runnable childStatement) {
+ childStatement.run();
+ }
+
+ public void finished() {
+ fClassesFinisher.countDown();
+ }
+ });
+ return runner;
+ }
+
+ private Runner parallelize(final ParentRunner runner, final ExecutorService service, final boolean isClassPool) {
+ runner.setScheduler(new RunnerScheduler() {
+ private final ConcurrentLinkedQueue<Future<?>> fTaskFutures
+ = !isClassPool & fProvidedPools ? new ConcurrentLinkedQueue<Future<?>>() : null;
- public void finished() {
- try {
- fService.shutdown();
- fService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
- } catch (InterruptedException e) {
- e.printStackTrace(System.err);
- }
+ public void schedule(Runnable childStatement) {
+ if (isClassPool & fHasSinglePoll) fSinglePoolBalancer.acquireUninterruptibly();
+ Future<?> f= service.submit(childStatement);
+ if (!isClassPool & fProvidedPools) fTaskFutures.add(f);
+ }
+
+ public void finished() {
+ if (isClassPool) { //wait until all test cases finished
+ awaitClassesFinished();
+ tryFinish(service);
+ } else { //wait until the test case finished
+ if (fProvidedPools) {
+ awaitClassFinished(fTaskFutures);
+ if (fHasSinglePoll) fSinglePoolBalancer.release();
+ } else tryFinish(service);
}
- });
- }
+ }
+ });
return runner;
}
+ private static void tryFinish(ExecutorService pool) {
+ try {
+ pool.shutdown();
+ pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace(System.err);
+ }
+ }
+
@Override
public Runner getSuite(RunnerBuilder builder, java.lang.Class<?>[] classes)
throws InitializationError {
Runner suite = super.getSuite(builder, classes);
- return fClasses ? parallelize(suite) : suite;
+ if (canSchedule(suite)) {
+ if (fHasSinglePoll) {
+ int maxConcurrentClasses;
+ if (fSinglePoolCoreSize == 0 || fSinglePoolCoreSize == fSinglePoolMaxSize)
+ //is unbounded or fixed-size single pool
+ maxConcurrentClasses= fSinglePoolMaxSize - fSinglePoolMinConcurrentMethods;
+ else maxConcurrentClasses= Math.max(1, fSinglePoolCoreSize - fSinglePoolMinConcurrentMethods);
+ maxConcurrentClasses= Math.min(maxConcurrentClasses, countClasses);
+ fSinglePoolBalancer= new Semaphore(maxConcurrentClasses);
+ }
+ fClassesFinisher= new CountDownLatch(countClasses);
+ if (fParallelClasses) parallelize((ParentRunner) suite, threadPoolClasses(), true);
+ }
+ return suite;
}
@Override
protected Runner getRunner(RunnerBuilder builder, Class<?> testClass)
throws Throwable {
Runner runner = super.getRunner(builder, testClass);
- return fMethods ? parallelize(runner) : runner;
+ if (canSchedule(runner)) {
+ ++countClasses;
+ ParentRunner child= (ParentRunner) runner;
+ runner= fParallelMethods ? parallelize(child, threadPoolMethods(), false) : sequencer(child);
+ }
+ return runner;
+ }
+
+ /**
+ * Creates a New Cached Thread Pool to schedule methods in a test class.
+ * This method is called if ParallelComputer is instantiate by {@link #ParallelComputer(boolean, boolean)}.
+ * @return new Thread pool
+ */
+ protected ExecutorService newThreadPoolClasses() {
+ return Executors.newCachedThreadPool();
+ }
+
+ /**
+ * Creates a New Cached Thread Pool to schedule a test case.
+ * This method is called if ParallelComputer is instantiate by {@link #ParallelComputer(boolean, boolean)}.
+ * @return new Thread pool
+ */
+ protected ExecutorService newThreadPoolMethods() {
+ return Executors.newCachedThreadPool();
+ }
+
+ private boolean canSchedule(Runner runner) {
+ return !(runner instanceof ErrorReportingRunner) &&
+ runner instanceof ParentRunner &&
+ (fParallelClasses || fParallelMethods);
+ }
+
+ private ExecutorService threadPoolClasses() {
+ return fProvidedPools ? fPoolClasses : newThreadPoolClasses();
+ }
+
+ private ExecutorService threadPoolMethods() {
+ return fProvidedPools ? fPoolMethods : newThreadPoolMethods();
+ }
+
+ private void awaitClassFinished(Queue<Future<?>> children) {
+ for (Future<?> child : children) {
+ try {
+ child.get();
+ } catch (InterruptedException e) {
+ e.printStackTrace(System.err);
+ } catch (ExecutionException e) {/*fired in run-notifier*/}
+ }
+ fClassesFinisher.countDown();
+ }
+
+ private void awaitClassesFinished() {
+ try {
+ fClassesFinisher.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace(System.err);
+ }
}
}
View
85 src/test/java/org/junit/tests/experimental/parallel/ParallelClassTest.java
@@ -6,8 +6,13 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.Executors;
+import java.util.HashSet;
+import java.util.Collections;
import org.junit.Before;
import org.junit.Test;
@@ -21,51 +26,41 @@
private static volatile Thread fExample1Two = null;
private static volatile Thread fExample2One = null;
private static volatile Thread fExample2Two = null;
- private static volatile CountDownLatch fSynchronizer;
+ private static volatile CyclicBarrier fSynchronizer;
public static class Example1 {
- @Test
- public void one() throws InterruptedException {
- fSynchronizer.countDown();
- assertTrue(fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS));
- fExample1One = Thread.currentThread();
+ @Test public void one() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
+ fExample1One= Thread.currentThread();
}
- @Test
- public void two() throws InterruptedException {
- fSynchronizer.countDown();
- assertTrue(fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS));
- fExample1Two = Thread.currentThread();
+ @Test public void two() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
+ fExample1Two= Thread.currentThread();
}
}
public static class Example2 {
- @Test
- public void one() throws InterruptedException {
- fSynchronizer.countDown();
- assertTrue(fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS));
- fExample2One = Thread.currentThread();
+ @Test public void one() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
+ fExample2One= Thread.currentThread();
}
- @Test
- public void two() throws InterruptedException {
- fSynchronizer.countDown();
- assertTrue(fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS));
- fExample2Two = Thread.currentThread();
+ @Test public void two() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
+ fExample2Two= Thread.currentThread();
}
}
- @Before
- public void init() {
+ @Before public void init() {
fExample1One = null;
fExample1Two = null;
fExample2One = null;
fExample2Two = null;
- fSynchronizer = new CountDownLatch(2);
}
- @Test
- public void testsRunInParallel() {
+ @Test public void infinitivePool() {
+ fSynchronizer= new CyclicBarrier(2);
Result result = JUnitCore.runClasses(ParallelComputer.classes(), Example1.class, Example2.class);
assertTrue(result.wasSuccessful());
assertNotNull(fExample1One);
@@ -75,5 +70,41 @@ public void testsRunInParallel() {
assertThat(fExample1One, is(fExample1Two));
assertThat(fExample2One, is(fExample2Two));
assertThat(fExample1One, is(not(fExample2One)));
+ HashSet<Thread> threads= new HashSet<Thread>();
+ Collections.addAll(threads, fExample1One, fExample1Two, fExample2One, fExample2Two);
+ assertThat(threads.size(), is(2));
+ }
+
+ @Test public void singleThread() throws InterruptedException {
+ fSynchronizer= null;
+ Result result= JUnitCore.runClasses(ParallelComputer.classes(Executors.newSingleThreadExecutor()),
+ Example1.class, Example2.class);
+ assertTrue(result.wasSuccessful());
+ assertNotNull(fExample1One);
+ assertNotNull(fExample1Two);
+ assertNotNull(fExample2One);
+ assertNotNull(fExample2Two);
+ HashSet<Thread> threads= new HashSet<Thread>();
+ Collections.addAll(threads, fExample1One, fExample1Two, fExample2One, fExample2Two);
+ assertThat(threads.size(), is(1));
+ assertThat(threads.iterator().next(), is(not(Thread.currentThread())));
+ }
+
+ @Test public void fixedSizePool() throws InterruptedException {
+ fSynchronizer= new CyclicBarrier(2);
+ Class<?>[] classes= {Example1.class, Example2.class};
+ Result result= JUnitCore.runClasses(ParallelComputer.classes(Executors.newFixedThreadPool(classes.length)),
+ classes);
+ assertTrue(result.wasSuccessful());
+ assertNotNull(fExample1One);
+ assertNotNull(fExample1Two);
+ assertNotNull(fExample2One);
+ assertNotNull(fExample2Two);
+ assertThat(fExample1One, is(fExample1Two));
+ assertThat(fExample2One, is(fExample2Two));
+ assertThat(fExample1One, is(not(fExample2One)));
+ HashSet<Thread> threads= new HashSet<Thread>();
+ Collections.addAll(threads, fExample1One, fExample1Two, fExample2One, fExample2Two);
+ assertThat(threads.size(), is(2));
}
}
View
301 src/test/java/org/junit/tests/experimental/parallel/ParallelClassesAndMethodsTest.java
@@ -0,0 +1,301 @@
+package org.junit.tests.experimental.parallel;
+
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.experimental.ParallelComputer;
+import org.junit.rules.TestName;
+import org.junit.runner.Computer;
+import org.junit.runner.JUnitCore;
+import org.junit.runner.Result;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import java.util.HashSet;
+import java.util.Collections;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Arrays;
+
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.AnyOf.anyOf;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Testing ParallelComputer.
+ * <p/>
+ *
+ * @author tibor17
+ * @since 4.11, 29.9.2012, 14:58
+ */
+public final class ParallelClassesAndMethodsTest {
+ private static final long TIMEOUT= 15;
+
+ private static volatile Thread fExample1One= null;
+ private static volatile Thread fExample1Two= null;
+ private static volatile Thread fExample2One= null;
+ private static volatile Thread fExample2Two= null;
+ private static volatile Thread fExample3One= null;
+ private static volatile Thread fExample3Two= null;
+ private static volatile CyclicBarrier fSynchronizer;
+
+ public static class Example1 {
+ @Test public void one() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
+ fExample1One= Thread.currentThread();
+ }
+ @Test public void two() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
+ fExample1Two= Thread.currentThread();
+ }
+ }
+
+ public static class Example2 {
+ @Test public void one() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
+ fExample2One= Thread.currentThread();
+ }
+ @Test public void two() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
+ fExample2Two= Thread.currentThread();
+ }
+ }
+
+ public static class Example3 {
+ @Test public void one() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
+ fExample3One= Thread.currentThread();
+ }
+ @Test public void two() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
+ fExample3Two= Thread.currentThread();
+ }
+ }
+
+ public static class Erroneous {
+ @Rule static final TestName testName= new TestName();
+ //intended error in rule -no public instance member
+ @Test public void test() {}
+ }
+
+ @RunWith(Parameterized.class) public static class FibonacciTest {
+ @Parameters(name = "{index}: fib({0})={1}")
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][] {{ 0, 0 }, { 1, 1 }, { 2, 1 }, { 3, 2 }, { 4, 3 }, { 5, 5 }, { 6, 8 }});
+ }
+
+ private int fibonacci(int length) {
+ int n2= 0, n1= 0, f;
+ do {
+ f= n2 + n1;
+ n2= n1;
+ n1= f;
+ if (n1 == 0) n2= 1;
+ } while (length-- != 0);
+ return f;
+ }
+
+ private final int fInput;
+ private final int fExpected;
+
+ public FibonacciTest(int input, int expected) {
+ fInput= input;
+ fExpected= expected;
+ }
+
+ @Test public void test1() {
+ assertEquals(fExpected, fibonacci(fInput));
+ }
+ @Test public void test2() {
+ assertEquals(fExpected, fibonacci(fInput));
+ }
+ }
+
+ @Before
+ public void init() {
+ fExample1One= null;
+ fExample1Two= null;
+ fExample2One= null;
+ fExample2Two= null;
+ fExample3One= null;
+ fExample3Two= null;
+ }
+
+ @Test public void negativeTest() {
+ fSynchronizer= null;
+ Class<?>[] classes= {Example1.class, Example2.class, Example3.class, Erroneous.class};
+
+ Computer comp= ParallelComputer.methods();
+ Result result= JUnitCore.runClasses(comp, classes);
+ assertFalse(result.wasSuccessful());
+
+ comp= ParallelComputer.classes();
+ result= JUnitCore.runClasses(comp, classes);
+ assertFalse(result.wasSuccessful());
+
+ comp= ParallelComputer.classesAndMethodsUnbounded();
+ result= JUnitCore.runClasses(comp, classes);
+ assertFalse(result.wasSuccessful());
+
+ comp= ParallelComputer.classesAndMethodsBounded(10);
+ result= JUnitCore.runClasses(comp, classes);
+ assertFalse(result.wasSuccessful());
+ }
+
+ @Test public void runParallelOversize() throws InterruptedException {
+ fSynchronizer= new CyclicBarrier(6);//continue with tests if 6 threads for methods are scheduled until timeout; otherwise fail
+ ThreadPoolExecutor pool= new ThreadPoolExecutor(0, 11,// 11 => more than (six thread parties in barrier + three concurrent classes)
+ Long.MAX_VALUE, TimeUnit.NANOSECONDS,//don't reuse threads
+ new SynchronousQueue<Runnable>());//default like for unbounded pools
+ Computer comp= ParallelComputer.classesAndMethods(pool, 7);//capacity: 7 concurrent methods and 4 concurrent classes
+ Result result= JUnitCore.runClasses(comp, Example1.class, Example2.class, Example3.class);
+ assertTrue(result.wasSuccessful());
+ assertNotNull(fExample1One);
+ assertNotNull(fExample1Two);
+ assertNotNull(fExample2One);
+ assertNotNull(fExample2Two);
+ assertNotNull(fExample3One);
+ assertNotNull(fExample3Two);
+ HashSet<Thread> threads= new HashSet<Thread>();
+ Collections.addAll(threads, fExample1One, fExample1Two,
+ fExample2One, fExample2Two,
+ fExample3One, fExample3Two);
+ assertThat(threads.size(), is(6));
+ //should be 6 == number of thread parties in CyclicBarrier == number of concurrent methods
+ //7 == capacity, we give the Computer a chance to excess the number of concurrent methods if fails
+ }
+
+ @Test public void runParallel() throws InterruptedException {
+ fSynchronizer= new CyclicBarrier(6);//continue with tests if 6 threads for methods are scheduled until timeout; otherwise fail
+ ThreadPoolExecutor pool= new ThreadPoolExecutor(0, 9,// 9 == (six thread parties in barrier + three concurrent classes)
+ Long.MAX_VALUE, TimeUnit.NANOSECONDS,//don't reuse threads
+ new SynchronousQueue<Runnable>());//default like for unbounded pools
+ Computer comp= ParallelComputer.classesAndMethods(pool, 6);//capacity: 6 concurrent methods and 3 concurrent classes
+ Result result= JUnitCore.runClasses(comp, Example1.class, Example2.class, Example3.class);
+ assertTrue(result.wasSuccessful());
+ assertNotNull(fExample1One);
+ assertNotNull(fExample1Two);
+ assertNotNull(fExample2One);
+ assertNotNull(fExample2Two);
+ assertNotNull(fExample3One);
+ assertNotNull(fExample3Two);
+ HashSet<Thread> threads= new HashSet<Thread>();
+ Collections.addAll(threads, fExample1One, fExample1Two,
+ fExample2One, fExample2Two,
+ fExample3One, fExample3Two);
+ assertThat(threads.size(), is(6));
+ //should be 6 == number of thread parties in CyclicBarrier == number of concurrent methods
+ }
+
+ @Test public void runParallelUndersized() throws InterruptedException {
+ fSynchronizer= null;
+ ThreadPoolExecutor pool= new ThreadPoolExecutor(3, 4,
+ 0, TimeUnit.NANOSECONDS,//reuse threads
+ new LinkedBlockingQueue<Runnable>());//default like for unbounded pools
+ Computer comp= ParallelComputer.classesAndMethods(pool, 2);//capacity: 2 concurrent methods at least, and 1 or 2 concurrent classes
+ Result result= JUnitCore.runClasses(comp, Example1.class, Example2.class, Example3.class);
+ assertTrue(result.wasSuccessful());
+ assertNotNull(fExample1One);
+ assertNotNull(fExample1Two);
+ assertNotNull(fExample2One);
+ assertNotNull(fExample2Two);
+ assertNotNull(fExample3One);
+ assertNotNull(fExample3Two);
+ HashSet<Thread> threads= new HashSet<Thread>();
+ Collections.addAll(threads, fExample1One, fExample1Two,
+ fExample2One, fExample2Two,
+ fExample3One, fExample3Two);
+ assertThat(threads.size(), anyOf(is(2), is(3)));
+ }
+
+ @Test public void runParallelMinimum() throws InterruptedException {
+ fSynchronizer= null;
+ Computer comp= ParallelComputer.classesAndMethodsBounded(2);//capacity: 1 concurrent method, and 1 concurrent class
+ Result result= JUnitCore.runClasses(comp, Example1.class, Example2.class, Example3.class);
+ assertTrue(result.wasSuccessful());
+ assertNotNull(fExample1One);
+ assertNotNull(fExample1Two);
+ assertNotNull(fExample2One);
+ assertNotNull(fExample2Two);
+ assertNotNull(fExample3One);
+ assertNotNull(fExample3Two);
+ HashSet<Thread> threads= new HashSet<Thread>();
+ Collections.addAll(threads, fExample1One, fExample1Two,
+ fExample2One, fExample2Two,
+ fExample3One, fExample3Two);
+ assertThat(threads.size(), anyOf(is(1), is(2)));
+ // Might be two method Threads in entire run time, but cannot be concurrent.
+ // e.g. class Thread might be used for Method, and vice versa at some later time.
+ // Thus the Threads may interchange, but still one Thread per class, and one Thread per method.
+ }
+
+ @Test public void classesAndMethodsUnbounded() {
+ fSynchronizer= new CyclicBarrier(6);//continue with tests if 6 threads for methods are scheduled until timeout; otherwise fail
+ Computer comp= ParallelComputer.classesAndMethodsUnbounded();
+ Result result= JUnitCore.runClasses(comp, Example1.class, Example2.class, Example3.class);
+ assertTrue(result.wasSuccessful());
+ assertNotNull(fExample1One);
+ assertNotNull(fExample1Two);
+ assertNotNull(fExample2One);
+ assertNotNull(fExample2Two);
+ assertNotNull(fExample3One);
+ assertNotNull(fExample3Two);
+ HashSet<Thread> threads= new HashSet<Thread>();
+ Collections.addAll(threads, fExample1One, fExample1Two,
+ fExample2One, fExample2Two,
+ fExample3One, fExample3Two);
+ assertThat(threads.size(), is(6));
+ }
+
+ @Test public void classesAndMethodsBounded() throws InterruptedException {
+ fSynchronizer= new CyclicBarrier(6);//continue with tests if 6 threads for methods are scheduled until timeout; otherwise fail
+ Computer comp= ParallelComputer.classesAndMethodsBounded(10);//min capacity: 6 concurrent methods, and 3 concurrent classes
+ Result result= JUnitCore.runClasses(comp, Example1.class, Example2.class, Example3.class);
+ assertTrue(result.wasSuccessful());
+ assertNotNull(fExample1One);
+ assertNotNull(fExample1Two);
+ assertNotNull(fExample2One);
+ assertNotNull(fExample2Two);
+ assertNotNull(fExample3One);
+ assertNotNull(fExample3Two);
+ HashSet<Thread> threads= new HashSet<Thread>();
+ Collections.addAll(threads, fExample1One, fExample1Two,
+ fExample2One, fExample2Two,
+ fExample3One, fExample3Two);
+ assertThat(threads.size(), is(6));
+ }
+
+ @Test public void classesAndMethodsBoundedUndersized() throws InterruptedException {
+ fSynchronizer= null;
+ Computer comp= ParallelComputer.classesAndMethodsBounded(4);//1 - 3 concurrent methods
+ Result result= JUnitCore.runClasses(comp, Example1.class, Example2.class, Example3.class);
+ assertTrue(result.wasSuccessful());
+ assertNotNull(fExample1One);
+ assertNotNull(fExample1Two);
+ assertNotNull(fExample2One);
+ assertNotNull(fExample2Two);
+ assertNotNull(fExample3One);
+ assertNotNull(fExample3Two);
+ HashSet<Thread> threads= new HashSet<Thread>();
+ Collections.addAll(threads, fExample1One, fExample1Two,
+ fExample2One, fExample2Two,
+ fExample3One, fExample3Two);
+ assertThat(threads.size(), anyOf(is(1), is(2), is(3)));
+ }
+
+ @Test public void fibonacci() {
+ Computer comp= ParallelComputer.classesAndMethodsUnbounded();
+ Result result= JUnitCore.runClasses(comp, FibonacciTest.class);
+ assertTrue(result.wasSuccessful());
+ }
+}
View
58 src/test/java/org/junit/tests/experimental/parallel/ParallelMethodTest.java
@@ -6,11 +6,14 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.Executors;
+import java.util.HashSet;
import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.ParallelComputer;
import org.junit.runner.JUnitCore;
@@ -20,42 +23,55 @@
private static final long TIMEOUT = 15;
private static volatile Thread fOne = null;
private static volatile Thread fTwo = null;
+ private static volatile CyclicBarrier fSynchronizer;
public static class Example {
- private static volatile CountDownLatch fSynchronizer;
-
- @BeforeClass
- public static void init() {
- fSynchronizer = new CountDownLatch(2);
- }
-
- @Test
- public void one() throws InterruptedException {
- fSynchronizer.countDown();
- assertTrue(fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS));
+ @Test public void one() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
fOne = Thread.currentThread();
}
- @Test
- public void two() throws InterruptedException {
- fSynchronizer.countDown();
- assertTrue(fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS));
+ @Test public void two() throws InterruptedException, BrokenBarrierException, TimeoutException {
+ if (fSynchronizer != null) fSynchronizer.await(TIMEOUT, TimeUnit.SECONDS);
fTwo = Thread.currentThread();
}
}
- @Before
- public void init() {
+ @Before public void init() {
fOne = null;
fTwo = null;
}
- @Test
- public void testsRunInParallel() {
+ @Test public void infinitivePool() {
+ fSynchronizer = new CyclicBarrier(2);
Result result = JUnitCore.runClasses(ParallelComputer.methods(), Example.class);
assertTrue(result.wasSuccessful());
assertNotNull(fOne);
assertNotNull(fTwo);
assertThat(fOne, is(not(fTwo)));
}
+
+ @Test public void singleThread() throws InterruptedException {
+ fSynchronizer= null;
+ Result result= JUnitCore.runClasses(ParallelComputer.methods(Executors.newSingleThreadExecutor()),
+ Example.class);
+ assertTrue(result.wasSuccessful());
+ assertNotNull(fOne);
+ assertNotNull(fTwo);
+ HashSet<Thread> threads= new HashSet<Thread>();
+ threads.add(fOne);
+ threads.add(fTwo);
+ assertThat(threads.size(), is(1));
+ assertThat(threads.iterator().next(), is(not(Thread.currentThread())));
+ }
+
+ @Test public void fixedSizePool() throws InterruptedException {
+ fSynchronizer= new CyclicBarrier(2);
+ Result result= JUnitCore.runClasses(ParallelComputer.methods(Executors.newFixedThreadPool(2)),
+ Example.class);
+ assertTrue(result.wasSuccessful());
+ assertNotNull(fOne);
+ assertNotNull(fTwo);
+ assertThat(fOne, is(not(fTwo)));
+ }
}

0 comments on commit f2fd08e

Please sign in to comment.
Something went wrong with that request. Please try again.