Skip to content

Commit

Permalink
Fix a signaling bug in newDirectExecutorService() Also switch to impl…
Browse files Browse the repository at this point in the history
…icit

monitors to take advantage of biased locking.

If thread A calls awaitTermination while there are no running tasks, then
Thread B calls shutdown(). Thread A will never be woken up.
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=87280837
  • Loading branch information
lukesandberg authored and cpovirk committed Mar 2, 2015
1 parent 99a1407 commit 1a5b0b9
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 40 deletions.
Expand Up @@ -53,6 +53,7 @@
import org.mockito.InOrder; import org.mockito.InOrder;
import org.mockito.Mockito; import org.mockito.Mockito;


import java.lang.Thread.State;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
Expand Down Expand Up @@ -243,6 +244,51 @@ public Void call() throws Exception {
throwableFromOtherThread.get()); throwableFromOtherThread.get());
} }


/**
* Test for a bug where threads weren't getting signaled when shutdown was called, only when
* tasks completed.
*/

public void testDirectExecutorService_awaitTermination_missedSignal() {
final ExecutorService service = MoreExecutors.newDirectExecutorService();
Thread waiter = new Thread() {
@Override public void run() {
try {
service.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
return;
}
}
};
waiter.start();
awaitTimedWaiting(waiter);
service.shutdown();
Uninterruptibles.joinUninterruptibly(waiter, 10, TimeUnit.SECONDS);
if (waiter.isAlive()) {
waiter.interrupt();
fail("awaitTermination failed to trigger after shutdown()");
}
}

/** Wait for the given thread to reach the {@link State#TIMED_WAITING} thread state. */
void awaitTimedWaiting(Thread thread) {
while (true) {
switch (thread.getState()) {
case BLOCKED:
case NEW:
case RUNNABLE:
case WAITING:
Thread.yield();
break;
case TIMED_WAITING:
return;
case TERMINATED:
default:
throw new AssertionError();
}
}
}

public void testDirectExecutorService_shutdownNow() { public void testDirectExecutorService_shutdownNow() {
ExecutorService executor = newDirectExecutorService(); ExecutorService executor = newDirectExecutorService();
assertEquals(ImmutableList.of(), executor.shutdownNow()); assertEquals(ImmutableList.of(), executor.shutdownNow());
Expand Down
63 changes: 23 additions & 40 deletions guava/src/com/google/common/util/concurrent/MoreExecutors.java
Expand Up @@ -51,9 +51,8 @@
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock; import javax.annotation.concurrent.GuardedBy;
import java.util.concurrent.locks.ReentrantLock;


/** /**
* Factory and utility methods for {@link java.util.concurrent.Executor}, {@link * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
Expand Down Expand Up @@ -290,12 +289,9 @@ private static class DirectExecutorService
extends AbstractListeningExecutorService { extends AbstractListeningExecutorService {
/** /**
* Lock used whenever accessing the state variables * Lock used whenever accessing the state variables
* (runningTasks, shutdown, terminationCondition) of the executor * (runningTasks, shutdown) of the executor
*/ */
private final Lock lock = new ReentrantLock(); private final Object lock = new Object();

/** Signaled after the executor is shutdown and running tasks are done */
private final Condition termination = lock.newCondition();


/* /*
* Conceptually, these two variables describe the executor being in * Conceptually, these two variables describe the executor being in
Expand All @@ -304,8 +300,8 @@ private static class DirectExecutorService
* - Shutdown: runningTasks > 0 and shutdown == true * - Shutdown: runningTasks > 0 and shutdown == true
* - Terminated: runningTasks == 0 and shutdown == true * - Terminated: runningTasks == 0 and shutdown == true
*/ */
private int runningTasks = 0; @GuardedBy("lock") private int runningTasks = 0;
private boolean shutdown = false; @GuardedBy("lock") private boolean shutdown = false;


@Override @Override
public void execute(Runnable command) { public void execute(Runnable command) {
Expand All @@ -319,21 +315,18 @@ public void execute(Runnable command) {


@Override @Override
public boolean isShutdown() { public boolean isShutdown() {
lock.lock(); synchronized (lock) {
try {
return shutdown; return shutdown;
} finally {
lock.unlock();
} }
} }


@Override @Override
public void shutdown() { public void shutdown() {
lock.lock(); synchronized (lock) {
try {
shutdown = true; shutdown = true;
} finally { if (runningTasks == 0) {
lock.unlock(); lock.notifyAll();
}
} }
} }


Expand All @@ -346,31 +339,27 @@ public List<Runnable> shutdownNow() {


@Override @Override
public boolean isTerminated() { public boolean isTerminated() {
lock.lock(); synchronized (lock) {
try {
return shutdown && runningTasks == 0; return shutdown && runningTasks == 0;
} finally {
lock.unlock();
} }
} }


@Override @Override
public boolean awaitTermination(long timeout, TimeUnit unit) public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException { throws InterruptedException {
long nanos = unit.toNanos(timeout); long nanos = unit.toNanos(timeout);
lock.lock(); synchronized (lock) {
try {
for (;;) { for (;;) {
if (isTerminated()) { if (shutdown && runningTasks == 0) {
return true; return true;
} else if (nanos <= 0) { } else if (nanos <= 0) {
return false; return false;
} else { } else {
nanos = termination.awaitNanos(nanos); long now = System.nanoTime();
TimeUnit.NANOSECONDS.timedWait(lock, nanos);
nanos -= System.nanoTime() - now; // subtract the actual time we waited
} }
} }
} finally {
lock.unlock();
} }
} }


Expand All @@ -382,29 +371,23 @@ public boolean awaitTermination(long timeout, TimeUnit unit)
* shutdown * shutdown
*/ */
private void startTask() { private void startTask() {
lock.lock(); synchronized (lock) {
try { if (shutdown) {
if (isShutdown()) {
throw new RejectedExecutionException("Executor already shutdown"); throw new RejectedExecutionException("Executor already shutdown");
} }
runningTasks++; runningTasks++;
} finally {
lock.unlock();
} }
} }


/** /**
* Decrements the running task count. * Decrements the running task count.
*/ */
private void endTask() { private void endTask() {
lock.lock(); synchronized (lock) {
try { int numRunning = --runningTasks;
runningTasks--; if (numRunning == 0) {
if (isTerminated()) { lock.notifyAll();
termination.signalAll();
} }
} finally {
lock.unlock();
} }
} }
} }
Expand Down

0 comments on commit 1a5b0b9

Please sign in to comment.