Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

added set-agent-send(-off)-executor! and send-via

  • Loading branch information...
commit f5f4faf95051f794c9bfa0315e4457b600c84cef 1 parent 5af28af
Rich Hickey authored August 17, 2012
26  src/clj/clojure/core.clj
@@ -1877,6 +1877,28 @@
1877 1877
                             (if (:error-handler opts) :continue :fail)))
1878 1878
        a)))
1879 1879
 
  1880
+(defn set-agent-send-executor!
  1881
+  "Sets the ExecutorService to be used by send"
  1882
+  {:added "1.5"}
  1883
+  [executor]
  1884
+  (set! clojure.lang.Agent/pooledExecutor executor))
  1885
+
  1886
+(defn set-agent-send-off-executor!
  1887
+  "Sets the ExecutorService to be used by send-off"
  1888
+  {:added "1.5"}
  1889
+  [executor]
  1890
+  (set! clojure.lang.Agent/soloExecutor executor))
  1891
+
  1892
+(defn send-via
  1893
+  "Dispatch an action to an agent. Returns the agent immediately.
  1894
+  Subsequently, in a thread supplied by executor, the state of the agent
  1895
+  will be set to the value of:
  1896
+
  1897
+  (apply action-fn state-of-agent args)"
  1898
+  {:added "1.5"}
  1899
+  [executor ^clojure.lang.Agent a f & args]
  1900
+  (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args executor))
  1901
+
1880 1902
 (defn send
1881 1903
   "Dispatch an action to an agent. Returns the agent immediately.
1882 1904
   Subsequently, in a thread from a thread pool, the state of the agent
@@ -1886,7 +1908,7 @@
1886 1908
   {:added "1.0"
1887 1909
    :static true}
1888 1910
   [^clojure.lang.Agent a f & args]
1889  
-  (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args false))
  1911
+  (apply send-via clojure.lang.Agent/pooledExecutor a f args))
1890 1912
 
1891 1913
 (defn send-off
1892 1914
   "Dispatch a potentially blocking action to an agent. Returns the
@@ -1897,7 +1919,7 @@
1897 1919
   {:added "1.0"
1898 1920
    :static true}
1899 1921
   [^clojure.lang.Agent a f & args]
1900  
-  (.dispatch a (binding [*agent* a] (binding-conveyor-fn f)) args true))
  1922
+  (apply send-via clojure.lang.Agent/soloExecutor a f args))
1901 1923
 
1902 1924
 (defn release-pending-sends
1903 1925
   "Normally, actions sent directly or indirectly during another action
20  src/jvm/clojure/lang/Agent.java
@@ -12,6 +12,7 @@
12 12
 
13 13
 package clojure.lang;
14 14
 
  15
+import java.util.concurrent.Executor;
15 16
 import java.util.concurrent.ExecutorService;
16 17
 import java.util.concurrent.Executors;
17 18
 import java.util.concurrent.ThreadFactory;
@@ -45,11 +46,11 @@ public ActionQueue( IPersistentStack q, Throwable error )
45 46
 
46 47
 final private static AtomicLong sendOffThreadPoolCounter = new AtomicLong(0);
47 48
 
48  
-final public static ExecutorService pooledExecutor =
  49
+volatile public static ExecutorService pooledExecutor =
49 50
 	Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors(), 
50 51
 		createThreadFactory("clojure-agent-send-pool-%d", sendThreadPoolCounter));
51 52
 
52  
-final public static ExecutorService soloExecutor = Executors.newCachedThreadPool(
  53
+volatile public static ExecutorService soloExecutor = Executors.newCachedThreadPool(
53 54
 	createThreadFactory("clojure-agent-send-off-pool-%d", sendOffThreadPoolCounter));
54 55
 
55 56
 final static ThreadLocal<IPersistentVector> nested = new ThreadLocal<IPersistentVector>();
@@ -73,23 +74,20 @@ public static void shutdown(){
73 74
 	final Agent agent;
74 75
 	final IFn fn;
75 76
 	final ISeq args;
76  
-	final boolean solo;
  77
+	final Executor exec;
77 78
 
78 79
 
79  
-	public Action(Agent agent, IFn fn, ISeq args, boolean solo){
  80
+	public Action(Agent agent, IFn fn, ISeq args, Executor exec){
80 81
 		this.agent = agent;
81 82
 		this.args = args;
82 83
 		this.fn = fn;
83  
-		this.solo = solo;
  84
+		this.exec = exec;
84 85
 	}
85 86
 
86 87
 	void execute(){
87 88
 		try
88 89
 			{
89  
-			if(solo)
90  
-				soloExecutor.execute(this);
91  
-			else
92  
-				pooledExecutor.execute(this);
  90
+			exec.execute(this);
93 91
 			}
94 92
 		catch(Throwable error)
95 93
 			{
@@ -233,13 +231,13 @@ synchronized public Object restart(Object newState, boolean clearActions){
233 231
 	return newState;
234 232
 }
235 233
 
236  
-public Object dispatch(IFn fn, ISeq args, boolean solo) {
  234
+public Object dispatch(IFn fn, ISeq args, Executor exec) {
237 235
 	Throwable error = getError();
238 236
 	if(error != null)
239 237
 		{
240 238
 		throw Util.runtimeException("Agent is failed, needs restart", error);
241 239
 		}
242  
-	Action action = new Action(this, fn, args, solo);
  240
+	Action action = new Action(this, fn, args, exec);
243 241
 	dispatchAction(action);
244 242
 
245 243
 	return this;

0 notes on commit f5f4faf

Please sign in to comment.
Something went wrong with that request. Please try again.