/
One.purs
169 lines (144 loc) · 5.11 KB
/
One.purs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
-- | Queues with at most one handler - this is useful for sending messages to a single, solitary
-- | handler (user interface component, websocket connection, what have you).
module Queue.One
( module Queue.Types
, Queue (..)
, new
) where
import Queue.Types
( kind SCOPE, READ, WRITE, class QueueScope, Handler, class QueueExtra, allowWriting, writeOnly
, class Queue, put, putMany, pop, popMany, take, takeAll, takeMany, on, once, del, read, length, draw, drain)
import Queue.Types (new) as Q
import Prelude (pure, bind, unit, discard, (<$>), (<$), ($))
import Data.Either (Either (..))
import Data.Maybe (Maybe (..))
import Data.Traversable (traverse_, for_)
import Data.Array (reverse, uncons, snoc, take, drop, length) as Array
import Control.Monad.Rec.Class (forever)
import Effect (Effect)
import Effect.Aff (error, killFiber, joinFiber, delay, forkAff)
import Effect.Aff.AVar as AVar
import Effect.Ref (Ref)
import Effect.Ref (read, write, new) as Ref
import Effect.Class (liftEffect)
-- | Represents a singleton queue, with at-most __one__ handler.
newtype Queue (rw :: # SCOPE) a =
Queue (Ref (Either (Array a) (Handler a)))
new :: forall a. Effect (Queue (read :: READ, write :: WRITE) a)
new = Q.new
instance queueQueueOne :: Queue Queue where
new = Queue <$> Ref.new (Left [])
putMany (Queue queue) xss = do
for_ xss \x -> do -- left-to-right
ePH <- Ref.read queue
case ePH of
Left pending -> Ref.write (Left (Array.snoc pending x)) queue
Right f -> f x
popMany (Queue queue) n = do
ePH <- Ref.read queue
case ePH of
Right _ -> pure []
Left pending ->
let pending' = Array.reverse pending
in Array.take n pending' <$ Ref.write (Left (Array.reverse (Array.drop n pending'))) queue
takeMany (Queue queue) n = do
ePH <- Ref.read queue
case ePH of
Right _ -> pure []
Left pending -> Array.take n pending <$ Ref.write (Left (Array.drop n pending)) queue
takeAll (Queue queue) = do
ePH <- Ref.read queue
case ePH of
Right _ -> pure []
Left pending -> pending <$ Ref.write (Left []) queue
on (Queue queue) f = do
ePH <- Ref.read queue
case ePH of
Left pending -> do
Ref.write (Right f) queue
traverse_ f pending
Right _ ->
Ref.write (Right f) queue
once q@(Queue queue) f' = do
let f x = do
del q
f' x
ePH <- Ref.read queue
case ePH of
Left pending -> case Array.uncons pending of
Nothing -> Ref.write (Right f) queue
Just {head,tail} -> do
f head
Ref.write (Left tail) queue
Right _ ->
Ref.write (Right f) queue
del (Queue queue) = do
ePH <- Ref.read queue
case ePH of
Left _ -> pure unit
Right _ -> Ref.write (Left []) queue
read (Queue queue) = do
ePH <- Ref.read queue
case ePH of
Left pending -> pure pending
Right _ -> pure []
length (Queue queue) = do
ePH <- Ref.read queue
pure $ case ePH of
Right _ -> 0
Left pending -> Array.length pending
instance queueScopeQueueOne :: QueueScope Queue where
readOnly (Queue q) = Queue q
allowWriting (Queue q) = Queue q
writeOnly (Queue q) = Queue q
allowReading (Queue q) = Queue q
instance queueExtraQueueOne :: QueueExtra Queue where
debounceStatic toWaitFurther output = do
presented <- liftEffect new
writingThread <- AVar.empty
writer <- forkAff $ forever do
x <- draw presented
newWriter <- forkAff do
delay toWaitFurther
liftEffect (put (allowWriting output) x)
mInvoker <- AVar.tryTake writingThread
case mInvoker of
Nothing -> pure unit
Just i -> killFiber (error "Killing writer") i -- kills old writer if it exists, preventing bounce
AVar.put newWriter writingThread
pure {input: writeOnly presented, writer}
throttleStatic toWaitFurther output = do
presented <- liftEffect new
writingThread <- AVar.empty
writer <- forkAff $ forever do
x <- draw presented
mInvoker <- AVar.tryTake writingThread
case mInvoker of
Nothing -> pure unit
Just i -> joinFiber i -- wait until previous write is done
newWriter <- forkAff do
delay toWaitFurther
liftEffect (put (allowWriting output) x)
AVar.put newWriter writingThread
pure {input: writeOnly presented, writer}
intersperseStatic timeBetween xM output = do
presented <- liftEffect new
writingThread <- AVar.empty
writer <- forkAff $ forever do
mInvoker <- AVar.tryTake writingThread
case mInvoker of
Nothing -> pure unit
Just i -> joinFiber i
newWriter <- forkAff do
delay timeBetween
x <- xM
liftEffect (put (allowWriting output) x)
AVar.put newWriter writingThread
listener <- forkAff $ forever do
y <- draw presented
mInvoker <- AVar.tryTake writingThread
case mInvoker of
Nothing -> pure unit
Just i -> killFiber (error "Killing listener") i -- kill interspersed message
liftEffect (put (allowWriting output) y)
pure {input: writeOnly presented, writer, listener}