Skip to content

Commit

Permalink
ftp: Refactor command queue into sequential executor
Browse files Browse the repository at this point in the history
The FTP door uses a custom CommandQueue to provide the
ability to sequentially process commands using a shared
thread pool. This patch refactors this code into a more
generic SequntialExecutor class. It implements the
ExecutorService interface and is semantically similar to
a single thread executor. However the thread used for
processing tasks is allocated on demands from a shared
Executor.

The selective enabling of interrupts has been removed
from the FTP door. This was a workaround for bad exception
handling in old versions of the door. The workaround is
not easily ported to SequentialExecutor and is therefore
removed.

Target: trunk
Require-notes: no
Require-book: no
Acked-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
Patch: http://rb.dcache.org/r/6325/
  • Loading branch information
gbehrmann committed Dec 9, 2013
1 parent 1193536 commit 90e584f
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 168 deletions.
141 changes: 33 additions & 108 deletions modules/dcache-ftp/src/main/java/diskCacheV111/doors/LineBasedDoor.java
Expand Up @@ -10,11 +10,10 @@
import java.io.Writer;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;

import dmg.cells.nucleus.CDC;
import dmg.cells.nucleus.CellCommandListener;
Expand All @@ -26,7 +25,7 @@
import dmg.util.StreamEngine;

import org.dcache.cells.AbstractCell;
import org.dcache.util.FireAndForgetTask;
import org.dcache.util.SequentialExecutor;
import org.dcache.util.Transfer;

/**
Expand Down Expand Up @@ -54,13 +53,12 @@ public class LineBasedDoor

private LineBasedInterpreter interpreter;

private final CommandQueue commandQueue = new CommandQueue();
private final CountDownLatch shutdownGate = new CountDownLatch(1);

/**
* Shared executor for processing commands.
* Executor for processing commands.
*/
private final ExecutorService executor;
private final Executor executor;

private Thread workerThread;

Expand Down Expand Up @@ -146,6 +144,7 @@ private synchronized void shutdownInputStream()
public void run()
{
try {
SequentialExecutor executor = new SequentialExecutor(this.executor);
try {
/* Notice that we do not close the input stream, as
* doing so would close the socket as well. We don't
Expand All @@ -160,17 +159,16 @@ public void run()

String s = in.readLine();
while (s != null) {
commandQueue.add(s);
executor.execute(new Command(s));
s = in.readLine();
}
} catch (IOException e) {
LOGGER.error("Got error reading data: {}", e.getMessage());
} finally {
/* This will block until command processing has
* finished.
*/
try {
commandQueue.stop();
executor.shutdownNow();
interpreter.shutdown();
executor.awaitTermination();
} catch (InterruptedException e) {
LOGGER.error("Failed to shut down command processing: {}",
e.getMessage());
Expand Down Expand Up @@ -247,112 +245,39 @@ public void getInfo(PrintWriter pw)
}
}

/**
* Support class to implement FTP command processing on shared
* worker threads. Commands on the same queue are executed
* sequentially.
*/
class CommandQueue
public interface LineBasedInterpreter
{
/** Queue of FTP commands to execute.
*/
private final Queue<String> _commands = new LinkedList<>();

/**
* True iff the command queue has been stopped.
*/
private boolean isStopped;

/**
* True iff the command processing task is in the
* ExecutorService queue or is currently running.
*/
private boolean isRunning;
void setWriter(Writer out);
void execute(String cmd) throws CommandExitException;
void init();
void shutdown();
void setRemoteAddress(InetSocketAddress remoteAddress);
void setLocalAddress(InetSocketAddress localAddress);
}

/**
* Adds a command to the command queue.
*/
public synchronized void add(String command)
{
if (!isStopped) {
_commands.add(command);
if (!isRunning) {
final CDC cdc = new CDC();
isRunning = true;
executor.submit(new FireAndForgetTask(new Runnable()
{
@Override
public void run()
{
try (CDC ignored = cdc.restore()) {
String command = getOrDone();
while (command != null) {
try {
interpreter.execute(command);
} catch (CommandExitException e) {
shutdownInputStream();
} catch (RuntimeException e) {
LOGGER.error("Bug detected", e);
}
command = getOrDone();
}
}
}
}));
}
}
}
private class Command implements Runnable
{
private final CDC cdc;
private final String command;

/**
* Returns the next command.
*
* Returns null and signals that the command processing loop
* was left if the CommandQueue was stopped or the queue is
* empty.
*/
private synchronized String getOrDone()
public Command(String command)
{
if (isStopped || _commands.isEmpty()) {
isRunning = false;
notifyAll();
return null;
} else {
return _commands.remove();
}
this.cdc = new CDC();
this.command = command;
}

/**
* Stops the command queue.
*
* The interpreter is shut down and the method blocks until command
* processing has ceased.
*
* After a call to this method, {@code getOrDone} returns null.
*
* Does nothing if the command queue is already stopped.
*/
public synchronized void stop()
throws InterruptedException
@Override
public void run()
{
if (!isStopped) {
isStopped = true;

interpreter.shutdown();

while (isRunning) {
wait();
try (CDC ignored = cdc.restore()) {
try {
interpreter.execute(command);
} catch (CommandExitException e) {
shutdownInputStream();
} catch (RuntimeException e) {
LOGGER.error("Bug detected", e);
}
}
}
}

public interface LineBasedInterpreter
{
void setWriter(Writer out);
void execute(String cmd) throws CommandExitException;
void init();
void shutdown();
void setRemoteAddress(InetSocketAddress remoteAddress);
void setLocalAddress(InetSocketAddress localAddress);
}
}
Expand Up @@ -117,7 +117,6 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import diskCacheV111.doors.AbstractInterruptibleLineBasedInterpreter;
import diskCacheV111.doors.FTPTransactionLog;
import diskCacheV111.util.CacheException;
import diskCacheV111.util.CheckStagePermission;
Expand Down Expand Up @@ -187,6 +186,7 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.vehicles.FileAttributes;
import org.dcache.vehicles.PnfsListDirectoryMessage;

import static diskCacheV111.doors.LineBasedDoor.LineBasedInterpreter;
import static org.dcache.namespace.FileAttribute.*;

/**
Expand Down Expand Up @@ -247,8 +247,7 @@ public String getReply()
}

public abstract class AbstractFtpDoorV1
extends AbstractInterruptibleLineBasedInterpreter
implements CellMessageReceiver, CellCommandListener, CellInfoProvider, CellMessageSender
implements LineBasedInterpreter, CellMessageReceiver, CellCommandListener, CellInfoProvider, CellMessageSender
{
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractFtpDoorV1.class);
private static final Timer TIMER = new Timer("Performance marker timer", true);
Expand Down Expand Up @@ -1396,8 +1395,6 @@ private synchronized void closePassiveModeServerSocket()
@Override
public void shutdown()
{
super.shutdown();

/* In case of failure, we may have a transfer hanging around.
*/
FtpTransfer transfer = getTransfer();
Expand Down Expand Up @@ -2595,13 +2592,8 @@ private void retrieve(String file, long offset, long size,
* manager updates its state, we will retry failed
* transfer a few times.
*/
enableInterrupt();
try {
transfer.createAdapter();
transfer.selectPoolAndStartMover(_ioQueueName, _readRetryPolicy);
} finally {
disableInterrupt();
}
transfer.createAdapter();
transfer.selectPoolAndStartMover(_ioQueueName, _readRetryPolicy);
} catch (PermissionDeniedCacheException e) {
transfer.abort(550, "Permission denied");
} catch (CacheException e) {
Expand Down Expand Up @@ -2702,13 +2694,8 @@ private void store(String file, Mode mode, String xferMode,
transfer.createTransactionLog();
transfer.setChecksum(_checkSum);

enableInterrupt();
try {
transfer.createAdapter();
transfer.selectPoolAndStartMover(_ioQueueName, _writeRetryPolicy);
} finally {
disableInterrupt();
}
transfer.createAdapter();
transfer.selectPoolAndStartMover(_ioQueueName, _writeRetryPolicy);
} catch (InterruptedException e) {
transfer.abort(451, "Operation cancelled");
} catch (IOException e) {
Expand Down Expand Up @@ -2853,7 +2840,6 @@ public void ftp_list(String arg)
FsPath path = absolutePath(arg);

try {
enableInterrupt();
reply("150 Opening ASCII data connection for file list", false);
try {
openDataSocket();
Expand Down Expand Up @@ -2908,8 +2894,6 @@ public void ftp_list(String arg)
} catch (CacheException | IOException e){
reply("451 Local error in processing");
LOGGER.warn("Error in LIST: {}", e.getMessage());
} finally {
disableInterrupt();
}
}

Expand All @@ -2927,8 +2911,6 @@ public void ftp_nlst(String arg)
}

try {
enableInterrupt();

FsPath path = absolutePath(arg);

/* RFC 3659 seems to imply that we have to report on
Expand Down Expand Up @@ -2987,8 +2969,6 @@ public void ftp_nlst(String arg)
} catch (CacheException | IOException e) {
reply("451 Local error in processing");
LOGGER.warn("Error in NLST: {}", e.getMessage());
} finally {
disableInterrupt();
}
}

Expand Down Expand Up @@ -3029,8 +3009,6 @@ public void ftp_mlsd(String arg)
checkLoggedIn();

try {
enableInterrupt();

FsPath path;
if (arg.length() == 0) {
path = absolutePath(".");
Expand Down Expand Up @@ -3079,17 +3057,13 @@ public void ftp_mlsd(String arg)
} catch (CacheException | IOException e) {
reply("451 Local error in processing");
LOGGER.warn("Error in MLSD: {}", e.getMessage());
} finally {
disableInterrupt();
}
}

public void ftp_rnfr(String arg) throws FTPCommandException {

checkLoggedIn();

try {
enableInterrupt();
_filepath = null;

if (Strings.isNullOrEmpty(arg)) {
Expand All @@ -3101,24 +3075,17 @@ public void ftp_rnfr(String arg) throws FTPCommandException {
_filepath = path;

reply("350 File exists, ready for destination name RNTO");
}
catch (InterruptedException e) {
throw new FTPCommandException(451,"Operation cancelled");
}
catch (CacheException e) {
} catch (FileNotFoundCacheException e) {
throw new FTPCommandException(550, "File not found");
}
finally {
disableInterrupt();
} catch (CacheException e) {
throw new FTPCommandException(451, "Transient error: " + e.getMessage());
}
}

public void ftp_rnto(String arg) throws FTPCommandException {

checkLoggedIn();

try {
enableInterrupt();
if (_filepath == null) {
throw new FTPCommandException(503, "RNTO must be preceeded by RNFR");
}
Expand All @@ -3130,16 +3097,12 @@ public void ftp_rnto(String arg) throws FTPCommandException {
_pnfs.renameEntry(_filepath.toString(), newName.toString(), true);

reply("250 File renamed");
}
catch (InterruptedException e) {
throw new FTPCommandException(451, "Operation cancelled");
}
catch (CacheException e) {
} catch (PermissionDeniedCacheException e) {
throw new FTPCommandException(550, "Permission denied");
}
finally {
} catch (CacheException e) {
throw new FTPCommandException(451, "Transient error: " + e.getMessage());
} finally {
_filepath = null;
disableInterrupt();
}
}
//----------------------------------------------
Expand Down Expand Up @@ -3178,12 +3141,9 @@ public void ftp_quit(String arg)
* transfers have completed.
*/
try {
enableInterrupt();
joinTransfer();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
disableInterrupt();
}

throw new CommandExitException("", 0);
Expand Down

0 comments on commit 90e584f

Please sign in to comment.