Skip to content

Commit

Permalink
experiment with ReadWriteLock performance enhanced
Browse files Browse the repository at this point in the history
  • Loading branch information
dapengzhang0 committed Aug 7, 2017
1 parent 222c241 commit 63d7d2f
Showing 1 changed file with 83 additions and 13 deletions.
96 changes: 83 additions & 13 deletions core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -115,7 +115,7 @@ public final class ManagedChannelImpl extends ManagedChannel implements WithLogI

private final BackoffPolicy.Provider backoffPolicyProvider;

private final ReentrantReadWriteLock shutdownLock = new ReentrantReadWriteLock();
private final ReadWriteLock shutdownReadWriteLock = new ReadWriteLock();
private boolean shutdownWriteLockUsed;

/**
Expand Down Expand Up @@ -473,18 +473,23 @@ public ManagedChannelImpl shutdown() {
private void runShutdownTask() {
checkState(shutdown.get(), "shutdown must be true");

if (shutdownLock.writeLock().tryLock()) {
if (shutdownReadWriteLock.tryToAcquireWriteLock()) {
if (!shutdownWriteLockUsed) {
shutdownWriteLockUsed = true;
shutdownLock.writeLock().unlock();
shutdownReadWriteLock.releaseWriteLock();
// Continue the following shutdown process.
} else {
shutdownLock.writeLock().unlock();
shutdownReadWriteLock.releaseWriteLock();
return;
}
} else {
// Either the read or the write lock is currently acquired. Quit here, and let one of the read
// lock acquisitions or the write lock acquisition take over the following shutdown process.
return;
}

// The following is the shutdown process. It can be run only once.

// Put gotoState(SHUTDOWN) as early into the channelExecutor's queue as possible.
// delayedTransport.shutdown() may also add some tasks into the queue. But some things inside
// delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the
Expand Down Expand Up @@ -560,7 +565,7 @@ public String authority() {

private class RealChannel extends Channel {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
public <ReqT, RespT> ClientCall<ReqT, RespT> newCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions) {
Executor executor = callOptions.getExecutor();
if (executor == null) {
Expand All @@ -583,6 +588,57 @@ public String authority() {
}
}


/**
* Maintains a pair of associated locks. The read lock can be acquired many times as long as the
* write lock is not currently acquired. The write lock can be acquired only when no lock is
* currently acquired. Each acquisition of the locks must be released once and only once later.
* The lock is not reentrant. The lock is irrelevant to the threads in which you try to acquire or
* release.
*/
private static final class ReadWriteLock {
/**
* A positive number counts the times the read lock has been acquired, and Long.MIN_VALUE or a
* negative number close to Long.MIN_VALUE means a write lock is currently acquired.
*/
final AtomicLong readRef = new AtomicLong();

/**
* Try to acquire the read lock. Return true if succeeded; and false if not able to acquire it
* right now, meaning that the write lock is currently acquired. The method does not block.
*/
boolean tryToAcquireReadLock() {
return readRef.getAndIncrement() >= 0L;
}

/**
* Release an acquisition of the read lock.
*
* @throws IllegalStateException if the read lock is not currently acquired.
*/
void releaseReadLock() {
checkState(readRef.getAndDecrement() > 0L);
}

/**
* Try to acquire the write lock. Return true if succeeded; and false if not able to acquire it
* right now, meaning that either the read or the write lock is currently acquired. The method
* does not block.
*/
boolean tryToAcquireWriteLock() {
return readRef.compareAndSet(0L, Long.MIN_VALUE);
}

/**
* Release an acquisition of the write lock.
*
* @throws IllegalStateException if the write lock is not currently acquired.
*/
void releaseWriteLock() {
checkState(readRef.getAndSet(0L) < 0L);
}
}

private final class DirectFallbackExecutor implements Executor {
@Override
public void execute(final Runnable command) {
Expand All @@ -591,12 +647,18 @@ public void execute(final Runnable command) {
return;
}

shutdownLock.readLock().lock();
if (!shutdownReadWriteLock.tryToAcquireReadLock()) {
// The write lock is currently acquired, so shutdown is in progress.
command.run();
return;
}
if (shutdownWriteLockUsed) {
shutdownLock.readLock().unlock();
// The write lock had been acquired and then released, so shutdown is in progress.
shutdownReadWriteLock.releaseReadLock();
command.run();
return;
}

final boolean[] accepted = new boolean[1];
try {
final Thread thread = Thread.currentThread();
Expand All @@ -605,7 +667,7 @@ public void execute(final Runnable command) {
public void run() {
if (Thread.currentThread() == thread && !accepted[0]) {
accepted[0] = true;
shutdownLock.readLock().unlock();
shutdownReadWriteLock.releaseReadLock();
}
command.run();
}
Expand All @@ -616,9 +678,10 @@ public void run() {
} finally {
if (!accepted[0]) {
accepted[0] = true;
shutdownLock.readLock().unlock();
shutdownReadWriteLock.releaseReadLock();
}
if (shutdown.get()) {
// May take over the shutdown process.
runShutdownTask();
}
}
Expand All @@ -632,20 +695,27 @@ public Future<?> scheduleDeadlineCancellation(Runnable command, Deadline deadlin
return cancelledFuture(command);
}

shutdownLock.readLock().lock();
if (!shutdownReadWriteLock.tryToAcquireReadLock()) {
// The write lock is currently acquired, so shutdown is in progress.
command.run();
return cancelledFuture(command);
}
if (shutdownWriteLockUsed) {
shutdownLock.readLock().unlock();
// The write lock had been acquired and then released, so shutdown is in progress.
shutdownReadWriteLock.releaseReadLock();
return cancelledFuture(command);
}

try {
return transportFactory.getScheduledExecutorService()
.schedule(command, deadline.timeRemaining(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
} catch (RejectedExecutionException e) {
// TODO(zdapeng): Handle Exception properly
throw e;
} finally {
shutdownLock.readLock().unlock();
shutdownReadWriteLock.releaseReadLock();
if (shutdown.get()) {
// May take over the shutdown process.
runShutdownTask();
}
}
Expand Down

0 comments on commit 63d7d2f

Please sign in to comment.