Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/clojure/cljs/core/async.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@
([] (chan nil))
([buf-or-n] (channels/chan (if (number? buf-or-n) (buffer buf-or-n) buf-or-n))))

(defn <port
"Returns a receive-only port wrapping the given channel."
[channel]
(channels/<port channel))

(defn >port
"Returns a send-only port wrapping the given channel."
[channel]
(channels/>port channel))

(defn timeout
"Returns a channel that will close after msecs"
[msecs]
Expand Down
70 changes: 41 additions & 29 deletions src/main/clojure/cljs/core/async/impl/channels.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -38,34 +38,6 @@
(do (.splice takes idx 1)
(recur idx)))))))

impl/WritePort
(put! [this val handler]
(assert (not (nil? val)) "Can't put nil in on a channel")
(cleanup this)
(if @closed
(box nil)
(let [[put-cb take-cb] (loop [taker-idx 0]
(when (< taker-idx (.-length takes))
(let [taker (aget takes taker-idx)]
(if (and (impl/active? taker)
(impl/active? handler))
(do (.splice takes taker-idx 1)
[(impl/commit handler) (impl/commit taker)])
(recur (inc taker-idx))))))]
(if (and put-cb take-cb)
(do (dispatch/run (fn [] (take-cb val)))
(box nil))
(if (and buf (not (impl/full? buf)))
(let [put-cb (and (impl/active? handler)
(impl/commit handler))]
(if put-cb
(do (impl/add! buf val)
(box nil))
nil))
(do
(.unshift puts [handler val])
nil))))))

impl/ReadPort
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This didn't change. Only moved.

(take! [this handler]
(cleanup this)
Expand Down Expand Up @@ -93,7 +65,34 @@
nil)
(do (.unshift takes handler)
nil))))))
impl/Channel

impl/WritePort
(put! [this val handler]
(assert (not (nil? val)) "Can't put nil in on a channel")
(cleanup this)
(if @closed
(box nil)
(let [[put-cb take-cb] (loop [taker-idx 0]
(when (< taker-idx (.-length takes))
(let [taker (aget takes taker-idx)]
(if (and (impl/active? taker)
(impl/active? handler))
(do (.splice takes taker-idx 1)
[(impl/commit handler) (impl/commit taker)])
(recur (inc taker-idx))))))]
(if (and put-cb take-cb)
(do (dispatch/run (fn [] (take-cb val)))
(box nil))
(if (and buf (not (impl/full? buf)))
(let [put-cb (and (impl/active? handler)
(impl/commit handler))]
(if put-cb
(do (impl/add! buf val)
(box nil))
nil))
(do
(.unshift puts [handler val])
nil))))))
(close! [this]
(cleanup this)
(if @closed
Expand All @@ -109,3 +108,16 @@
(defn chan [buf]
(ManyToManyChannel. (make-array 0) (make-array 0) buf (atom nil)))

(defn <port [c]
(reify
impl/ReadPort
(take! [this fn1-handler]
(impl/take! c fn1-handler))))

(defn >port [c]
(reify
impl/WritePort
(put! [this val handler]
(impl/put! c val handler))
(close! [this]
(impl/close! c))))
8 changes: 4 additions & 4 deletions src/main/clojure/cljs/core/async/impl/protocols.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
(take! [port fn1-handler] "derefable val if taken, nil if take was enqueued"))

(defprotocol WritePort
(put! [port val fn0-handler] "derefable nil if put, nil if put was enqueued. Must throw on nil val."))

(defprotocol Channel
(put! [port val fn0-handler] "derefable nil if put, nil if put was enqueued. Must throw on nil val.")
(close! [chan]))

(defprotocol Channel)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't in use any more. Should it stick around as a marker protocol?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also raises the question of whether or not there should be type predicates for channels & ports.


(defprotocol Handler
(active? [h] "returns true if has callback. Must work w/o lock")
#_(lock-id [h] "a unique id for lock acquisition order, 0 if no lock")
Expand All @@ -25,4 +25,4 @@
(defprotocol Buffer
(full? [b])
(remove! [b])
(add! [b itm]))
(add! [b itm]))
10 changes: 10 additions & 0 deletions src/main/clojure/clojure/core/async.clj
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@
([] (chan nil))
([buf-or-n] (channels/chan (if (number? buf-or-n) (buffer buf-or-n) buf-or-n))))

(defn <port
"Returns a receive-only port wrapping the given channel."
[channel]
(channels/<port channel))

(defn >port
"Returns a send-only port wrapping the given channel."
[channel]
(channels/>port channel))

(defn timeout
"Returns a channel that will close after msecs"
[msecs]
Expand Down
111 changes: 61 additions & 50 deletions src/main/clojure/clojure/core/async/impl/channels.clj
Original file line number Diff line number Diff line change
Expand Up @@ -42,55 +42,6 @@
(when (.hasNext iter)
(recur (.next iter)))))))

impl/WritePort
(put!
[this val handler]
(when (nil? val)
(throw (IllegalArgumentException. "Can't put nil on channel")))
(.lock mutex)
(cleanup this)
(if @closed
(do (.unlock mutex)
(box nil))
(let [^Lock handler handler
iter (.iterator takes)
[put-cb take-cb] (when (.hasNext iter)
(loop [^Lock taker (.next iter)]
(if (< (impl/lock-id handler) (impl/lock-id taker))
(do (.lock handler) (.lock taker))
(do (.lock taker) (.lock handler)))
(let [ret (when (and (impl/active? handler) (impl/active? taker))
[(impl/commit handler) (impl/commit taker)])]
(.unlock handler)
(.unlock taker)
(if ret
(do
(.remove iter)
ret)
(when (.hasNext iter)
(recur (.next iter)))))))]
(if (and put-cb take-cb)
(do
(.unlock mutex)
(dispatch/run (fn [] (take-cb val)))
(box nil))
(if (and buf (not (impl/full? buf)))
(do
(.lock handler)
(let [put-cb (and (impl/active? handler) (impl/commit handler))]
(.unlock handler)
(if put-cb
(do (impl/add! buf val)
(.unlock mutex)
(box nil))
(do (.unlock mutex)
nil))))
(do
(when (impl/active? handler)
(.add puts [handler val]))
(.unlock mutex)
nil))))))

impl/ReadPort
(take!
[this handler]
Expand Down Expand Up @@ -159,7 +110,54 @@
(.unlock mutex)
nil)))))))

impl/Channel
impl/WritePort
(put!
[this val handler]
(when (nil? val)
(throw (IllegalArgumentException. "Can't put nil on channel")))
(.lock mutex)
(cleanup this)
(if @closed
(do (.unlock mutex)
(box nil))
(let [^Lock handler handler
iter (.iterator takes)
[put-cb take-cb] (when (.hasNext iter)
(loop [^Lock taker (.next iter)]
(if (< (impl/lock-id handler) (impl/lock-id taker))
(do (.lock handler) (.lock taker))
(do (.lock taker) (.lock handler)))
(let [ret (when (and (impl/active? handler) (impl/active? taker))
[(impl/commit handler) (impl/commit taker)])]
(.unlock handler)
(.unlock taker)
(if ret
(do
(.remove iter)
ret)
(when (.hasNext iter)
(recur (.next iter)))))))]
(if (and put-cb take-cb)
(do
(.unlock mutex)
(dispatch/run (fn [] (take-cb val)))
(box nil))
(if (and buf (not (impl/full? buf)))
(do
(.lock handler)
(let [put-cb (and (impl/active? handler) (impl/commit handler))]
(.unlock handler)
(if put-cb
(do (impl/add! buf val)
(.unlock mutex)
(box nil))
(do (.unlock mutex)
nil))))
(do
(when (impl/active? handler)
(.add puts [handler val]))
(.unlock mutex)
nil))))))
(close!
[this]
(.lock mutex)
Expand All @@ -186,3 +184,16 @@
(defn chan [buf]
(ManyToManyChannel. (LinkedList.) (LinkedList.) buf (atom nil) (mutex/mutex)))

(defn <port [c]
(reify
impl/ReadPort
(take! [this fn1-handler]
(impl/take! c fn1-handler))))

(defn >port [c]
(reify
impl/WritePort
(put! [this val handler]
(impl/put! c val handler))
(close! [this]
(impl/close! c))))
6 changes: 3 additions & 3 deletions src/main/clojure/clojure/core/async/impl/protocols.clj
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
(take! [port fn1-handler] "derefable val if taken, nil if take was enqueued"))

(defprotocol WritePort
(put! [port val fn0-handler] "derefable nil if put, nil if put was enqueued. Must throw on nil val."))

(defprotocol Channel
(put! [port val fn0-handler] "derefable nil if put, nil if put was enqueued. Must throw on nil val.")
(close! [chan]))

(defprotocol Channel)

(defprotocol Handler
(active? [h] "returns true if has callback. Must work w/o lock")
(lock-id [h] "a unique id for lock acquisition order, 0 if no lock")
Expand Down
7 changes: 2 additions & 5 deletions src/main/clojure/clojure/core/async/impl/timers.clj
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,7 @@
-1
(if (= timestamp ostamp)
0
1))))
impl/Channel
(close! [this]
(impl/close! channel)))
1)))))

(defn timeout
"returns a channel that will close after msecs"
Expand All @@ -59,7 +56,7 @@
(loop []
(let [^TimeoutQueueEntry tqe (.take q)]
(.remove timeouts-map (.timestamp tqe) tqe)
(impl/close! tqe))
(impl/close! (.channel tqe)))
(recur))))

(defonce timeout-daemon
Expand Down
11 changes: 11 additions & 0 deletions src/test/clojure/clojure/core/async_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,14 @@
(put! c :enqueues #(deliver p :proceeded)) ;; enqueue a put
(<!! c) ;; make room in the buffer
(deref p 250 :timeout)))))

(deftest constrained-ports
(let [c (chan 1)
ro (<port c)
wo (>port c)]
(is (thrown? IllegalArgumentException (>!! ro :foo)))
(>!! wo :foo)
(is (thrown? IllegalArgumentException (<!! wo)))
(is (= (<!! ro) :foo))
(is (thrown? IllegalArgumentException (close! ro)))
(close! wo)))