/
Concurrent.fr
80 lines (62 loc) · 2.7 KB
/
Concurrent.fr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
{--
Support for concurrency.
Concurrency in Frege comes in 2 flavors.
The first is through 'Thread's, which are,
unlike in Haskell, _OS threads_.
The second possibility is to use a thread pool and an executor service
one can submit tasks to. But note that blocking asynchronous tasks,
unlike Haskell green threads, will block an OS thread on blocking actions.
-}
module frege.control.Concurrent where
import frege.java.util.Concurrent as C
--- A thread safe, shared variable, that is either full or empty.
--- Technically, this is just a 'BlockingQueue' restricted to length 1.
abstract newtype MVar a = MV (MutableIO (BlockingQueue a)) where
--- create an empty 'MVar'
newEmpty = ArrayBlockingQueue.new 1 >>= return . MV
--- create a 'MVar' filled with a value
new a = do m <- newEmpty; m.put a; return m
--- put a value in a 'MVar', blocks if full
put (MV q) a = q.put a
--- take a value from a 'MVar', blocks if empty
take (MV q) = q.take
--- put a value in a 'MVar', returns false if already full.
offer (MV q) a = q.offer a
--- get the value from a 'MVar', return 'Nothing' when empty
poll (MV q) = q.poll
--- like 'poll', but waits a specified number of milliseconds for a value to become available
wait (MV q) n = q.poll n TimeUnit.milliSeconds
-- Haskell compatibility
newEmptyMVar = MVar.newEmpty
newMVar = MVar.new
takeMVar = MVar.take
putMVar = MVar.put
tryTakeMVar = MVar.poll
tryPutMVar = MVar.offer
--- Create and start a new OS 'Thread' that runs an 'IO' action.
forkOS :: IO () -> IOMutable Thread
forkOS action = do
r <- Runnable.new action
t <- Thread.new r
t.start
return t
--- Run the 'IO' action asynchronously in an 'ExecutorService'
--- This is not suitable for not-ending processes!
--- The executor service may manage a fixed small number of concurrent threads only.
forkIO :: IO () -> IO ()
forkIO action = do
service <- ExecutorService.executorService ()
Runnable.new action >>= service.submit
--- Shutdwon the 'ExecutorService'
shutdown = ExecutorService.executorService () >>= _.shutdown
--- Run a 'IO' action asynchronously and return the result in a 'MVar'
async :: IO a -> IO (MVar (Exception | a))
async action = do
service <- ExecutorService.executorService ()
mvar <- newEmptyMVar
let embedded = do
action >>= mvar.put . Right
`catch` mvar.put . Left
`finally` mvar.offer (Left (error "async: no end?")) >> return ()
Runnable.new embedded >>= service.submit
return mvar