From 5273cfddd71f93534f5c198988a6aa0d56003951 Mon Sep 17 00:00:00 2001 From: Gerd Behrmann Date: Wed, 4 Dec 2013 17:08:15 +0100 Subject: [PATCH] ftp: Use shared thread pool for various processing threads This patch makes the ftp door use the shared thread pool for the I/O thread, for the message processing thread, and the the callback thread. The patch also updates AbstractCell to use the Cell's message thread. This saves one additional thread instantiation for each cell instance. Target: trunk Require-notes: no Require-book: no Acked-by: Tigran Mkrtchyan Patch: http://rb.dcache.org/r/6331/ --- .../java/dmg/cells/nucleus/CellAdapter.java | 7 ++++ .../java/dmg/cells/nucleus/CellNucleus.java | 26 +++++++++++--- .../diskCacheV111/doors/LineBasedDoor.java | 10 +++--- .../java/org/dcache/cells/AbstractCell.java | 36 +++++++++---------- 4 files changed, 49 insertions(+), 30 deletions(-) diff --git a/modules/cells/src/main/java/dmg/cells/nucleus/CellAdapter.java b/modules/cells/src/main/java/dmg/cells/nucleus/CellAdapter.java index 6257a5b2648..80c9893c530 100644 --- a/modules/cells/src/main/java/dmg/cells/nucleus/CellAdapter.java +++ b/modules/cells/src/main/java/dmg/cells/nucleus/CellAdapter.java @@ -12,6 +12,8 @@ import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Future; import dmg.cells.network.PingMessage; import dmg.cells.services.RoutingManager; @@ -421,6 +423,11 @@ public Reader getDomainContextReader(String contextName) return _nucleus.getDomainContextReader(contextName); } + protected Future invokeOnMessageThread(Callable task) + { + return _nucleus.invokeOnMessageThread(task); + } + /** * sends a CellMessage along the specified path. * diff --git a/modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java b/modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java index 3dc45ebb427..ab2ee29e273 100644 --- a/modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java +++ b/modules/cells/src/main/java/dmg/cells/nucleus/CellNucleus.java @@ -18,6 +18,9 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; @@ -60,8 +63,9 @@ public class CellNucleus implements ThreadFactory private final Map _waitHash = new HashMap<>(); private String _cellClass; - private volatile ThreadPoolExecutor _callbackExecutor; - private volatile ThreadPoolExecutor _messageExecutor; + private volatile ExecutorService _callbackExecutor; + private volatile ExecutorService _messageExecutor; + private AtomicInteger _eventQueueSize = new AtomicInteger(); private boolean _isPrivateCallbackExecutor = true; private boolean _isPrivateMessageExecutor = true; @@ -313,7 +317,7 @@ public synchronized void setAsyncCallback(boolean asyncCallback) /** * Executor used for message callbacks. */ - public synchronized void setCallbackExecutor(ThreadPoolExecutor executor) + public synchronized void setCallbackExecutor(ExecutorService executor) { if (executor == null) { throw new IllegalArgumentException("null is not allowed"); @@ -328,7 +332,7 @@ public synchronized void setCallbackExecutor(ThreadPoolExecutor executor) /** * Executor used for incoming message delivery. */ - public synchronized void setMessageExecutor(ThreadPoolExecutor executor) + public synchronized void setMessageExecutor(ExecutorService executor) { if (executor == null) { throw new IllegalArgumentException("null is not allowed"); @@ -686,6 +690,14 @@ public void run() { }; } + /** + * Submits a task for execution on the message thread. + */ + Future invokeOnMessageThread(Callable task) + { + return _messageExecutor.submit(task); + } + @Override @Nonnull public Thread newThread(@Nonnull Runnable target) { @@ -725,7 +737,7 @@ public Thread newThread(@Nonnull Runnable target, @Nonnull String name) int getEventQueueSize() { - return _messageExecutor.getQueue().size(); + return _eventQueueSize.get(); } void addToEventQueue(MessageEvent ce) { @@ -965,11 +977,13 @@ public CallbackTask(CellLock lock, CellMessage message) { _lock = lock; _message = message; + _eventQueueSize.incrementAndGet(); } @Override public void innerRun() { + _eventQueueSize.decrementAndGet(); try (CDC ignored = _lock.getCdc().restore()) { CellMessageAnswerable callback = _lock.getCallback(); @@ -1006,12 +1020,14 @@ public DeliverMessageTask(CellEvent event) { _event = event; EventLogger.queueBegin(_event); + _eventQueueSize.incrementAndGet(); } @Override public void innerRun() { EventLogger.queueEnd(_event); + _eventQueueSize.decrementAndGet(); if (_event instanceof LastMessageEvent) { LOGGER.trace("messageThread : LastMessageEvent arrived"); diff --git a/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java b/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java index d71f4d75494..b78265bff27 100644 --- a/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java +++ b/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java @@ -12,8 +12,8 @@ import java.net.Socket; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import dmg.cells.nucleus.CDC; import dmg.cells.nucleus.CellCommandListener; @@ -60,20 +60,20 @@ public class LineBasedDoor */ private final Executor executor; - private Thread workerThread; - public LineBasedDoor(String cellName, Args args, Class interpreterClass, StreamEngine engine, ExecutorService executor) { super(cellName, args); + getNucleus().setCallbackExecutor(executor); + getNucleus().setMessageExecutor(new SequentialExecutor(executor)); this.interpreterClass = interpreterClass; this.engine = engine; this.executor = executor; try { doInit(); - workerThread.start(); + executor.execute(this); } catch (InterruptedException e) { shutdownGate.countDown(); } catch (ExecutionException e) { @@ -93,8 +93,6 @@ protected void init() LOGGER.debug("Client host: {}", engine.getInetAddress().getHostAddress()); - workerThread = new Thread(this); - interpreter = interpreterClass.newInstance(); parseOptions(interpreter); interpreter.setWriter(engine.getWriter()); diff --git a/modules/dcache/src/main/java/org/dcache/cells/AbstractCell.java b/modules/dcache/src/main/java/org/dcache/cells/AbstractCell.java index 7fe14cf5d8b..1d6c225a566 100644 --- a/modules/dcache/src/main/java/org/dcache/cells/AbstractCell.java +++ b/modules/dcache/src/main/java/org/dcache/cells/AbstractCell.java @@ -14,7 +14,6 @@ import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.FutureTask; import diskCacheV111.util.CacheException; import diskCacheV111.vehicles.Message; @@ -272,28 +271,27 @@ final protected void doInit() /* Execute initialisation in a different thread allocated * from the correct thread group. */ - FutureTask task = new FutureTask<>(new Callable() { - @Override - public Void call() throws Exception { - parseOptions(AbstractCell.this); + Callable task = new Callable() { + @Override + public Void call() throws Exception { + parseOptions(AbstractCell.this); - _monitor = new MessageProcessingMonitor(); - _monitor.setCellEndpoint(AbstractCell.this); - _monitor.setEnabled(_isMonitoringEnabled); + _monitor = new MessageProcessingMonitor(); + _monitor.setCellEndpoint(AbstractCell.this); + _monitor.setEnabled(_isMonitoringEnabled); - if (_cellClass != null) { - getNucleus().setCellClass(_cellClass); - } + if (_cellClass != null) { + getNucleus().setCellClass(_cellClass); + } - addMessageListener(AbstractCell.this); - addCommandListener(_monitor); + addMessageListener(AbstractCell.this); + addCommandListener(_monitor); - AbstractCell.this.executeInit(); - return null; - } - }); - getNucleus().newThread(task, "init").start(); - task.get(); + AbstractCell.this.executeInit(); + return null; + } + }; + invokeOnMessageThread(task).get(); start(); } catch (InterruptedException e) {