-
Notifications
You must be signed in to change notification settings - Fork 1
/
blocking.clj
115 lines (113 loc) · 3.99 KB
/
blocking.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
(ns clj-tape.blocking
(:require [clj-tape.core :as ctc]))
(defprotocol ConcurrentQueue
"Protocol for concurrently-accessed queues, supporting atomic take!"
(is-closed? [_] "Return true if queue is closed.")
(take! [_] [_ timeout timeout-val] "Atomically peek-and-remove"))
(defn make-blocking-queue
[queue & {:keys [timeout] :as options}]
{:pre [(satisfies? ctc/Queue queue)
(or (nil? timeout) (integer? timeout))]}
(let [closed (ref false)
wait (fn [do-timeout & [wait-timeout]]
(if (= 0 (or wait-timeout timeout))
:timeout
(loop []
(if do-timeout
(.wait queue (or wait-timeout timeout))
(.wait queue))
(cond @closed :closed
(ctc/is-empty? queue) (if do-timeout :timeout
(recur)
)
:else :ready))))]
(reify
ConcurrentQueue
(is-closed? [_] @closed)
(take! [queue]
(take! queue nil nil))
(take! [_ wait-timeout timeout-val]
(if @closed
nil
(locking queue
(let [[status val]
(if (ctc/is-empty? queue)
[:empty nil]
[:ready (let [val (ctc/peek queue)]
(ctc/remove! queue)
val)])]
(if (= :empty status)
(loop [status (wait (or wait-timeout timeout) wait-timeout)]
(case status
:timeout timeout-val
:closed nil
:ready (let [[status val] (let [val (ctc/peek queue)]
(if val
(do
(ctc/remove! queue)
[:ready val])
[:timeout nil]))]
(if (= status :ready)
val
(recur (wait (or wait-timeout timeout) wait-timeout))))))
val)))))
ctc/Queue
(ctc/is-empty? [_] (ctc/is-empty? queue))
(ctc/put! [_ data]
(if @closed
nil
(locking queue
(ctc/put! queue data)
(.notifyAll queue)
true)))
(ctc/peek [_]
(if @closed
nil
(locking queue
(if (ctc/is-empty? queue)
(case (wait true)
:timeout :timeout
:ready (ctc/peek queue)
:closed nil)
(ctc/peek queue)))))
(ctc/peek [this n]
(locking queue
(for [i (range n)]
(do (when (ctc/is-empty? queue)
(wait false))
(ctc/peek this)))))
(ctc/as-list [this]
(ctc/peek this (ctc/size queue)))
(ctc/remove! [_]
(if @closed
nil
(locking queue
(if (ctc/is-empty? queue)
(case (wait true)
:timeout :timeout
:ready (do (ctc/remove! queue)
true)
:closed nil)
(ctc/remove! queue)))))
(ctc/remove! [this n]
(if @closed
nil
(locking queue
(doseq [i (range n)]
(do
(when (ctc/is-empty? queue)
(wait false))
(ctc/remove! this))))))
(ctc/clear! [_] (locking queue (ctc/clear! queue)))
(ctc/size [_] (locking queue (ctc/size queue)))
(ctc/close! [_]
(locking queue
(dosync (ref-set closed true))
(while (not (ctc/is-empty? queue))
(ctc/remove! queue))
(ctc/close! queue)))
(ctc/delete! [this]
(locking queue
(ctc/close! this)
(ctc/delete! queue)))
)))