Skip to content

Commit

Permalink
cells+dcache: fix Reply handling in batch processing
Browse files Browse the repository at this point in the history
Motivation:

A command may return a DelayedReply; such commands will (likely) return
before they have completed.  There are various places where a list of
commands is used to establish state, batch and setup files being two
examples. In some cases, these list of commands have commands that rely
on a previous command completing; for example, establishing a new mover
queue and configuring the job timeout for that queue.  Currently, if a
command relies on an earlier command that returns a DelayedReply then
there is a race-condition.

Modification:

Update list-of-command executing code so that they check whether the
returned object is a DelayedReply; if so, the execution of that command
will block until the result becomes available.

A side effect of this patch is that if a batch script calls a command
that returns a DelayedReply and that command throws a declared exception
then with this patch, that command invocation is considered a error and
the behaviour described by the "onerror" command is honoured.

Note that this patch is a stop-gap solution; the real solution would
involve refactoring Reply to support a blocking call.

Result:

Batch files are executed in strict order.  Some possible command
failures are correctly considered an error.

Target: master
Request: 3.0
Request: 2.16
Require-notes: yes
Require-book: no
Fixes: #2943
Patch: https://rb.dcache.org/r/9940/
Acked-by: Gerd Behrmann
Acked-by: Tigran Mkrtchyan
  • Loading branch information
paulmillar committed Nov 28, 2016
1 parent d082578 commit bbdd62c
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
7 changes: 7 additions & 0 deletions modules/cells/src/main/java/dmg/cells/nucleus/CellShell.java
Original file line number Diff line number Diff line change
Expand Up @@ -1697,6 +1697,10 @@ public void execute(String source, Reader in, Writer out, Writer err, Args args)
*/
Object answer = objectCommand2(line);

if (answer instanceof DelayedReply) {
answer = ((DelayedReply)answer).take();
}

/* Process result.
*/
if (!(answer instanceof Throwable)) {
Expand Down Expand Up @@ -1745,6 +1749,9 @@ public void execute(String source, Reader in, Writer out, Writer err, Args args)
}
}
}
} catch (InterruptedException e) {
throw new CommandExitException(String.format("%s: line %d: interrupted", source,
no));
} catch (IOException e) {
throw new IOException(String.format("%s: line %d: %s", source,
no, e.getMessage()), e);
Expand Down
10 changes: 10 additions & 0 deletions modules/cells/src/main/java/dmg/cells/nucleus/DelayedReply.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public synchronized void reply(Serializable msg)
if (_envelope != null) {
send();
}
notifyAll();
}

protected synchronized void send()
Expand All @@ -38,4 +39,13 @@ protected synchronized void send()
_envelope = null;
_endpoint = null;
}

public synchronized Serializable take() throws InterruptedException
{
while (_msg == null) {
wait();
}

return _msg;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@
import dmg.cells.nucleus.CellMessageReceiver;
import dmg.cells.nucleus.CellMessageSender;
import dmg.cells.nucleus.CellSetupProvider;
import dmg.cells.nucleus.DelayedReply;
import dmg.cells.nucleus.DomainContextAware;
import dmg.cells.nucleus.EnvironmentAware;
import dmg.util.CommandException;
import dmg.util.CommandExitException;
import dmg.util.CommandInterpreter;
import dmg.util.CommandPanicException;
import dmg.util.CommandThrowableException;
Expand Down Expand Up @@ -366,7 +368,12 @@ private void executeSetup(CommandInterpreter interpreter, String source, byte[]
continue;
}
try {
interpreter.command(new Args(line));
Serializable result = interpreter.command(new Args(line));
if (result instanceof DelayedReply) {
((DelayedReply)result).take();
}
} catch (InterruptedException e) {
throw new CommandExitException("Error at " + source + ":" + lineCount + ": command interrupted");
} catch (CommandPanicException e) {
throw new CommandPanicException("Error at " + source + ":" + lineCount + ": " + e.getMessage(), e);
} catch (CommandException e) {
Expand Down

0 comments on commit bbdd62c

Please sign in to comment.