Skip to content

Commit

Permalink
Add agent error handlers and error modes :fail and :continue. Fixes c…
Browse files Browse the repository at this point in the history
  • Loading branch information
Chouser committed Jan 13, 2010
1 parent 1186fe5 commit b63af1a
Show file tree
Hide file tree
Showing 4 changed files with 280 additions and 46 deletions.
89 changes: 81 additions & 8 deletions src/clj/clojure/core.clj
Expand Up @@ -1329,14 +1329,29 @@
:validator validate-fn
:error-handler handler-fn
:error-mode mode-keyword
If metadata-map is supplied, it will be come the metadata on the
agent. validate-fn must be nil or a side-effect-free fn of one
argument, which will be passed the intended new state on any state
change. If the new state is unacceptable, the validate-fn should
return false or throw an exception."
([state] (new clojure.lang.Agent state))
return false or throw an exception. handler-fn is called if an
action throws an exception or if validate-fn rejects a new state --
see set-error-handler! for details. The mode-keyword may be either
:continue (the default if an error-handler is given) or :fail (the
default if no error-handler is given) -- see set-error-mode! for
details."
([state & options]
(setup-reference (agent state) options)))
(let [a (new clojure.lang.Agent state)
opts (apply hash-map options)]
(setup-reference a options)
(when (:error-handler opts)
(.setErrorHandler a (:error-handler opts)))
(.setErrorMode a (or (:error-mode opts)
(if (:error-handler opts) :continue :fail)))
a)))

(defn send
"Dispatch an action to an agent. Returns the agent immediately.
Expand Down Expand Up @@ -1388,16 +1403,73 @@
[#^clojure.lang.IRef reference key]
(.removeWatch reference key))

(defn agent-error
"Returns the exception thrown during an asynchronous action of the
agent if the agent is failed. Returns nil if the agent is not
failed."
[#^clojure.lang.Agent a] (.getError a))

(defn restart-agent
"When an agent is failed, changes the agent state to new-state and
then un-fails the agent so that sends are allowed again. If
a :clear-actions true option is given, any actions queued on the
agent that were being held while it was failed will be discarded,
otherwise those held actions will proceed. The new-state must pass
the validator if any, or restart will throw an exception and the
agent will remain failed with its old state and error. Watchers, if
any, will NOT be notified of the new state. Throws an exception if
the agent is not failed."
[#^clojure.lang.Agent a, new-state & options]
(let [opts (apply hash-map options)]
(.restart a new-state (if (:clear-actions opts) true false))))

(defn set-error-handler!
"Sets the error-handler of agent a to handler-fn. If an action
being run by the agent throws an exception or doesn't pass the
validator fn, handler-fn will be called with two arguments: the
agent and the exception."
[#^clojure.lang.Agent a, handler-fn]
(.setErrorHandler a handler-fn))

(defn error-handler
"Returns the error-handler of agent a, or nil if there is none.
See set-error-handler!"
[#^clojure.lang.Agent a]
(.getErrorHandler a))

(defn set-error-mode!
"Sets the error-mode of agent a to mode-keyword, which must be
either :fail or :continue. If an action being run by the agent
throws an exception or doesn't pass the validator fn, an
error-handler may be called (see set-error-handler!), after which,
if the mode is :continue, the agent will continue as if neither the
action that caused the error nor the error itself ever happened.
If the mode is :fail, the agent will become failed and will stop
accepting new 'send' and 'send-off' actions, and any previously
queued actions will be held until a 'restart-agent'. Deref will
still work, returning the state of the agent before the error."
[#^clojure.lang.Agent a, mode-keyword]
(.setErrorMode a mode-keyword))

(defn error-mode
"Returns the error-mode of agent a. See set-error-mode!"
[#^clojure.lang.Agent a]
(.getErrorMode a))

(defn agent-errors
"Returns a sequence of the exceptions thrown during asynchronous
"DEPRECATED: Use 'agent-error' instead.
Returns a sequence of the exceptions thrown during asynchronous
actions of the agent."
[#^clojure.lang.Agent a] (. a (getErrors)))
[a]
(when-let [e (agent-error a)]
(list e)))

(defn clear-agent-errors
"Clears any exceptions thrown during asynchronous actions of the
"DEPRECATED: Use 'restart-agent' instead.
Clears any exceptions thrown during asynchronous actions of the
agent, allowing subsequent actions to occur."
[#^clojure.lang.Agent a] (. a (clearErrors)))
[#^clojure.lang.Agent a] (restart-agent a (.deref a)))

(defn shutdown-agents
"Initiates a shutdown of the thread pools that back the agent
Expand Down Expand Up @@ -2104,7 +2176,8 @@
(defn await
"Blocks the current thread (indefinitely!) until all actions
dispatched thus far, from this thread or agent, to the agent(s) have
occurred."
occurred. Will block on failed agents. Will never return if
a failed agent is restarted with :clear-actions true."
[& agents]
(io! "await in transaction"
(when *agent*
Expand Down
8 changes: 6 additions & 2 deletions src/clj/clojure/core_print.clj
Expand Up @@ -309,9 +309,13 @@
(.write w ")"))

(defmethod print-method clojure.lang.IDeref [o #^Writer w]
(print-sequential (format "#<%s@%x: "
(print-sequential (format "#<%s@%x%s: "
(.getSimpleName (class o))
(System/identityHashCode o))
(System/identityHashCode o)
(if (and (instance? clojure.lang.Agent o)
(agent-error o))
" FAILED"
""))
pr-on, "", ">", (list (if (and (future? o) (not (future-done? o))) :pending @o)), w))

(def #^{:private true} print-initialized true)
148 changes: 114 additions & 34 deletions src/jvm/clojure/lang/Agent.java
Expand Up @@ -17,10 +17,27 @@
import java.util.Map;

public class Agent extends ARef {

static class ActionQueue {
public final IPersistentStack q;
public final Throwable error; // non-null indicates fail state
static final ActionQueue EMPTY = new ActionQueue(PersistentQueue.EMPTY, null);

public ActionQueue( IPersistentStack q, Throwable error )
{
this.q = q;
this.error = error;
}
}

static final Keyword CONTINUE = Keyword.intern(null, "continue");
static final Keyword FAIL = Keyword.intern(null, "fail");

volatile Object state;
AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);
AtomicReference<ActionQueue> aq = new AtomicReference(ActionQueue.EMPTY);

volatile ISeq errors = null;
volatile Keyword errorMode = CONTINUE;
volatile IFn errorHandler = null;

final public static ExecutorService pooledExecutor =
Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
Expand Down Expand Up @@ -50,10 +67,24 @@ public Action(Agent agent, IFn fn, ISeq args, boolean solo){
}

void execute(){
if(solo)
soloExecutor.execute(this);
else
pooledExecutor.execute(this);
try
{
if(solo)
soloExecutor.execute(this);
else
pooledExecutor.execute(this);
}
catch(Throwable error)
{
if(agent.errorHandler != null)
{
try
{
agent.errorHandler.invoke(agent, error);
}
catch(Throwable e) {} // ignore errorHandler errors
}
}
}

static void doRun(Action action){
Expand All @@ -62,7 +93,7 @@ static void doRun(Action action){
Var.pushThreadBindings(RT.map(RT.AGENT, action.agent));
nested.set(PersistentVector.EMPTY);

boolean hadError = false;
Throwable error = null;
try
{
Object oldval = action.agent.state;
Expand All @@ -72,28 +103,41 @@ static void doRun(Action action){
}
catch(Throwable e)
{
//todo report/callback
action.agent.errors = RT.cons(e, action.agent.errors);
hadError = true;
error = e;
}

if(!hadError)
if(error == null)
{
releasePendingSends();
}
else
{
nested.set(PersistentVector.EMPTY);
if(action.agent.errorHandler != null)
{
try
{
action.agent.errorHandler.invoke(action.agent, error);
}
catch(Throwable e) {} // ignore errorHandler errors
}
if(action.agent.errorMode == CONTINUE)
{
error = null;
}
}

boolean popped = false;
IPersistentStack next = null;
ActionQueue next = null;
while(!popped)
{
IPersistentStack prior = action.agent.q.get();
next = prior.pop();
popped = action.agent.q.compareAndSet(prior, next);
ActionQueue prior = action.agent.aq.get();
next = new ActionQueue(prior.q.pop(), error);
popped = action.agent.aq.compareAndSet(prior, next);
}

if(next.count() > 0)
((Action) next.peek()).execute();

if(error == null && next.q.count() > 0)
((Action) next.q.peek()).execute();
}
finally
{
Expand Down Expand Up @@ -124,25 +168,61 @@ boolean setState(Object newState) throws Exception{
}

public Object deref() throws Exception{
if(errors != null)
{
throw new Exception("Agent has errors", (Exception) RT.first(errors));
}
return state;
}

public ISeq getErrors(){
return errors;
public Throwable getError(){
return aq.get().error;
}

public void clearErrors(){
errors = null;
public void setErrorMode(Keyword k){
errorMode = k;
}

public Keyword getErrorMode(){
return errorMode;
}

public void setErrorHandler(IFn f){
errorHandler = f;
}

public IFn getErrorHandler(){
return errorHandler;
}

synchronized public Object restart(Object newState, boolean clearActions){
if(getError() == null)
{
throw new RuntimeException("Agent does not need a restart");
}
validate(newState);
state = newState;

if(clearActions)
aq.set(ActionQueue.EMPTY);
else
{
boolean restarted = false;
ActionQueue prior = null;
while(!restarted)
{
prior = aq.get();
restarted = aq.compareAndSet(prior, new ActionQueue(prior.q, null));
}

if(prior.q.count() > 0)
((Action) prior.q.peek()).execute();
}

return newState;
}

public Object dispatch(IFn fn, ISeq args, boolean solo) {
if(errors != null)
Throwable error = getError();
if(error != null)
{
throw new RuntimeException("Agent has errors", (Exception) RT.first(errors));
throw new RuntimeException("Agent is failed, needs restart", error);
}
Action action = new Action(this, fn, args, solo);
dispatchAction(action);
Expand All @@ -164,22 +244,22 @@ else if(nested.get() != null)

void enqueue(Action action){
boolean queued = false;
IPersistentStack prior = null;
ActionQueue prior = null;
while(!queued)
{
prior = q.get();
queued = q.compareAndSet(prior, (IPersistentStack) prior.cons(action));
prior = aq.get();
queued = aq.compareAndSet(prior, new ActionQueue((IPersistentStack)prior.q.cons(action), prior.error));
}

if(prior.count() == 0)
if(prior.q.count() == 0 && prior.error == null)
action.execute();
}

public int getQueueCount(){
return q.get().count();
return aq.get().q.count();
}

static public int releasePendingSends(){
static public int releasePendingSends(){
IPersistentVector sends = nested.get();
if(sends == null)
return 0;
Expand Down

0 comments on commit b63af1a

Please sign in to comment.