Skip to content
Permalink
Browse files

fixing tests, filter> and remove> still fail

  • Loading branch information...
halgari committed Dec 13, 2013
1 parent 17f9f54 commit 9fcae99576c0735a804bbd4cbec81307e2d34d90
@@ -390,6 +390,7 @@
(clojure.lang.Var/resetThreadBindingFrame binds)
(let [ret (try (f)
(catch Throwable t
(println t)
nil))]
(when-not (nil? ret)
(>!! c ret))
@@ -473,6 +474,7 @@

impl/WritePort
(put! [_ val fn1]
(println (impl/closed? ch))
(if (p val)
(impl/put! ch val fn1)
(channels/box (impl/closed? ch))))))
@@ -516,10 +518,10 @@
(let [val (<! in)]
(if (nil? val)
(close! out)
(let [vals (f val)
ok (core/reduce #(>! out %2) (impl/closed? out) vals)]
(when ok
(recur)))))))
(do (doseq [v (f val)]
(>! out v))
(when-not (impl/closed? out)
(recur)))))))

(defn mapcat<
"Takes a function and a source channel, and returns a channel which
@@ -621,11 +623,10 @@
([ch coll] (onto-chan ch coll true))
([ch coll close?]
(go-loop [vs (seq coll)]
(if vs
(when (>! ch (first vs))
(recur (next vs)))
(when close?
(close! ch))))))
(if (and vs (>! ch (first vs)))
(recur (next vs))
(when close?
(close! ch))))))

(defn to-chan
"Creates and returns a channel which contains the contents of coll,
@@ -667,8 +668,8 @@
(untap-all* [_] (reset! cs {}) nil))
dchan (chan 1)
dctr (atom nil)
done #(when (zero? (swap! dctr dec))
(put! dchan true))]
done (fn [_] (when (zero? (swap! dctr dec))
(put! dchan true)))]
(go-loop []
(let [val (<! ch)]
(if (nil? val)
@@ -26,6 +26,8 @@
([^Executor executor-svc]
(reify impl/Executor
(impl/exec [this r]
(.execute executor-svc ^Runnable r)))))


(.execute executor-svc ^Runnable (fn []
(try
(r)
(catch Throwable ex
(println ex)))))))))
@@ -961,8 +961,8 @@
nil))

(defn put! [state blk c val]
(if-let [cb (impl/put! c val (fn-handler (fn []
(aset-all! state VALUE-IDX nil STATE-IDX blk)
(if-let [cb (impl/put! c val (fn-handler (fn [ret-val]
(aset-all! state VALUE-IDX ret-val STATE-IDX blk)
(run-state-machine-wrapped state))))]
(do (aset-all! state VALUE-IDX @cb STATE-IDX blk)
:recur)
@@ -360,7 +360,7 @@
(deftest close-on-exception-tests
(testing "threads"
(is (nil? (<!! (thread (assert false "This exception is expected")))))
(is (nil? (<!! (thread (alts! [(identity-chan 42)])
(is (nil? (<!! (thread (alts!! [(identity-chan 42)])
(assert false "This exception is expected"))))))
(testing "go blocks"
(is (nil? (<!! (go (assert false "This exception is expected")))))
@@ -7,12 +7,6 @@
(defn default-chan []
(chan 1))

(defn drain [c]
(close! c)
(dorun (take-while #(not (nil? %))
(repeatedly #(<!! c)))))


(deftest buffers-tests
(is (not (unblocking-buffer? (buffer 1))))
(is (unblocking-buffer? (dropping-buffer 1)))
@@ -30,8 +24,7 @@
(let [c (default-chan)
_ (>!! c 42)
blocking (deref (future (>!! c 43)) DEREF_WAIT :blocked)]
(is (= blocking :blocked))
#_(drain c)))
(is (= blocking :blocked))))

(deftest unfulfilled-readers-block
(let [c (default-chan)
@@ -46,7 +39,7 @@
(deftest test-<!!-and-put!
(let [executed (promise)
test-channel (chan nil)]
(put! test-channel :test-val #(deliver executed true))
(put! test-channel :test-val (fn [_] (deliver executed true)))
(is (not (realized? executed)) "The provided callback does not execute until
a reader can consume the written value.")
(is (= :test-val (<!! test-channel))
@@ -65,27 +58,27 @@

(deftest take!-on-caller?
(is (apply not= (let [starting-thread (Thread/currentThread)
test-channel (chan nil)
read-promise (promise)]
(take! test-channel (fn [_] (deliver read-promise (Thread/currentThread))) true)
(>!! test-channel :foo)
[starting-thread @read-promise]))
test-channel (chan nil)
read-promise (promise)]
(take! test-channel (fn [_] (deliver read-promise (Thread/currentThread))) true)
(>!! test-channel :foo)
[starting-thread @read-promise]))
"When on-caller? requested, but no value is immediately
available, take!'s callback executes on another thread.")
(is (apply = (let [starting-thread (Thread/currentThread)
test-channel (chan nil)
read-promise (promise)]
(put! test-channel :foo (constantly nil))
(take! test-channel (fn [_] (deliver read-promise (Thread/currentThread))) true)
[starting-thread @read-promise]))
test-channel (chan nil)
read-promise (promise)]
(put! test-channel :foo (constantly nil))
(take! test-channel (fn [_] (deliver read-promise (Thread/currentThread))) true)
[starting-thread @read-promise]))
"When on-caller? requested, and a value is ready to read,
take!'s callback executes on the same thread.")
(is (apply not= (let [starting-thread (Thread/currentThread)
test-channel (chan nil)
read-promise (promise)]
(put! test-channel :foo (constantly nil))
(take! test-channel (fn [_] (deliver read-promise (Thread/currentThread))) false)
[starting-thread @read-promise]))
test-channel (chan nil)
read-promise (promise)]
(put! test-channel :foo (constantly nil))
(take! test-channel (fn [_] (deliver read-promise (Thread/currentThread))) false)
[starting-thread @read-promise]))
"When on-caller? is false, and a value is ready to read,
take!'s callback executes on a different thread."))

@@ -94,27 +87,28 @@
test-channel (chan nil)
write-promise (promise)]
(take! test-channel (fn [_] nil))
(put! test-channel :foo #(deliver write-promise (Thread/currentThread)) true)
(put! test-channel :foo (fn [_] (deliver write-promise (Thread/currentThread))) true)
[starting-thread @write-promise]))
"When on-caller? requested, and a reader can consume the value,
put!'s callback executes on the same thread.")
(is (apply not= (let [starting-thread (Thread/currentThread)
test-channel (chan nil)
write-promise (promise)]
(take! test-channel (fn [_] nil))
(put! test-channel :foo #(deliver write-promise (Thread/currentThread)) false)
(put! test-channel :foo (fn [_] (deliver write-promise (Thread/currentThread))) false)
[starting-thread @write-promise]))
"When on-caller? is false, but a reader can consume the value,
put!'s callback executes on a different thread.")
(is (apply not= (let [starting-thread (Thread/currentThread)
test-channel (chan nil)
write-promise (promise)]
(put! test-channel :foo #(deliver write-promise (Thread/currentThread)) true)
(put! test-channel :foo (fn [_] (deliver write-promise (Thread/currentThread))) true)
(take! test-channel (fn [_] nil))
[starting-thread @write-promise]))
"When on-caller? requested, but no reader can consume the value,
put!'s callback executes on a different thread."))


(deftest limit-async-take!-put!
(testing "async put! limit"
(let [c (chan)]
@@ -129,14 +123,14 @@
(take! c (fn [x])))
(is (thrown? AssertionError
(take! c (fn [x]))))
(is (nil? (>!! c 42)))))) ;; make sure the channel unlocks
(is (true? (>!! c 42)))))) ;; make sure the channel unlocks

(deftest puts-fulfill-when-buffer-available
(is (= :proceeded
(let [c (chan 1)
p (promise)]
(>!! c :full) ;; fill up the channel
(put! c :enqueues #(deliver p :proceeded)) ;; enqueue a put
(put! c :enqueues (fn [_] (deliver p :proceeded))) ;; enqueue a put
(<!! c) ;; make room in the buffer
(deref p 250 :timeout)))))

@@ -164,6 +158,11 @@
(testing "remove<"
(is (= [1 3 5]
(<!! (a/into [] (a/remove< even? (a/to-chan [1 2 3 4 5 6])))))))

(testing "onto-chan"
(is (= (range 10)
(<!! (a/into [] (a/to-chan (range 10)))))))

(testing "filter>"
(is (= [2 4 6]
(let [out (chan)
@@ -186,6 +185,8 @@
in (mapcat> range out)]
(a/onto-chan in [1 2 3])
(<!! (a/into [] out))))))


(testing "pipe"
(is (= [1 2 3 4 5]
(let [out (chan)]
@@ -273,4 +274,5 @@
(<!! (a/into [] (a/partition 2 (a/to-chan [1 2 2 3])))))))
(testing "partition-by"
(is (= [["a" "b"] [1 :2 3] ["c"]]
(<!! (a/into [] (a/partition-by string? (a/to-chan ["a" "b" 1 :2 3 "c"]))))))))
(<!! (a/into [] (a/partition-by string? (a/to-chan ["a" "b" 1 :2 3 "c"])))))))
)

0 comments on commit 9fcae99

Please sign in to comment.
You can’t perform that action at this time.