diff --git a/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/AbstractFtpDoorV1.java b/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/AbstractFtpDoorV1.java index 91cb7669051..d4af9d928f0 100644 --- a/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/AbstractFtpDoorV1.java +++ b/modules/dcache-ftp/src/main/java/org/dcache/ftp/door/AbstractFtpDoorV1.java @@ -175,11 +175,11 @@ LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING import org.dcache.namespace.PermissionHandler; import org.dcache.namespace.PosixPermissionHandler; import org.dcache.services.login.RemoteLoginStrategy; +import org.dcache.util.AsynchronousRedirectedTransfer; import org.dcache.util.Checksum; import org.dcache.util.ChecksumType; import org.dcache.util.Glob; import org.dcache.util.PortRange; -import org.dcache.util.Transfer; import org.dcache.util.TransferRetryPolicy; import org.dcache.util.list.DirectoryEntry; import org.dcache.util.list.DirectoryListPrinter; @@ -210,6 +210,18 @@ public FTPCommandException(int code, String reply) this(code, reply, reply); } + /** + * Constructs a command exception with the given ftp reply code and + * message. The message will be used for both the public FTP reply + * string and for the exception message. + */ + public FTPCommandException(int code, String reply, Exception cause) + { + super(reply, cause); + _code = code; + _reply = reply; + } + /** * Constructs a command exception with the given ftp reply code, * public and internal message. @@ -636,8 +648,6 @@ private static String buildChecksumList(){ ) protected TimeUnit _performanceMarkerPeriodUnit; - protected final int _sleepAfterMoverKill = 1; // seconds - protected final int _spaceManagerTimeout = 5 * 60; protected PortRange _passiveModePortRange; @@ -711,7 +721,7 @@ private static String buildChecksumList(){ /** * Encapsulation of an FTP transfer. */ - protected class FtpTransfer extends Transfer + protected class FtpTransfer extends AsynchronousRedirectedTransfer { private final Mode _mode; private final String _xferMode; @@ -737,16 +747,6 @@ protected class FtpTransfer extends Transfer */ protected PerfMarkerTask _perfMarkerTask; - /** - * True if the transfer was aborted. - */ - protected boolean _aborted; - - /** - * True if the transfer completed successfully. - */ - protected boolean _completed; - public FtpTransfer(FsPath path, long offset, long size, @@ -946,130 +946,117 @@ public void setChecksum(Checksum checksum) } } - protected synchronized void startTransfer() + @Override + public synchronized void startMover(String queue, long timeout) + throws CacheException, InterruptedException { - if (_adapter != null) { - _adapter.start(); + super.startMover(queue, timeout); + setStatus("Mover " + getPool() + "/" + getMoverId()); + if (_version == 1) { + redirect(null); } + } - setStatus("Mover " + getPool() + "/" + getMoverId() + ": " + - (isWrite() ? "Receiving" : "Sending")); - - reply("150 Opening BINARY data connection for " + _path, false); + public void abort(int replyCode, String msg) + { + doAbort(new FTPCommandException(replyCode, msg)); + } - if (isWrite() && _xferMode.equals("E") && _performanceMarkerPeriod > 0) { - long period = _performanceMarkerPeriodUnit.toMillis(_performanceMarkerPeriod); - long timeout = period / 2; - _perfMarkerTask = - new PerfMarkerTask(getPoolAddress(), getMoverId(), timeout); - TIMER.schedule(_perfMarkerTask, period, period); - } + public void abort(int replyCode, String msg, Exception exception) + { + doAbort(new FTPCommandException(replyCode, msg, exception)); } @Override - public synchronized void startMover(String queue, long timeout) - throws CacheException, InterruptedException + protected void onQueued() { - super.startMover(queue, timeout); setStatus("Mover " + getPool() + "/" + getMoverId()); - if (_version == 1) { - startTransfer(); - } } - public synchronized void transferStarted(CellMessage envelope, - GFtpTransferStartedMessage message) + @Override + protected synchronized void onRedirect(GFtpTransferStartedMessage redirect) { try { - if (_aborted || _completed) { - return; - } + if (redirect != null) { + if (_version != 2) { + LOGGER.error("Received unexpected GFtpTransferStartedMessage for {}", redirect.getPnfsId()); + return; + } - if (_version != 2) { - LOGGER.error("Received unexpected GFtpTransferStartedMessage for {} from {}", message - .getPnfsId(), envelope.getSourcePath()); - return; - } + if (!redirect.getPnfsId().equals(getPnfsId().getId())) { + LOGGER.error("GFtpTransferStartedMessage has wrong ID, expected {} but got {}", getPnfsId(), redirect.getPnfsId()); + throw new FTPCommandException(451, "Transient internal failure"); + } - if (!message.getPnfsId().equals(getPnfsId().getId())) { - LOGGER.error("GFtpTransferStartedMessage has wrong ID, expected {} but got {}", getPnfsId(), message - .getPnfsId()); - throw new FTPCommandException(451, "Transient internal failure"); - } + if (redirect.getPassive() && !_reply127) { + LOGGER.error("Pool unexpectedly volunteered to be passive"); + throw new FTPCommandException(451, "Transient internal failure"); + } + + /* If passive X mode was requested, but the pool rejected + * it, then we have to fail for now. REVISIT: We should + * use the other adapter in this case. + */ + if (_mode == Mode.PASSIVE && !redirect.getPassive() && _xferMode.equals("X")) { + throw new FTPCommandException(504, "Cannot use passive X mode"); + } - if (message.getPassive() && !_reply127) { - LOGGER.error("Pool {} unexpectedly volunteered to be passive", envelope.getSourcePath()); - throw new FTPCommandException(451, "Transient internal failure"); + /* Determine the 127 response address to send back to the + * client. When the pool is passive, this is the address of + * the pool (and in this case we no longer need the + * adapter). Otherwise this is the address of the adapter. + */ + if (redirect.getPassive()) { + assert _reply127; + assert _adapter != null; + + reply127PORT(redirect.getPoolAddress()); + + LOGGER.info("Closing adapter"); + _adapter.close(); + _adapter = null; + } else if (_reply127) { + reply127PORT(new InetSocketAddress(_localAddress.getAddress(), + _adapter.getClientListenerPort())); + } } - /* If passive X mode was requested, but the pool rejected - * it, then we have to fail for now. REVISIT: We should - * use the other adapter in this case. - */ - if (_mode == Mode.PASSIVE && !message.getPassive() && _xferMode.equals("X")) { - throw new FTPCommandException(504, "Cannot use passive X mode"); + if (_adapter != null) { + _adapter.start(); } - /* Determine the 127 response address to send back to the - * client. When the pool is passive, this is the address of - * the pool (and in this case we no longer need the - * adapter). Otherwise this is the address of the adapter. - */ - if (message.getPassive()) { - assert _reply127; - assert _adapter != null; + setStatus("Mover " + getPool() + "/" + getMoverId() + ": " + + (isWrite() ? "Receiving" : "Sending")); - reply127PORT(message.getPoolAddress()); + reply("150 Opening BINARY data connection for " + _path, false); - LOGGER.info("Closing adapter"); - _adapter.close(); - _adapter = null; - } else if (_reply127) { - reply127PORT(new InetSocketAddress(_localAddress.getAddress(), - _adapter.getClientListenerPort())); + if (isWrite() && _xferMode.equals("E") && _performanceMarkerPeriod > 0) { + long period = _performanceMarkerPeriodUnit.toMillis(_performanceMarkerPeriod); + long timeout = period / 2; + _perfMarkerTask = + new PerfMarkerTask(getPoolAddress(), getMoverId(), timeout); + TIMER.schedule(_perfMarkerTask, period, period); } - startTransfer(); } catch (FTPCommandException e) { abort(e.getCode(), e.getReply()); } catch (RuntimeException e) { - abort(426, "Transient internal error", e); + abort(451, "Transient internal error", e); } } @Override - public void finished(CacheException error) - { - super.finished(error); - transferCompleted(error); - } - - protected synchronized void transferCompleted(CacheException error) + protected synchronized void onFinish() { try { - /* It may happen the transfer has been aborted - * already. This is not a failure. - */ - if (_aborted) { - return; - } - - if (_completed) { - throw new RuntimeException("DoorTransferFinished message received more than once"); - } - - if (error != null) { - throw error; - } - /* Wait for adapter to shut down. */ if (_adapter != null) { LOGGER.info("Waiting for adapter to finish."); _adapter.join(300000); // 5 minutes if (_adapter.isAlive()) { - throw new FTPCommandException(426, "FTP proxy did not shut down"); + throw new FTPCommandException(451, "FTP proxy did not shut down"); } else if (_adapter.hasError()) { - throw new FTPCommandException(426, "FTP proxy failed: " + _adapter.getError()); + throw new FTPCommandException(451, "FTP proxy failed: " + _adapter.getError()); } LOGGER.debug("Closing adapter"); @@ -1088,25 +1075,17 @@ protected synchronized void transferCompleted(CacheException error) } notifyBilling(0, ""); - _completed = true; setTransfer(null); reply("226 Transfer complete."); - } catch (CacheException e) { - abort(426, e.getMessage()); } catch (FTPCommandException e) { abort(e.getCode(), e.getReply()); } catch (InterruptedException e) { - abort(426, "FTP proxy was interrupted", e); + abort(451, "FTP proxy was interrupted", e); } catch (RuntimeException e) { - abort(426, "Transient internal error", e); + abort(451, "Transient internal error", e); } } - public void abort(int replyCode, String msg) - { - abort(replyCode, msg, null); - } - /** * Aborts a transfer and performs all necessary cleanup steps, * including killing movers and removing incomplete files. A @@ -1120,18 +1099,10 @@ public void abort(int replyCode, String msg) * not expect to appear in normal use (potential * bugs). Communication errors and the like should not be * logged with an exception. - * - * @param replyCode reply code to send the the client - * @param replyMsg error message to send back to the client - * @param exception exception to log or null */ - public synchronized void abort(int replyCode, String replyMsg, - Exception exception) + @Override + protected synchronized void onFailure(Exception exception) { - if (_aborted || _completed) { - return; - } - if (_perfMarkerTask != null) { _perfMarkerTask.stop(); } @@ -1141,8 +1112,6 @@ public synchronized void abort(int replyCode, String replyMsg, _adapter = null; } - killMover(_sleepAfterMoverKill * 1000); - if (isWrite()) { if (_removeFileOnIncompleteTransfer) { LOGGER.warn("Removing incomplete file {}: {}", getPnfsId(), _path); @@ -1154,19 +1123,26 @@ public synchronized void abort(int replyCode, String replyMsg, /* Report errors. */ + int replyCode; + String replyMsg; + if (exception instanceof FTPCommandException) { + replyCode = ((FTPCommandException) exception).getCode(); + replyMsg = ((FTPCommandException) exception).getReply(); + } else { + replyCode = 451; + replyMsg = exception.getMessage(); + } + String msg = String.valueOf(replyCode) + " " + replyMsg; notifyBilling(replyCode, replyMsg); if (_tLog != null) { _tLog.error(msg); _tLog = null; } - if (exception == null) { - LOGGER.error("Transfer error: {}", msg); - } else { - LOGGER.error("Transfer error: {} ({})", msg, exception.getMessage()); + LOGGER.error("Transfer error: {}", msg); + if (!(exception instanceof FTPCommandException)) { LOGGER.debug(exception.toString(), exception); } - _aborted = true; setTransfer(null); reply(msg); } @@ -1497,7 +1473,7 @@ public void messageArrived(CellMessage envelope, LOGGER.debug("Received TransferStarted message"); FtpTransfer transfer = getTransfer(); if (transfer != null) { - transfer.transferStarted(envelope, message); + transfer.redirect(message); } } diff --git a/modules/dcache/src/main/java/org/dcache/util/AsynchronousRedirectedTransfer.java b/modules/dcache/src/main/java/org/dcache/util/AsynchronousRedirectedTransfer.java new file mode 100644 index 00000000000..3c9bb335264 --- /dev/null +++ b/modules/dcache/src/main/java/org/dcache/util/AsynchronousRedirectedTransfer.java @@ -0,0 +1,136 @@ +package org.dcache.util; + +import javax.security.auth.Subject; + +import diskCacheV111.util.CacheException; +import diskCacheV111.util.FsPath; +import diskCacheV111.util.PnfsHandler; +import diskCacheV111.vehicles.PoolMoverKillMessage; + +import dmg.cells.nucleus.CellAddressCore; +import dmg.cells.nucleus.CellPath; +import dmg.cells.nucleus.NoRouteToCellException; + +/** + * A transfer where the mover can send a redirect message to the door + * asynchronously. + * + * The transfer startup phase is blocking and identical to a regular Transfer, + * however notification of redirect and transfer completion is done + * asynchronously. Subclasses are to implement onQueued, onRedirect, onFinish and + * onFailure. The class deals with out of order notifications and guarantees + * that: + * + * - a mover ID is known before onQueued is called + * - onQueued is always called before onRedirect + * - onRedirect is always called before onFinish + * - that onRedirect and onFinish are not called once onFailure was called + * + * The class implements automatic killing of the mover in case the transfer is + * aborted. + */ +public abstract class AsynchronousRedirectedTransfer extends Transfer +{ + private T _redirectObject; + private boolean _isRedirected; + private boolean _isFinished; + private boolean _isDone; + + public AsynchronousRedirectedTransfer(PnfsHandler pnfs, Subject namespaceSubject, Subject subject, FsPath path) { + super(pnfs, namespaceSubject, subject, path); + } + + public AsynchronousRedirectedTransfer(PnfsHandler pnfs, Subject subject, FsPath path) { + super(pnfs, subject, path); + } + + @Override + public void startMover(String queue, long timeout) throws CacheException, InterruptedException + { + super.startMover(queue, timeout); + doQueued(); + } + + protected synchronized void doQueued() + { + if (_isDone) { + doKill(); + } else { + onQueued(); + doRedirect(); + } + } + + protected synchronized void doRedirect() + { + if (!_isDone && getMoverId() != null && _isRedirected) { + onRedirect(_redirectObject); + doFinish(); + } + } + + protected synchronized void doFinish() + { + if (!_isDone && getMoverId() != null && _isRedirected && _isFinished) { + onFinish(); + _isDone = true; + } + } + + protected synchronized void doAbort(Exception exception) + { + if (!_isDone) { + doKill(); + onFailure(exception); + _isDone = true; + } + } + + protected synchronized void doKill() + { + if (hasMover()) { + Integer moverId = getMoverId(); + String pool = getPool(); + CellAddressCore poolAddress = getPoolAddress(); + try { + PoolMoverKillMessage message = + new PoolMoverKillMessage(pool, moverId); + message.setReplyRequired(false); + _pool.send(new CellPath(poolAddress), message); + } catch (NoRouteToCellException e) { + _log.error("Failed to kill mover " + pool + "/" + moverId + ": " + e.getMessage()); + } + } + } + + /** + * Signals that the transfer is redirected. + */ + public synchronized void redirect(T object) + { + _redirectObject = object; + _isRedirected = true; + doRedirect(); + } + + @Override + public synchronized void finished(CacheException error) + { + super.finished(error); + if (error != null) { + doAbort(error); + } else { + _isFinished = true; + doFinish(); + } + } + + protected abstract void onQueued(); + + protected abstract void onRedirect(T object); + + protected abstract void onFinish(); + + protected abstract void onFailure(Exception exception); + +} diff --git a/modules/dcache/src/main/java/org/dcache/util/Transfer.java b/modules/dcache/src/main/java/org/dcache/util/Transfer.java index 16b6dacc7b1..846907e6532 100644 --- a/modules/dcache/src/main/java/org/dcache/util/Transfer.java +++ b/modules/dcache/src/main/java/org/dcache/util/Transfer.java @@ -64,7 +64,7 @@ */ public class Transfer implements Comparable { - private static final Logger _log = LoggerFactory.getLogger(Transfer.class); + protected static final Logger _log = LoggerFactory.getLogger(Transfer.class); private static final TimebasedCounter _sessionCounter = new TimebasedCounter();