Skip to content
Permalink
Browse files
Merge branch 'bidir-deadlock' into 'ibm-trunk'
Bidir deadlock

See merge request !49
  • Loading branch information
ngmr committed May 27, 2015
2 parents f007558 + d48c200 commit 3d11ccd94ce456617efb42fed3dc32ea4a2b7f01
Showing 7 changed files with 188 additions and 259 deletions.
@@ -23,6 +23,7 @@
import org.omg.SendingContext.CodeBase;

abstract public class GIOPConnection implements DowncallEmitter, UpcallReturn {
static final java.util.logging.Logger logger = java.util.logging.Logger.getLogger(GIOPConnection.class.getName());
// ----------------------------------------------------------------
// Inner classes
// ----------------------------------------------------------------
@@ -1440,8 +1441,10 @@ synchronized public boolean bidirConnection() {
public void setState(int newState) {
synchronized (this) {
if (state_ == newState
|| (state_ != State.Holding && newState < state_))
|| (state_ != State.Holding && newState < state_)) {
logger.fine("No state change from " +state_ + " to " + newState);
return;
}

//
// make sure to update the state since some of the actions
@@ -17,80 +17,54 @@

package org.apache.yoko.orb.OB;

import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class GIOPConnectionThreaded extends GIOPConnection {
static final Logger logger = Logger.getLogger(GIOPConnectionThreaded.class.getName());

// ----------------------------------------------------------------
// Inner helper classes
// ----------------------------------------------------------------

//
// thread to handle connection shutdown
//
public final class ShutdownThread extends Thread {
private GIOPConnectionThreaded parent_;

ShutdownThread(ThreadGroup group, GIOPConnectionThreaded parent) {
super(group, "Yoko:GIOPConnectionThreaded:ShutdownThread");
parent_ = parent;
}
public final class Shutdown implements Runnable {

public void run() {
try {
parent_.execShutdown();
execShutdown();
} catch (RuntimeException ex) {
Assert._OB_assert(ex);
}

//
// break cyclic dependency with parent
//
parent_ = null;
}
}

//
// thread to handle reception of messages
//
public final class ReceiverThread extends Thread {
private GIOPConnectionThreaded parent_;

ReceiverThread(ThreadGroup group, GIOPConnectionThreaded parent) {
super(group, "Yoko:GIOPConnectionThreaded:ReceiverThread");
parent_ = parent;
public final class Receiver implements Runnable {
Receiver() {
receiverLock.readLock().lock();
}

public void run() {
try {
parent_.execReceive();
execReceive();
} catch (RuntimeException ex) {
Assert._OB_assert(ex);
} finally {
receiverLock.readLock().unlock();
}

//
// break cyclic dependency with parent
//
parent_ = null;
}
}

// ----------------------------------------------------------------
// Member data
// ----------------------------------------------------------------
//

//
// the shutdown thread handle
//
protected Thread shutdownThread_ = null;

//
// the list of receiver threads
//
protected java.util.LinkedList receiverThreads_ = new java.util.LinkedList();

//
// the holding monitor to pause the receiver threads
//
@@ -105,6 +79,9 @@ public void run() {
// sending mutex to prevent multiple threads from sending at once
//
protected java.lang.Object sendMutex_ = new java.lang.Object();

private boolean shuttingDown;
private final ReentrantReadWriteLock receiverLock = new ReentrantReadWriteLock(true);

// ----------------------------------------------------------------
// Protected Methods
@@ -115,45 +92,9 @@ public void run() {
// Assumes 'this' is synchronized on entry
//
protected void addReceiverThread() {
//
// Retrieve the thread group
//
ThreadGroup group;
if ((properties_ & Property.CreatedByClient) != 0) {
group = orbInstance_.getClientWorkerGroup();
}
else {
group = orbInstance_.getServerWorkerGroup();
}

//
// Start receiver thread
//
Thread thr = new ReceiverThread(group, this);
thr.setDaemon(true);
thr.start();

//
// add the thread to our list of threads
//
receiverThreads_.addLast(thr);
getExecutor().submit(new Receiver());
}

//
// clean up any dead receiver threads
// assumes 'this' is synchronized on entry
//
protected void cleanupDeadReceiverThreads() {
java.util.ListIterator i = receiverThreads_.listIterator(0);

while (i.hasNext()) {
Thread thr = (Thread) i.next();

if (!thr.isAlive()) {
i.remove();
}
}
}

//
// pause a thread on a holding monitor if turned on
@@ -229,6 +170,7 @@ protected void abortiveShutdown() {
.describeTransient(org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown),
org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown,
org.omg.CORBA.CompletionStatus.COMPLETED_MAYBE), false);
arrive();

}

@@ -244,8 +186,11 @@ synchronized protected void gracefulShutdown() {
//
// don't shutdown if there are pending upcalls
//
if (upcallsInProgress_ > 0 || state_ != State.Closing)
if (upcallsInProgress_ > 0 || state_ != State.Closing) {
logger.fine("pending upcalls: " + upcallsInProgress_ + " state: " + state_);

return;
}

//
// send a CloseConnection if we can
@@ -266,38 +211,44 @@ synchronized protected void gracefulShutdown() {
org.omg.GIOP.MsgType_1_1.CloseConnection, false, 0);

messageQueue_.add(orbInstance_, out._OB_buffer());
} else {
logger.fine("could not send close connection message");
}

//
// now create the startup thread
// now create the shutdown thread
//
try {
if (shutdownThread_ != null)
if (shuttingDown)
return;

//
// Retrieve the thread group
//
ThreadGroup group;
if ((properties_ & Property.CreatedByClient) != 0)
group = orbInstance_.getClientWorkerGroup();
else
group = orbInstance_.getServerWorkerGroup();

shuttingDown = true;
//
// start the shutdown thread
//
shutdownThread_ = new ShutdownThread(group, this);
shutdownThread_.setDaemon(true);
shutdownThread_.start();
try {
getExecutor().submit(new Shutdown());
} catch (RejectedExecutionException ree) {
logger.log(Level.WARNING, "Could not submit shutdown task", ree);
}
} catch (OutOfMemoryError ex) {
processException(State.Closed, new org.omg.CORBA.IMP_LIMIT(
org.apache.yoko.orb.OB.MinorCodes.describeImpLimit(org.apache.yoko.orb.OB.MinorCodes.MinorThreadLimit),
org.apache.yoko.orb.OB.MinorCodes.MinorThreadLimit,
org.omg.CORBA.CompletionStatus.COMPLETED_NO), false);
} finally {
arrive();
}
}


private void arrive() {
if ((properties_ & Property.CreatedByClient) != 0)
orbInstance_.getClientPhaser().arrive();
else
orbInstance_.getServerPhaser().arrive();
}

// ----------------------------------------------------------------
// Public Methods
// ----------------------------------------------------------------
@@ -308,6 +259,7 @@ synchronized protected void gracefulShutdown() {
public GIOPConnectionThreaded(ORBInstance orbInstance,
org.apache.yoko.orb.OCI.Transport transport, GIOPClient client) {
super(orbInstance, transport, client);
orbInstance.getClientPhaser().register();
start();
}

@@ -317,6 +269,14 @@ public GIOPConnectionThreaded(ORBInstance orbInstance,
public GIOPConnectionThreaded(ORBInstance orbInstance,
org.apache.yoko.orb.OCI.Transport transport, OAInterface oa) {
super(orbInstance, transport, oa);
orbInstance.getServerPhaser().register();
}

private ExecutorService getExecutor() {
if ((properties_ & Property.CreatedByClient) != 0)
return orbInstance_.getClientExecutor();
else
return orbInstance_.getServerExecutor();
}

//
@@ -351,49 +311,38 @@ public void execShutdown() {

//
// shutdown the transport
// synchronization on sendMutex_ is needed to avoid a deadlock in some oracle and ibm jdks between send and shutdown
// https://bugs.openjdk.java.net/browse/JDK-8013809 deadlock in SSLSocketImpl between between write and close
//
transport_.shutdown();
synchronized (sendMutex_) {
transport_.shutdown();
}

//
// Shutdown the receiver threads. There may not be a receiver
// thread if the transport is SendOnly.
//
if (transport_.mode() == org.apache.yoko.orb.OCI.SendReceiveMode.SendReceive
|| transport_.mode() == org.apache.yoko.orb.OCI.SendReceiveMode.ReceiveOnly) {
int timeout = shutdownTimeout_ * 1000;

synchronized (this) {
java.util.ListIterator i = receiverThreads_.listIterator();

while (i.hasNext()) {
Thread t = (Thread) i.next();

try {
if (timeout > 0) {
t.join(timeout);
}
else {
t.join();
}
} catch (InterruptedException ex) {
continue;
}
try {
receiverLock.writeLock().tryLock(shutdownTimeout_, SECONDS);
} catch (InterruptedException e) {
}

i.remove();
}
try {
//
// We now close the connection actively, since it may still be
// open under certain circumstances. For example, the receiver
// thread may not have terminated yet or the receive thread might
// set the state to GIOPState::Error before termination.
//
processException(State.Closed, new org.omg.CORBA.TRANSIENT(org.apache.yoko.orb.OB.MinorCodes
.describeTransient(org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown),
org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown,
org.omg.CORBA.CompletionStatus.COMPLETED_MAYBE), false);
} finally {
if (receiverLock.isWriteLockedByCurrentThread()) {
receiverLock.writeLock().unlock();
}
}

//
// We now close the connection actively, since it may still be
// open under certain circumstances. For example, the reciver
// thread may not have terminated yet or the receive thread might
// set the state to GIOPState::Error before termination.
//
processException(State.Closed, new org.omg.CORBA.TRANSIENT(org.apache.yoko.orb.OB.MinorCodes
.describeTransient(org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown),
org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown,
org.omg.CORBA.CompletionStatus.COMPLETED_MAYBE), false);
}

//
@@ -514,9 +463,7 @@ public void execReceive() {
// case)
//
if (haveBidirSCL) {
synchronized (this) {
addReceiverThread();
}
addReceiverThread();
}

upcall.invoke();
@@ -814,12 +761,15 @@ public void start() {
//
if (transport_.mode() != org.apache.yoko.orb.OCI.SendReceiveMode.SendOnly) {
try {
synchronized (this) {
if (receiverThreads_.size() > 0) {
return;
// If the write lock is obtainable there are no receivers outstanding.
// We can then add a receiver, which implicitly obtains a read lock.
// ReentrantReadWriteLock explicitly allows downgrading a write lock to a read lock.
if(receiverLock.writeLock().tryLock()) {
try {
addReceiverThread();
} finally {
receiverLock.writeLock().unlock();
}

addReceiverThread();
}
} catch (OutOfMemoryError ex) {
synchronized (this) {
@@ -851,10 +801,6 @@ public void refresh() {
}

synchronized (this) {
//
// cleanup any defunct receiver threads now
//
cleanupDeadReceiverThreads();

//
// if we can't write messages then don't bother to proceed

0 comments on commit 3d11ccd

Please sign in to comment.