/
Queue.purs
119 lines (90 loc) · 3.8 KB
/
Queue.purs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
module Queue
( module Queue.Scope
, Queue (..), Handler
, newQueue, readOnly, allowWriting, writeOnly, allowReading
, putQueue, putManyQueue
, onQueue, onceQueue
, readQueue, takeQueue, delQueue, drainQueue
) where
import Queue.Scope (kind SCOPE, READ, WRITE)
import Prelude
import Data.Either (Either (..))
import Data.Maybe (Maybe (..))
import Data.Traversable (traverse_)
import Data.Array as Array
import Control.Monad.Eff (Eff, kind Effect)
import Control.Monad.Eff.Ref (REF, Ref, newRef, readRef, writeRef)
type Handler eff a = a -> Eff eff Unit
newtype Queue (rw :: # SCOPE) (eff :: # Effect) a = Queue (Ref (Either (Array a) (Array (Handler eff a))))
newQueue :: forall eff a. Eff (ref :: REF | eff) (Queue (read :: READ, write :: WRITE) (ref :: REF | eff) a)
newQueue = Queue <$> newRef (Left [])
readOnly :: forall rw eff a. Queue (read :: READ | rw) eff a -> Queue (read :: READ) eff a
readOnly (Queue q) = Queue q
allowWriting :: forall rw eff a. Queue (read :: READ) eff a -> Queue (read :: READ | rw) eff a
allowWriting (Queue q) = Queue q
writeOnly :: forall rw eff a. Queue (write :: WRITE | rw) eff a -> Queue (write :: WRITE) eff a
writeOnly (Queue q) = Queue q
allowReading :: forall rw eff a. Queue (write :: WRITE) eff a -> Queue (write :: WRITE | rw) eff a
allowReading (Queue q) = Queue q
putQueue :: forall rw eff a. Queue (write :: WRITE | rw) (ref :: REF | eff) a -> a -> Eff (ref :: REF | eff) Unit
putQueue q x = putManyQueue q [x]
putManyQueue :: forall rw eff a. Queue (write :: WRITE | rw) (ref :: REF | eff) a -> Array a -> Eff (ref :: REF | eff) Unit
putManyQueue (Queue queue) xs = do
ePH <- readRef queue
case ePH of
Left pending -> writeRef queue (Left (pending <> xs))
Right handlers -> traverse_ (\x -> traverse_ (\f -> f x) handlers) xs
onQueue :: forall rw eff a. Queue (read :: READ | rw) (ref :: REF | eff) a -> Handler (ref :: REF | eff) a -> Eff (ref :: REF | eff) Unit
onQueue (Queue queue) f = do
ePH <- readRef queue
case ePH of
Left pending -> do
traverse_ f pending
writeRef queue (Right [f])
Right handlers ->
writeRef queue (Right (handlers <> [f]))
-- | Treat this as the only handler, and on the next input, clear all handlers.
onceQueue :: forall rw eff a. Queue (read :: READ | rw) (ref :: REF | eff) a -> Handler (ref :: REF | eff) a -> Eff (ref :: REF | eff) Unit
onceQueue q@(Queue queue) f' = do
hasRun <- newRef false
let f x = do
r <- readRef hasRun
unless r (f' x)
writeRef hasRun true
delQueue q
ePH <- readRef queue
case ePH of
Left pending -> do
case Array.uncons pending of
Nothing ->
writeRef queue (Right [f])
Just {head,tail} -> do
f head
writeRef queue (Left tail)
Right handlers ->
writeRef queue (Right (handlers <> [f]))
readQueue :: forall rw eff a. Queue rw (ref :: REF | eff) a -> Eff (ref :: REF | eff) (Array a)
readQueue (Queue queue) = do
ePH <- readRef queue
case ePH of
Left pending -> pure pending
Right _ -> pure []
takeQueue :: forall rw eff a. Queue (write :: WRITE | rw) (ref :: REF | eff) a -> Eff (ref :: REF | eff) (Array a)
takeQueue (Queue queue) = do
ePH <- readRef queue
case ePH of
Left pending -> do
writeRef queue (Left [])
pure pending
Right _ -> pure []
-- | Removes the registered callbacks, if any.
delQueue :: forall rw eff a. Queue (read :: READ | rw) (ref :: REF | eff) a -> Eff (ref :: REF | eff) Unit
delQueue (Queue queue) = do
ePH <- readRef queue
case ePH of
Left _ -> pure unit
Right _ -> writeRef queue (Left [])
-- | Adds a listener that does nothing, and "drains" any pending messages.
drainQueue :: forall rw eff a. Queue (read :: READ | rw) (ref :: REF | eff) a -> Eff (ref :: REF | eff) Unit
drainQueue q =
onQueue q \_ -> pure unit