Permalink
Browse files

completed shutdown/shutdownNow

  • Loading branch information...
1 parent f8af097 commit 99b436e5f64d8cc04463fe06599b46d18a9dac32 @Tibor17 committed Nov 28, 2012
@@ -14,9 +14,14 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Queue;
import org.junit.runner.Computer;
+import org.junit.runner.Description;
import org.junit.runner.Runner;
import org.junit.runners.ParentRunner;
import org.junit.runners.model.InitializationError;
@@ -92,6 +97,11 @@
private final ExecutorService fPoolClasses;
private final ExecutorService fPoolMethods;
+ private final ConcurrentLinkedQueue<Description> fBeforeShutdown;
+
+ //set if a pool is shut down externally
+ private final AtomicBoolean fIsShutDown;
+
// disables the callers Thread for scheduling purposes until all classes finished
private volatile CountDownLatch fClassesFinisher;
@@ -100,14 +110,18 @@
// used in #parallelize(): allows a number of parallel classes and methods
private volatile Semaphore fSinglePoolBalancer;
- //set if a pool is shut down externally
- private final AtomicBoolean fIsShutDown= new AtomicBoolean();
-
// fClassesFinisher is initialized with this value, see #getSuite() and #getParent()
private volatile int fCountClasses;
+ /**
+ * @deprecated As of JUnit 4.12, replace by {@link #methods()}, {@link #classes()}
+ * and {@link #classesAndMethodsUnbounded()}.
+ */
@Deprecated//should be private
public ParallelComputer(boolean classes, boolean methods) {
+ fClassesFinisher= null;
+ fSinglePoolBalancer= null;
+ fCountClasses= 0;//to satisfy JVM spec -write operation first on volatile fields
fParallelClasses= classes;
fParallelMethods= methods;
fPoolClasses= null;
@@ -117,6 +131,8 @@ public ParallelComputer(boolean classes, boolean methods) {
fSinglePoolCoreSize= -1;
fSinglePoolMaxSize= -1;
fSinglePoolMinConcurrentMethods= -1;
+ fBeforeShutdown= null;
+ fIsShutDown= null;
}
private ParallelComputer(ExecutorService poolClasses, ExecutorService poolMethods) {
@@ -130,6 +146,9 @@ private ParallelComputer(ThreadPoolExecutor pool, int minConcurrentMethods) {
private ParallelComputer(ExecutorService poolClasses, ExecutorService poolMethods,
int singlePoolCoreSize, int singlePoolMaxSize,
int minConcurrentMethods) {
+ fClassesFinisher= null;
+ fSinglePoolBalancer= null;
+ fCountClasses= 0;//to satisfy JVM spec -write operation first on volatile fields
if (poolClasses == null && poolMethods == null)
throw new NullPointerException("null classes/methods executor");
if (poolClasses != null && poolClasses.isShutdown())
@@ -156,6 +175,9 @@ private ParallelComputer(ExecutorService poolClasses, ExecutorService poolMethod
throw new IllegalArgumentException("min concurrent methods " + fSinglePoolMinConcurrentMethods + " should be >= 1");
if (fHasSinglePoll && fSinglePoolMinConcurrentMethods >= fSinglePoolMaxSize)
throw new IllegalArgumentException("min methods pool size should be less than max pool size");
+ fBeforeShutdown= new ConcurrentLinkedQueue<Description>();
+ fIsShutDown= new AtomicBoolean();
+ addDefaultShutdownHandler();
}
public static Computer classes() {
@@ -166,12 +188,12 @@ public static Computer methods() {
return new ParallelComputer(false, true);
}
- public static Computer methods(ExecutorService pool) {
+ public static ParallelComputer methods(ExecutorService pool) {
if (pool == null) throw new NullPointerException("null methods executor");
return new ParallelComputer(null, pool);
}
- public static Computer classes(ExecutorService pool) {
+ public static ParallelComputer classes(ExecutorService pool) {
if (pool == null) throw new NullPointerException("null classes executor");
return new ParallelComputer(pool, null);
}
@@ -180,7 +202,7 @@ public static Computer classes(ExecutorService pool) {
* Parallel computer with infinitive thread pool size.
* @return parallel computer
*/
- public static Computer classesAndMethodsUnbounded() {
+ public static ParallelComputer classesAndMethodsUnbounded() {
ThreadPoolExecutor pool= new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
@@ -193,7 +215,7 @@ public static Computer classesAndMethodsUnbounded() {
* @return parallel computer with <tt>nThreads</tt>
* @throws IllegalArgumentException if <tt>nThreads &lt; 2</tt>
*/
- public static Computer classesAndMethodsBounded(int nThreads) {
+ public static ParallelComputer classesAndMethodsBounded(int nThreads) {
ThreadPoolExecutor pool= new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
@@ -210,7 +232,7 @@ public static Computer classesAndMethodsBounded(int nThreads) {
* @throws IllegalStateException if the pool is already shut down
* @throws NullPointerException <tt>pool</tt> is null
*/
- public static Computer classesAndMethods(ThreadPoolExecutor pool) {
+ public static ParallelComputer classesAndMethods(ThreadPoolExecutor pool) {
return classesAndMethods(pool, 1);
}
@@ -226,7 +248,7 @@ public static Computer classesAndMethods(ThreadPoolExecutor pool) {
* @throws IllegalStateException if the pool is already shut down
* @throws NullPointerException <tt>pool</tt> is null
*/
- public static Computer classesAndMethods(ThreadPoolExecutor pool, int minConcurrentMethods) {
+ public static ParallelComputer classesAndMethods(ThreadPoolExecutor pool, int minConcurrentMethods) {
return new ParallelComputer(pool, minConcurrentMethods);
}
@@ -235,18 +257,20 @@ public static Computer classesAndMethods(ThreadPoolExecutor pool, int minConcurr
* Zero <tt>corePoolSize</tt> and <tt>maximumPoolSize</tt> greater than one is accepted.
* @throws NullPointerException if a pool is null
*/
- public static Computer classesAndMethods(ExecutorService poolClasses, ExecutorService poolMethods) {
+ public static ParallelComputer classesAndMethods(ExecutorService poolClasses, ExecutorService poolMethods) {
if (poolClasses == null) throw new NullPointerException("null classes executor");
if (poolMethods == null) throw new NullPointerException("null methods executor");
if (poolClasses == poolMethods)
throw new IllegalArgumentException("instead call #classesAndMethods(ThreadPoolExecutor, int)");
return new ParallelComputer(poolClasses, poolMethods);
}
- private Runner sequencer(ParentRunner runner) {
+ private Runner sequencer(final ParentRunner runner) {
runner.setScheduler(new RunnerScheduler() {
public void schedule(Runnable childStatement) {
+ if (fIsShutDown.get()) return;
childStatement.run();
+ fBeforeShutdown.add(runner.getDescription());
}
public void finished() {
@@ -267,20 +291,11 @@ public void schedule(Runnable childStatement) {
if (fIsShutDown.get()) return;
try {
Future<?> f= service.submit(childStatement);
+ fBeforeShutdown.add(runner.getDescription());
if (!isClassPool & fProvidedPools) fTaskFutures.add(f);
} catch (RejectedExecutionException e) {
- /*external shut down*/
- if (fIsShutDown.compareAndSet(false, true)) {//let other threads avoid submitting new tasks
- if (fHasSinglePoll) {
- //let dormant class-threads wake up and escape from balancer
- fSinglePoolBalancer.release(fCountClasses);
- }
- if (fProvidedPools) {
- //signal that the total num classes will not be reached
- while (fClassesFinisher.getCount() > 0)
- fClassesFinisher.countDown();
- }
- }
+ /*after external shut down*/
+ shutdownQuietly(false);
}
}
@@ -309,9 +324,9 @@ private static void tryFinish(ExecutorService pool) {
}
@Override
- public Runner getSuite(RunnerBuilder builder, java.lang.Class<?>[] classes)
+ public Runner getSuite(RunnerBuilder builder, Class<?>[] classes)
throws InitializationError {
- Runner suite = super.getSuite(builder, classes);
+ Runner suite= super.getSuite(builder, classes);
if (canSchedule(suite)) {
if (fHasSinglePoll) {
int maxConcurrentClasses;
@@ -331,7 +346,7 @@ public Runner getSuite(RunnerBuilder builder, java.lang.Class<?>[] classes)
@Override
protected Runner getRunner(RunnerBuilder builder, Class<?> testClass)
throws Throwable {
- Runner runner = super.getRunner(builder, testClass);
+ Runner runner= super.getRunner(builder, testClass);
if (canSchedule(runner)) {
++fCountClasses;//incremented without been AtomicInteger because not yet concurrent access
ParentRunner child= (ParentRunner) runner;
@@ -377,7 +392,8 @@ private void awaitClassFinished(Queue<Future<?>> children) {
try {
child.get();
} catch (InterruptedException e) {
- e.printStackTrace(System.err);
+ /*after called external ExecutorService#shutdownNow()*/
+ shutdownQuietly(true);
} catch (ExecutionException e) {
/*cause fired in run-notifier*/
} catch (CancellationException e) {
@@ -394,4 +410,85 @@ private void awaitClassesFinished() {
e.printStackTrace(System.err);
}
}
+
+ private void addDefaultShutdownHandler() {
+ if (fProvidedPools) {
+ if (fParallelClasses) setDefaultShutdownHandler(fPoolClasses);
+ if (!fHasSinglePoll && fParallelMethods) setDefaultShutdownHandler(fPoolMethods);
+ }
+ }
+
+ private void setDefaultShutdownHandler(Executor executor) {
+ if (executor instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor pool= (ThreadPoolExecutor) executor;
+ RejectedExecutionHandler poolHandler= pool.getRejectedExecutionHandler();
+ pool.setRejectedExecutionHandler(new ShutdownHandler(poolHandler));
+ }
+ }
+
+ /**
+ * @param shutdownNow if <tt>false</tt> don't call pool's methods
+ */
+ private void shutdownQuietly(boolean shutdownNow) {
+ try {
+ if (fIsShutDown.compareAndSet(false, true) && fCountClasses > 0) {//let other threads avoid submitting new tasks
+ if (fHasSinglePoll) {
+ //let dormant class-threads wake up and escape from balancer
+ fSinglePoolBalancer.release(fCountClasses);
+ }
+ if (fProvidedPools) {
+ //cached a value in volatile field to the stack as a constant for faster reads
+ final CountDownLatch classesFinisher= fClassesFinisher;
+ //signals that the total num classes could not be reached
+ while (classesFinisher.getCount() > 0)
+ classesFinisher.countDown();
+ }
+ }
+
+ if (fProvidedPools) {
+ if (fParallelClasses) shutdown(fPoolClasses, shutdownNow);
+ if (!fHasSinglePoll && fParallelMethods) shutdown(fPoolMethods, shutdownNow);
+ }
+ } catch (Throwable t) {
+ //may be only OOM, security and permission exceptions
+ t.printStackTrace(System.err);
+ }
+ }
+
+ private static void shutdown(ExecutorService pool, boolean shutdownNow) {
+ if (shutdownNow) pool.shutdownNow();
+ else pool.shutdown();
+ }
+
+ /**
+ * Attempts to stop all actively executing tasks and immediately returns a collection
+ * of descriptions of those tasks which have completed prior to this call.
+ * <p>
+ * If <tt>shutdownNow</tt> is set, waiting methods will cancel via {@link Thread#interrupt}.
+ *
+ * @param shutdownNow if <tt>true</tt> interrupts waiting methods
+ * @return collection of recent descriptions
+ * @throws IllegalStateException if created by {@link #classes()} or {@link #methods()}
+ */
+ public final Collection<Description> shutdown(boolean shutdownNow) {
+ if (!fProvidedPools) throw new IllegalStateException();
+ shutdownQuietly(shutdownNow);
+ return new ArrayList<Description>(fBeforeShutdown);
+ }
+
+ private final class ShutdownHandler implements RejectedExecutionHandler {
+ private final RejectedExecutionHandler fPoolHandler;
+
+ ShutdownHandler(RejectedExecutionHandler poolHandler) {
+ fPoolHandler= poolHandler;
+ }
+
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ if (executor.isShutdown()) {
+ //keep using false in order to avoid calling pool's methods
+ shutdownQuietly(false);
+ }
+ fPoolHandler.rejectedExecution(r, executor);
+ }
+ }
}
Oops, something went wrong.

0 comments on commit 99b436e

Please sign in to comment.