Permalink
Browse files

support shut-down; performance tests; javadoc

  • Loading branch information...
Tibor17 committed Nov 23, 2012
1 parent 9ff646e commit f8af09762a31ad9cf503fec62d18b5c1775c3584
@@ -11,6 +11,9 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Queue;
import org.junit.runner.Computer;
@@ -23,6 +26,43 @@
/**
* Represents a factory of Computers scheduling concurrent JUnit classes and methods in several variants.
+ * <p><pre>
+ * public static class Slow2 extends Slow1 {}
+ *
+ * public static class Slow1 {
+ * &#064;Test
+ * public void slow() throws InterruptedException {
+ * Thread.sleep(900);
+ * }
+ *
+ * &#064;Test
+ * public void slower() throws InterruptedException {
+ * Thread.sleep(1100);
+ * }
+ * }
+ * </pre>
+ * <p>
+ * <h3>Executing parallel classes</h3>
+ * This executes parallel classes in one infinite-size thread pool.
+ * <p><pre>
+ * JUnitCore.runClasses(ParallelComputer.classes(), Slow1.class, Slow2.class);
+ * </pre>
+ *
+ * <h3>Executing parallel methods</h3>
+ * This creates a new infinite-size thread pool per class and executes its methods in parallel.
+ * <p><pre>
+ * JUnitCore.runClasses(ParallelComputer.methods(), Slow1.class, Slow2.class);
+ * </pre>
+ *
+ * <h3>Executing parallel classes and methods</h3>
+ * <p><pre>
+ * Executes parallel classes and methods in one infinite-size thread pool.
+ * JUnitCore.runClasses(ParallelComputer.classesAndMethodsUnbounded(), Slow1.class, Slow2.class);
+ * </pre>
+ *
+ * @author tibor17
+ * @since 4.12
+ * @see <a href="https://github.com/KentBeck/junit/wiki/ParallelComputer">ParallelComputer at JUnit wiki</a>
*/
public class ParallelComputer extends Computer {
// true: parallelized classes if #classes(), #classesAndMethods() or ParallelComputer(true, x), ParallelComputer(pool[,...])
@@ -60,10 +100,13 @@
// used in #parallelize(): allows a number of parallel classes and methods
private volatile Semaphore fSinglePoolBalancer;
+ //set if a pool is shut down externally
+ private final AtomicBoolean fIsShutDown= new AtomicBoolean();
+
// fClassesFinisher is initialized with this value, see #getSuite() and #getParent()
- private int countClasses;
+ private volatile int fCountClasses;
- @Deprecated
+ @Deprecated//should be private
public ParallelComputer(boolean classes, boolean methods) {
fParallelClasses= classes;
fParallelMethods= methods;
@@ -84,10 +127,6 @@ private ParallelComputer(ThreadPoolExecutor pool, int minConcurrentMethods) {
this(pool, pool, pool.getCorePoolSize(), pool.getMaximumPoolSize(), minConcurrentMethods);
}
- private ParallelComputer(ThreadPoolExecutor pool) {
- this(pool, 1);
- }
-
private ParallelComputer(ExecutorService poolClasses, ExecutorService poolMethods,
int singlePoolCoreSize, int singlePoolMaxSize,
int minConcurrentMethods) {
@@ -108,8 +147,8 @@ private ParallelComputer(ExecutorService poolClasses, ExecutorService poolMethod
fSinglePoolCoreSize= fHasSinglePoll ? singlePoolCoreSize : -1;
fSinglePoolMaxSize= fHasSinglePoll ? singlePoolMaxSize : -1;
fSinglePoolMinConcurrentMethods= fHasSinglePoll ? minConcurrentMethods : -1;
- boolean isUnbounded= fSinglePoolCoreSize == 0 & fSinglePoolMaxSize > 1;
- if (fHasSinglePoll && !isUnbounded && fSinglePoolCoreSize <= 1)
+ final boolean isFixedSize= fSinglePoolCoreSize == fSinglePoolMaxSize;
+ if (fHasSinglePoll && isFixedSize && fSinglePoolCoreSize <= 1)
throw new IllegalArgumentException("core pool size " + fSinglePoolCoreSize + " should be > 1");
if (fHasSinglePoll && fSinglePoolMaxSize <= 1)
throw new IllegalArgumentException("max pool size " + fSinglePoolMaxSize + " should be > 1");
@@ -149,10 +188,10 @@ public static Computer classesAndMethodsUnbounded() {
}
/**
- * Parallel computer with fixed-size thred pool of concurrent classes and methods.
+ * Parallel computer with fixed-size thread pool of concurrent classes and methods.
* @param nThreads two threads at least
- * @return parallel computer with nThreads
- * @throws IllegalArgumentException if nThreads < 2
+ * @return parallel computer with <tt>nThreads</tt>
+ * @throws IllegalArgumentException if <tt>nThreads &lt; 2</tt>
*/
public static Computer classesAndMethodsBounded(int nThreads) {
ThreadPoolExecutor pool= new ThreadPoolExecutor(nThreads, nThreads,
@@ -163,31 +202,39 @@ public static Computer classesAndMethodsBounded(int nThreads) {
/**
* Parallel computer for concurrent classes and methods.
- * Zero corePoolSize and maximumPoolSize greater than one is accepted.
+ * Zero <tt>corePoolSize</tt> and <tt>maximumPoolSize</tt> greater than one is accepted.
* @param pool unbounded or fixed-size thread pool.
* @return computer parallelized by given pool
- * @throws IllegalArgumentException if maximumPoolSize < 2; or corePoolSize < 2 for fixed-size pool
- * ; or the pool is shut down
- * @throws NullPointerException pool is null
+ * @throws IllegalArgumentException if <tt>maximumPoolSize &lt; 2</tt>;
+ * or <tt>corePoolSize &lt; 2</tt> for fixed-size pool
+ * @throws IllegalStateException if the pool is already shut down
+ * @throws NullPointerException <tt>pool</tt> is null
*/
public static Computer classesAndMethods(ThreadPoolExecutor pool) {
return classesAndMethods(pool, 1);
}
/**
* Parallel computer for concurrent classes and methods.
- * Zero corePoolSize and maximumPoolSize greater than one is accepted.
+ * Zero <tt>corePoolSize</tt> and <tt>maximumPoolSize</tt> greater than one is accepted.
* @param pool unbounded or fixed-size thread pool.
* @param minConcurrentMethods 1 to (pool capacity - 1)
* @return computer parallelized by given pool
- * @throws IllegalArgumentException if maximumPoolSize < 2; or corePoolSize < 2 for fixed-size pool
- * ; or minConcurrentMethods < 1; or the pool is shut down
- * @throws NullPointerException pool is null
+ * @throws IllegalArgumentException if <tt>maximumPoolSize &lt; 2</tt>;
+ * or <tt>corePoolSize &lt; 2</tt> for fixed-size pool;
+ * or <tt>minConcurrentMethods &lt; 1<tt>
+ * @throws IllegalStateException if the pool is already shut down
+ * @throws NullPointerException <tt>pool</tt> is null
*/
public static Computer classesAndMethods(ThreadPoolExecutor pool, int minConcurrentMethods) {
return new ParallelComputer(pool, minConcurrentMethods);
}
+ /**
+ * Parallel computer for concurrent classes and methods.
+ * Zero <tt>corePoolSize</tt> and <tt>maximumPoolSize</tt> greater than one is accepted.
+ * @throws NullPointerException if a pool is null
+ */
public static Computer classesAndMethods(ExecutorService poolClasses, ExecutorService poolMethods) {
if (poolClasses == null) throw new NullPointerException("null classes executor");
if (poolMethods == null) throw new NullPointerException("null methods executor");
@@ -215,9 +262,26 @@ private Runner parallelize(final ParentRunner runner, final ExecutorService serv
= !isClassPool & fProvidedPools ? new ConcurrentLinkedQueue<Future<?>>() : null;
public void schedule(Runnable childStatement) {
+ if (fIsShutDown.get()) return;
if (isClassPool & fHasSinglePoll) fSinglePoolBalancer.acquireUninterruptibly();
- Future<?> f= service.submit(childStatement);
- if (!isClassPool & fProvidedPools) fTaskFutures.add(f);
+ if (fIsShutDown.get()) return;
+ try {
+ Future<?> f= service.submit(childStatement);
+ if (!isClassPool & fProvidedPools) fTaskFutures.add(f);
+ } catch (RejectedExecutionException e) {
+ /*external shut down*/
+ if (fIsShutDown.compareAndSet(false, true)) {//let other threads avoid submitting new tasks
+ if (fHasSinglePoll) {
+ //let dormant class-threads wake up and escape from balancer
+ fSinglePoolBalancer.release(fCountClasses);
+ }
+ if (fProvidedPools) {
+ //signal that the total num classes will not be reached
+ while (fClassesFinisher.getCount() > 0)
+ fClassesFinisher.countDown();
+ }
+ }
+ }
}
public void finished() {
@@ -255,10 +319,10 @@ public Runner getSuite(RunnerBuilder builder, java.lang.Class<?>[] classes)
//is unbounded or fixed-size single pool
maxConcurrentClasses= fSinglePoolMaxSize - fSinglePoolMinConcurrentMethods;
else maxConcurrentClasses= Math.max(1, fSinglePoolCoreSize - fSinglePoolMinConcurrentMethods);
- maxConcurrentClasses= Math.min(maxConcurrentClasses, countClasses);
+ maxConcurrentClasses= Math.min(maxConcurrentClasses, fCountClasses);
fSinglePoolBalancer= new Semaphore(maxConcurrentClasses);
}
- fClassesFinisher= fProvidedPools ? new CountDownLatch(countClasses) : null;
+ fClassesFinisher= fProvidedPools ? new CountDownLatch(fCountClasses) : null;
if (fParallelClasses) parallelize((ParentRunner) suite, threadPoolClasses(), true);
}
return suite;
@@ -269,7 +333,7 @@ protected Runner getRunner(RunnerBuilder builder, Class<?> testClass)
throws Throwable {
Runner runner = super.getRunner(builder, testClass);
if (canSchedule(runner)) {
- ++countClasses;
+ ++fCountClasses;//incremented without been AtomicInteger because not yet concurrent access
ParentRunner child= (ParentRunner) runner;
runner= fParallelMethods ? parallelize(child, threadPoolMethods(), false) : sequencer(child);
}
@@ -314,7 +378,11 @@ private void awaitClassFinished(Queue<Future<?>> children) {
child.get();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
- } catch (ExecutionException e) {/*fired in run-notifier*/}
+ } catch (ExecutionException e) {
+ /*cause fired in run-notifier*/
+ } catch (CancellationException e) {
+ /*cannot happen because not calling Future#cancel(boolean)*/
+ }
}
fClassesFinisher.countDown();
}
@@ -24,6 +24,7 @@
import org.junit.tests.experimental.parallel.ParallelClassTest;
import org.junit.tests.experimental.parallel.ParallelMethodTest;
import org.junit.tests.experimental.parallel.ParallelClassesAndMethodsTest;
+import org.junit.tests.experimental.parallel.ParallelComputerShutDownTest;
import org.junit.tests.experimental.rules.BlockJUnit4ClassRunnerOverrideTest;
import org.junit.tests.experimental.rules.ClassRulesTest;
import org.junit.tests.experimental.rules.ExpectedExceptionTest;
@@ -145,6 +146,7 @@
ParallelClassTest.class,
ParallelMethodTest.class,
ParallelClassesAndMethodsTest.class,
+ ParallelComputerShutDownTest.class,
ParentRunnerTest.class,
NameRulesTest.class,
ClassRulesTest.class,
Oops, something went wrong.

0 comments on commit f8af097

Please sign in to comment.