Skip to content

Commit

Permalink
make sure the scheduled executor uses daemon threads
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Mar 31, 2012
1 parent c9643db commit 56ce233
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 3 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject lamina "0.4.1-beta1"
(defproject lamina "0.4.1-beta2"
:description "event-driven data structures for clojure"
:dependencies [[org.clojure/clojure "1.3.0"]
[org.clojure/tools.logging "0.2.3"]
Expand Down
10 changes: 8 additions & 2 deletions src/lamina/core/timer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@
[java.util.concurrent
ScheduledThreadPoolExecutor
TimeUnit
TimeoutException]))
TimeoutException
ThreadFactory]))

(def delayed-executor
(ScheduledThreadPoolExecutor. (.availableProcessors (Runtime/getRuntime))))
(ScheduledThreadPoolExecutor.
(.availableProcessors (Runtime/getRuntime))
(reify ThreadFactory
(newThread [_ f]
(doto (Thread. f)
(.setDaemon true))))))

(defn delay-invoke [f delay]
(.schedule
Expand Down
54 changes: 54 additions & 0 deletions src/lamina/core/watch.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
;; Copyright (c) Zachary Tellman. All rights reserved.
;; The use and distribution terms for this software are covered by the
;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
;; which can be found in the file epl-v10.html at the root of this distribution.
;; By using this software in any fashion, you are agreeing to be bound by
;; the terms of this license.
;; You must not remove this notice, or any other, from this software.

(ns lamina.core.watch
(:use
[lamina.core channel utils])
(:import
[java.lang.ref
WeakReference]
[java.util
Collections
WeakHashMap]))

;;;

(deftype WeakRef [^WeakReference ref]
clojure.lang.IDeref
(deref [_] (.get ref)))

(defn weak-ref [x]
(WeakRef. x))

;;;

(defprotocol Watcher
(examine [_])
(update [_ val]))

(defn ref-watcher [reference callback]
(let [r (weak-ref reference)
tl (ThreadLocal.)
val (atom @reference)]
(reify Watcher
(examine [_]
(when-let [r @r]
(loop [x @val, y @r]
(if-not (= x y)
(if (compare-and-set! val x y)
(callback y)
(recur @val @r)))))))))

(defn watcher-channel [watchable]
(let [ch (channel @watchable)
k (gensym "watcher-channel")]
(add-watch watchable k
(fn [_ _ _ x] (enqueue ch x)))
(on-closed ch
#(remove-watch watchable k))
ch))

0 comments on commit 56ce233

Please sign in to comment.