/
JobControl.hs
92 lines (77 loc) · 2.19 KB
/
JobControl.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
-----------------------------------------------------------------------------
-- |
-- Module : Distribution.Client.JobControl
-- Copyright : (c) Duncan Coutts 2012
-- License : BSD-like
--
-- Maintainer : cabal-devel@haskell.org
-- Stability : provisional
-- Portability : portable
--
-- A job control concurrency abstraction
-----------------------------------------------------------------------------
module Distribution.Client.JobControl (
JobControl,
newSerialJobControl,
newParallelJobControl,
spawnJob,
collectJob,
JobLimit,
newJobLimit,
withJobLimit,
Lock,
newLock,
criticalSection
) where
import Control.Monad
import Control.Concurrent
import Control.Exception
data JobControl m a = JobControl {
spawnJob :: m a -> m (),
collectJob :: m a
}
newSerialJobControl :: IO (JobControl IO a)
newSerialJobControl = do
queue <- newChan
return JobControl {
spawnJob = spawn queue,
collectJob = collect queue
}
where
spawn :: Chan (IO a) -> IO a -> IO ()
spawn = writeChan
collect :: Chan (IO a) -> IO a
collect = join . readChan
newParallelJobControl :: IO (JobControl IO a)
#if MIN_VERSION_base(4,3,0)
newParallelJobControl = do
resultVar <- newEmptyMVar
return JobControl {
spawnJob = spawn resultVar,
collectJob = collect resultVar
}
where
spawn :: MVar (Either SomeException a) -> IO a -> IO ()
spawn resultVar job =
mask $ \restore ->
forkIO (do res <- try (restore job)
putMVar resultVar res)
>> return ()
collect :: MVar (Either SomeException a) -> IO a
collect resultVar =
takeMVar resultVar >>= either throw return
#else
newParallelJobControl = newSerialJobControl
#endif
data JobLimit = JobLimit QSem
newJobLimit :: Int -> IO JobLimit
newJobLimit n =
fmap JobLimit (newQSem n)
withJobLimit :: JobLimit -> IO a -> IO a
withJobLimit (JobLimit sem) =
bracket_ (waitQSem sem) (signalQSem sem)
newtype Lock = Lock (MVar ())
newLock :: IO Lock
newLock = fmap Lock $ newMVar ()
criticalSection :: Lock -> IO a -> IO a
criticalSection (Lock lck) act = bracket_ (takeMVar lck) (putMVar lck ()) act