Skip to content

Commit

Permalink
reformat
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/cassandra/branches/cassandra-0.7@1074010 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
jbellis committed Feb 24, 2011
1 parent 7ff4006 commit e79224c
Showing 1 changed file with 172 additions and 143 deletions.
315 changes: 172 additions & 143 deletions src/java/org/apache/cassandra/thrift/CustomTThreadPoolServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,151 +39,180 @@

/**
* Slightly modified version of the Apache Thrift TThreadPoolServer.
*
* <p/>
* This allows passing an executor so you have more control over the actual
* behaviour of the tasks being run.
*
* <p/>
* Newer version of Thrift should make this obsolete.
*/
public class CustomTThreadPoolServer extends TServer {

private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());

// Executor service for handling client connections
private ExecutorService executorService_;

// Flag for stopping the server
private volatile boolean stopped_;

// Server options
private Options options_;

// Customizable server options
public static class Options {
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
public int stopTimeoutVal = 60;
public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
}


public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
TServerSocket tServerSocket,
TTransportFactory inTransportFactory,
TTransportFactory outTransportFactory,
TProtocolFactory tProtocolFactory,
TProtocolFactory tProtocolFactory2,
Options options,
ExecutorService executorService) {

super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
tProtocolFactory, tProtocolFactory2);
options_ = options;
executorService_ = executorService;
}


public void serve() {
try {
serverTransport_.listen();
} catch (TTransportException ttx) {
LOGGER.error("Error occurred during listening.", ttx);
return;
}

stopped_ = false;
while (!stopped_) {
int failureCount = 0;
try {
TTransport client = serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);
executorService_.execute(wp);
} catch (TTransportException ttx) {
if (!stopped_) {
++failureCount;
LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
}
}
}

executorService_.shutdown();

// Loop until awaitTermination finally does return without a interrupted
// exception. If we don't do this, then we'll shut down prematurely. We want
// to let the executorService clear it's task queue, closing client sockets
// appropriately.
long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
break;
} catch (InterruptedException ix) {
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
now = newnow;
}
}
}

public void stop() {
stopped_ = true;
serverTransport_.interrupt();
}

private class WorkerProcess implements Runnable {

/**
* Client that this services.
*/
private TTransport client_;

/**
* Default constructor.
*
* @param client Transport to process
*/
private WorkerProcess(TTransport client) {
client_ = client;
}

/**
* Loops on processing a client forever
*/
public void run() {
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
try {
processor = processorFactory_.getProcessor(client_);
inputTransport = inputTransportFactory_.getTransport(client_);
outputTransport = outputTransportFactory_.getTransport(client_);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
// we check stopped_ first to make sure we're not supposed to be shutting
// down. this is necessary for graceful shutdown.
while (!stopped_ && processor.process(inputProtocol, outputProtocol))
{
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
}
} catch (TTransportException ttx) {
// Assume the client died and continue silently
} catch (TException tx) {
LOGGER.error("Thrift error occurred during processing of message.", tx);
} catch (Exception x) {
LOGGER.error("Error occurred during processing of message.", x);
}

if (inputTransport != null) {
inputTransport.close();
}

if (outputTransport != null) {
outputTransport.close();
}
}
}
public class CustomTThreadPoolServer extends TServer
{

private static final Logger LOGGER = LoggerFactory.getLogger(CustomTThreadPoolServer.class.getName());

// Executor service for handling client connections
private ExecutorService executorService_;

// Flag for stopping the server
private volatile boolean stopped_;

// Server options
private Options options_;

// Customizable server options
public static class Options
{
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
public int stopTimeoutVal = 60;
public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
}


public CustomTThreadPoolServer(TProcessorFactory tProcessorFactory,
TServerSocket tServerSocket,
TTransportFactory inTransportFactory,
TTransportFactory outTransportFactory,
TProtocolFactory tProtocolFactory,
TProtocolFactory tProtocolFactory2,
Options options,
ExecutorService executorService)
{

super(tProcessorFactory, tServerSocket, inTransportFactory, outTransportFactory,
tProtocolFactory, tProtocolFactory2);
options_ = options;
executorService_ = executorService;
}


public void serve()
{
try
{
serverTransport_.listen();
}
catch (TTransportException ttx)
{
LOGGER.error("Error occurred during listening.", ttx);
return;
}

stopped_ = false;
while (!stopped_)
{
int failureCount = 0;
try
{
TTransport client = serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);
executorService_.execute(wp);
}
catch (TTransportException ttx)
{
if (!stopped_)
{
++failureCount;
LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
}
}
}

executorService_.shutdown();

// Loop until awaitTermination finally does return without a interrupted
// exception. If we don't do this, then we'll shut down prematurely. We want
// to let the executorService clear it's task queue, closing client sockets
// appropriately.
long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0)
{
try
{
executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
break;
}
catch (InterruptedException ix)
{
long newnow = System.currentTimeMillis();
timeoutMS -= (newnow - now);
now = newnow;
}
}
}

public void stop()
{
stopped_ = true;
serverTransport_.interrupt();
}

private class WorkerProcess implements Runnable
{

/**
* Client that this services.
*/
private TTransport client_;

/**
* Default constructor.
*
* @param client Transport to process
*/
private WorkerProcess(TTransport client)
{
client_ = client;
}

/**
* Loops on processing a client forever
*/
public void run()
{
TProcessor processor = null;
TTransport inputTransport = null;
TTransport outputTransport = null;
TProtocol inputProtocol = null;
TProtocol outputProtocol = null;
try
{
processor = processorFactory_.getProcessor(client_);
inputTransport = inputTransportFactory_.getTransport(client_);
outputTransport = outputTransportFactory_.getTransport(client_);
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
// we check stopped_ first to make sure we're not supposed to be shutting
// down. this is necessary for graceful shutdown.
while (!stopped_ && processor.process(inputProtocol, outputProtocol))
{
inputProtocol = inputProtocolFactory_.getProtocol(inputTransport);
outputProtocol = outputProtocolFactory_.getProtocol(outputTransport);
}
}
catch (TTransportException ttx)
{
// Assume the client died and continue silently
}
catch (TException tx)
{
LOGGER.error("Thrift error occurred during processing of message.", tx);
}
catch (Exception x)
{
LOGGER.error("Error occurred during processing of message.", x);
}

if (inputTransport != null)
{
inputTransport.close();
}

if (outputTransport != null)
{
outputTransport.close();
}
}
}
}

0 comments on commit e79224c

Please sign in to comment.