Skip to content

Commit

Permalink
add non-blocking offer! and poll! to api
Browse files Browse the repository at this point in the history
Same as ASYNC-104 resolved by b5e2ff1
  • Loading branch information
dnolen committed Apr 3, 2015
1 parent ae25507 commit d8047c0
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 15 deletions.
30 changes: 25 additions & 5 deletions src/main/clojure/cljs/core/async.cljs
Expand Up @@ -9,11 +9,14 @@
(:require-macros [cljs.core.async.impl.ioc-macros :as ioc]
[cljs.core.async.macros :refer [go go-loop]]))

(defn- fn-handler [f]
(reify
impl/Handler
(active? [_] true)
(commit [_] f)))
(defn- fn-handler
([f] (fn-handler f true))
([f blockable]
(reify
impl/Handler
(active? [_] true)
(blockable? [_] blockable)
(commit [_] f))))

(defn buffer
"Returns a fixed buffer of size n. When full, puts will block/park."
Expand Down Expand Up @@ -152,6 +155,7 @@
(reify
impl/Handler
(active? [_] @flag)
(blockable? [_] true)
(commit [_]
(reset! flag nil)
true))))
Expand All @@ -160,6 +164,7 @@
(reify
impl/Handler
(active? [_] (impl/active? flag))
(blockable? [_] true)
(commit [_]
(impl/commit flag)
cb)))
Expand Down Expand Up @@ -216,6 +221,20 @@
[ports & {:as opts}]
(throw (js/Error. "alts! used not in (go ...) block")))

(defn offer!
"Puts a val into port if it's possible to do so immediately.
nil values are not allowed. Never blocks. Returns true if offer succeeds."
[port val]
(let [ret (impl/put! port val (fn-handler nop false))]
(when ret @ret)))

(defn poll!
"Takes a val from port if it's possible to do so immediately.
Never blocks. Returns value if successful, nil otherwise."
[port]
(let [ret (impl/take! port (fn-handler nop false))]
(when ret @ret)))

;;;;;;; channel ops

(defn pipe
Expand Down Expand Up @@ -744,6 +763,7 @@
(reify
impl/Handler
(active? [_] (impl/active? fn1))
(blockable? [_] true)
#_(lock-id [_] (impl/lock-id fn1))
(commit [_]
(let [f1 (impl/commit fn1)]
Expand Down
20 changes: 11 additions & 9 deletions src/main/clojure/cljs/core/async/impl/channels.cljs
Expand Up @@ -78,11 +78,12 @@
(do (set! dirty-puts 0)
(.cleanup puts put-active?))
(set! dirty-puts (inc dirty-puts)))
(assert (< (.-length puts) impl/MAX-QUEUE-SIZE)
(str "No more than " impl/MAX-QUEUE-SIZE
" pending puts are allowed on a single channel."
" Consider using a windowed buffer."))
(.unbounded-unshift puts (PutBox. handler val))
(when (impl/blockable? handler)
(assert (< (.-length puts) impl/MAX-QUEUE-SIZE)
(str "No more than " impl/MAX-QUEUE-SIZE
" pending puts are allowed on a single channel."
" Consider using a windowed buffer."))
(.unbounded-unshift puts (PutBox. handler val)))
nil)))))))
impl/ReadPort
(take! [this ^not-native handler]
Expand Down Expand Up @@ -129,10 +130,11 @@
(do (set! dirty-takes 0)
(.cleanup takes impl/active?))
(set! dirty-takes (inc dirty-takes)))
(assert (< (.-length takes) impl/MAX-QUEUE-SIZE)
(str "No more than " impl/MAX-QUEUE-SIZE
" pending takes are allowed on a single channel."))
(.unbounded-unshift takes handler)
(when (impl/blockable? handler)
(assert (< (.-length takes) impl/MAX-QUEUE-SIZE)
(str "No more than " impl/MAX-QUEUE-SIZE
" pending takes are allowed on a single channel."))
(.unbounded-unshift takes handler))
nil)))))))
impl/Channel
(closed? [_] closed)
Expand Down
1 change: 1 addition & 0 deletions src/main/clojure/cljs/core/async/impl/ioc_helpers.cljs
Expand Up @@ -27,6 +27,7 @@
(reify
impl/Handler
(active? [_] true)
(blockable? [_] true)
(commit [_] f)))


Expand Down
1 change: 1 addition & 0 deletions src/main/clojure/cljs/core/async/impl/protocols.cljs
Expand Up @@ -23,6 +23,7 @@

(defprotocol Handler
(active? [h] "returns true if has callback. Must work w/o lock")
(blockable? [h] "returns true if this handler may be blocked, otherwise it must not block")
#_(lock-id [h] "a unique id for lock acquisition order, 0 if no lock")
(commit [h] "commit to fulfilling its end of the transfer, returns cb. Must be called within lock"))

Expand Down
17 changes: 16 additions & 1 deletion src/test/cljs/cljs/core/async/tests.cljs
Expand Up @@ -4,7 +4,7 @@
(:require
[cljs.core.async :refer
[buffer dropping-buffer sliding-buffer put! take! chan promise-chan
close! take partition-by] :as async]
close! take partition-by offer! poll!] :as async]
[cljs.core.async.impl.dispatch :as dispatch]
[cljs.core.async.impl.buffers :as buff]
[cljs.core.async.impl.timers :as timers :refer [timeout]]
Expand Down Expand Up @@ -439,3 +439,18 @@
(testing "then takes return nil"
(is (= nil (<! t1) (<! t1) (<! t2) (<! t2)))))
(inc! l))))))

(deftest test-offer-poll-go
(let [c (chan 2)]
(is (= [true true 5 6 nil]
[(offer! c 5) (offer! c 6) (poll! c) (poll! c) (poll! c)])))
(let [c (chan 2)]
(is (true? (offer! c 1)))
(is (true? (offer! c 2)))
(is (nil? (offer! c 3)))
(is (= 1 (poll! c)))
(is (= 2 (poll! c)))
(is (nil? (poll! c))))
(let [c (chan)]
(is (nil? (offer! c 1)))
(is (nil? (poll! c)))))

0 comments on commit d8047c0

Please sign in to comment.