diff --git a/modules/cells/src/main/java/dmg/cells/nucleus/CellAdapter.java b/modules/cells/src/main/java/dmg/cells/nucleus/CellAdapter.java index 6257a5b2648..80c9893c530 100644 --- a/modules/cells/src/main/java/dmg/cells/nucleus/CellAdapter.java +++ b/modules/cells/src/main/java/dmg/cells/nucleus/CellAdapter.java @@ -12,6 +12,8 @@ import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import dmg.cells.network.PingMessage; import dmg.cells.services.RoutingManager; @@ -421,6 +423,11 @@ public Reader getDomainContextReader(String contextName) return _nucleus.getDomainContextReader(contextName); } + protected Future invokeOnMessageThread(Callable task) + { + return _nucleus.invokeOnMessageThread(task); + } + /** * sends a CellMessage along the specified path. * diff --git a/modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java b/modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java index 3dc45ebb427..ab2ee29e273 100644 --- a/modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java +++ b/modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java @@ -18,6 +18,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; @@ -60,8 +63,9 @@ public class CellNucleus implements ThreadFactory private final Map _waitHash = new HashMap<>(); private String _cellClass; - private volatile ThreadPoolExecutor _callbackExecutor; - private volatile ThreadPoolExecutor _messageExecutor; + private volatile ExecutorService _callbackExecutor; + private volatile ExecutorService _messageExecutor; + private AtomicInteger _eventQueueSize = new AtomicInteger(); private boolean _isPrivateCallbackExecutor = true; private boolean _isPrivateMessageExecutor = true; @@ -313,7 +317,7 @@ public synchronized void setAsyncCallback(boolean asyncCallback) /** * Executor used for message callbacks. */ - public synchronized void setCallbackExecutor(ThreadPoolExecutor executor) + public synchronized void setCallbackExecutor(ExecutorService executor) { if (executor == null) { throw new IllegalArgumentException("null is not allowed"); @@ -328,7 +332,7 @@ public synchronized void setCallbackExecutor(ThreadPoolExecutor executor) /** * Executor used for incoming message delivery. */ - public synchronized void setMessageExecutor(ThreadPoolExecutor executor) + public synchronized void setMessageExecutor(ExecutorService executor) { if (executor == null) { throw new IllegalArgumentException("null is not allowed"); @@ -686,6 +690,14 @@ public void run() { }; } + /** + * Submits a task for execution on the message thread. + */ + Future invokeOnMessageThread(Callable task) + { + return _messageExecutor.submit(task); + } + @Override @Nonnull public Thread newThread(@Nonnull Runnable target) { @@ -725,7 +737,7 @@ public Thread newThread(@Nonnull Runnable target, @Nonnull String name) int getEventQueueSize() { - return _messageExecutor.getQueue().size(); + return _eventQueueSize.get(); } void addToEventQueue(MessageEvent ce) { @@ -965,11 +977,13 @@ public CallbackTask(CellLock lock, CellMessage message) { _lock = lock; _message = message; + _eventQueueSize.incrementAndGet(); } @Override public void innerRun() { + _eventQueueSize.decrementAndGet(); try (CDC ignored = _lock.getCdc().restore()) { CellMessageAnswerable callback = _lock.getCallback(); @@ -1006,12 +1020,14 @@ public DeliverMessageTask(CellEvent event) { _event = event; EventLogger.queueBegin(_event); + _eventQueueSize.incrementAndGet(); } @Override public void innerRun() { EventLogger.queueEnd(_event); + _eventQueueSize.decrementAndGet(); if (_event instanceof LastMessageEvent) { LOGGER.trace("messageThread : LastMessageEvent arrived"); diff --git a/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java b/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java index d71f4d75494..b78265bff27 100644 --- a/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java +++ b/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java @@ -12,8 +12,8 @@ import java.net.Socket; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import dmg.cells.nucleus.CDC; import dmg.cells.nucleus.CellCommandListener; @@ -60,20 +60,20 @@ public class LineBasedDoor */ private final Executor executor; - private Thread workerThread; - public LineBasedDoor(String cellName, Args args, Class interpreterClass, StreamEngine engine, ExecutorService executor) { super(cellName, args); + getNucleus().setCallbackExecutor(executor); + getNucleus().setMessageExecutor(new SequentialExecutor(executor)); this.interpreterClass = interpreterClass; this.engine = engine; this.executor = executor; try { doInit(); - workerThread.start(); + executor.execute(this); } catch (InterruptedException e) { shutdownGate.countDown(); } catch (ExecutionException e) { @@ -93,8 +93,6 @@ protected void init() LOGGER.debug("Client host: {}", engine.getInetAddress().getHostAddress()); - workerThread = new Thread(this); - interpreter = interpreterClass.newInstance(); parseOptions(interpreter); interpreter.setWriter(engine.getWriter()); diff --git a/modules/dcache/src/main/java/org/dcache/cells/AbstractCell.java b/modules/dcache/src/main/java/org/dcache/cells/AbstractCell.java index 7fe14cf5d8b..1d6c225a566 100644 --- a/modules/dcache/src/main/java/org/dcache/cells/AbstractCell.java +++ b/modules/dcache/src/main/java/org/dcache/cells/AbstractCell.java @@ -14,7 +14,6 @@ import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; import diskCacheV111.util.CacheException; import diskCacheV111.vehicles.Message; @@ -272,28 +271,27 @@ final protected void doInit() /* Execute initialisation in a different thread allocated * from the correct thread group. */ - FutureTask task = new FutureTask<>(new Callable() { - @Override - public Void call() throws Exception { - parseOptions(AbstractCell.this); + Callable task = new Callable() { + @Override + public Void call() throws Exception { + parseOptions(AbstractCell.this); - _monitor = new MessageProcessingMonitor(); - _monitor.setCellEndpoint(AbstractCell.this); - _monitor.setEnabled(_isMonitoringEnabled); + _monitor = new MessageProcessingMonitor(); + _monitor.setCellEndpoint(AbstractCell.this); + _monitor.setEnabled(_isMonitoringEnabled); - if (_cellClass != null) { - getNucleus().setCellClass(_cellClass); - } + if (_cellClass != null) { + getNucleus().setCellClass(_cellClass); + } - addMessageListener(AbstractCell.this); - addCommandListener(_monitor); + addMessageListener(AbstractCell.this); + addCommandListener(_monitor); - AbstractCell.this.executeInit(); - return null; - } - }); - getNucleus().newThread(task, "init").start(); - task.get(); + AbstractCell.this.executeInit(); + return null; + } + }; + invokeOnMessageThread(task).get(); start(); } catch (InterruptedException e) {