diff --git a/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java b/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java index 109fde23eb7..d71f4d75494 100644 --- a/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java +++ b/modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java @@ -10,11 +10,10 @@ import java.io.Writer; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.LinkedList; -import java.util.Queue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import dmg.cells.nucleus.CDC; import dmg.cells.nucleus.CellCommandListener; @@ -26,7 +25,7 @@ import dmg.util.StreamEngine; import org.dcache.cells.AbstractCell; -import org.dcache.util.FireAndForgetTask; +import org.dcache.util.SequentialExecutor; import org.dcache.util.Transfer; /** @@ -54,13 +53,12 @@ public class LineBasedDoor private LineBasedInterpreter interpreter; - private final CommandQueue commandQueue = new CommandQueue(); private final CountDownLatch shutdownGate = new CountDownLatch(1); /** - * Shared executor for processing commands. + * Executor for processing commands. */ - private final ExecutorService executor; + private final Executor executor; private Thread workerThread; @@ -146,6 +144,7 @@ private synchronized void shutdownInputStream() public void run() { try { + SequentialExecutor executor = new SequentialExecutor(this.executor); try { /* Notice that we do not close the input stream, as * doing so would close the socket as well. We don't @@ -160,17 +159,16 @@ public void run() String s = in.readLine(); while (s != null) { - commandQueue.add(s); + executor.execute(new Command(s)); s = in.readLine(); } } catch (IOException e) { LOGGER.error("Got error reading data: {}", e.getMessage()); } finally { - /* This will block until command processing has - * finished. - */ try { - commandQueue.stop(); + executor.shutdownNow(); + interpreter.shutdown(); + executor.awaitTermination(); } catch (InterruptedException e) { LOGGER.error("Failed to shut down command processing: {}", e.getMessage()); @@ -247,112 +245,39 @@ public void getInfo(PrintWriter pw) } } - /** - * Support class to implement FTP command processing on shared - * worker threads. Commands on the same queue are executed - * sequentially. - */ - class CommandQueue + public interface LineBasedInterpreter { - /** Queue of FTP commands to execute. - */ - private final Queue _commands = new LinkedList<>(); - - /** - * True iff the command queue has been stopped. - */ - private boolean isStopped; - - /** - * True iff the command processing task is in the - * ExecutorService queue or is currently running. - */ - private boolean isRunning; + void setWriter(Writer out); + void execute(String cmd) throws CommandExitException; + void init(); + void shutdown(); + void setRemoteAddress(InetSocketAddress remoteAddress); + void setLocalAddress(InetSocketAddress localAddress); + } - /** - * Adds a command to the command queue. - */ - public synchronized void add(String command) - { - if (!isStopped) { - _commands.add(command); - if (!isRunning) { - final CDC cdc = new CDC(); - isRunning = true; - executor.submit(new FireAndForgetTask(new Runnable() - { - @Override - public void run() - { - try (CDC ignored = cdc.restore()) { - String command = getOrDone(); - while (command != null) { - try { - interpreter.execute(command); - } catch (CommandExitException e) { - shutdownInputStream(); - } catch (RuntimeException e) { - LOGGER.error("Bug detected", e); - } - command = getOrDone(); - } - } - } - })); - } - } - } + private class Command implements Runnable + { + private final CDC cdc; + private final String command; - /** - * Returns the next command. - * - * Returns null and signals that the command processing loop - * was left if the CommandQueue was stopped or the queue is - * empty. - */ - private synchronized String getOrDone() + public Command(String command) { - if (isStopped || _commands.isEmpty()) { - isRunning = false; - notifyAll(); - return null; - } else { - return _commands.remove(); - } + this.cdc = new CDC(); + this.command = command; } - /** - * Stops the command queue. - * - * The interpreter is shut down and the method blocks until command - * processing has ceased. - * - * After a call to this method, {@code getOrDone} returns null. - * - * Does nothing if the command queue is already stopped. - */ - public synchronized void stop() - throws InterruptedException + @Override + public void run() { - if (!isStopped) { - isStopped = true; - - interpreter.shutdown(); - - while (isRunning) { - wait(); + try (CDC ignored = cdc.restore()) { + try { + interpreter.execute(command); + } catch (CommandExitException e) { + shutdownInputStream(); + } catch (RuntimeException e) { + LOGGER.error("Bug detected", e); } } } } - - public interface LineBasedInterpreter - { - void setWriter(Writer out); - void execute(String cmd) throws CommandExitException; - void init(); - void shutdown(); - void setRemoteAddress(InetSocketAddress remoteAddress); - void setLocalAddress(InetSocketAddress localAddress); - } } diff --git a/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/AbstractFtpDoorV1.java b/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/AbstractFtpDoorV1.java index d4af9d928f0..fae6146a660 100644 --- a/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/AbstractFtpDoorV1.java +++ b/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/AbstractFtpDoorV1.java @@ -117,7 +117,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import java.util.regex.Matcher; import java.util.regex.Pattern; -import diskCacheV111.doors.AbstractInterruptibleLineBasedInterpreter; import diskCacheV111.doors.FTPTransactionLog; import diskCacheV111.util.CacheException; import diskCacheV111.util.CheckStagePermission; @@ -187,6 +186,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.vehicles.FileAttributes; import org.dcache.vehicles.PnfsListDirectoryMessage; +import static diskCacheV111.doors.LineBasedDoor.LineBasedInterpreter; import static org.dcache.namespace.FileAttribute.*; /** @@ -247,8 +247,7 @@ public String getReply() } public abstract class AbstractFtpDoorV1 - extends AbstractInterruptibleLineBasedInterpreter - implements CellMessageReceiver, CellCommandListener, CellInfoProvider, CellMessageSender + implements LineBasedInterpreter, CellMessageReceiver, CellCommandListener, CellInfoProvider, CellMessageSender { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFtpDoorV1.class); private static final Timer TIMER = new Timer("Performance marker timer", true); @@ -1396,8 +1395,6 @@ private synchronized void closePassiveModeServerSocket() @Override public void shutdown() { - super.shutdown(); - /* In case of failure, we may have a transfer hanging around. */ FtpTransfer transfer = getTransfer(); @@ -2595,13 +2592,8 @@ private void retrieve(String file, long offset, long size, * manager updates its state, we will retry failed * transfer a few times. */ - enableInterrupt(); - try { - transfer.createAdapter(); - transfer.selectPoolAndStartMover(_ioQueueName, _readRetryPolicy); - } finally { - disableInterrupt(); - } + transfer.createAdapter(); + transfer.selectPoolAndStartMover(_ioQueueName, _readRetryPolicy); } catch (PermissionDeniedCacheException e) { transfer.abort(550, "Permission denied"); } catch (CacheException e) { @@ -2702,13 +2694,8 @@ private void store(String file, Mode mode, String xferMode, transfer.createTransactionLog(); transfer.setChecksum(_checkSum); - enableInterrupt(); - try { - transfer.createAdapter(); - transfer.selectPoolAndStartMover(_ioQueueName, _writeRetryPolicy); - } finally { - disableInterrupt(); - } + transfer.createAdapter(); + transfer.selectPoolAndStartMover(_ioQueueName, _writeRetryPolicy); } catch (InterruptedException e) { transfer.abort(451, "Operation cancelled"); } catch (IOException e) { @@ -2853,7 +2840,6 @@ public void ftp_list(String arg) FsPath path = absolutePath(arg); try { - enableInterrupt(); reply("150 Opening ASCII data connection for file list", false); try { openDataSocket(); @@ -2908,8 +2894,6 @@ public void ftp_list(String arg) } catch (CacheException | IOException e){ reply("451 Local error in processing"); LOGGER.warn("Error in LIST: {}", e.getMessage()); - } finally { - disableInterrupt(); } } @@ -2927,8 +2911,6 @@ public void ftp_nlst(String arg) } try { - enableInterrupt(); - FsPath path = absolutePath(arg); /* RFC 3659 seems to imply that we have to report on @@ -2987,8 +2969,6 @@ public void ftp_nlst(String arg) } catch (CacheException | IOException e) { reply("451 Local error in processing"); LOGGER.warn("Error in NLST: {}", e.getMessage()); - } finally { - disableInterrupt(); } } @@ -3029,8 +3009,6 @@ public void ftp_mlsd(String arg) checkLoggedIn(); try { - enableInterrupt(); - FsPath path; if (arg.length() == 0) { path = absolutePath("."); @@ -3079,17 +3057,13 @@ public void ftp_mlsd(String arg) } catch (CacheException | IOException e) { reply("451 Local error in processing"); LOGGER.warn("Error in MLSD: {}", e.getMessage()); - } finally { - disableInterrupt(); } } public void ftp_rnfr(String arg) throws FTPCommandException { - checkLoggedIn(); try { - enableInterrupt(); _filepath = null; if (Strings.isNullOrEmpty(arg)) { @@ -3101,24 +3075,17 @@ public void ftp_rnfr(String arg) throws FTPCommandException { _filepath = path; reply("350 File exists, ready for destination name RNTO"); - } - catch (InterruptedException e) { - throw new FTPCommandException(451,"Operation cancelled"); - } - catch (CacheException e) { + } catch (FileNotFoundCacheException e) { throw new FTPCommandException(550, "File not found"); - } - finally { - disableInterrupt(); + } catch (CacheException e) { + throw new FTPCommandException(451, "Transient error: " + e.getMessage()); } } public void ftp_rnto(String arg) throws FTPCommandException { - checkLoggedIn(); try { - enableInterrupt(); if (_filepath == null) { throw new FTPCommandException(503, "RNTO must be preceeded by RNFR"); } @@ -3130,16 +3097,12 @@ public void ftp_rnto(String arg) throws FTPCommandException { _pnfs.renameEntry(_filepath.toString(), newName.toString(), true); reply("250 File renamed"); - } - catch (InterruptedException e) { - throw new FTPCommandException(451, "Operation cancelled"); - } - catch (CacheException e) { + } catch (PermissionDeniedCacheException e) { throw new FTPCommandException(550, "Permission denied"); - } - finally { + } catch (CacheException e) { + throw new FTPCommandException(451, "Transient error: " + e.getMessage()); + } finally { _filepath = null; - disableInterrupt(); } } //---------------------------------------------- @@ -3178,12 +3141,9 @@ public void ftp_quit(String arg) * transfers have completed. */ try { - enableInterrupt(); joinTransfer(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - } finally { - disableInterrupt(); } throw new CommandExitException("", 0); diff --git a/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/GssFtpDoorV1.java b/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/GssFtpDoorV1.java index ec0f5f0bc87..4c056e1eb3b 100644 --- a/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/GssFtpDoorV1.java +++ b/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/GssFtpDoorV1.java @@ -101,16 +101,12 @@ public void ftp_adat(String arg) { } try { - enableInterrupt(); //_serviceContext.setChannelBinding(cb); //debug("GssFtpDoorV1::ftp_adat: CB set"); token = _serviceContext.acceptSecContext(token, 0, token.length); //debug("GssFtpDoorV1::ftp_adat: Token created"); _gssIdentity = _serviceContext.getSrcName(); //debug("GssFtpDoorV1::ftp_adat: User principal: " + UserPrincipal); - } catch (InterruptedException e) { - reply("421 Service unavailable"); - return; } catch (GSSException e) { CertPathValidatorException cpve = getFirst(filter(Throwables.getCausalChain(e), CertPathValidatorException.class), null); @@ -123,8 +119,6 @@ public void ftp_adat(String arg) { LOGGER.trace("Authentication failed", e); reply("535 Authentication failed: " + e.getMessage()); return; - } finally { - disableInterrupt(); } if (token != null) { if (!_serviceContext.isEstablished()) { diff --git a/modules/dcache-ftp/src/test/java/org/dcache/ftp/door/AbstractFtpDoorV1Test.java b/modules/dcache-ftp/src/test/java/org/dcache/ftp/door/AbstractFtpDoorV1Test.java index 21e06ca4935..f0c9213070d 100644 --- a/modules/dcache-ftp/src/test/java/org/dcache/ftp/door/AbstractFtpDoorV1Test.java +++ b/modules/dcache-ftp/src/test/java/org/dcache/ftp/door/AbstractFtpDoorV1Test.java @@ -120,7 +120,7 @@ public void whenRnfrIsCalledWithEmptyFilenameReplyError500() throws FTPCommandEx public void whenRnfrIsCalledForNonExistingFilenameReplyFileNotFound550() throws FTPCommandException, CacheException { doCallRealMethod().when(door).ftp_rnfr(anyString()); - when(pnfs.getPnfsIdByPath("/pathRoot/cwd/"+INVALID_FILE)).thenThrow(CacheException.class); + when(pnfs.getPnfsIdByPath("/pathRoot/cwd/"+INVALID_FILE)).thenThrow(FileNotFoundCacheException.class); thrown.expectCode(550); door.ftp_rnfr(INVALID_FILE); diff --git a/modules/dcache/src/main/java/org/dcache/util/SequentialExecutor.java b/modules/dcache/src/main/java/org/dcache/util/SequentialExecutor.java new file mode 100644 index 00000000000..dec32a0eb46 --- /dev/null +++ b/modules/dcache/src/main/java/org/dcache/util/SequentialExecutor.java @@ -0,0 +1,176 @@ +package org.dcache.util; + +import com.google.common.util.concurrent.Monitor; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.AbstractExecutorService; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.TimeUnit; + +/** + * An executor which executes its tasks sequentially. + * + * Differs from Executors#newSingleThreadExecutor by sourcing a thread + * from a shared executor when needed. + */ +public class SequentialExecutor extends AbstractExecutorService +{ + private final Queue tasks = new ArrayDeque<>(); + + private final Executor executor; + + private final Monitor monitor = new Monitor(); + private final Monitor.Guard isTerminated = new Monitor.Guard(monitor) { + @Override + public boolean isSatisfied() + { + return isShutdown && !isRunning; + } + }; + + private final RunnableFuture worker = + new FutureTask(new Worker()) { + public void run() + { + runAndReset(); + } + }; + private boolean isShutdown; + private boolean isRunning; + + public SequentialExecutor(Executor executor) + { + this.executor = executor; + } + + @Override + public void shutdown() + { + monitor.enter(); + try { + isShutdown = true; + } finally { + monitor.leave(); + } + } + + @Override + public List shutdownNow() + { + monitor.enter(); + try { + isShutdown = true; + List unexecutedTasks = new ArrayList<>(); + unexecutedTasks.addAll(tasks); + tasks.clear(); + + // Kill runnable + + return unexecutedTasks; + } finally { + monitor.leave(); + } + } + + @Override + public boolean isShutdown() + { + monitor.enter(); + try { + return isShutdown; + } finally { + monitor.leave(); + } + } + + @Override + public boolean isTerminated() + { + monitor.enter(); + try { + return isShutdown && !isRunning; + } finally { + monitor.leave(); + } + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException + { + monitor.enter(); + try { + return monitor.waitFor(isTerminated, timeout, unit); + } finally { + monitor.leave(); + } + } + + public void awaitTermination() throws InterruptedException + { + monitor.enter(); + try { + monitor.waitFor(isTerminated); + } finally { + monitor.leave(); + } + } + + @Override + public void execute(Runnable task) + { + monitor.enter(); + try { + if (isShutdown) { + throw new RejectedExecutionException("Executor has been shut down."); + } + tasks.add(task); + if (!isRunning) { + isRunning = true; + executor.execute(worker); + } + } finally { + monitor.leave(); + } + } + + private Runnable getTask() + { + monitor.enter(); + try { + if (tasks.isEmpty()) { + isRunning = false; + return null; + } else { + return tasks.remove(); + } + } finally { + monitor.leave(); + } + } + + private class Worker implements Callable + { + @Override + public Void call() + { + Runnable task = getTask(); + while (task != null) { + try { + task.run(); + } catch (Throwable t) { + Thread thread = Thread.currentThread(); + thread.getUncaughtExceptionHandler().uncaughtException(thread, t); + } + task = getTask(); + } + return null; + } + } +}