/
IOQueues.purs
72 lines (57 loc) · 1.98 KB
/
IOQueues.purs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
module IOQueues where
import Queue.Types
( READ, WRITE, readOnly, writeOnly, allowReading, allowWriting, Handler
, class Queue, class QueueScope, once, on, put)
import Queue.Types (new) as Q
import Prelude (Unit, bind, discard, (>>=), pure, (<$>), (<$), (<<<))
import Data.Either (Either (..))
import Effect (Effect)
import Effect.Aff (Aff, makeAff, nonCanceler)
-- | Represents an asynchronously invokable function `input -> Aff output`
newtype IOQueues q input output = IOQueues
{ input :: q (read :: READ) input
, output :: q (write :: WRITE) output
}
new :: forall input output q. Queue q => QueueScope q => Effect (IOQueues q input output)
new = do
input <- readOnly <$> Q.new
output <- writeOnly <$> Q.new
pure (IOQueues {input,output})
-- * Invoking
-- | Invoke the queue in `Aff`.
callAsync :: forall input output q
. Queue q
=> QueueScope q
=> IOQueues q input output
-> input
-> Aff output
callAsync qs x = makeAff \resolve ->
nonCanceler <$ callAsyncEff qs (resolve <<< Right) x
-- | Invoke the queue in `Eff`
callAsyncEff :: forall input output q
. Queue q
=> QueueScope q
=> IOQueues q input output
-> Handler output
-> input
-> Effect Unit
callAsyncEff (IOQueues {input,output}) f x = do
once (allowReading output) f
put (allowWriting input) x
-- * Binding
-- | For binding the receiver
registerSync :: forall input output q
. Queue q
=> IOQueues q input output
-> (input -> Effect output)
-> Effect Unit
registerSync (IOQueues {input,output}) f =
on input \x -> f x >>= put output
-- | Bind a receiver only once
registerSyncOnce :: forall input output q
. Queue q
=> IOQueues q input output
-> (input -> Effect output)
-> Effect Unit
registerSyncOnce (IOQueues {input,output}) f =
once input \x -> f x >>= put output