Skip to content
Permalink
Browse files
work on shutdown logic; wait to close down pool until GIOPConnectionT…
…hreaded shutdowns have been run
  • Loading branch information
djencks committed May 26, 2015
1 parent 3105218 commit 4d93d897fa1cdb751727a9d2ac659c9782957ab1
Showing 7 changed files with 71 additions and 18 deletions.
@@ -23,6 +23,7 @@
import org.omg.SendingContext.CodeBase;

abstract public class GIOPConnection implements DowncallEmitter, UpcallReturn {
static final java.util.logging.Logger logger = java.util.logging.Logger.getLogger(GIOPConnection.class.getName());
// ----------------------------------------------------------------
// Inner classes
// ----------------------------------------------------------------
@@ -1440,8 +1441,10 @@ synchronized public boolean bidirConnection() {
public void setState(int newState) {
synchronized (this) {
if (state_ == newState
|| (state_ != State.Holding && newState < state_))
|| (state_ != State.Holding && newState < state_)) {
logger.fine("No state change from " +state_ + " to " + newState);
return;
}

//
// make sure to update the state since some of the actions
@@ -20,6 +20,8 @@
import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -168,6 +170,7 @@ protected void abortiveShutdown() {
.describeTransient(org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown),
org.apache.yoko.orb.OB.MinorCodes.MinorForcedShutdown,
org.omg.CORBA.CompletionStatus.COMPLETED_MAYBE), false);
arrive();

}

@@ -183,8 +186,11 @@ synchronized protected void gracefulShutdown() {
//
// don't shutdown if there are pending upcalls
//
if (upcallsInProgress_ > 0 || state_ != State.Closing)
if (upcallsInProgress_ > 0 || state_ != State.Closing) {
logger.fine("pending upcalls: " + upcallsInProgress_ + " state: " + state_);

return;
}

//
// send a CloseConnection if we can
@@ -205,10 +211,12 @@ synchronized protected void gracefulShutdown() {
org.omg.GIOP.MsgType_1_1.CloseConnection, false, 0);

messageQueue_.add(orbInstance_, out._OB_buffer());
} else {
logger.fine("could not send close connection message");
}

//
// now create the startup thread
// now create the shutdown thread
//
try {
if (shuttingDown)
@@ -218,15 +226,29 @@ synchronized protected void gracefulShutdown() {
//
// start the shutdown thread
//
getExecutor().submit(new Shutdown());
try {
getExecutor().submit(new Shutdown());
} catch (RejectedExecutionException ree) {
logger.log(Level.WARNING, "Could not submit shutdown task", ree);
}
} catch (OutOfMemoryError ex) {
processException(State.Closed, new org.omg.CORBA.IMP_LIMIT(
org.apache.yoko.orb.OB.MinorCodes.describeImpLimit(org.apache.yoko.orb.OB.MinorCodes.MinorThreadLimit),
org.apache.yoko.orb.OB.MinorCodes.MinorThreadLimit,
org.omg.CORBA.CompletionStatus.COMPLETED_NO), false);
} finally {
arrive();
}
}


private void arrive() {
if ((properties_ & Property.CreatedByClient) != 0)
orbInstance_.getClientPhaser().arrive();
else
orbInstance_.getServerPhaser().arrive();
}

// ----------------------------------------------------------------
// Public Methods
// ----------------------------------------------------------------
@@ -237,6 +259,7 @@ synchronized protected void gracefulShutdown() {
public GIOPConnectionThreaded(ORBInstance orbInstance,
org.apache.yoko.orb.OCI.Transport transport, GIOPClient client) {
super(orbInstance, transport, client);
orbInstance.getClientPhaser().register();
start();
}

@@ -246,13 +269,14 @@ public GIOPConnectionThreaded(ORBInstance orbInstance,
public GIOPConnectionThreaded(ORBInstance orbInstance,
org.apache.yoko.orb.OCI.Transport transport, OAInterface oa) {
super(orbInstance, transport, oa);
orbInstance.getServerPhaser().register();
}

private ExecutorService getExecutor() {
if ((properties_ & Property.CreatedByClient) != 0)
return orbInstance_.getClientExecutor();
else
return orbInstance_.getServerExecutor();
return orbInstance_.getClientExecutor();
else
return orbInstance_.getServerExecutor();
}

//
@@ -17,11 +17,8 @@

package org.apache.yoko.orb.OB;

import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.yoko.orb.PortableServer.*;

abstract class GIOPServerStarter {
static final Logger logger = Logger.getLogger(GIOPServerStarter.class.getName());

@@ -18,6 +18,7 @@
package org.apache.yoko.orb.OB;

import java.util.concurrent.ExecutorService;
import java.util.logging.Level;

final class GIOPServerStarterThreaded extends GIOPServerStarter {
//
@@ -237,11 +238,13 @@ public void starterRun() {
// StateClosing for proper connection shutdown
//
Assert._OB_assert(state_ == StateClosed);

logger.fine("Processing an inbound connection because state is closed");
GIOPConnection connection = new GIOPConnectionThreaded(
orbInstance_, transport, oaInterface_);
logger.fine("Created connection " + connection);

connection.setState(GIOPConnection.State.Closing);
logger.fine("set connection state to closing");
}
} catch (org.omg.CORBA.SystemException ex) {
String msg = "can't accept connection\n" + ex.getMessage();
@@ -26,7 +26,10 @@

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;

import org.apache.yoko.orb.OBPortableServer.POAManagerFactory_impl;
import org.apache.yoko.orb.OBPortableServer.POA_impl;
@@ -110,7 +113,7 @@ private synchronized void completeServerShutdown() {
}

private void waitForServerThreads() {
shutdownExecutor(orbInstance_.getServerExecutor());
shutdownExecutor(orbInstance_.getServerPhaser(), orbInstance_.getServerExecutor());

//
// Get the DispatchStrategyFactory implementation and
@@ -417,7 +420,7 @@ public synchronized void shutdownServerClient() {
// Wait for all the threads in the client worker group to
// terminate
//
shutdownExecutor(orbInstance_.getClientExecutor());
shutdownExecutor(orbInstance_.getClientPhaser(), orbInstance_.getClientExecutor());
}

//
@@ -428,12 +431,16 @@ public synchronized void shutdownServerClient() {
notifyAll();
}

private void shutdownExecutor(ExecutorService executor) {
executor.shutdown();
private void shutdownExecutor(Phaser phaser, ExecutorService executor) {
int phase = phaser.arrive();//release the system's "lock"
//phaser advances after all GIOPConnectionThreaded have shut down (gracefully or abort)
try {
executor.awaitTermination(shutdownTimeout_, TimeUnit.SECONDS);
} catch (InterruptedException e) {
phaser.awaitAdvanceInterruptibly(phase, shutdownTimeout_, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
} catch (TimeoutException e) {
} finally {
phaser.forceTermination();
}
executor.shutdownNow();
}
@@ -17,9 +17,15 @@

package org.apache.yoko.orb.OB;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.yoko.orb.OB.BootManager;
import org.apache.yoko.orb.OB.DispatchStrategyFactory;
@@ -93,8 +99,10 @@ public final class ORBInstance {
private RecursiveMutex orbSyncMutex_ = new RecursiveMutex();

private ExecutorService serverExecutor_;
private Phaser serverPhaser = new Phaser(1);

private ExecutorService clientExecutor_;
private Phaser clientPhaser = new Phaser(1);

private org.apache.yoko.orb.OCI.ConFactoryRegistry conFactoryRegistry_;

@@ -418,9 +426,17 @@ public ExecutorService getServerExecutor() {
return serverExecutor_;
}

public Phaser getServerPhaser() {
return serverPhaser;
}

public ExecutorService getClientExecutor() {
return clientExecutor_;
}

public Phaser getClientPhaser() {
return clientPhaser;
}

public org.apache.yoko.orb.OCI.ConFactoryRegistry getConFactoryRegistry() {
return conFactoryRegistry_;
@@ -468,4 +484,5 @@ public boolean extendedWchar() {
public OrbAsyncHandler getAsyncHandler() {
return asyncHandler_;
}

}
@@ -71,7 +71,7 @@ public int handle() {
}

public void close() {
logger.fine("Closing connection to host=" + localAddress_ + ", port=" + port_);
logger.log(Level.FINE, "Closing server socket with host=" + localAddress_ + ", port=" + port_, new Exception("Stack trace"));
//
// Destroy the info object
//
@@ -83,7 +83,9 @@ public void close() {
try {
socket_.close();
socket_ = null;
logger.log(Level.FINE, "Closed server socket with host=" + localAddress_ + ", port=" + port_);
} catch (java.io.IOException ex) {
logger.log(Level.FINE, "Exception closing server socket with host=" + localAddress_ + ", port=" + port_, ex);
}
}

0 comments on commit 4d93d89

Please sign in to comment.