-
Notifications
You must be signed in to change notification settings - Fork 11
/
queue.clj
30 lines (25 loc) · 1.04 KB
/
queue.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
(ns hyperfiddle.rcf.queue
(:require [hyperfiddle.rcf.time :as time])
(:import (java.util.concurrent LinkedBlockingQueue TimeUnit)))
(defn queue [] (LinkedBlockingQueue.))
(defn get-queue [^LinkedBlockingQueue q]
(map :value q))
(defn poll!
([^LinkedBlockingQueue q, start, timeout, timeout-value]
(let [now (time/current-time)]
(if (time/timeout? now start timeout)
timeout-value
(:value (.poll q (time/remaining now start timeout) TimeUnit/MILLISECONDS) timeout-value))))
([^LinkedBlockingQueue q, start, timeout, timeout-value, callback]
;; TODO leverage this arity for non-blocking poll? call callback in (cc/future …)?
(callback (poll! q start timeout timeout-value))))
(defn poll-n! [q start timeout missing-value n callback]
(assert (nat-int? n))
(poll! q start timeout missing-value
(fn [x]
(if (= 1 n)
(callback x)
(poll-n! q start timeout missing-value (dec n) (partial callback x))))))
(defn offer! [^LinkedBlockingQueue q, val]
(.offer q {:value val})
val)