Skip to content

Commit

Permalink
ftp: Use shared thread pool for various processing threads
Browse files Browse the repository at this point in the history
This patch makes the ftp door use the shared thread pool for
the I/O thread, for the message processing thread, and the
the callback thread.

The patch also updates AbstractCell to use the Cell's message
thread. This saves one additional thread instantiation for
each cell instance.

Target: trunk
Require-notes: no
Require-book: no
Acked-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
Patch: http://rb.dcache.org/r/6331/
  • Loading branch information
gbehrmann committed Dec 18, 2013
1 parent 8642945 commit 5273cfd
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 30 deletions.
Expand Up @@ -12,6 +12,8 @@
import java.io.StringWriter;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;

import dmg.cells.network.PingMessage;
import dmg.cells.services.RoutingManager;
Expand Down Expand Up @@ -421,6 +423,11 @@ public Reader getDomainContextReader(String contextName)
return _nucleus.getDomainContextReader(contextName);
}

protected <T> Future<T> invokeOnMessageThread(Callable<T> task)
{
return _nucleus.invokeOnMessageThread(task);
}

/**
* sends a <code>CellMessage</code> along the specified path.
*
Expand Down
26 changes: 21 additions & 5 deletions modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java
Expand Up @@ -18,6 +18,9 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
Expand Down Expand Up @@ -60,8 +63,9 @@ public class CellNucleus implements ThreadFactory
private final Map<UOID, CellLock> _waitHash = new HashMap<>();
private String _cellClass;

private volatile ThreadPoolExecutor _callbackExecutor;
private volatile ThreadPoolExecutor _messageExecutor;
private volatile ExecutorService _callbackExecutor;
private volatile ExecutorService _messageExecutor;
private AtomicInteger _eventQueueSize = new AtomicInteger();

private boolean _isPrivateCallbackExecutor = true;
private boolean _isPrivateMessageExecutor = true;
Expand Down Expand Up @@ -313,7 +317,7 @@ public synchronized void setAsyncCallback(boolean asyncCallback)
/**
* Executor used for message callbacks.
*/
public synchronized void setCallbackExecutor(ThreadPoolExecutor executor)
public synchronized void setCallbackExecutor(ExecutorService executor)
{
if (executor == null) {
throw new IllegalArgumentException("null is not allowed");
Expand All @@ -328,7 +332,7 @@ public synchronized void setCallbackExecutor(ThreadPoolExecutor executor)
/**
* Executor used for incoming message delivery.
*/
public synchronized void setMessageExecutor(ThreadPoolExecutor executor)
public synchronized void setMessageExecutor(ExecutorService executor)
{
if (executor == null) {
throw new IllegalArgumentException("null is not allowed");
Expand Down Expand Up @@ -686,6 +690,14 @@ public void run() {
};
}

/**
* Submits a task for execution on the message thread.
*/
<T> Future<T> invokeOnMessageThread(Callable<T> task)
{
return _messageExecutor.submit(task);
}

@Override @Nonnull
public Thread newThread(@Nonnull Runnable target)
{
Expand Down Expand Up @@ -725,7 +737,7 @@ public Thread newThread(@Nonnull Runnable target, @Nonnull String name)

int getEventQueueSize()
{
return _messageExecutor.getQueue().size();
return _eventQueueSize.get();
}

void addToEventQueue(MessageEvent ce) {
Expand Down Expand Up @@ -965,11 +977,13 @@ public CallbackTask(CellLock lock, CellMessage message)
{
_lock = lock;
_message = message;
_eventQueueSize.incrementAndGet();
}

@Override
public void innerRun()
{
_eventQueueSize.decrementAndGet();
try (CDC ignored = _lock.getCdc().restore()) {
CellMessageAnswerable callback =
_lock.getCallback();
Expand Down Expand Up @@ -1006,12 +1020,14 @@ public DeliverMessageTask(CellEvent event)
{
_event = event;
EventLogger.queueBegin(_event);
_eventQueueSize.incrementAndGet();
}

@Override
public void innerRun()
{
EventLogger.queueEnd(_event);
_eventQueueSize.decrementAndGet();

if (_event instanceof LastMessageEvent) {
LOGGER.trace("messageThread : LastMessageEvent arrived");
Expand Down
Expand Up @@ -12,8 +12,8 @@
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

import dmg.cells.nucleus.CDC;
import dmg.cells.nucleus.CellCommandListener;
Expand Down Expand Up @@ -60,20 +60,20 @@ public class LineBasedDoor
*/
private final Executor executor;

private Thread workerThread;

public LineBasedDoor(String cellName, Args args, Class<? extends LineBasedInterpreter> interpreterClass,
StreamEngine engine, ExecutorService executor)
{
super(cellName, args);

getNucleus().setCallbackExecutor(executor);
getNucleus().setMessageExecutor(new SequentialExecutor(executor));
this.interpreterClass = interpreterClass;
this.engine = engine;
this.executor = executor;

try {
doInit();
workerThread.start();
executor.execute(this);
} catch (InterruptedException e) {
shutdownGate.countDown();
} catch (ExecutionException e) {
Expand All @@ -93,8 +93,6 @@ protected void init()
LOGGER.debug("Client host: {}",
engine.getInetAddress().getHostAddress());

workerThread = new Thread(this);

interpreter = interpreterClass.newInstance();
parseOptions(interpreter);
interpreter.setWriter(engine.getWriter());
Expand Down
36 changes: 17 additions & 19 deletions modules/dcache/src/main/java/org/dcache/cells/AbstractCell.java
Expand Up @@ -14,7 +14,6 @@
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

import diskCacheV111.util.CacheException;
import diskCacheV111.vehicles.Message;
Expand Down Expand Up @@ -272,28 +271,27 @@ final protected void doInit()
/* Execute initialisation in a different thread allocated
* from the correct thread group.
*/
FutureTask<Void> task = new FutureTask<>(new Callable<Void>() {
@Override
public Void call() throws Exception {
parseOptions(AbstractCell.this);
Callable<Void> task = new Callable<Void>() {
@Override
public Void call() throws Exception {
parseOptions(AbstractCell.this);

_monitor = new MessageProcessingMonitor();
_monitor.setCellEndpoint(AbstractCell.this);
_monitor.setEnabled(_isMonitoringEnabled);
_monitor = new MessageProcessingMonitor();
_monitor.setCellEndpoint(AbstractCell.this);
_monitor.setEnabled(_isMonitoringEnabled);

if (_cellClass != null) {
getNucleus().setCellClass(_cellClass);
}
if (_cellClass != null) {
getNucleus().setCellClass(_cellClass);
}

addMessageListener(AbstractCell.this);
addCommandListener(_monitor);
addMessageListener(AbstractCell.this);
addCommandListener(_monitor);

AbstractCell.this.executeInit();
return null;
}
});
getNucleus().newThread(task, "init").start();
task.get();
AbstractCell.this.executeInit();
return null;
}
};
invokeOnMessageThread(task).get();

start();
} catch (InterruptedException e) {
Expand Down

0 comments on commit 5273cfd

Please sign in to comment.