-
Notifications
You must be signed in to change notification settings - Fork 1
/
semaphore.clj
43 lines (38 loc) · 1.54 KB
/
semaphore.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
(ns react.semaphore
(:require [react.future :as future])
(:import [java.util.concurrent RejectedExecutionException]))
(defprotocol IAsyncSemaphore
(acquire [this])
(release [this]))
(def cas! compare-and-set!)
(deftype AsyncSemaphore [state max-waiters]
IAsyncSemaphore
(acquire [this]
(if-let [p (let [{:keys [permits waiters] :as s} @state]
(if (> permits 0)
; OK
(when (cas! state s (update-in s [:permits] dec))
(future/successful nil))
(if (< (count waiters) max-waiters)
; WAIT
(let [p (future/promise)]
(when (cas! state s (update-in s [:waiters] conj p))
p))
; Exception
(future/failed (RejectedExecutionException. "Max waiters exceeded")))))]
p
(recur)))
(release [this]
(when-not (let [{:keys [permits waiters] :as s} @state]
(if (empty? waiters)
; INC
(cas! state s (update-in s [:permits] inc))
; Notify first waiter
(when (cas! state s (update-in s [:waiters] rest))
(future/success (first waiters) nil))))
(recur))))
(defn async-semaphore
[permits & {:keys [max-waiters] :or {max-waiters Integer/MAX_VALUE}}]
(AsyncSemaphore. (atom {:permits permits
:waiters []})
max-waiters))