Skip to content

Commit

Permalink
fix in tests: Thread.yield() scheduled locked threads + improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Tibor Digana committed Dec 4, 2012
1 parent 99b436e commit 784d1d6
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 190 deletions.
107 changes: 48 additions & 59 deletions src/main/java/org/junit/experimental/ParallelComputer.java
Expand Up @@ -19,7 +19,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;

import org.junit.runner.Computer;
import org.junit.runner.Description;
import org.junit.runner.Runner;
Expand Down Expand Up @@ -102,12 +101,12 @@ public class ParallelComputer extends Computer {
//set if a pool is shut down externally
private final AtomicBoolean fIsShutDown;

// disables the callers Thread for scheduling purposes until all classes finished
// Used if fHasSinglePoll is set. Disables this Thread for scheduling purposes until all classes finished.
private volatile CountDownLatch fClassesFinisher;

// prevents thread resources exhaustion on classes when used with single pool => gives a chance to parallelize methods
// see fSinglePoolMinConcurrentMethods above
// used in #parallelize(): allows a number of parallel classes and methods
// used in #tryParallelize(): allows a number of parallel classes and methods
private volatile Semaphore fSinglePoolBalancer;

// fClassesFinisher is initialized with this value, see #getSuite() and #getParent()
Expand Down Expand Up @@ -175,9 +174,9 @@ private ParallelComputer(ExecutorService poolClasses, ExecutorService poolMethod
throw new IllegalArgumentException("min concurrent methods " + fSinglePoolMinConcurrentMethods + " should be >= 1");
if (fHasSinglePoll && fSinglePoolMinConcurrentMethods >= fSinglePoolMaxSize)
throw new IllegalArgumentException("min methods pool size should be less than max pool size");
fBeforeShutdown= new ConcurrentLinkedQueue<Description>();
fIsShutDown= new AtomicBoolean();
addDefaultShutdownHandler();
fBeforeShutdown= new ConcurrentLinkedQueue<Description>();
fIsShutDown= new AtomicBoolean(false);
}

public static Computer classes() {
Expand Down Expand Up @@ -265,56 +264,50 @@ public static ParallelComputer classesAndMethods(ExecutorService poolClasses, Ex
return new ParallelComputer(poolClasses, poolMethods);
}

private Runner sequencer(final ParentRunner runner) {
runner.setScheduler(new RunnerScheduler() {
public void schedule(Runnable childStatement) {
if (fIsShutDown.get()) return;
childStatement.run();
fBeforeShutdown.add(runner.getDescription());
}

public void finished() {
if (fClassesFinisher != null) fClassesFinisher.countDown();
}
});
return runner;
}

private Runner parallelize(final ParentRunner runner, final ExecutorService service, final boolean isClassPool) {
/**
* @return <tt>true</tt> if a parallel scheduler is set in the <tt>runner</tt>
*/
private boolean tryParallelize(final ParentRunner runner, final ExecutorService service, final boolean isClasses) {
runner.setScheduler(new RunnerScheduler() {
private final ConcurrentLinkedQueue<Future<?>> fTaskFutures
= !isClassPool & fProvidedPools ? new ConcurrentLinkedQueue<Future<?>>() : null;
private final ConcurrentLinkedQueue<Future<?>> fMethodsFutures
= !isClasses & fProvidedPools & fParallelMethods ? new ConcurrentLinkedQueue<Future<?>>() : null;

public void schedule(Runnable childStatement) {
if (fIsShutDown.get()) return;
if (isClassPool & fHasSinglePoll) fSinglePoolBalancer.acquireUninterruptibly();
if (isClasses & fHasSinglePoll) fSinglePoolBalancer.acquireUninterruptibly();
if (fIsShutDown.get()) return;
try {
Future<?> f= service.submit(childStatement);
fBeforeShutdown.add(runner.getDescription());
if (!isClassPool & fProvidedPools) fTaskFutures.add(f);
if (service == null) {
fBeforeShutdown.add(runner.getDescription());
childStatement.run();
} else {
Future<?> f= service.submit(childStatement);
if (!isClasses & fProvidedPools & fParallelMethods) fMethodsFutures.add(f);
fBeforeShutdown.add(runner.getDescription());
}
} catch (RejectedExecutionException e) {
/*after external shut down*/
shutdownQuietly(false);
}
}

public void finished() {
if (isClassPool) { //wait until all test cases finished
if (fProvidedPools) awaitClassesFinished();
if (isClasses) { //wait until all test cases finished
awaitClassesFinished();
tryFinish(service);
} else { //wait until the test case finished
if (fProvidedPools) {
awaitClassFinished(fTaskFutures);
awaitClassFinished(fMethodsFutures);
if (fHasSinglePoll) fSinglePoolBalancer.release();
} else tryFinish(service);
}
}
});
return runner;
return service != null;
}

private static void tryFinish(ExecutorService pool) {
if (pool == null) return;
try {
pool.shutdown();
pool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
Expand All @@ -324,8 +317,7 @@ private static void tryFinish(ExecutorService pool) {
}

@Override
public Runner getSuite(RunnerBuilder builder, Class<?>[] classes)
throws InitializationError {
public Runner getSuite(RunnerBuilder builder, Class<?>[] classes) throws InitializationError {
Runner suite= super.getSuite(builder, classes);
if (canSchedule(suite)) {
if (fHasSinglePoll) {
Expand All @@ -337,20 +329,18 @@ public Runner getSuite(RunnerBuilder builder, Class<?>[] classes)
maxConcurrentClasses= Math.min(maxConcurrentClasses, fCountClasses);
fSinglePoolBalancer= new Semaphore(maxConcurrentClasses);
}
fClassesFinisher= fProvidedPools ? new CountDownLatch(fCountClasses) : null;
if (fParallelClasses) parallelize((ParentRunner) suite, threadPoolClasses(), true);
fClassesFinisher= fHasSinglePoll ? new CountDownLatch(fCountClasses) : null;
tryParallelize((ParentRunner) suite, threadPoolClasses(), true);
}
return suite;
}

@Override
protected Runner getRunner(RunnerBuilder builder, Class<?> testClass)
throws Throwable {
protected Runner getRunner(RunnerBuilder builder, Class<?> testClass) throws Throwable {
Runner runner= super.getRunner(builder, testClass);
if (canSchedule(runner)) {
++fCountClasses;//incremented without been AtomicInteger because not yet concurrent access
ParentRunner child= (ParentRunner) runner;
runner= fParallelMethods ? parallelize(child, threadPoolMethods(), false) : sequencer(child);
tryParallelize((ParentRunner) runner, threadPoolMethods(), false);
}
return runner;
}
Expand Down Expand Up @@ -380,32 +370,34 @@ private boolean canSchedule(Runner runner) {
}

private ExecutorService threadPoolClasses() {
return fProvidedPools ? fPoolClasses : newThreadPoolClasses();
return !fProvidedPools && fParallelClasses ? newThreadPoolClasses() : fPoolClasses;
}

private ExecutorService threadPoolMethods() {
return fProvidedPools ? fPoolMethods : newThreadPoolMethods();
return !fProvidedPools && fParallelMethods ? newThreadPoolMethods() : fPoolMethods;
}

private void awaitClassFinished(Queue<Future<?>> children) {
for (Future<?> child : children) {
try {
child.get();
} catch (InterruptedException e) {
/*after called external ExecutorService#shutdownNow()*/
shutdownQuietly(true);
} catch (ExecutionException e) {
/*cause fired in run-notifier*/
} catch (CancellationException e) {
/*cannot happen because not calling Future#cancel(boolean)*/
private void awaitClassFinished(Queue<Future<?>> methodsFutures) {
if (methodsFutures != null) {//fParallelMethods is false
for (Future<?> methodFuture : methodsFutures) {
try {
methodFuture.get();
} catch (InterruptedException e) {
/*after called external ExecutorService#shutdownNow()*/
shutdownQuietly(true);
} catch (ExecutionException e) {
/*cause fired in run-notifier*/
} catch (CancellationException e) {
/*cannot happen because not calling Future#cancel(boolean)*/
}
}
}
fClassesFinisher.countDown();
if (fHasSinglePoll) fClassesFinisher.countDown();
}

private void awaitClassesFinished() {
try {
fClassesFinisher.await();
if (fHasSinglePoll) fClassesFinisher.await();
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
Expand All @@ -431,12 +423,10 @@ private void setDefaultShutdownHandler(Executor executor) {
*/
private void shutdownQuietly(boolean shutdownNow) {
try {
if (fIsShutDown.compareAndSet(false, true) && fCountClasses > 0) {//let other threads avoid submitting new tasks
if (fHasSinglePoll) {
if (fIsShutDown.compareAndSet(false, true)) {//let other threads avoid submitting new tasks
if (fHasSinglePoll && fCountClasses > 0) {
//let dormant class-threads wake up and escape from balancer
fSinglePoolBalancer.release(fCountClasses);
}
if (fProvidedPools) {
//cached a value in volatile field to the stack as a constant for faster reads
final CountDownLatch classesFinisher= fClassesFinisher;
//signals that the total num classes could not be reached
Expand Down Expand Up @@ -485,7 +475,6 @@ private final class ShutdownHandler implements RejectedExecutionHandler {

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (executor.isShutdown()) {
//keep using false in order to avoid calling pool's methods
shutdownQuietly(false);
}
fPoolHandler.rejectedExecution(r, executor);
Expand Down

0 comments on commit 784d1d6

Please sign in to comment.