From b24cb803845487d3cbe8851f69c40a0583604aed Mon Sep 17 00:00:00 2001 From: Brandon Bloom Date: Fri, 12 Jul 2013 17:05:20 -0400 Subject: [PATCH] Add constrained read/write-only ports --- src/main/clojure/cljs/core/async.cljs | 10 ++ .../cljs/core/async/impl/channels.cljs | 70 ++++++----- .../cljs/core/async/impl/protocols.cljs | 8 +- src/main/clojure/clojure/core/async.clj | 10 ++ .../clojure/core/async/impl/channels.clj | 111 ++++++++++-------- .../clojure/core/async/impl/protocols.clj | 6 +- .../clojure/core/async/impl/timers.clj | 7 +- src/test/clojure/clojure/core/async_test.clj | 11 ++ 8 files changed, 142 insertions(+), 91 deletions(-) diff --git a/src/main/clojure/cljs/core/async.cljs b/src/main/clojure/cljs/core/async.cljs index f0e08056..302e9964 100644 --- a/src/main/clojure/cljs/core/async.cljs +++ b/src/main/clojure/cljs/core/async.cljs @@ -37,6 +37,16 @@ ([] (chan nil)) ([buf-or-n] (channels/chan (if (number? buf-or-n) (buffer buf-or-n) buf-or-n)))) +(defn port + "Returns a send-only port wrapping the given channel." + [channel] + (channels/>port channel)) + (defn timeout "Returns a channel that will close after msecs" [msecs] diff --git a/src/main/clojure/cljs/core/async/impl/channels.cljs b/src/main/clojure/cljs/core/async/impl/channels.cljs index 7146f344..58291a96 100644 --- a/src/main/clojure/cljs/core/async/impl/channels.cljs +++ b/src/main/clojure/cljs/core/async/impl/channels.cljs @@ -38,34 +38,6 @@ (do (.splice takes idx 1) (recur idx))))))) - impl/WritePort - (put! [this val handler] - (assert (not (nil? val)) "Can't put nil in on a channel") - (cleanup this) - (if @closed - (box nil) - (let [[put-cb take-cb] (loop [taker-idx 0] - (when (< taker-idx (.-length takes)) - (let [taker (aget takes taker-idx)] - (if (and (impl/active? taker) - (impl/active? handler)) - (do (.splice takes taker-idx 1) - [(impl/commit handler) (impl/commit taker)]) - (recur (inc taker-idx))))))] - (if (and put-cb take-cb) - (do (dispatch/run (fn [] (take-cb val))) - (box nil)) - (if (and buf (not (impl/full? buf))) - (let [put-cb (and (impl/active? handler) - (impl/commit handler))] - (if put-cb - (do (impl/add! buf val) - (box nil)) - nil)) - (do - (.unshift puts [handler val]) - nil)))))) - impl/ReadPort (take! [this handler] (cleanup this) @@ -93,7 +65,34 @@ nil) (do (.unshift takes handler) nil)))))) - impl/Channel + + impl/WritePort + (put! [this val handler] + (assert (not (nil? val)) "Can't put nil in on a channel") + (cleanup this) + (if @closed + (box nil) + (let [[put-cb take-cb] (loop [taker-idx 0] + (when (< taker-idx (.-length takes)) + (let [taker (aget takes taker-idx)] + (if (and (impl/active? taker) + (impl/active? handler)) + (do (.splice takes taker-idx 1) + [(impl/commit handler) (impl/commit taker)]) + (recur (inc taker-idx))))))] + (if (and put-cb take-cb) + (do (dispatch/run (fn [] (take-cb val))) + (box nil)) + (if (and buf (not (impl/full? buf))) + (let [put-cb (and (impl/active? handler) + (impl/commit handler))] + (if put-cb + (do (impl/add! buf val) + (box nil)) + nil)) + (do + (.unshift puts [handler val]) + nil)))))) (close! [this] (cleanup this) (if @closed @@ -109,3 +108,16 @@ (defn chan [buf] (ManyToManyChannel. (make-array 0) (make-array 0) buf (atom nil))) +(defn port [c] + (reify + impl/WritePort + (put! [this val handler] + (impl/put! c val handler)) + (close! [this] + (impl/close! c)))) diff --git a/src/main/clojure/cljs/core/async/impl/protocols.cljs b/src/main/clojure/cljs/core/async/impl/protocols.cljs index 8618bed5..81fc1014 100644 --- a/src/main/clojure/cljs/core/async/impl/protocols.cljs +++ b/src/main/clojure/cljs/core/async/impl/protocols.cljs @@ -12,11 +12,11 @@ (take! [port fn1-handler] "derefable val if taken, nil if take was enqueued")) (defprotocol WritePort - (put! [port val fn0-handler] "derefable nil if put, nil if put was enqueued. Must throw on nil val.")) - -(defprotocol Channel + (put! [port val fn0-handler] "derefable nil if put, nil if put was enqueued. Must throw on nil val.") (close! [chan])) +(defprotocol Channel) + (defprotocol Handler (active? [h] "returns true if has callback. Must work w/o lock") #_(lock-id [h] "a unique id for lock acquisition order, 0 if no lock") @@ -25,4 +25,4 @@ (defprotocol Buffer (full? [b]) (remove! [b]) - (add! [b itm])) \ No newline at end of file + (add! [b itm])) diff --git a/src/main/clojure/clojure/core/async.clj b/src/main/clojure/clojure/core/async.clj index 2ba4a3bd..35923bf9 100644 --- a/src/main/clojure/clojure/core/async.clj +++ b/src/main/clojure/clojure/core/async.clj @@ -58,6 +58,16 @@ ([] (chan nil)) ([buf-or-n] (channels/chan (if (number? buf-or-n) (buffer buf-or-n) buf-or-n)))) +(defn port + "Returns a send-only port wrapping the given channel." + [channel] + (channels/>port channel)) + (defn timeout "Returns a channel that will close after msecs" [msecs] diff --git a/src/main/clojure/clojure/core/async/impl/channels.clj b/src/main/clojure/clojure/core/async/impl/channels.clj index 6fdd037e..f32d04fa 100644 --- a/src/main/clojure/clojure/core/async/impl/channels.clj +++ b/src/main/clojure/clojure/core/async/impl/channels.clj @@ -42,55 +42,6 @@ (when (.hasNext iter) (recur (.next iter))))))) - impl/WritePort - (put! - [this val handler] - (when (nil? val) - (throw (IllegalArgumentException. "Can't put nil on channel"))) - (.lock mutex) - (cleanup this) - (if @closed - (do (.unlock mutex) - (box nil)) - (let [^Lock handler handler - iter (.iterator takes) - [put-cb take-cb] (when (.hasNext iter) - (loop [^Lock taker (.next iter)] - (if (< (impl/lock-id handler) (impl/lock-id taker)) - (do (.lock handler) (.lock taker)) - (do (.lock taker) (.lock handler))) - (let [ret (when (and (impl/active? handler) (impl/active? taker)) - [(impl/commit handler) (impl/commit taker)])] - (.unlock handler) - (.unlock taker) - (if ret - (do - (.remove iter) - ret) - (when (.hasNext iter) - (recur (.next iter)))))))] - (if (and put-cb take-cb) - (do - (.unlock mutex) - (dispatch/run (fn [] (take-cb val))) - (box nil)) - (if (and buf (not (impl/full? buf))) - (do - (.lock handler) - (let [put-cb (and (impl/active? handler) (impl/commit handler))] - (.unlock handler) - (if put-cb - (do (impl/add! buf val) - (.unlock mutex) - (box nil)) - (do (.unlock mutex) - nil)))) - (do - (when (impl/active? handler) - (.add puts [handler val])) - (.unlock mutex) - nil)))))) - impl/ReadPort (take! [this handler] @@ -159,7 +110,54 @@ (.unlock mutex) nil))))))) - impl/Channel + impl/WritePort + (put! + [this val handler] + (when (nil? val) + (throw (IllegalArgumentException. "Can't put nil on channel"))) + (.lock mutex) + (cleanup this) + (if @closed + (do (.unlock mutex) + (box nil)) + (let [^Lock handler handler + iter (.iterator takes) + [put-cb take-cb] (when (.hasNext iter) + (loop [^Lock taker (.next iter)] + (if (< (impl/lock-id handler) (impl/lock-id taker)) + (do (.lock handler) (.lock taker)) + (do (.lock taker) (.lock handler))) + (let [ret (when (and (impl/active? handler) (impl/active? taker)) + [(impl/commit handler) (impl/commit taker)])] + (.unlock handler) + (.unlock taker) + (if ret + (do + (.remove iter) + ret) + (when (.hasNext iter) + (recur (.next iter)))))))] + (if (and put-cb take-cb) + (do + (.unlock mutex) + (dispatch/run (fn [] (take-cb val))) + (box nil)) + (if (and buf (not (impl/full? buf))) + (do + (.lock handler) + (let [put-cb (and (impl/active? handler) (impl/commit handler))] + (.unlock handler) + (if put-cb + (do (impl/add! buf val) + (.unlock mutex) + (box nil)) + (do (.unlock mutex) + nil)))) + (do + (when (impl/active? handler) + (.add puts [handler val])) + (.unlock mutex) + nil)))))) (close! [this] (.lock mutex) @@ -186,3 +184,16 @@ (defn chan [buf] (ManyToManyChannel. (LinkedList.) (LinkedList.) buf (atom nil) (mutex/mutex))) +(defn port [c] + (reify + impl/WritePort + (put! [this val handler] + (impl/put! c val handler)) + (close! [this] + (impl/close! c)))) diff --git a/src/main/clojure/clojure/core/async/impl/protocols.clj b/src/main/clojure/clojure/core/async/impl/protocols.clj index 6c72973e..30670fb8 100644 --- a/src/main/clojure/clojure/core/async/impl/protocols.clj +++ b/src/main/clojure/clojure/core/async/impl/protocols.clj @@ -13,11 +13,11 @@ (take! [port fn1-handler] "derefable val if taken, nil if take was enqueued")) (defprotocol WritePort - (put! [port val fn0-handler] "derefable nil if put, nil if put was enqueued. Must throw on nil val.")) - -(defprotocol Channel + (put! [port val fn0-handler] "derefable nil if put, nil if put was enqueued. Must throw on nil val.") (close! [chan])) +(defprotocol Channel) + (defprotocol Handler (active? [h] "returns true if has callback. Must work w/o lock") (lock-id [h] "a unique id for lock acquisition order, 0 if no lock") diff --git a/src/main/clojure/clojure/core/async/impl/timers.clj b/src/main/clojure/clojure/core/async/impl/timers.clj index 2e4a89f4..54336f2f 100644 --- a/src/main/clojure/clojure/core/async/impl/timers.clj +++ b/src/main/clojure/clojure/core/async/impl/timers.clj @@ -35,10 +35,7 @@ -1 (if (= timestamp ostamp) 0 - 1)))) - impl/Channel - (close! [this] - (impl/close! channel))) + 1))))) (defn timeout "returns a channel that will close after msecs" @@ -59,7 +56,7 @@ (loop [] (let [^TimeoutQueueEntry tqe (.take q)] (.remove timeouts-map (.timestamp tqe) tqe) - (impl/close! tqe)) + (impl/close! (.channel tqe))) (recur)))) (defonce timeout-daemon diff --git a/src/test/clojure/clojure/core/async_test.clj b/src/test/clojure/clojure/core/async_test.clj index 2829e3ed..6bfa1013 100644 --- a/src/test/clojure/clojure/core/async_test.clj +++ b/src/test/clojure/clojure/core/async_test.clj @@ -117,3 +117,14 @@ (put! c :enqueues #(deliver p :proceeded)) ;; enqueue a put (port c)] + (is (thrown? IllegalArgumentException (>!! ro :foo))) + (>!! wo :foo) + (is (thrown? IllegalArgumentException (