Skip to content

Commit

Permalink
Fix race condition in seque
Browse files Browse the repository at this point in the history
Signed-off-by: Stuart Halloway <stu@thinkrelevance.com>
  • Loading branch information
amalloy authored and stuarthalloway committed Aug 15, 2012
1 parent 54f790e commit 0e3a535
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 11 deletions.
29 changes: 18 additions & 11 deletions src/clj/clojure/core.clj
Expand Up @@ -4756,18 +4756,25 @@
n-or-q
(LinkedBlockingQueue. (int n-or-q)))
NIL (Object.) ;nil sentinel since LBQ doesn't support nils
agt (agent (seq s))
agt (agent (lazy-seq s)) ; never start with nil; that signifies we've already put eos
log-error (fn [q e]
(if (.offer q q)
(throw e)
e))
fill (fn [s]
(try
(loop [[x & xs :as s] s]
(if s
(if (.offer q (if (nil? x) NIL x))
(recur xs)
s)
(.put q q))) ; q itself is eos sentinel
(catch Exception e
(.put q q)
(throw e))))
(when s
(if (instance? Exception s) ; we failed to .offer an error earlier
(log-error q s)
(try
(loop [[x & xs :as s] (seq s)]
(if s
(if (.offer q (if (nil? x) NIL x))
(recur xs)
s)
(when-not (.offer q q) ; q itself is eos sentinel
()))) ; empty seq, not nil, so we know to put eos next time
(catch Exception e
(log-error q e))))))
drain (fn drain []
(lazy-seq
(let [x (.take q)]
Expand Down
22 changes: 22 additions & 0 deletions test/clojure/test_clojure/agents.clj
Expand Up @@ -151,6 +151,28 @@
(.join))
(is (= @a :thread-binding))))

;; check for a race condition that was causing seque to leak threads from the
;; send-off pool. Specifically, if we consume all items from the seque, and
;; the LBQ continues to grow, it means there was an agent action blocking on
;; the .put, which would block indefinitely outside of this test.
(deftest seque-threads
(let [queue-size 5
slow-seq (for [x (take (* 2 queue-size) (iterate inc 0))]
(do (Thread/sleep 25)
x))
small-lbq (java.util.concurrent.LinkedBlockingQueue. queue-size)
worker (seque small-lbq slow-seq)]
(doall worker)
(is (= worker slow-seq))
(Thread/sleep 250) ;; make sure agents have time to run or get blocked
(let [queue-backlog (.size small-lbq)]
(is (<= 0 queue-backlog queue-size))
(when-not (zero? queue-backlog)
(.take small-lbq)
(Thread/sleep 250) ;; see if agent was blocking, indicating a thread leak
(is (= (.size small-lbq)
(dec queue-backlog)))))))

; http://clojure.org/agents

; agent
Expand Down

0 comments on commit 0e3a535

Please sign in to comment.