Skip to content
Permalink
Browse files
Use ReadWriteLock for receiver threads / shutdown
  • Loading branch information
ngmr authored and djencks committed May 26, 2015
1 parent 7ddcd03 commit 31052189df9c44be0411dbfea2f49ad009d9818f
Showing 1 changed file with 36 additions and 31 deletions.
@@ -17,10 +17,10 @@

package org.apache.yoko.orb.OB;

import java.util.concurrent.Callable;
import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

@@ -44,18 +44,17 @@ public void run() {
}

public final class Receiver implements Runnable {
Receiver() {
receiverLock.readLock().lock();
}

public void run() {
try {
execReceive();
} catch (RuntimeException ex) {
Assert._OB_assert(ex);
} finally {
if (receiverCount.decrementAndGet() ==0) {
synchronized(receiverCount) {
receiverCount.notifyAll();
}
}
receiverLock.readLock().unlock();
}
}
}
@@ -80,7 +79,7 @@ public void run() {
protected java.lang.Object sendMutex_ = new java.lang.Object();

private boolean shuttingDown;
private AtomicInteger receiverCount = new AtomicInteger();
private final ReentrantReadWriteLock receiverLock = new ReentrantReadWriteLock(true);

// ----------------------------------------------------------------
// Protected Methods
@@ -299,27 +298,27 @@ public void execShutdown() {
// Shutdown the receiver threads. There may not be a receiver
// thread if the transport is SendOnly.
//
synchronized (receiverCount) {
while (receiverCount.get() > 0) {
try {
receiverCount.wait(shutdownTimeout_);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
try {
receiverLock.writeLock().tryLock(shutdownTimeout_, SECONDS);
} catch (InterruptedException e) {
}

//
// We now close the connection actively, since it may still be
// open under certain circumstances. For example, the receiver
// thread may not have terminated yet or the receive thread might
// set the state to GIOPState::Error before termination.
//
processException(State.Closed, new org.omg.CORBA.TRANSIENT(org.apache.yoko.orb.OB.MinorCodes
.describeTransient(org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown),
org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown,
org.omg.CORBA.CompletionStatus.COMPLETED_MAYBE), false);
try {
//
// We now close the connection actively, since it may still be
// open under certain circumstances. For example, the receiver
// thread may not have terminated yet or the receive thread might
// set the state to GIOPState::Error before termination.
//
processException(State.Closed, new org.omg.CORBA.TRANSIENT(org.apache.yoko.orb.OB.MinorCodes
.describeTransient(org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown),
org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown,
org.omg.CORBA.CompletionStatus.COMPLETED_MAYBE), false);
} finally {
if (receiverLock.isWriteLockedByCurrentThread()) {
receiverLock.writeLock().unlock();
}
}
}

//
@@ -440,7 +439,6 @@ public void execReceive() {
// case)
//
if (haveBidirSCL) {
receiverCount.incrementAndGet();
addReceiverThread();
}

@@ -739,8 +737,15 @@ public void start() {
//
if (transport_.mode() != org.apache.yoko.orb.OCI.SendReceiveMode.SendOnly) {
try {
if (receiverCount.compareAndSet(0, 1)) {
addReceiverThread();
// If the write lock is obtainable there are no receivers outstanding.
// We can then add a receiver, which implicitly obtains a read lock.
// ReentrantReadWriteLock explicitly allows downgrading a write lock to a read lock.
if(receiverLock.writeLock().tryLock()) {
try {
addReceiverThread();
} finally {
receiverLock.writeLock().unlock();
}
}
} catch (OutOfMemoryError ex) {
synchronized (this) {

0 comments on commit 3105218

Please sign in to comment.