From b63af1ad6ce38b50552be3c424ea166cb063ee7c Mon Sep 17 00:00:00 2001 From: Chouser Date: Mon, 11 Jan 2010 02:04:32 -0500 Subject: [PATCH] Add agent error handlers and error modes :fail and :continue. Fixes #30 --- src/clj/clojure/core.clj | 89 ++++++++++++++-- src/clj/clojure/core_print.clj | 8 +- src/jvm/clojure/lang/Agent.java | 148 +++++++++++++++++++++------ test/clojure/test_clojure/agents.clj | 81 ++++++++++++++- 4 files changed, 280 insertions(+), 46 deletions(-) diff --git a/src/clj/clojure/core.clj b/src/clj/clojure/core.clj index 245c36430c..c253fc1b35 100644 --- a/src/clj/clojure/core.clj +++ b/src/clj/clojure/core.clj @@ -1329,14 +1329,29 @@ :validator validate-fn + :error-handler handler-fn + + :error-mode mode-keyword + If metadata-map is supplied, it will be come the metadata on the agent. validate-fn must be nil or a side-effect-free fn of one argument, which will be passed the intended new state on any state change. If the new state is unacceptable, the validate-fn should - return false or throw an exception." - ([state] (new clojure.lang.Agent state)) + return false or throw an exception. handler-fn is called if an + action throws an exception or if validate-fn rejects a new state -- + see set-error-handler! for details. The mode-keyword may be either + :continue (the default if an error-handler is given) or :fail (the + default if no error-handler is given) -- see set-error-mode! for + details." ([state & options] - (setup-reference (agent state) options))) + (let [a (new clojure.lang.Agent state) + opts (apply hash-map options)] + (setup-reference a options) + (when (:error-handler opts) + (.setErrorHandler a (:error-handler opts))) + (.setErrorMode a (or (:error-mode opts) + (if (:error-handler opts) :continue :fail))) + a))) (defn send "Dispatch an action to an agent. Returns the agent immediately. @@ -1388,16 +1403,73 @@ [#^clojure.lang.IRef reference key] (.removeWatch reference key)) +(defn agent-error + "Returns the exception thrown during an asynchronous action of the + agent if the agent is failed. Returns nil if the agent is not + failed." + [#^clojure.lang.Agent a] (.getError a)) + +(defn restart-agent + "When an agent is failed, changes the agent state to new-state and + then un-fails the agent so that sends are allowed again. If + a :clear-actions true option is given, any actions queued on the + agent that were being held while it was failed will be discarded, + otherwise those held actions will proceed. The new-state must pass + the validator if any, or restart will throw an exception and the + agent will remain failed with its old state and error. Watchers, if + any, will NOT be notified of the new state. Throws an exception if + the agent is not failed." + [#^clojure.lang.Agent a, new-state & options] + (let [opts (apply hash-map options)] + (.restart a new-state (if (:clear-actions opts) true false)))) + +(defn set-error-handler! + "Sets the error-handler of agent a to handler-fn. If an action + being run by the agent throws an exception or doesn't pass the + validator fn, handler-fn will be called with two arguments: the + agent and the exception." + [#^clojure.lang.Agent a, handler-fn] + (.setErrorHandler a handler-fn)) + +(defn error-handler + "Returns the error-handler of agent a, or nil if there is none. + See set-error-handler!" + [#^clojure.lang.Agent a] + (.getErrorHandler a)) + +(defn set-error-mode! + "Sets the error-mode of agent a to mode-keyword, which must be + either :fail or :continue. If an action being run by the agent + throws an exception or doesn't pass the validator fn, an + error-handler may be called (see set-error-handler!), after which, + if the mode is :continue, the agent will continue as if neither the + action that caused the error nor the error itself ever happened. + + If the mode is :fail, the agent will become failed and will stop + accepting new 'send' and 'send-off' actions, and any previously + queued actions will be held until a 'restart-agent'. Deref will + still work, returning the state of the agent before the error." + [#^clojure.lang.Agent a, mode-keyword] + (.setErrorMode a mode-keyword)) + +(defn error-mode + "Returns the error-mode of agent a. See set-error-mode!" + [#^clojure.lang.Agent a] + (.getErrorMode a)) (defn agent-errors - "Returns a sequence of the exceptions thrown during asynchronous + "DEPRECATED: Use 'agent-error' instead. + Returns a sequence of the exceptions thrown during asynchronous actions of the agent." - [#^clojure.lang.Agent a] (. a (getErrors))) + [a] + (when-let [e (agent-error a)] + (list e))) (defn clear-agent-errors - "Clears any exceptions thrown during asynchronous actions of the + "DEPRECATED: Use 'restart-agent' instead. + Clears any exceptions thrown during asynchronous actions of the agent, allowing subsequent actions to occur." - [#^clojure.lang.Agent a] (. a (clearErrors))) + [#^clojure.lang.Agent a] (restart-agent a (.deref a))) (defn shutdown-agents "Initiates a shutdown of the thread pools that back the agent @@ -2104,7 +2176,8 @@ (defn await "Blocks the current thread (indefinitely!) until all actions dispatched thus far, from this thread or agent, to the agent(s) have - occurred." + occurred. Will block on failed agents. Will never return if + a failed agent is restarted with :clear-actions true." [& agents] (io! "await in transaction" (when *agent* diff --git a/src/clj/clojure/core_print.clj b/src/clj/clojure/core_print.clj index d2b1612e12..0ab1ee52e4 100644 --- a/src/clj/clojure/core_print.clj +++ b/src/clj/clojure/core_print.clj @@ -309,9 +309,13 @@ (.write w ")")) (defmethod print-method clojure.lang.IDeref [o #^Writer w] - (print-sequential (format "#<%s@%x: " + (print-sequential (format "#<%s@%x%s: " (.getSimpleName (class o)) - (System/identityHashCode o)) + (System/identityHashCode o) + (if (and (instance? clojure.lang.Agent o) + (agent-error o)) + " FAILED" + "")) pr-on, "", ">", (list (if (and (future? o) (not (future-done? o))) :pending @o)), w)) (def #^{:private true} print-initialized true) diff --git a/src/jvm/clojure/lang/Agent.java b/src/jvm/clojure/lang/Agent.java index 310c826ba5..7d40ce77f6 100644 --- a/src/jvm/clojure/lang/Agent.java +++ b/src/jvm/clojure/lang/Agent.java @@ -17,10 +17,27 @@ import java.util.Map; public class Agent extends ARef { + +static class ActionQueue { + public final IPersistentStack q; + public final Throwable error; // non-null indicates fail state + static final ActionQueue EMPTY = new ActionQueue(PersistentQueue.EMPTY, null); + + public ActionQueue( IPersistentStack q, Throwable error ) + { + this.q = q; + this.error = error; + } +} + +static final Keyword CONTINUE = Keyword.intern(null, "continue"); +static final Keyword FAIL = Keyword.intern(null, "fail"); + volatile Object state; - AtomicReference q = new AtomicReference(PersistentQueue.EMPTY); + AtomicReference aq = new AtomicReference(ActionQueue.EMPTY); - volatile ISeq errors = null; + volatile Keyword errorMode = CONTINUE; + volatile IFn errorHandler = null; final public static ExecutorService pooledExecutor = Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors()); @@ -50,10 +67,24 @@ public Action(Agent agent, IFn fn, ISeq args, boolean solo){ } void execute(){ - if(solo) - soloExecutor.execute(this); - else - pooledExecutor.execute(this); + try + { + if(solo) + soloExecutor.execute(this); + else + pooledExecutor.execute(this); + } + catch(Throwable error) + { + if(agent.errorHandler != null) + { + try + { + agent.errorHandler.invoke(agent, error); + } + catch(Throwable e) {} // ignore errorHandler errors + } + } } static void doRun(Action action){ @@ -62,7 +93,7 @@ static void doRun(Action action){ Var.pushThreadBindings(RT.map(RT.AGENT, action.agent)); nested.set(PersistentVector.EMPTY); - boolean hadError = false; + Throwable error = null; try { Object oldval = action.agent.state; @@ -72,28 +103,41 @@ static void doRun(Action action){ } catch(Throwable e) { - //todo report/callback - action.agent.errors = RT.cons(e, action.agent.errors); - hadError = true; + error = e; } - if(!hadError) + if(error == null) { releasePendingSends(); } + else + { + nested.set(PersistentVector.EMPTY); + if(action.agent.errorHandler != null) + { + try + { + action.agent.errorHandler.invoke(action.agent, error); + } + catch(Throwable e) {} // ignore errorHandler errors + } + if(action.agent.errorMode == CONTINUE) + { + error = null; + } + } boolean popped = false; - IPersistentStack next = null; + ActionQueue next = null; while(!popped) { - IPersistentStack prior = action.agent.q.get(); - next = prior.pop(); - popped = action.agent.q.compareAndSet(prior, next); + ActionQueue prior = action.agent.aq.get(); + next = new ActionQueue(prior.q.pop(), error); + popped = action.agent.aq.compareAndSet(prior, next); } - if(next.count() > 0) - ((Action) next.peek()).execute(); - + if(error == null && next.q.count() > 0) + ((Action) next.q.peek()).execute(); } finally { @@ -124,25 +168,61 @@ boolean setState(Object newState) throws Exception{ } public Object deref() throws Exception{ - if(errors != null) - { - throw new Exception("Agent has errors", (Exception) RT.first(errors)); - } return state; } - public ISeq getErrors(){ - return errors; +public Throwable getError(){ + return aq.get().error; } -public void clearErrors(){ - errors = null; +public void setErrorMode(Keyword k){ + errorMode = k; +} + +public Keyword getErrorMode(){ + return errorMode; +} + +public void setErrorHandler(IFn f){ + errorHandler = f; +} + +public IFn getErrorHandler(){ + return errorHandler; +} + +synchronized public Object restart(Object newState, boolean clearActions){ + if(getError() == null) + { + throw new RuntimeException("Agent does not need a restart"); + } + validate(newState); + state = newState; + + if(clearActions) + aq.set(ActionQueue.EMPTY); + else + { + boolean restarted = false; + ActionQueue prior = null; + while(!restarted) + { + prior = aq.get(); + restarted = aq.compareAndSet(prior, new ActionQueue(prior.q, null)); + } + + if(prior.q.count() > 0) + ((Action) prior.q.peek()).execute(); + } + + return newState; } public Object dispatch(IFn fn, ISeq args, boolean solo) { - if(errors != null) + Throwable error = getError(); + if(error != null) { - throw new RuntimeException("Agent has errors", (Exception) RT.first(errors)); + throw new RuntimeException("Agent is failed, needs restart", error); } Action action = new Action(this, fn, args, solo); dispatchAction(action); @@ -164,22 +244,22 @@ else if(nested.get() != null) void enqueue(Action action){ boolean queued = false; - IPersistentStack prior = null; + ActionQueue prior = null; while(!queued) { - prior = q.get(); - queued = q.compareAndSet(prior, (IPersistentStack) prior.cons(action)); + prior = aq.get(); + queued = aq.compareAndSet(prior, new ActionQueue((IPersistentStack)prior.q.cons(action), prior.error)); } - if(prior.count() == 0) + if(prior.q.count() == 0 && prior.error == null) action.execute(); } public int getQueueCount(){ - return q.get().count(); + return aq.get().q.count(); } - static public int releasePendingSends(){ +static public int releasePendingSends(){ IPersistentVector sends = nested.get(); if(sends == null) return 0; diff --git a/test/clojure/test_clojure/agents.clj b/test/clojure/test_clojure/agents.clj index 63b6c8ccc3..6dc3579063 100644 --- a/test/clojure/test_clojure/agents.clj +++ b/test/clojure/test_clojure/agents.clj @@ -18,7 +18,7 @@ (send agt (fn [state] (throw (Throwable. "just testing Throwables")))) (try ;; Let the action finish; eat the "agent has errors" error that bubbles up - (await agt) + (await-for 100 agt) (catch RuntimeException _)) (is (instance? Throwable (first (agent-errors agt)))) (is (= 1 (count (agent-errors agt)))) @@ -27,9 +27,86 @@ (clear-agent-errors agt) (is (= nil @agt)) (send agt nil?) - (await agt) + (is (true? (await-for 100 agt))) (is (true? @agt)))) +(deftest default-modes + (is (= :fail (error-mode (agent nil)))) + (is (= :continue (error-mode (agent nil :error-handler println))))) + +(deftest continue-handler + (let [err (atom nil) + agt (agent 0 :error-mode :continue :error-handler #(reset! err %&))] + (send agt /) + (is (true? (await-for 100 agt))) + (is (= 0 @agt)) + (is (nil? (agent-error agt))) + (is (= agt (first @err))) + (is (true? (instance? ArithmeticException (second @err)))))) + +(deftest fail-handler + (let [err (atom nil) + agt (agent 0 :error-mode :fail :error-handler #(reset! err %&))] + (send agt /) + (Thread/sleep 100) + (is (true? (instance? ArithmeticException (agent-error agt)))) + (is (= 0 @agt)) + (is (= agt (first @err))) + (is (true? (instance? ArithmeticException (second @err)))) + (is (thrown? RuntimeException (send agt inc))))) + +(deftest restart-no-clear + (let [p (promise) + agt (agent 1 :error-mode :fail)] + (send agt (fn [v] @p)) + (send agt /) + (send agt inc) + (send agt inc) + (deliver p 0) + (Thread/sleep 100) + (is (= 0 @agt)) + (is (= ArithmeticException (class (agent-error agt)))) + (restart-agent agt 10) + (is (true? (await-for 100 agt))) + (is (= 12 @agt)) + (is (nil? (agent-error agt))))) + +(deftest restart-clear + (let [p (promise) + agt (agent 1 :error-mode :fail)] + (send agt (fn [v] @p)) + (send agt /) + (send agt inc) + (send agt inc) + (deliver p 0) + (Thread/sleep 100) + (is (= 0 @agt)) + (is (= ArithmeticException (class (agent-error agt)))) + (restart-agent agt 10 :clear-actions true) + (is (true? (await-for 100 agt))) + (is (= 10 @agt)) + (is (nil? (agent-error agt))) + (send agt inc) + (is (true? (await-for 100 agt))) + (is (= 11 @agt)) + (is (nil? (agent-error agt))))) + +(deftest invalid-restart + (let [p (promise) + agt (agent 2 :error-mode :fail :validator even?)] + (is (thrown? RuntimeException (restart-agent agt 4))) + (send agt (fn [v] @p)) + (send agt (partial + 2)) + (send agt (partial + 2)) + (deliver p 3) + (Thread/sleep 100) + (is (= 2 @agt)) + (is (= IllegalStateException (class (agent-error agt)))) + (is (thrown? RuntimeException (restart-agent agt 5))) + (restart-agent agt 6) + (is (true? (await-for 100 agt))) + (is (= 10 @agt)) + (is (nil? (agent-error agt))))) ; http://clojure.org/agents