Skip to content

Commit

Permalink
ftp: Fix race conditions caused by message reordering
Browse files Browse the repository at this point in the history
Since DoorTransferFinished and GftpTransferStarted take different
routes, it is possible for these messages to be delivered out of
order. As a result the FTP door produces a 226 reply earlier than
is allowed.

This patch introduces an abstract base class for asynchronous
redirected transfers. That class deals with out of order
notifications. As a side effect, a small pause during transfer
abortion has been removed.

The patch also replaces several 426 replies with 451. 451 is for
internal errors while 426 is for client initiated transfer
abort.

Target: trunk
Require-notes: yes
Require-book: no
Acked-by: Tigran Mkrtchyan <tigran.mkrtchyan@desy.de>
Patch: http://rb.dcache.org/r/6322/
  • Loading branch information
gbehrmann committed Dec 9, 2013
1 parent d885e74 commit 9cf2b89
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 128 deletions.
Expand Up @@ -175,11 +175,11 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
import org.dcache.namespace.PermissionHandler;
import org.dcache.namespace.PosixPermissionHandler;
import org.dcache.services.login.RemoteLoginStrategy;
import org.dcache.util.AsynchronousRedirectedTransfer;
import org.dcache.util.Checksum;
import org.dcache.util.ChecksumType;
import org.dcache.util.Glob;
import org.dcache.util.PortRange;
import org.dcache.util.Transfer;
import org.dcache.util.TransferRetryPolicy;
import org.dcache.util.list.DirectoryEntry;
import org.dcache.util.list.DirectoryListPrinter;
Expand Down Expand Up @@ -210,6 +210,18 @@ public FTPCommandException(int code, String reply)
this(code, reply, reply);
}

/**
* Constructs a command exception with the given ftp reply code and
* message. The message will be used for both the public FTP reply
* string and for the exception message.
*/
public FTPCommandException(int code, String reply, Exception cause)
{
super(reply, cause);
_code = code;
_reply = reply;
}

/**
* Constructs a command exception with the given ftp reply code,
* public and internal message.
Expand Down Expand Up @@ -636,8 +648,6 @@ private static String buildChecksumList(){
)
protected TimeUnit _performanceMarkerPeriodUnit;

protected final int _sleepAfterMoverKill = 1; // seconds

protected final int _spaceManagerTimeout = 5 * 60;

protected PortRange _passiveModePortRange;
Expand Down Expand Up @@ -711,7 +721,7 @@ private static String buildChecksumList(){
/**
* Encapsulation of an FTP transfer.
*/
protected class FtpTransfer extends Transfer
protected class FtpTransfer extends AsynchronousRedirectedTransfer<GFtpTransferStartedMessage>
{
private final Mode _mode;
private final String _xferMode;
Expand All @@ -737,16 +747,6 @@ protected class FtpTransfer extends Transfer
*/
protected PerfMarkerTask _perfMarkerTask;

/**
* True if the transfer was aborted.
*/
protected boolean _aborted;

/**
* True if the transfer completed successfully.
*/
protected boolean _completed;

public FtpTransfer(FsPath path,
long offset,
long size,
Expand Down Expand Up @@ -946,130 +946,117 @@ public void setChecksum(Checksum checksum)
}
}

protected synchronized void startTransfer()
@Override
public synchronized void startMover(String queue, long timeout)
throws CacheException, InterruptedException
{
if (_adapter != null) {
_adapter.start();
super.startMover(queue, timeout);
setStatus("Mover " + getPool() + "/" + getMoverId());
if (_version == 1) {
redirect(null);
}
}

setStatus("Mover " + getPool() + "/" + getMoverId() + ": " +
(isWrite() ? "Receiving" : "Sending"));

reply("150 Opening BINARY data connection for " + _path, false);
public void abort(int replyCode, String msg)
{
doAbort(new FTPCommandException(replyCode, msg));
}

if (isWrite() && _xferMode.equals("E") && _performanceMarkerPeriod > 0) {
long period = _performanceMarkerPeriodUnit.toMillis(_performanceMarkerPeriod);
long timeout = period / 2;
_perfMarkerTask =
new PerfMarkerTask(getPoolAddress(), getMoverId(), timeout);
TIMER.schedule(_perfMarkerTask, period, period);
}
public void abort(int replyCode, String msg, Exception exception)
{
doAbort(new FTPCommandException(replyCode, msg, exception));
}

@Override
public synchronized void startMover(String queue, long timeout)
throws CacheException, InterruptedException
protected void onQueued()
{
super.startMover(queue, timeout);
setStatus("Mover " + getPool() + "/" + getMoverId());
if (_version == 1) {
startTransfer();
}
}

public synchronized void transferStarted(CellMessage envelope,
GFtpTransferStartedMessage message)
@Override
protected synchronized void onRedirect(GFtpTransferStartedMessage redirect)
{
try {
if (_aborted || _completed) {
return;
}
if (redirect != null) {
if (_version != 2) {
LOGGER.error("Received unexpected GFtpTransferStartedMessage for {}", redirect.getPnfsId());
return;
}

if (_version != 2) {
LOGGER.error("Received unexpected GFtpTransferStartedMessage for {} from {}", message
.getPnfsId(), envelope.getSourcePath());
return;
}
if (!redirect.getPnfsId().equals(getPnfsId().getId())) {
LOGGER.error("GFtpTransferStartedMessage has wrong ID, expected {} but got {}", getPnfsId(), redirect.getPnfsId());
throw new FTPCommandException(451, "Transient internal failure");
}

if (!message.getPnfsId().equals(getPnfsId().getId())) {
LOGGER.error("GFtpTransferStartedMessage has wrong ID, expected {} but got {}", getPnfsId(), message
.getPnfsId());
throw new FTPCommandException(451, "Transient internal failure");
}
if (redirect.getPassive() && !_reply127) {
LOGGER.error("Pool unexpectedly volunteered to be passive");
throw new FTPCommandException(451, "Transient internal failure");
}

/* If passive X mode was requested, but the pool rejected
* it, then we have to fail for now. REVISIT: We should
* use the other adapter in this case.
*/
if (_mode == Mode.PASSIVE && !redirect.getPassive() && _xferMode.equals("X")) {
throw new FTPCommandException(504, "Cannot use passive X mode");
}

if (message.getPassive() && !_reply127) {
LOGGER.error("Pool {} unexpectedly volunteered to be passive", envelope.getSourcePath());
throw new FTPCommandException(451, "Transient internal failure");
/* Determine the 127 response address to send back to the
* client. When the pool is passive, this is the address of
* the pool (and in this case we no longer need the
* adapter). Otherwise this is the address of the adapter.
*/
if (redirect.getPassive()) {
assert _reply127;
assert _adapter != null;

reply127PORT(redirect.getPoolAddress());

LOGGER.info("Closing adapter");
_adapter.close();
_adapter = null;
} else if (_reply127) {
reply127PORT(new InetSocketAddress(_localAddress.getAddress(),
_adapter.getClientListenerPort()));
}
}

/* If passive X mode was requested, but the pool rejected
* it, then we have to fail for now. REVISIT: We should
* use the other adapter in this case.
*/
if (_mode == Mode.PASSIVE && !message.getPassive() && _xferMode.equals("X")) {
throw new FTPCommandException(504, "Cannot use passive X mode");
if (_adapter != null) {
_adapter.start();
}

/* Determine the 127 response address to send back to the
* client. When the pool is passive, this is the address of
* the pool (and in this case we no longer need the
* adapter). Otherwise this is the address of the adapter.
*/
if (message.getPassive()) {
assert _reply127;
assert _adapter != null;
setStatus("Mover " + getPool() + "/" + getMoverId() + ": " +
(isWrite() ? "Receiving" : "Sending"));

reply127PORT(message.getPoolAddress());
reply("150 Opening BINARY data connection for " + _path, false);

LOGGER.info("Closing adapter");
_adapter.close();
_adapter = null;
} else if (_reply127) {
reply127PORT(new InetSocketAddress(_localAddress.getAddress(),
_adapter.getClientListenerPort()));
if (isWrite() && _xferMode.equals("E") && _performanceMarkerPeriod > 0) {
long period = _performanceMarkerPeriodUnit.toMillis(_performanceMarkerPeriod);
long timeout = period / 2;
_perfMarkerTask =
new PerfMarkerTask(getPoolAddress(), getMoverId(), timeout);
TIMER.schedule(_perfMarkerTask, period, period);
}
startTransfer();
} catch (FTPCommandException e) {
abort(e.getCode(), e.getReply());
} catch (RuntimeException e) {
abort(426, "Transient internal error", e);
abort(451, "Transient internal error", e);
}
}

@Override
public void finished(CacheException error)
{
super.finished(error);
transferCompleted(error);
}

protected synchronized void transferCompleted(CacheException error)
protected synchronized void onFinish()
{
try {
/* It may happen the transfer has been aborted
* already. This is not a failure.
*/
if (_aborted) {
return;
}

if (_completed) {
throw new RuntimeException("DoorTransferFinished message received more than once");
}

if (error != null) {
throw error;
}

/* Wait for adapter to shut down.
*/
if (_adapter != null) {
LOGGER.info("Waiting for adapter to finish.");
_adapter.join(300000); // 5 minutes
if (_adapter.isAlive()) {
throw new FTPCommandException(426, "FTP proxy did not shut down");
throw new FTPCommandException(451, "FTP proxy did not shut down");
} else if (_adapter.hasError()) {
throw new FTPCommandException(426, "FTP proxy failed: " + _adapter.getError());
throw new FTPCommandException(451, "FTP proxy failed: " + _adapter.getError());
}

LOGGER.debug("Closing adapter");
Expand All @@ -1088,25 +1075,17 @@ protected synchronized void transferCompleted(CacheException error)
}

notifyBilling(0, "");
_completed = true;
setTransfer(null);
reply("226 Transfer complete.");
} catch (CacheException e) {
abort(426, e.getMessage());
} catch (FTPCommandException e) {
abort(e.getCode(), e.getReply());
} catch (InterruptedException e) {
abort(426, "FTP proxy was interrupted", e);
abort(451, "FTP proxy was interrupted", e);
} catch (RuntimeException e) {
abort(426, "Transient internal error", e);
abort(451, "Transient internal error", e);
}
}

public void abort(int replyCode, String msg)
{
abort(replyCode, msg, null);
}

/**
* Aborts a transfer and performs all necessary cleanup steps,
* including killing movers and removing incomplete files. A
Expand All @@ -1120,18 +1099,10 @@ public void abort(int replyCode, String msg)
* not expect to appear in normal use (potential
* bugs). Communication errors and the like should not be
* logged with an exception.
*
* @param replyCode reply code to send the the client
* @param replyMsg error message to send back to the client
* @param exception exception to log or null
*/
public synchronized void abort(int replyCode, String replyMsg,
Exception exception)
@Override
protected synchronized void onFailure(Exception exception)
{
if (_aborted || _completed) {
return;
}

if (_perfMarkerTask != null) {
_perfMarkerTask.stop();
}
Expand All @@ -1141,8 +1112,6 @@ public synchronized void abort(int replyCode, String replyMsg,
_adapter = null;
}

killMover(_sleepAfterMoverKill * 1000);

if (isWrite()) {
if (_removeFileOnIncompleteTransfer) {
LOGGER.warn("Removing incomplete file {}: {}", getPnfsId(), _path);
Expand All @@ -1154,19 +1123,26 @@ public synchronized void abort(int replyCode, String replyMsg,

/* Report errors.
*/
int replyCode;
String replyMsg;
if (exception instanceof FTPCommandException) {
replyCode = ((FTPCommandException) exception).getCode();
replyMsg = ((FTPCommandException) exception).getReply();
} else {
replyCode = 451;
replyMsg = exception.getMessage();
}

String msg = String.valueOf(replyCode) + " " + replyMsg;
notifyBilling(replyCode, replyMsg);
if (_tLog != null) {
_tLog.error(msg);
_tLog = null;
}
if (exception == null) {
LOGGER.error("Transfer error: {}", msg);
} else {
LOGGER.error("Transfer error: {} ({})", msg, exception.getMessage());
LOGGER.error("Transfer error: {}", msg);
if (!(exception instanceof FTPCommandException)) {
LOGGER.debug(exception.toString(), exception);
}
_aborted = true;
setTransfer(null);
reply(msg);
}
Expand Down Expand Up @@ -1497,7 +1473,7 @@ public void messageArrived(CellMessage envelope,
LOGGER.debug("Received TransferStarted message");
FtpTransfer transfer = getTransfer();
if (transfer != null) {
transfer.transferStarted(envelope, message);
transfer.redirect(message);
}
}

Expand Down

0 comments on commit 9cf2b89

Please sign in to comment.