Skip to content
Permalink
Browse files
use internal ExecutorServices instead of thread groups in ORBInstance…
… to remove client deadlock due to expecting threads ending to notify their thread group
  • Loading branch information
djencks committed May 26, 2015
1 parent f6e0625 commit d33611df9211e59e8bb282ce1bd29ad7d22f814e
Showing 4 changed files with 108 additions and 241 deletions.
@@ -17,80 +17,53 @@

package org.apache.yoko.orb.OB;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class GIOPConnectionThreaded extends GIOPConnection {
static final Logger logger = Logger.getLogger(GIOPConnectionThreaded.class.getName());

// ----------------------------------------------------------------
// Inner helper classes
// ----------------------------------------------------------------

//
// thread to handle connection shutdown
//
public final class ShutdownThread extends Thread {
private GIOPConnectionThreaded parent_;

ShutdownThread(ThreadGroup group, GIOPConnectionThreaded parent) {
super(group, "Yoko:GIOPConnectionThreaded:ShutdownThread");
parent_ = parent;
}
public final class Shutdown implements Runnable {

public void run() {
try {
parent_.execShutdown();
execShutdown();
} catch (RuntimeException ex) {
Assert._OB_assert(ex);
}

//
// break cyclic dependency with parent
//
parent_ = null;
}
}

//
// thread to handle reception of messages
//
public final class ReceiverThread extends Thread {
private GIOPConnectionThreaded parent_;

ReceiverThread(ThreadGroup group, GIOPConnectionThreaded parent) {
super(group, "Yoko:GIOPConnectionThreaded:ReceiverThread");
parent_ = parent;
}

public final class Receiver implements Runnable {

public void run() {
try {
parent_.execReceive();
execReceive();
} catch (RuntimeException ex) {
Assert._OB_assert(ex);
} finally {
if (receiverCount.decrementAndGet() ==0) {
synchronized(receiverCount) {
receiverCount.notifyAll();
}
}
}

//
// break cyclic dependency with parent
//
parent_ = null;
}
}

// ----------------------------------------------------------------
// Member data
// ----------------------------------------------------------------
//

//
// the shutdown thread handle
//
protected Thread shutdownThread_ = null;

//
// the list of receiver threads
//
protected java.util.LinkedList receiverThreads_ = new java.util.LinkedList();

//
// the holding monitor to pause the receiver threads
//
@@ -105,6 +78,9 @@ public void run() {
// sending mutex to prevent multiple threads from sending at once
//
protected java.lang.Object sendMutex_ = new java.lang.Object();

private boolean shuttingDown;
private AtomicInteger receiverCount = new AtomicInteger();

// ----------------------------------------------------------------
// Protected Methods
@@ -115,45 +91,9 @@ public void run() {
// Assumes 'this' is synchronized on entry
//
protected void addReceiverThread() {
//
// Retrieve the thread group
//
ThreadGroup group;
if ((properties_ & Property.CreatedByClient) != 0) {
group = orbInstance_.getClientWorkerGroup();
}
else {
group = orbInstance_.getServerWorkerGroup();
}

//
// Start receiver thread
//
Thread thr = new ReceiverThread(group, this);
thr.setDaemon(true);
thr.start();

//
// add the thread to our list of threads
//
receiverThreads_.addLast(thr);
getExecutor().submit(new Receiver());
}

//
// clean up any dead receiver threads
// assumes 'this' is synchronized on entry
//
protected void cleanupDeadReceiverThreads() {
java.util.ListIterator i = receiverThreads_.listIterator(0);

while (i.hasNext()) {
Thread thr = (Thread) i.next();

if (!thr.isAlive()) {
i.remove();
}
}
}

//
// pause a thread on a holding monitor if turned on
@@ -272,24 +212,14 @@ synchronized protected void gracefulShutdown() {
// now create the startup thread
//
try {
if (shutdownThread_ != null)
if (shuttingDown)
return;

//
// Retrieve the thread group
//
ThreadGroup group;
if ((properties_ & Property.CreatedByClient) != 0)
group = orbInstance_.getClientWorkerGroup();
else
group = orbInstance_.getServerWorkerGroup();

shuttingDown = true;
//
// start the shutdown thread
//
shutdownThread_ = new ShutdownThread(group, this);
shutdownThread_.setDaemon(true);
shutdownThread_.start();
getExecutor().submit(new Shutdown());
} catch (OutOfMemoryError ex) {
processException(State.Closed, new org.omg.CORBA.IMP_LIMIT(
org.apache.yoko.orb.OB.MinorCodes.describeImpLimit(org.apache.yoko.orb.OB.MinorCodes.MinorThreadLimit),
@@ -318,6 +248,13 @@ public GIOPConnectionThreaded(ORBInstance orbInstance,
org.apache.yoko.orb.OCI.Transport transport, OAInterface oa) {
super(orbInstance, transport, oa);
}

private ExecutorService getExecutor() {
if ((properties_ & Property.CreatedByClient) != 0)
return orbInstance_.getClientExecutor();
else
return orbInstance_.getServerExecutor();
}

//
// called from the shutdown thread to initiate shutdown
@@ -358,35 +295,20 @@ public void execShutdown() {
// Shutdown the receiver threads. There may not be a receiver
// thread if the transport is SendOnly.
//
if (transport_.mode() == org.apache.yoko.orb.OCI.SendReceiveMode.SendReceive
|| transport_.mode() == org.apache.yoko.orb.OCI.SendReceiveMode.ReceiveOnly) {
int timeout = shutdownTimeout_ * 1000;

synchronized (this) {
java.util.ListIterator i = receiverThreads_.listIterator();

while (i.hasNext()) {
Thread t = (Thread) i.next();

try {
if (timeout > 0) {
t.join(timeout);
}
else {
t.join();
}
} catch (InterruptedException ex) {
continue;
}

i.remove();
synchronized (receiverCount) {
while (receiverCount.get() > 0) {
try {
receiverCount.wait(shutdownTimeout_);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}

//
// We now close the connection actively, since it may still be
// open under certain circumstances. For example, the reciver
// open under certain circumstances. For example, the receiver
// thread may not have terminated yet or the receive thread might
// set the state to GIOPState::Error before termination.
//
@@ -514,9 +436,8 @@ public void execReceive() {
// case)
//
if (haveBidirSCL) {
synchronized (this) {
addReceiverThread();
}
receiverCount.incrementAndGet();
addReceiverThread();
}

upcall.invoke();
@@ -814,11 +735,7 @@ public void start() {
//
if (transport_.mode() != org.apache.yoko.orb.OCI.SendReceiveMode.SendOnly) {
try {
synchronized (this) {
if (receiverThreads_.size() > 0) {
return;
}

if (receiverCount.compareAndSet(0, 1)) {
addReceiverThread();
}
} catch (OutOfMemoryError ex) {
@@ -851,10 +768,6 @@ public void refresh() {
}

synchronized (this) {
//
// cleanup any defunct receiver threads now
//
cleanupDeadReceiverThreads();

//
// if we can't write messages then don't bother to proceed
@@ -17,21 +17,17 @@

package org.apache.yoko.orb.OB;

import java.util.concurrent.ExecutorService;

final class GIOPServerStarterThreaded extends GIOPServerStarter {
//
// The starter thread
//
protected final class StarterThread extends Thread {
private GIOPServerStarterThreaded starter_;

StarterThread(ThreadGroup group, GIOPServerStarterThreaded starter) {
super(group, "Yoko:Server:StarterThread");
starter_ = starter;
}
protected final class Starter implements Runnable {

public void run() {
try {
starter_.starterRun();
starterRun();
} catch (RuntimeException ex) {
Assert._OB_assert(ex);
}
@@ -41,8 +37,8 @@ public void run() {
// Shutdown the acceptor so that no further connections are
// accepted
//
starter_.logCloseAcceptor();
starter_.acceptor_.shutdown();
logCloseAcceptor();
acceptor_.shutdown();

//
// Accept all connections which might have queued up in the
@@ -51,7 +47,7 @@ public void run() {
org.apache.yoko.orb.OCI.Transport transport = null;

try {
transport = starter_.acceptor_.accept(false);
transport = acceptor_.accept(false);
} catch (org.omg.CORBA.SystemException ex) {
}

@@ -62,8 +58,8 @@ public void run() {

try {
GIOPConnection connection = new GIOPConnectionThreaded(
starter_.orbInstance_, transport,
starter_.oaInterface_);
orbInstance_, transport,
oaInterface_);

connection.setState(GIOPConnection.State.Closing);
} catch (org.omg.CORBA.SystemException ex) {
@@ -74,17 +70,11 @@ public void run() {
//
// Close the acceptor
//
starter_.acceptor_.close();
acceptor_.close();

//
// Break cyclic object dependency
//
starter_ = null;
}
}

protected Thread starterThread_;

// ----------------------------------------------------------------------
// GIOPServerStarterThreaded package member implementation
// ----------------------------------------------------------------------
@@ -98,14 +88,12 @@ public void run() {
//
// Retrieve the thread group for the servers
//
ThreadGroup group = orbInstance_.getServerWorkerGroup();
ExecutorService executor = orbInstance_.getServerExecutor();

//
// Start starter thread
//
starterThread_ = new StarterThread(group, this);
starterThread_.setDaemon(true);
starterThread_.start();
executor.submit(new Starter());
} catch (OutOfMemoryError ex) {
acceptor_.close();
state_ = StateClosed;

0 comments on commit d33611d

Please sign in to comment.