Skip to content
Browse files

Merge branch 'clj-1.3'

  • Loading branch information...
2 parents e4e3d16 + 4383f0b commit 94579827d220c4e99f9c31b3d5d12e773cad7c8f @ztellman ztellman committed
View
4 .gitignore
@@ -4,6 +4,6 @@ lib
classes
push
local-push
-DS_Store
+*DS_Store
autodoc
-.lein-failures
+.lein*
View
4 .travis.yml
@@ -0,0 +1,4 @@
+language: clojure
+before_install:
+ - lein plugin install lein-multi 1.1.0
+script: lein multi test
View
17 project.clj
@@ -1,9 +1,14 @@
-(defproject lamina "0.4.0"
+(defproject lamina "0.4.1"
:description "event-driven data structures for clojure"
- :dependencies [[org.clojure/clojure "1.2.0"]
- [org.clojure/clojure-contrib "1.2.0"]
- [potemkin "0.1.0"]]
- ;;:jvm-opts ["-agentlib:jdwp=transport=dt_socket,address=8030,server=y,suspend=n"]
+ :dependencies [[org.clojure/clojure "1.3.0"]
+ [org.clojure/tools.logging "0.2.3"]
+ [potemkin "0.1.3"]]
+ :multi-deps {:all [[org.clojure/tools.logging "0.2.3"]
+ [potemkin "0.1.3"]]
+ "1.2" [[org.clojure/clojure "1.2.1"]]}
+ :repositories {"sonatype-oss-public" "https://oss.sonatype.org/content/groups/public/"}
+ :exclusions [org.clojure/contrib
+ org.clojure/clojure-contrib]
:license {:name "Eclipse Public License - v 1.0"
:url "http://www.eclipse.org/legal/epl-v10.html"
- :distribution :repo})
+ :distribution :repo})
View
18 src/lamina/api.clj
@@ -14,14 +14,14 @@
[lamina.core.channel :as channel]
[lamina.core.seq :as seq]))
-(import-fn #'pipeline/poll-result)
-(import-fn #'pipeline/success-result)
-(import-fn #'pipeline/error-result)
-(import-fn #'pipeline/success!)
-(import-fn #'pipeline/error!)
-(import-fn #'pipeline/closed-result)
-(import-fn #'pipeline/drained-result)
+(import-fn pipeline/poll-result)
+(import-fn pipeline/success-result)
+(import-fn pipeline/error-result)
+(import-fn pipeline/success!)
+(import-fn pipeline/error!)
+(import-fn pipeline/closed-result)
+(import-fn pipeline/drained-result)
-(import-fn #'seq/copy)
+(import-fn seq/copy)
-(import-fn #'channel/dequeue)
+(import-fn channel/dequeue)
View
4 src/lamina/connections.clj
@@ -51,7 +51,7 @@
(success! @result ::close)
(reset! result (success-result ::close))
(reset! latch false)
- (run-pipeline connection close))))
+ (run-pipeline connection #(when (channel? %) (close %))))))
;; run connection loop
(run-pipeline nil
@@ -76,6 +76,8 @@
(new-connection-callback ch))
(fn [_]
(future (success! @result ch))
+ (Thread/yield)
+ (when-not @latch (close ch))
(wait-for-close ch))))
;; wait here for connection to drop
(fn [_]
View
107 src/lamina/core.clj
@@ -28,28 +28,28 @@
;;;; CHANNELS
;; core channel functions
-(import-fn #'channel/receive)
-(import-fn #'channel/cancel-callback)
-(import-fn #'channel/enqueue)
-(import-fn #'channel/enqueue-and-close)
-(import-fn #'channel/close)
-(import-fn #'channel/on-closed)
-(import-fn #'channel/on-drained)
-(import-fn #'channel/drained?)
-(import-fn #'channel/closed?)
-(import-fn #'channel/channel?)
-(import-fn #'seq/receive-all)
-(import-fn #'channel/poll)
+(import-fn channel/receive)
+(import-fn channel/cancel-callback)
+(import-fn channel/enqueue)
+(import-fn channel/enqueue-and-close)
+(import-fn channel/close)
+(import-fn channel/on-closed)
+(import-fn channel/on-drained)
+(import-fn channel/drained?)
+(import-fn channel/closed?)
+(import-fn channel/channel?)
+(import-fn seq/receive-all)
+(import-fn channel/poll)
;; channel variants
-(import-fn #'channel/splice)
-(import-fn #'channel/channel)
-(import-fn #'channel/channel-pair)
-(import-fn #'channel/permanent-channel)
-(import-fn #'channel/constant-channel)
-(import-fn #'channel/closed-channel)
-(import-fn #'channel/timed-channel)
-(import-fn #'channel/proxy-channel)
+(import-fn channel/splice)
+(import-fn channel/channel)
+(import-fn channel/channel-pair)
+(import-fn channel/permanent-channel)
+(import-fn channel/constant-channel)
+(import-fn channel/closed-channel)
+(import-fn channel/timed-channel)
+(import-fn channel/proxy-channel)
(def nil-channel channel/nil-channel)
@@ -76,45 +76,46 @@
(apply receive-all ch callbacks)
ch))
-(import-fn #'seq/fork)
-(import-fn #'seq/map*)
-(import-fn #'seq/filter*)
-(import-fn #'seq/remove*)
-(import-fn #'seq/receive-in-order)
-(import-fn #'seq/reduce*)
-(import-fn #'seq/reductions*)
-(import-fn #'seq/take*)
-(import-fn #'seq/take-while*)
-(import-fn #'seq/partition*)
-(import-fn #'seq/partition-all*)
-
-(import-fn #'op/sample-every)
+(import-fn seq/fork)
+(import-fn seq/mapcat*)
+(import-fn seq/map*)
+(import-fn seq/filter*)
+(import-fn seq/remove*)
+(import-fn seq/receive-in-order)
+(import-fn seq/reduce*)
+(import-fn seq/reductions*)
+(import-fn seq/take*)
+(import-fn seq/take-while*)
+(import-fn seq/partition*)
+(import-fn seq/partition-all*)
+
+(import-fn op/sample-every)
;; named channels
-(import-fn #'named/named-channel)
-(import-fn #'named/release-named-channel)
+(import-fn named/named-channel)
+(import-fn named/release-named-channel)
;; synchronous channel functions
-(import-fn #'seq/lazy-channel-seq)
-(import-fn #'seq/channel-seq)
-(import-fn #'seq/wait-for-message)
+(import-fn seq/lazy-channel-seq)
+(import-fn seq/channel-seq)
+(import-fn seq/wait-for-message)
;;;; PIPELINES
;; core pipeline functions
-(import-fn #'pipeline/result-channel)
-(import-fn #'pipeline/pipeline)
-(import-fn #'pipeline/run-pipeline)
+(import-fn pipeline/result-channel)
+(import-fn pipeline/pipeline)
+(import-fn pipeline/run-pipeline)
;; pipeline stage helpers
-(import-fn #'pipeline/result-channel?)
-(import-fn #'pipeline/read-channel)
+(import-fn pipeline/result-channel?)
+(import-fn pipeline/read-channel)
(def read-channel* read-channel)
-(import-fn #'pipeline/read-merge)
+(import-fn pipeline/read-merge)
-(import-fn #'pipeline/on-success)
-(import-fn #'pipeline/on-error)
+(import-fn pipeline/on-success)
+(import-fn pipeline/on-error)
(defmacro do-stage
"Creates a pipeline stage that emits the same value it receives, but performs some side-effect
@@ -124,16 +125,16 @@
~@body
x#))
-(import-fn #'pipeline/wait-stage)
+(import-fn pipeline/wait-stage)
;; redirect signals
-(import-fn #'pipeline/redirect)
-(import-fn #'pipeline/restart)
-(import-fn #'pipeline/complete)
+(import-fn pipeline/redirect)
+(import-fn pipeline/restart)
+(import-fn pipeline/complete)
;; pipeline result hooks
-(import-fn #'pipeline/wait-for-result)
-(import-fn #'pipeline/siphon-result)
+(import-fn pipeline/wait-for-result)
+(import-fn pipeline/siphon-result)
;;;
@@ -159,7 +160,7 @@
;;;
-(import-fn #'x-utils/compact)
+(import-fn x-utils/compact)
(defmacro force-all
"Forces a sequence of results. Subsequent expressions will wait on all results being
View
7 src/lamina/core/expr.clj
@@ -16,7 +16,7 @@
;;;
-(def *debug* false)
+(def ^{:dynamic true} *debug* false)
(defmacro debug-print [& args]
(when *debug*
@@ -58,7 +58,6 @@
(and
(seq? expr)
(< 1 (count expr))
- (symbol? (first expr))
(not (apply first= expr special-forms))))
(defn transform-expr [expr]
@@ -75,6 +74,7 @@
result#)
`(let [~@(apply concat non-constant-args)]
(run-pipeline []
+ :error-handler (fn [_#])
~@(map
(fn [arg]
`(read-merge
@@ -109,6 +109,7 @@
(if (= (resolve class-name) clojure.lang.LazySeq)
(transform-lazy-seq expr)
`(run-pipeline []
+ :error-handler (fn [_#])
~@(map
(fn [arg] `(read-merge (constantly ~arg) conj))
args)
@@ -118,6 +119,7 @@
(defn transform-throw [[_ exception]]
`(run-pipeline ~exception
+ :error-handler (fn [_#])
(fn [exception#]
(throw exception#))))
@@ -184,6 +186,7 @@
%)
body)))]
`(run-pipeline nil
+ :error-handler (fn [_#])
(fn [_#]
~body))))
View
1 src/lamina/core/expr/utils.clj
@@ -92,6 +92,7 @@
(if (empty? dependencies)
expr*
`(run-pipeline nil
+ :error-handler (fn [_#])
~@(map
(fn [v] `(read-merge (constantly ~v) (constantly nil)))
dependencies)
View
6 src/lamina/core/expr/walk.clj
@@ -14,9 +14,9 @@
(declare walk-exprs)
-(def *recur-point* nil)
-(def *final-walk* false)
-(def *special-walk-handlers* {})
+(def ^{:dynamic true} *recur-point* nil)
+(def ^{:dynamic true} *final-walk* false)
+(def ^{:dynamic true} *special-walk-handlers* {})
(defn walk-bindings [f bindings]
(vec
View
19 src/lamina/core/observable.clj
@@ -11,7 +11,7 @@
(:use
[clojure.walk])
(:require
- [clojure.contrib.logging :as log])
+ [clojure.tools.logging :as log])
(:import
[java.util.concurrent
ScheduledThreadPoolExecutor
@@ -42,19 +42,19 @@
(when message-callback
(message-callback msgs))
(catch Exception e
- (log/error "Error in message callback" e))))
+ (log/error e "Error in message callback"))))
(on-close [_]
(try
(when close-callback
(close-callback))
(catch Exception e
- (log/error "Error in close callback" e))))
+ (log/error e "Error in close callback"))))
(on-observers-changed [_ observers]
(try
(when observers-callback
(observers-callback observers))
(catch Exception e
- (log/error "Error in observers-changed callback" e)))))))
+ (log/error e "Error in observers-changed callback")))))))
;;;
@@ -195,11 +195,12 @@
(when-not (empty? msgs)
(let [msg (first msgs)]
(when (compare-and-set! val ::empty msg)
- (locking observers
- (let [s (vals @observers)]
- (reset! observers nil)
- (doseq [o s]
- (on-message o [msg])))))))
+ (let [s (locking observers
+ (let [s (vals @observers)]
+ (reset! observers nil)
+ s))]
+ (doseq [o s]
+ (on-message o [msg]))))))
false)
(close [this]
(message this [nil]))
View
17 src/lamina/core/pipeline.clj
@@ -9,11 +9,10 @@
(ns ^{:skip-wiki true}
lamina.core.pipeline
(:use
- [clojure.contrib.def :only (defmacro- defvar)]
[lamina.core.channel]
[clojure.pprint])
(:require
- [clojure.contrib.logging :as log])
+ [clojure.tools.logging :as log])
(:import
[java.util.concurrent
TimeoutException
@@ -23,9 +22,9 @@
(def instrument-exceptions false)
-(def *inside-pipeline?* false)
+(def ^{:dynamic true} *inside-pipeline?* false)
-(def *current-executor* nil)
+(def ^{:dynamic true} *current-executor* nil)
(defmacro with-executor [executor & body]
`(let [f# (fn [] ~@body)]
@@ -43,7 +42,7 @@
;;;
-(declare wait-for-result)
+(declare ^{:dynamic true} wait-for-result)
(deftype ResultChannel [success error metadata]
Object
@@ -78,10 +77,10 @@
(defn result-channel? [x]
(instance? ResultChannel x))
-(defn on-success [^ResultChannel ch & callbacks]
+(defn ^{:dynamic true} on-success [^ResultChannel ch & callbacks]
(apply receive (.success ch) callbacks))
-(defn on-error [^ResultChannel ch & callbacks]
+(defn ^{:dynamic true} on-error [^ResultChannel ch & callbacks]
(apply receive (.error ch) callbacks))
(defn success! [^ResultChannel ch value]
@@ -277,7 +276,7 @@
(when current-stack
(.printStackTrace current-stack))
(when (instance? Throwable ex)
- (log/warn "lamina.core.pipeline" ex))))))
+ (log/warn ex "Unhandled exception in pipeline."))))))
val))]
(when-not (every? ifn? stages)
(throw (Exception. "Every stage in a pipeline must be a function.")))
@@ -324,7 +323,7 @@
(read-channel ch -1))
([ch timeout]
(if (drained? ch)
- (throw (Exception. "Cannot read from a drained channel."))
+ (success-result nil)
(let [msg (dequeue ch ::none)]
(if-not (= ::none msg)
(success-result msg)
View
23 src/lamina/core/seq.clj
@@ -9,8 +9,7 @@
(ns ^{:skip-wiki true}
lamina.core.seq
(:use
- [lamina.core channel pipeline utils]
- [clojure.contrib.generic.functor])
+ [lamina.core channel pipeline utils])
(:require
[lamina.core.observable :as o]
[lamina.core.queue :as q])
@@ -245,30 +244,28 @@
(constant-channel)
(channel)))
-(defn map*
- "Returns a channel which will consume all messages from 'ch', and emit (f msg)."
+(defn mapcat*
+ "Returns a channel which will consume all messages from 'ch', and emit each message in (f msg)."
[f ch]
(let [f (unwrap-fn f)
ch* (dst-channel ch)]
(siphon ch
{ch* #(if (and (drained? ch) (= [nil] %))
%
- (map f %))})
+ (mapcat f %))})
(on-drained ch #(close ch*))
ch*))
+(defn map*
+ "Returns a channel which will consume all messages from 'ch', and emit (f msg)."
+ [f ch]
+ (mapcat* (comp list f) ch))
+
(defn filter*
"Returns a channel which will consume all messages from 'ch', but only emit messages
for which (f msg) is true."
[f ch]
- (let [f (unwrap-fn f)
- ch* (dst-channel ch)]
- (siphon ch
- {ch* #(if (and (drained? ch) (= [nil] %))
- %
- (filter f %))})
- (on-drained ch #(close ch*))
- ch*))
+ (mapcat* #(when (f %) (list %)) ch))
(defn remove*
"Returns a channel which will consume all messages from 'ch', but only emit messages
View
10 src/lamina/core/timer.clj
@@ -12,10 +12,16 @@
[java.util.concurrent
ScheduledThreadPoolExecutor
TimeUnit
- TimeoutException]))
+ TimeoutException
+ ThreadFactory]))
(def delayed-executor
- (ScheduledThreadPoolExecutor. (.availableProcessors (Runtime/getRuntime))))
+ (ScheduledThreadPoolExecutor.
+ (.availableProcessors (Runtime/getRuntime))
+ (reify ThreadFactory
+ (newThread [_ f]
+ (doto (Thread. f)
+ (.setDaemon true))))))
(defn delay-invoke [f delay]
(.schedule
View
10 src/lamina/executors.clj
@@ -13,12 +13,12 @@
(:require
[lamina.executors.core :as x]))
-(import-fn #'x/set-default-executor)
-(import-fn #'x/set-local-executor)
+(import-fn x/set-default-executor)
+(import-fn x/set-local-executor)
-(import-fn #'x/thread-pool)
-(import-fn #'x/thread-pool?)
-(import-fn #'x/shutdown-thread-pool)
+(import-fn x/thread-pool)
+(import-fn x/thread-pool?)
+(import-fn x/shutdown-thread-pool)
(defmacro with-thread-pool
"Executes the body on the specified thread pool. Returns a result-channel representing the
View
30 src/lamina/executors/core.clj
@@ -11,9 +11,10 @@
[lamina.core.pipeline]
[lamina.core.channel :only (close channel enqueue receive drained?)]
[lamina.core.seq :only (receive-all siphon fork channel-seq)]
+ [lamina.core.timer :only (delay-invoke)]
[lamina trace])
(:require
- [clojure.contrib.logging :as log])
+ [clojure.tools.logging :as log])
(:import
[java.util.concurrent
ExecutorService
@@ -29,7 +30,7 @@
(declare default-executor)
(def ns-executors (atom {}))
-(def *thread-pool-options* nil)
+(def ^{:dynamic true} *thread-pool-options* nil)
(defn set-default-executor
"Sets the default executor used by task."
@@ -76,6 +77,25 @@
:thread-wrapper (fn [f] (.run ^Runnable f))
:name (str (gensym "thread-pool."))})
+(defn contract-pool-size [^ThreadPoolExecutor pool min-thread-count]
+ (let [active (.getActiveCount pool)
+ pool-size (.getPoolSize pool)]
+ (if (< min-thread-count active pool-size)
+ (.setCorePoolSize pool (dec pool-size)))))
+
+(defn expand-pool-size [^ThreadPoolExecutor pool max-thread-count]
+ (let [active (.getActiveCount pool)
+ pool-size (.getPoolSize pool)]
+ (when (= pool-size active)
+ (.setCorePoolSize pool (min max-thread-count (inc active))))))
+
+(defn periodically-contract-pool-size [^ThreadPoolExecutor pool min-thread-count interval]
+ (when-not (.isShutdown pool)
+ (contract-pool-size pool min-thread-count)
+ (delay-invoke
+ #(periodically-contract-pool-size pool min-thread-count interval)
+ interval)))
+
(defn thread-pool
"Creates a thread pool that will grow to a specified size when necessary, and dispose
of unused threads after a certain amount of inactivity.
@@ -108,6 +128,7 @@
timeouts-probe (canonical-probe [(:name options) :timeouts])]
(register-probe threads-probe results-probe errors-probe)
(siphon-probes (:name options) (:probes options))
+ (periodically-contract-pool-size pool min-thread-count (* 1000 (:idle-threshold options)))
^{::options options}
(reify Executor LaminaThreadPool
(shutdown-thread-pool [_]
@@ -119,10 +140,7 @@
(execute [this f]
(trace threads-probe
(thread-pool-state pool))
- (let [active (.getActiveCount pool)]
- (if (= (.getPoolSize pool) active)
- (.setCorePoolSize pool (min max-thread-count (inc active)))
- (.setCorePoolSize pool (max min-thread-count (inc active)))))
+ (expand-pool-size pool max-thread-count)
(.execute pool
(fn []
(binding [*current-executor* this]
View
12 src/lamina/trace.clj
@@ -11,7 +11,7 @@
[lamina.core channel seq pipeline]
potemkin)
(:require
- [clojure.contrib.logging :as log]
+ [clojure.tools.logging :as log]
[lamina.trace.core :as trace]))
;;;
@@ -25,10 +25,10 @@
;;;
-(import-fn #'trace/register-probe)
-(import-fn #'trace/canonical-probe)
-(import-fn #'trace/on-new-probe)
-(import-fn #'trace/probe-channel)
+(import-fn trace/register-probe)
+(import-fn trace/canonical-probe)
+(import-fn trace/on-new-probe)
+(import-fn trace/probe-channel)
(defn registered-probes []
@trace/probe-switches)
@@ -86,7 +86,7 @@
:start-time (/ start 1e6)
:end-time (/ end 1e6)
:duration (/ (- end start) 1e6)}))
- (log/error nil ex)))))
+ (log/error ex (str "Unconsumed error on " probe))))))
(defn trace-wrap [f options]
(when-not (:name options)
View
9 src/lamina/trace/core.clj
@@ -8,19 +8,18 @@
(ns lamina.trace.core
(:use
- [lamina.core channel seq]
- [clojure.contrib.core :only (dissoc-in)])
+ [lamina.core channel seq])
(:require
[clojure.string :as str]
[lamina.core.queue :as q]
[lamina.core.observable :as o]
- [clojure.contrib.logging :as log]))
+ [clojure.tools.logging :as log]))
;;;
(def probe-channels (ref {}))
(def probe-switches (atom {}))
-(def *probe-prefix* nil)
+(def ^{:dynamic true} *probe-prefix* nil)
(def new-probe-publisher (channel))
(receive-all new-probe-publisher (fn [_] ))
@@ -29,7 +28,7 @@
(defn logger [level]
#(if (instance? Throwable %)
- (log/log level nil %)
+ (log/log level % "Error.")
(log/log level (str %))))
(defmacro def-log-channel [channel-name level]
View
45 test/lamina/test/channel.clj
@@ -12,9 +12,7 @@
[lamina.core.channel :only (listen)])
(:use
[clojure.test]
- [clojure.contrib.def]
- [clojure.walk]
- [clojure.contrib.combinatorics]))
+ [clojure.walk]))
;;
@@ -30,7 +28,7 @@
(catch Exception e
(.printStackTrace e)))))
-(declare callback)
+(declare ^{:dynamic true} callback)
(defmacro output-of [f & body]
`(let [coll# (atom [])]
@@ -129,7 +127,7 @@
(deftest test-simple-listen
(let [ch (channel)
coll (atom [])
- num 1e3]
+ num 1000]
(async-enqueue ch (range num) true)
(dotimes [_ num]
(let [watch (atom false)
@@ -147,7 +145,7 @@
(let [ch (channel)
coll (atom [])
waiting-for (ref 0)
- num 1e3
+ num 1000
latch (promise)]
(async-enqueue ch (range num) true)
(while (< (count @coll) num)
@@ -162,7 +160,7 @@
(is (= (range num) @coll))))
(deftest test-on-closed
- (let [num 1e3
+ (let [num 1000
cnt (atom 0)]
(dotimes [i num]
(let [ch (channel)]
@@ -176,7 +174,7 @@
(deftest test-simple-poll
(let [ch (channel)
- num 1e3]
+ num 1000]
(let [coll (atom [])]
(async-enqueue ch (range num) false)
(dotimes [i num]
@@ -187,7 +185,7 @@
(deftest test-poll
(let [u (channel)
v (channel)
- num 1e3]
+ num 1000]
(let [colls {:u (atom [])
:v (atom [])}]
(async-enqueue u (range num) false)
@@ -205,7 +203,7 @@
;; synchronous methods
(deftest test-wait-for-message
- (let [num 1e2]
+ (let [num 100]
(let [ch (channel)]
(async-enqueue ch (range num) false)
(dotimes [i num]
@@ -217,7 +215,7 @@
(let [ch (closed-channel 1 nil)]
(is (= [1] (channel-seq ch))))
- (let [in (range 1e3)
+ (let [in (range 1000)
target (last in)
ch (channel)]
(async-enqueue ch in false)
@@ -232,7 +230,7 @@
;; fork
(deftest test-receive-all
- (dotimes [i 1e2]
+ (dotimes [i 100]
(let [result (atom [])]
(let [s (range 10)]
(let [ch (channel)]
@@ -246,7 +244,7 @@
(is (= @result s))))))))
(deftest test-fork
- (dotimes [i 1e2]
+ (dotimes [i 100]
(let [s (range 10)]
(let [ch (channel)]
(async-enqueue ch s false)
@@ -256,7 +254,7 @@
(is (= s (lazy-channel-seq ch*))))))))
(deftest test-fork-receive-all
- (dotimes [i 1e2]
+ (dotimes [i 100]
(let [result (atom [])]
(let [s (range 10)]
(let [ch (channel)]
@@ -332,6 +330,23 @@
(is (drained? ch*))
(is (drained? ch)))))
+(deftest test-mapcat*
+ (let [s [1 2 3]
+ f range]
+
+ (let [ch (apply closed-channel s)
+ ch* (mapcat* f ch)]
+ (is (= (mapcat f s) (channel-seq ch*)))
+ (is (drained? ch))
+ (is (drained? ch*)))
+
+ (let [ch (channel)
+ ch* (mapcat* f ch)]
+ (async-enqueue ch s true)
+ (is (= (mapcat f s) (channel-seq ch* 2500)))
+ (is (drained? ch))
+ (is (drained? ch*)))))
+
(deftest test-map*
(let [s (range 10)
f #(* % 2)]
@@ -419,7 +434,7 @@
(let [ch (channel)
ch* (partition-all* 4 3 ch)]
(async-enqueue ch s false)
- (is (= (partition-all 4 3 s) (channel-seq ch* 2500)))
+ (is (= (partition-all 4 3 s) (channel-seq ch* 5000)))
(is (drained? ch))
(is (drained? ch*)))))
View
51 test/lamina/test/connections.clj
@@ -12,15 +12,16 @@
[lamina core connections trace]
[lamina.core.pipeline :only (success-result error-result)])
(:require
- [clojure.contrib.logging :as log])
+ [clojure.tools.logging :as log])
(:import java.util.concurrent.TimeoutException))
;;;
(def probes
- {:connection:lost log-info
- :connection:opened log-info
- :connection:failed log-info})
+ {;;:connection:lost log-info
+ ;;:connection:opened log-info
+ ;;:connection:failed log-info
+ :errors (sink (fn [_]))})
(defn simple-echo-server []
(let [[a b] (channel-pair)]
@@ -41,7 +42,10 @@
read-channel
#(when-not (drained? b)
(enqueue b %))
- (fn [_] (restart)))
+ (fn [_]
+ (if (drained? b)
+ (do (close a) (close b))
+ (restart))))
[a #(do (close a) (close b))]))
(defn error-server []
@@ -136,7 +140,7 @@
(with-server alternating-delay-echo-server
(when initially-disconnected
(stop-server))
- (let [f (client-fn #(connect) {:probes (comment probes) :description "dropped-and-restored"})]
+ (let [f (client-fn #(connect) {:probes probes :description "dropped-and-restored"})]
(when-not initially-disconnected
(stop-server))
(try
@@ -150,19 +154,18 @@
(defn persistent-connection-stress-test [client-fn]
(with-server simple-echo-server
(let [continue (atom true)
- f (client-fn #(connect) {:probes (comment probes) :description "stress-test"})]
- ; periodically drop
- (.start
- (Thread.
- #(loop []
- (Thread/sleep 4000)
- (stop-server)
- ;;(println "Disconnecting")
- (Thread/sleep 100)
- (start-server)
- ;;(println "Reconnecting")
- (when @continue
- (recur)))))
+ f (client-fn #(connect) {:probes probes :description "stress-test"})]
+ ;; periodically drop
+ (future
+ (loop []
+ (Thread/sleep 4000)
+ (stop-server)
+ ;;(println "Disconnecting")
+ (Thread/sleep 100)
+ (start-server)
+ ;;(println "Reconnecting")
+ (when @continue
+ (recur))))
(try
(let [s (range 1e4)]
(is (= s
@@ -198,7 +201,7 @@
(defn errors-propagate [client-fn]
(with-server error-server
(start-server)
- (let [f (client-fn #(connect) {:description "error-server"})]
+ (let [f (client-fn #(connect) {:probes probes, :description "error-server"})]
(is (thrown? RuntimeException @(f "fail?" 100)))
(is (thrown? RuntimeException @(f "fail?" 100))))))
@@ -223,19 +226,19 @@
~@body)))
(deftest test-server-error-handler
- (with-handler (fn [_ _] (throw exception)) nil
+ (with-handler (fn [_ _] (throw exception)) {:probes probes}
(enqueue ch 1 2)
(is (= [exception exception] (channel-seq ch))))
- (with-handler (fn [_ _] (run-pipeline nil :error-handler (fn [_]) (fn [_] (throw exception)))) nil
+ (with-handler (fn [_ _] (run-pipeline nil :error-handler (fn [_]) (fn [_] (throw exception)))) {:probes probes}
(enqueue ch 1 2)
(is (= [exception exception] (channel-seq ch))))
- (with-handler (fn [_ _] (throw exception)) {:include-request true}
+ (with-handler (fn [_ _] (throw exception)) {:include-request true, :probes probes}
(enqueue ch 1 2)
(is (= [{:request 1, :response exception} {:request 2, :response exception}] (channel-seq ch))))
- (with-handler (fn [_ _] (run-pipeline nil :error-handler (fn [_]) (fn [_] (throw exception)))) {:include-request true}
+ (with-handler (fn [_ _] (run-pipeline nil :error-handler (fn [_]) (fn [_] (throw exception)))) {:include-request true, :probes probes}
(enqueue ch 1 2)
(is (= [{:request 1, :response exception} {:request 2, :response exception}] (channel-seq ch)))))
View
2 test/lamina/test/executors.clj
@@ -11,7 +11,7 @@
[clojure test walk]
[lamina core executors api])
(:require
- [clojure.contrib.logging :as log])
+ [clojure.tools.logging :as log])
(:import java.util.concurrent.TimeoutException))
(def pool (thread-pool {:max-thread-pool 1}))
View
13 test/lamina/test/expr.clj
@@ -11,7 +11,7 @@
[lamina.core]
[clojure.test]))
-(def *sleep-interval* 10)
+(def ^{:dynamic true} *sleep-interval* 10)
(defmacro task* [& body]
`(task
@@ -46,7 +46,8 @@
(is= 6 (task* (+ 1 (task* (+ 2 3)))))
(is= 6 (reduce #(task* (+ %1 %2)) [1 2 3]))
(is= 6 (->> (range 3) (map inc) (reduce +)))
- (is= 6 (->> (range 3) (map #(task* (+ 1 %))) (reduce #(task* (+ %1 %2))))))
+ (is= 6 (->> (range 3) (map #(task* (+ 1 %))) (reduce #(task* (+ %1 %2)))))
+ (is= 1 (let [a (task* {:status true})] (if (= true (:status a)) 1 2))))
(deftest test-exceptions
(is= 3
@@ -144,15 +145,15 @@
(deftest test-recur
(is= [0 1 2]
(for [x (range 3)] (task* x)))
- (is= (range 100)
+ (is= (range 50)
((fn [x]
- (if (= 100 (count x))
+ (if (= 50 (count x))
x
(recur (task* (conj x (count x))))))
[]))
- (is= (range 100)
+ (is= (range 50)
(loop [i 0 accum []]
- (if (< 99 i)
+ (if (< 49 i)
accum
(recur (inc i) (conj accum i)))))
(is= 4
View
2 test/lamina/test/observable.clj
@@ -11,7 +11,7 @@
[clojure test walk]
[lamina.core.observable]))
-(declare accumulator)
+(declare ^{:dynamic true} accumulator)
(defn sub [key o]
(subscribe o {key (observer #(apply swap! accumulator conj %))}))
View
12 test/lamina/test/pipeline.clj
@@ -30,6 +30,8 @@
(wait-for-result (pipeline 0) 100)
(catch TimeoutException e
(is false))
+ #_(catch clojure.lang.ArityException e
+ (is false))
(catch Exception e
(is true))))
@@ -56,8 +58,8 @@
;;;
(deftest test-basic-pipelines
- (test-pipeline (apply pipeline (take 1e3 (repeat inc))) 1e3)
- (test-pipeline (apply pipeline (take 1e3 (repeat (fn [x] (task (inc x)))))) 1e3)
+ (test-pipeline (apply pipeline (take 1e3 (repeat inc))) 1000)
+ (test-pipeline (apply pipeline (take 1e3 (repeat (fn [x] (task (inc x)))))) 1000)
(test-pipeline (apply pipeline (take 100 (repeat slow-inc))) 100)
(test-pipeline (pipeline #(assoc {} :result %) :result) 0))
@@ -86,8 +88,8 @@
(assert-failure (pipeline :error-handler (fn [_]) inc fail))
(assert-failure (pipeline :error-handler (fn [_]) inc fail inc))
(assert-failure (pipeline :error-handler (fn [_]) slow-inc slow-fail))
- (assert-failure (pipeline :error-handler (fn [_]) inc (pipeline :error-handler (fn [_ _]) inc fail) inc))
- (assert-failure (pipeline :error-handler (fn [_]) inc #(redirect (pipeline :error-handler (fn [_ _]) inc fail) %))))
+ (assert-failure (pipeline :error-handler (fn [_]) inc (pipeline :error-handler (fn [_]) inc fail) inc))
+ (assert-failure (pipeline :error-handler (fn [_]) inc #(redirect (pipeline :error-handler (fn [_]) inc fail) %))))
(deftest test-redirection-and-error-handlers
@@ -195,7 +197,7 @@
500)
(is (= @t1 @t2))))
-(declare to-be-bound)
+(declare ^{:dynamic true} to-be-bound)
(deftest test-bindings
(let [t1 (atom nil)
View
2 test/lamina/test/queue.clj
@@ -13,7 +13,7 @@
(:require
[lamina.core.observable :as o]))
-(declare accumulator)
+(declare ^{:dynamic true} accumulator)
(defmacro output= [value & body]
`(binding [accumulator (atom [])]

0 comments on commit 9457982

Please sign in to comment.
Something went wrong with that request. Please try again.