Permalink
Browse files

take 2 on NREPL-17; no more per-session executors, coping with JDK5/6…

… differences
  • Loading branch information...
1 parent 26dbf11 commit 9f11cfddaa245d6544bf232e58b0fa0e4c11ab8b @cemerick cemerick committed Apr 16, 2012
@@ -68,49 +68,81 @@
(defn- configure-thread-factory
"Returns a new ThreadFactory for the given session. This implementation
generates daemon threads, with names that include the session id."
- [session]
- (let [session-thread-counter (AtomicLong. 0)
- session-id (-> session meta :id)]
+ []
+ (let [session-thread-counter (AtomicLong. 0)]
(reify ThreadFactory
(newThread [_ runnable]
(doto (Thread. runnable
- (format "nREPL-session-%s-worker-%s"
- session-id (.getAndIncrement session-thread-counter)))
+ (format "nREPL-worker-%s" (.getAndIncrement session-thread-counter)))
(.setDaemon true))))))
-(defn- session-executor
- "Returns a single-threaded Executor for the given session, configured to
- use an unbounded queue and allow unused threads to expire after 30s."
- [session & {:keys [keep-alive queue thread-factory]
- :or {keep-alive 30000
- queue (LinkedBlockingQueue.)}}]
- (ThreadPoolExecutor. 0 1 (long 30000) TimeUnit/MILLISECONDS queue
- (or thread-factory (configure-thread-factory session))))
+(def ^{:private true} jdk6? (try
+ (Class/forName "java.util.ServiceLoader")
+ true
+ (catch ClassNotFoundException e false)))
+
+(defn- configure-executor
+ "Returns a ThreadPoolExecutor, configured (by default) to
+ have no core threads, use an unbounded queue, create only daemon threads,
+ and allow unused threads to expire after 30s."
+ [& {:keys [keep-alive queue thread-factory]
+ :or {keep-alive 30000
+ queue (LinkedBlockingQueue.)}}]
+ ; ThreadPoolExecutor in JDK5 *will not run* submitted jobs if the core pool size is zero and
+ ; the queue has not yet rejected a job (see http://kirkwylie.blogspot.com/2008/10/java5-vs-java6-threadpoolexecutor.html)
+ (ThreadPoolExecutor. (if jdk6? 0 1) Integer/MAX_VALUE
+ (long 30000) TimeUnit/MILLISECONDS
+ queue
+ (or thread-factory (configure-thread-factory))))
(defn- prep-session
[session]
(locking session
(returning session
- (when-not (-> session meta :executor)
- (alter-meta! session assoc :executor (session-executor session))))))
+ (when-not (-> session meta :queue)
+ (alter-meta! session assoc :queue (atom clojure.lang.PersistentQueue/EMPTY))))))
+
+(declare run-next)
+(defn- run-next*
+ [session executor]
+ (let [qa (-> session meta :queue)]
+ (loop []
+ (let [q @qa
+ qn (pop q)]
+ (if-not (compare-and-set! qa q qn)
+ (recur)
+ (when (seq qn)
+ (.execute executor (run-next session executor (peek qn)))))))))
+
+(defn- run-next
+ [session executor f]
+ #(try
+ (f)
+ (finally
+ (run-next* session executor))))
(defn- queue-eval
- "Evaluates the function on the given session's queue/executor.
- The session's value will be reset to the return value of the function."
- [session f]
- (.submit (-> session prep-session meta :executor) f))
+ "Queues the function for the given session."
+ [session executor f]
+ (let [qa (-> session prep-session meta :queue)]
+ (loop []
+ (let [q @qa]
+ (if-not (compare-and-set! qa q (conj q f))
+ (recur)
+ (when (empty? q)
+ (.execute executor (run-next session executor f))))))))
(defn interruptible-eval
"Evaluation middleware that supports interrupts. Returns a handler that supports
\"eval\" and \"interrupt\" :op-erations that delegates to the given handler
otherwise."
- [h]
+ [h & {:keys [executor] :or {executor (configure-executor)}}]
(fn [{:keys [op session interrupt-id id transport] :as msg}]
(case op
"eval"
(if-not (:code msg)
(t/send transport (response-for msg :status #{:error :no-code}))
- (queue-eval session
+ (queue-eval session executor
(comp
(partial reset! session)
(fn []
@@ -144,3 +176,4 @@
(t/send transport (response-for msg :status #{:error :interrupt-id-mismatch :done}))))
(h msg))))
+
@@ -15,7 +15,7 @@
[clojure.set :as set])
(:import (java.util.concurrent BlockingQueue LinkedBlockingQueue TimeUnit)))
-(println (format "Testing with Clojure v%s" (clojure-version)))
+(println (format "Testing with Clojure v%s on %s" (clojure-version) (System/getProperty "java.version")))
(defn- internal-eval
([expr] (internal-eval nil expr))

0 comments on commit 9f11cfd

Please sign in to comment.