Skip to content

Commit

Permalink
doors: include explanation when killing mover
Browse files Browse the repository at this point in the history
Motivation:

Many components outside of the pool, not just the door, may kill a
mover.  These components may kill a mover for a variety of reasons: from
normal operational activity through to bugs, door termination and
incorrect client behaviour.  Currently such activity is logged simply as
killed by door; however, this is not always true and in any case, lacks
potentially useful information as to why the mover was killed.

Modification:

Update the message that kills a mover to include an explanation as to
why this was done.  All components that can kill a mover are updated to
include this information when killing the mover.

Result:

More accurate information is logged when a mover is killed by some
component outside of the pool.

Target: master
Requires-notes: yes
Requires-book: no
Patch: https://rb.dcache.org/r/9690/
Acked-by: Gerd Behrmann
  • Loading branch information
paulmillar committed Aug 31, 2016
1 parent 249cb99 commit 7318a65
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 31 deletions.
Expand Up @@ -656,6 +656,7 @@ protected class SessionHandler {
protected Subject _subject;
protected Origin _origin;
protected Restriction _authz = Restrictions.denyAll();
protected String _explanation = "unspecified problem";

protected SessionHandler(int sessionId, int commandId, VspArgs args)
{
Expand Down Expand Up @@ -928,6 +929,7 @@ public void keepAlive(){
if( ( _timeout > 0L ) && ( _timeout < System.currentTimeMillis() ) ){
_log.warn("User timeout triggered") ;
sendReply("keepAlive" , 112 , "User timeout canceled session" ) ;
_explanation = "session cancelled: user timed out";
removeUs();
return ;
}
Expand All @@ -938,6 +940,7 @@ public void keepAlive(){
again(true);
} catch (RuntimeException e) {
sendReply("keepAlive", 111, e.getMessage());
_explanation = "bug detected resending messages: " + e.toString();
removeUs() ;
}
}
Expand Down Expand Up @@ -2123,7 +2126,8 @@ public void poolPassiveIoFileMessage( PoolPassiveIoFileMessage<byte[]> reply) {
public void removeUs() {
Integer moverId = _moverId;
if (moverId != null) {
PoolMoverKillMessage message = new PoolMoverKillMessage(_pool, moverId);
PoolMoverKillMessage message = new PoolMoverKillMessage(_pool,
moverId, "killed by door: " + _explanation);
message.setReplyRequired(false);

_cell.sendMessage(new CellMessage(new CellPath(_pool), message));
Expand Down
Expand Up @@ -483,7 +483,7 @@ public Layout layoutGet(CompoundContext context, Inode nfsInode, int layoutType,
nfsState.addDisposeListener((NFS4State state) -> {
Transfer t = _ioMessages.remove(stateid);
if (t != null) {
t.killMover(0);
t.killMover(0, "killed by door: disposed of LAYOUTGET state");
}
});

Expand Down Expand Up @@ -588,7 +588,7 @@ public void layoutReturn(CompoundContext context, stateid4 stateid) throws IOExc
}

_log.debug("Sending KILL to {}@{}", transfer.getMoverId(), transfer.getPool());
transfer.killMover(0);
transfer.killMover(0, "killed by door: returning layout");

try {
if(transfer.hasMover() && !transfer.waitForMover(500)) {
Expand Down Expand Up @@ -646,7 +646,8 @@ public class KillMoverCmd implements Callable<String> {

@Override
public String call() throws Exception {
PoolMoverKillMessage message = new PoolMoverKillMessage(pool, mover);
PoolMoverKillMessage message = new PoolMoverKillMessage(pool, mover,
"killed by door 'kill mover' command");
message.setReplyRequired(false);
_poolStub.notify(new CellPath(pool), message);
return "Done.";
Expand Down
Expand Up @@ -2,15 +2,26 @@

package diskCacheV111.vehicles;

import static java.util.Objects.requireNonNull;

public class PoolMoverKillMessage extends PoolMessage {

private static final long serialVersionUID = -8654307136745044047L;

private final String explanation;
public int moverId;
public PoolMoverKillMessage(String poolName, int moverId){

public PoolMoverKillMessage(String poolName, int moverId,
String explanation){
super(poolName);
this.moverId = moverId ;
setReplyRequired(true);
this.explanation = requireNonNull(explanation);
}

public String getExplanation()
{
return explanation;
}

public int getMoverId(){ return moverId ; }
Expand Down
Expand Up @@ -68,7 +68,8 @@ public void killMovers(Iterable<ActiveTransfersBean> transfers)
futures.put(transfer.getKey(),
_cellStub.send(new CellPath(transfer.getPool()),
new PoolMoverKillMessage(transfer.getPool(),
Ints.checkedCast(transfer.getMoverId()))));
Ints.checkedCast(transfer.getMoverId()),
"killed through webadmin")));
}

Collection<Long> failed = new ArrayList<>();
Expand Down
Expand Up @@ -675,7 +675,8 @@ public DcacheResource createFile(FsPath path, InputStream inputStream, Long leng
}
transfer.relayData(inputStream);
} finally {
transfer.killMover(_killTimeout, _killTimeoutUnit);
transfer.killMover(_killTimeout, _killTimeoutUnit,
"killed by door: proxy transfer complete");
}
success = true;
} finally {
Expand Down Expand Up @@ -725,7 +726,8 @@ public String getWriteUrl(FsPath path, Long length)
transfer.getMoverId() + ": Waiting for completion");
} finally {
if (uri == null) {
transfer.killMover(_killTimeout, _killTimeoutUnit);
transfer.killMover(_killTimeout, _killTimeoutUnit,
"killed by door: problem creating file");
transfer.deleteNameSpaceEntry();
}
}
Expand Down Expand Up @@ -759,21 +761,26 @@ public void readFile(FsPath path, PnfsId pnfsid,
URISyntaxException
{
ReadTransfer transfer = beginRead(path, pnfsid, true, null);
String explanation = "transfer completed";
try {
transfer.relayData(outputStream, range);
} catch (CacheException e) {
transfer.notifyBilling(e.getRc(), e.getMessage());
explanation = e.getMessage();
throw e;
} catch (InterruptedException e) {
transfer.notifyBilling(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
"Transfer interrupted");
explanation = "transfer interrupted";
throw e;
} catch (IOException | RuntimeException e) {
transfer.notifyBilling(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
e.toString());
explanation = "bug detected: " + e.toString();
throw e;
} finally {
transfer.killMover(_killTimeout, _killTimeoutUnit);
transfer.killMover(_killTimeout, _killTimeoutUnit,
"killed by door: " + explanation);
_transfers.remove((int) transfer.getId());
}
}
Expand Down Expand Up @@ -1038,6 +1045,7 @@ private ReadTransfer beginRead(FsPath path, PnfsId pnfsid, boolean isProxyTransf
{
Subject subject = getSubject();
Restriction restriction = getRestriction();
String explanation = "transfer complete";

String uri = null;
ReadTransfer transfer = new ReadTransfer(_pnfs, subject, restriction,
Expand All @@ -1060,18 +1068,22 @@ private ReadTransfer beginRead(FsPath path, PnfsId pnfsid, boolean isProxyTransf
transfer.getMoverId() + ": Waiting for completion");
} catch (CacheException e) {
transfer.notifyBilling(e.getRc(), e.getMessage());
explanation = e.getMessage();
throw e;
} catch (InterruptedException e) {
transfer.notifyBilling(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
"Transfer interrupted");
explanation = "transfer interrupted";
throw e;
} catch (RuntimeException e) {
transfer.notifyBilling(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
e.toString());
explanation = "bug detected: " + e.toString();
throw e;
} finally {
if (uri == null) {
transfer.killMover(_killTimeout, _killTimeoutUnit);
transfer.killMover(_killTimeout, _killTimeoutUnit,
"killed by door: " + explanation);
_transfers.remove((int) transfer.getId());
}
}
Expand Down Expand Up @@ -1110,7 +1122,8 @@ public void messageArrived(PoolIoFileMessage message)
{
if (message.getReturnCode() == 0) {
String pool = message.getPoolName();
_poolStub.notify(new CellPath(pool), new PoolMoverKillMessage(pool, message.getMoverId()));
_poolStub.notify(new CellPath(pool), new PoolMoverKillMessage(pool,
message.getMoverId(), "door timed out before pool"));
}
}

Expand Down
Expand Up @@ -363,6 +363,7 @@ public synchronized void finished(CacheException error)

InetSocketAddress address = null;
_transfers.put(handle, transfer);
String explanation = "unspecified problem";
try {
transfer.readNameSpaceEntry(false);
transfer.selectPoolAndStartMover(ioQueue == null ? _ioQueue : ioQueue, RETRY_POLICY);
Expand All @@ -374,19 +375,22 @@ public synchronized void finished(CacheException error)
transfer.setStatus("Mover " + transfer.getPool() + "/" +
transfer.getMoverId() + ": Sending");
} catch (CacheException e) {
explanation = e.getMessage();
transfer.notifyBilling(e.getRc(), e.getMessage());
throw e;
} catch (InterruptedException e) {
explanation = "transfer interrupted";
transfer.notifyBilling(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
"Transfer interrupted");
throw e;
} catch (RuntimeException e) {
explanation = "bug found: " + e.toString();
transfer.notifyBilling(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
e.toString());
throw e;
} finally {
if (address == null) {
transfer.killMover(0);
transfer.killMover(0, "killed by door: " + explanation);
_transfers.remove(handle);
}
}
Expand All @@ -409,6 +413,7 @@ public synchronized void finished(CacheException error)
int handle = transfer.getFileHandle();
InetSocketAddress address = null;
_transfers.put(handle, transfer);
String explanation = "problem within door";
try {
if (createDir) {
transfer.createNameSpaceEntryWithParents();
Expand All @@ -434,19 +439,22 @@ public synchronized void finished(CacheException error)
}
}
} catch (CacheException e) {
explanation = e.getMessage();
transfer.notifyBilling(e.getRc(), e.getMessage());
throw e;
} catch (InterruptedException e) {
explanation = "transfer interrupted";
transfer.notifyBilling(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
"Transfer interrupted");
throw e;
} catch (RuntimeException e) {
explanation = "bug found: " + e.toString();
transfer.notifyBilling(CacheException.UNEXPECTED_SYSTEM_EXCEPTION,
e.toString());
throw e;
} finally {
if (address == null) {
transfer.killMover(0);
transfer.killMover(0, "killed by door: " + explanation);
_transfers.remove(handle);
}
}
Expand Down Expand Up @@ -745,7 +753,8 @@ public void messageArrived(PoolIoFileMessage message)
{
if (message.getReturnCode() == 0) {
String pool = message.getPoolName();
_poolStub.notify(new CellPath(pool), new PoolMoverKillMessage(pool, message.getMoverId()));
_poolStub.notify(new CellPath(pool), new PoolMoverKillMessage(pool,
message.getMoverId(), "door timed out before pool"));
}
}

Expand Down Expand Up @@ -971,7 +980,7 @@ public Object ac_get_door_info(Args args)
for (Transfer transfer : _transfers.values()) {
if (transfer.getMoverId() == mover && transfer.getPool() != null && transfer.getPool().equals(pool)) {

transfer.killMover(0);
transfer.killMover(0, "killed by door 'kill mover' command");
return "Kill request to the mover " + mover + " has been submitted";
}
}
Expand Down
Expand Up @@ -344,6 +344,7 @@ private void copy(Subject subject,
// _target.setBillingStub();

boolean success = false;
String explanation = "copy failed";
_activeTransfers.put(_target.getId(), this);
_activeTransfers.put(_source.getId(), this);

Expand Down Expand Up @@ -372,15 +373,17 @@ private void copy(Subject subject,
} catch (CacheException e) {
_source.setStatus("Failed: " + e.toString());
_log.warn(e.toString());
explanation = "copy failed: " + e.getMessage();
throw e;
} catch (InterruptedException e) {
_source.setStatus("Failed: " + e.toString());
explanation = "interrupted";
throw e;
} finally {
if (!success) {
String status = _source.getStatus();
_source.killMover(0);
_target.killMover(1000);
_source.killMover(0, "killed by CopyManager: " + explanation);
_target.killMover(1000, "killed by CopyManager: " + explanation);
_target.deleteNameSpaceEntry();
_source.setStatus(status);
} else {
Expand Down
Expand Up @@ -638,7 +638,7 @@ public void timeout()
{
log.error(" transfer timed out");
if (moverId != null) {
killMover(moverId);
killMover(moverId, "timed out");
}
sendErrorReply(24, new IOException("timed out while waiting for mover reply"), false);
}
Expand All @@ -648,7 +648,7 @@ public void cancel(String explanation)
log.debug("transfer cancelled: {}", explanation);

if (moverId != null) {
killMover(moverId);
killMover(moverId, explanation);
}

// FIXME: sending the reply here removes the TransferManagerHandler
Expand Down Expand Up @@ -754,10 +754,11 @@ public void setPoolAddress(CellAddressCore poolAddress)
this.poolAddress = poolAddress;
}

public void killMover(int moverId)
public void killMover(int moverId, String explanation)
{
log.debug("sending mover kill to pool {} for moverId={}", pool, moverId);
PoolMoverKillMessage killMessage = new PoolMoverKillMessage(pool, moverId);
PoolMoverKillMessage killMessage = new PoolMoverKillMessage(pool, moverId,
"killed by TransferManagerHandler: " + explanation);
killMessage.setReplyRequired(false);
manager.getPoolStub().notify(new CellPath(poolAddress), killMessage);
}
Expand Down
Expand Up @@ -983,8 +983,9 @@ public PoolMoverKillMessage messageArrived(PoolMoverKillMessage kill)
try {
int id = kill.getMoverId();
MoverRequestScheduler js = _ioQueue.getQueueByJobId(id);
_log.info("Killing mover " + id);
js.cancel(id, "killed by door");
String explanation = kill.getExplanation();
_log.info("Killing mover {}: {}", id, explanation);
js.cancel(id, explanation);
kill.setSucceeded();
} catch (NoSuchElementException e) {
_log.info(e.toString());
Expand Down
Expand Up @@ -139,7 +139,7 @@ private synchronized void doQueued()
try {
isQueued = true;
if (isDone) {
doKill();
doKill("transfer aborted");
} else {
onQueued();
doRedirect();
Expand Down Expand Up @@ -176,18 +176,23 @@ private synchronized void doFinish()
private synchronized void doAbort(Throwable t)
{
if (!isDone) {
doKill();
doKill(explain(t));
onFailure(t);
isDone = true;
}
}

private synchronized void doKill()
protected String explain(Throwable t)
{
return String.valueOf(t);
}

private synchronized void doKill(String explanation)
{
if (queueFuture != null) {
queueFuture.cancel(true);
}
killMover(0);
killMover(0, "killed by door: " + explanation);
}

/**
Expand Down

0 comments on commit 7318a65

Please sign in to comment.