Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Add back new working QSem and QSemN implementations (#7417)

We decided not to break existing users without providing an easy
migration path.  For the time being I've made these implementations,
which fix the bugs in the old versions and perform reasonably well.

In due course we should move the concurrency functionality, including
these modules, out of base and into a separate package.
  • Loading branch information...
commit ea3abf16eda97e573ee63fb08ce330d3aeceaeae 1 parent 1bfecaf
Simon Marlow authored December 10, 2012
4  Control/Concurrent.hs
@@ -72,6 +72,8 @@ module Control.Concurrent (
72 72
 
73 73
         module Control.Concurrent.MVar,
74 74
         module Control.Concurrent.Chan,
  75
+        module Control.Concurrent.QSem,
  76
+        module Control.Concurrent.QSemN,
75 77
 
76 78
 #ifdef __GLASGOW_HASKELL__
77 79
         -- * Bound Threads
@@ -137,6 +139,8 @@ import Hugs.ConcBase
137 139
 
138 140
 import Control.Concurrent.MVar
139 141
 import Control.Concurrent.Chan
  142
+import Control.Concurrent.QSem
  143
+import Control.Concurrent.QSemN
140 144
 
141 145
 #ifdef __HUGS__
142 146
 type ThreadId = ()
133  Control/Concurrent/QSem.hs
... ...
@@ -0,0 +1,133 @@
  1
+{-# LANGUAGE Trustworthy #-}
  2
+{-# LANGUAGE CPP #-}
  3
+#ifdef __GLASGOW_HASKELL__
  4
+{-# LANGUAGE DeriveDataTypeable, BangPatterns #-}
  5
+#endif
  6
+{-# OPTIONS_GHC -funbox-strict-fields #-}
  7
+
  8
+-----------------------------------------------------------------------------
  9
+-- |
  10
+-- Module      :  Control.Concurrent.QSem
  11
+-- Copyright   :  (c) The University of Glasgow 2001
  12
+-- License     :  BSD-style (see the file libraries/base/LICENSE)
  13
+-- 
  14
+-- Maintainer  :  libraries@haskell.org
  15
+-- Stability   :  experimental
  16
+-- Portability :  non-portable (concurrency)
  17
+--
  18
+-- Simple quantity semaphores.
  19
+--
  20
+-----------------------------------------------------------------------------
  21
+
  22
+module Control.Concurrent.QSem
  23
+        ( -- * Simple Quantity Semaphores
  24
+          QSem,         -- abstract
  25
+          newQSem,      -- :: Int  -> IO QSem
  26
+          waitQSem,     -- :: QSem -> IO ()
  27
+          signalQSem    -- :: QSem -> IO ()
  28
+        ) where
  29
+
  30
+import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar, tryTakeMVar
  31
+                          , putMVar, newMVar, tryPutMVar)
  32
+import Control.Exception
  33
+import Data.Maybe
  34
+
  35
+-- | 'QSem' is a quantity semaphore in which the resource is aqcuired
  36
+-- and released in units of one. It provides guaranteed FIFO ordering
  37
+-- for satisfying blocked `waitQSem` calls.
  38
+--
  39
+-- The pattern
  40
+--
  41
+-- >   bracket_ waitQSem signalQSem (...)
  42
+--
  43
+-- is safe; it never loses a unit of the resource.
  44
+--
  45
+data QSem = QSem !(MVar (Int, [MVar ()], [MVar ()]))
  46
+
  47
+-- The semaphore state (i, xs, ys):
  48
+--
  49
+--   i is the current resource value
  50
+--
  51
+--   (xs,ys) is the queue of blocked threads, where the queue is
  52
+--           given by xs ++ reverse ys.  We can enqueue new blocked threads
  53
+--           by consing onto ys, and dequeue by removing from the head of xs.
  54
+--
  55
+-- A blocked thread is represented by an empty (MVar ()).  To unblock
  56
+-- the thread, we put () into the MVar.
  57
+--
  58
+-- A thread can dequeue itself by also putting () into the MVar, which
  59
+-- it must do if it receives an exception while blocked in waitQSem.
  60
+-- This means that when unblocking a thread in signalQSem we must
  61
+-- first check whether the MVar is already full; the MVar lock on the
  62
+-- semaphore itself resolves race conditions between signalQSem and a
  63
+-- thread attempting to dequeue itself.
  64
+
  65
+-- |Build a new 'QSem' with a supplied initial quantity.
  66
+--  The initial quantity must be at least 0.
  67
+newQSem :: Int -> IO QSem
  68
+newQSem initial
  69
+  | initial < 0 = fail "newQSem: Initial quantity must be non-negative"
  70
+  | otherwise   = do
  71
+      sem <- newMVar (initial, [], [])
  72
+      return (QSem sem)
  73
+
  74
+-- |Wait for a unit to become available
  75
+waitQSem :: QSem -> IO ()
  76
+waitQSem (QSem m) =
  77
+  mask_ $ do
  78
+    (i,b1,b2) <- takeMVar m
  79
+    if i == 0
  80
+       then do
  81
+         b <- newEmptyMVar
  82
+         putMVar m (i, b1, b:b2)
  83
+         wait b
  84
+       else do
  85
+         let !z = i-1
  86
+         putMVar m (z, b1, b2)
  87
+         return ()
  88
+  where
  89
+    wait b = takeMVar b `onException` do
  90
+                (uninterruptibleMask_ $ do -- Note [signal uninterruptible]
  91
+                   (i,b1,b2) <- takeMVar m
  92
+                   r <- tryTakeMVar b
  93
+                   r' <- if isJust r
  94
+                            then signal (i,b1,b2)
  95
+                            else do putMVar b (); return (i,b1,b2)
  96
+                   putMVar m r')
  97
+
  98
+-- |Signal that a unit of the 'QSem' is available
  99
+signalQSem :: QSem -> IO ()
  100
+signalQSem (QSem m) =
  101
+  uninterruptibleMask_ $ do -- Note [signal uninterruptible]
  102
+    r <- takeMVar m
  103
+    r' <- signal r
  104
+    putMVar m r'
  105
+
  106
+-- Note [signal uninterruptible]
  107
+--
  108
+--   If we have
  109
+--
  110
+--      bracket waitQSem signalQSem (...)
  111
+--
  112
+--   and an exception arrives at the signalQSem, then we must not lose
  113
+--   the resource.  The signalQSem is masked by bracket, but taking
  114
+--   the MVar might block, and so it would be interruptible.  Hence we
  115
+--   need an uninterruptibleMask here.
  116
+--
  117
+--   This isn't ideal: during high contention, some threads won't be
  118
+--   interruptible.  The QSemSTM implementation has better behaviour
  119
+--   here, but it performs much worse than this one in some
  120
+--   benchmarks.
  121
+
  122
+signal :: (Int,[MVar ()],[MVar ()]) -> IO (Int,[MVar ()],[MVar ()])
  123
+signal (i,a1,a2) =
  124
+ if i == 0
  125
+   then loop a1 a2
  126
+   else let !z = i+1 in return (z, a1, a2)
  127
+ where
  128
+   loop [] [] = return (1, [], [])
  129
+   loop [] b2 = loop (reverse b2) []
  130
+   loop (b:bs) b2 = do
  131
+     r <- tryPutMVar b ()
  132
+     if r then return (0, bs, b2)
  133
+          else loop bs b2
127  Control/Concurrent/QSemN.hs
... ...
@@ -0,0 +1,127 @@
  1
+{-# LANGUAGE Trustworthy #-}
  2
+{-# LANGUAGE CPP #-}
  3
+#ifdef __GLASGOW_HASKELL__
  4
+{-# LANGUAGE DeriveDataTypeable, BangPatterns #-}
  5
+#endif
  6
+{-# OPTIONS_GHC -funbox-strict-fields #-}
  7
+
  8
+-----------------------------------------------------------------------------
  9
+-- |
  10
+-- Module      :  Control.Concurrent.QSemN
  11
+-- Copyright   :  (c) The University of Glasgow 2001
  12
+-- License     :  BSD-style (see the file libraries/base/LICENSE)
  13
+-- 
  14
+-- Maintainer  :  libraries@haskell.org
  15
+-- Stability   :  experimental
  16
+-- Portability :  non-portable (concurrency)
  17
+--
  18
+-- Quantity semaphores in which each thread may wait for an arbitrary
  19
+-- \"amount\".
  20
+--
  21
+-----------------------------------------------------------------------------
  22
+
  23
+module Control.Concurrent.QSemN
  24
+        (  -- * General Quantity Semaphores
  25
+          QSemN,        -- abstract
  26
+          newQSemN,     -- :: Int   -> IO QSemN
  27
+          waitQSemN,    -- :: QSemN -> Int -> IO ()
  28
+          signalQSemN   -- :: QSemN -> Int -> IO ()
  29
+      ) where
  30
+
  31
+import Control.Concurrent.MVar ( MVar, newEmptyMVar, takeMVar, tryTakeMVar
  32
+                          , putMVar, newMVar
  33
+                          , tryPutMVar, isEmptyMVar)
  34
+import Data.Typeable
  35
+import Control.Exception
  36
+import Data.Maybe
  37
+
  38
+-- | 'QSemN' is a quantity semaphore in which the resource is aqcuired
  39
+-- and released in units of one. It provides guaranteed FIFO ordering
  40
+-- for satisfying blocked `waitQSemN` calls.
  41
+--
  42
+-- The pattern
  43
+--
  44
+-- >   bracket_ (waitQSemN n) (signalQSemN n) (...)
  45
+--
  46
+-- is safe; it never loses any of the resource.
  47
+--
  48
+data QSemN = QSemN !(MVar (Int, [(Int, MVar ())], [(Int, MVar ())]))
  49
+  deriving Typeable
  50
+
  51
+-- The semaphore state (i, xs, ys):
  52
+--
  53
+--   i is the current resource value
  54
+--
  55
+--   (xs,ys) is the queue of blocked threads, where the queue is
  56
+--           given by xs ++ reverse ys.  We can enqueue new blocked threads
  57
+--           by consing onto ys, and dequeue by removing from the head of xs.
  58
+--
  59
+-- A blocked thread is represented by an empty (MVar ()).  To unblock
  60
+-- the thread, we put () into the MVar.
  61
+--
  62
+-- A thread can dequeue itself by also putting () into the MVar, which
  63
+-- it must do if it receives an exception while blocked in waitQSemN.
  64
+-- This means that when unblocking a thread in signalQSemN we must
  65
+-- first check whether the MVar is already full; the MVar lock on the
  66
+-- semaphore itself resolves race conditions between signalQSemN and a
  67
+-- thread attempting to dequeue itself.
  68
+
  69
+-- |Build a new 'QSemN' with a supplied initial quantity.
  70
+--  The initial quantity must be at least 0.
  71
+newQSemN :: Int -> IO QSemN
  72
+newQSemN initial
  73
+  | initial < 0 = fail "newQSemN: Initial quantity must be non-negative"
  74
+  | otherwise   = do
  75
+      sem <- newMVar (initial, [], [])
  76
+      return (QSemN sem)
  77
+
  78
+-- |Wait for the specified quantity to become available
  79
+waitQSemN :: QSemN -> Int -> IO ()
  80
+waitQSemN (QSemN m) sz =
  81
+  mask_ $ do
  82
+    (i,b1,b2) <- takeMVar m
  83
+    let z = i-sz
  84
+    if z < 0
  85
+       then do
  86
+         b <- newEmptyMVar
  87
+         putMVar m (i, b1, (sz,b):b2)
  88
+         wait b
  89
+       else do
  90
+         putMVar m (z, b1, b2)
  91
+         return ()
  92
+  where
  93
+    wait b = do
  94
+        takeMVar b `onException`
  95
+                (uninterruptibleMask_ $ do -- Note [signal uninterruptible]
  96
+                   (i,b1,b2) <- takeMVar m
  97
+                   r <- tryTakeMVar b
  98
+                   r' <- if isJust r
  99
+                            then signal sz (i,b1,b2)
  100
+                            else do putMVar b (); return (i,b1,b2)
  101
+                   putMVar m r')
  102
+
  103
+-- |Signal that a given quantity is now available from the 'QSemN'.
  104
+signalQSemN :: QSemN -> Int -> IO ()
  105
+signalQSemN (QSemN m) sz = uninterruptibleMask_ $ do
  106
+  r <- takeMVar m
  107
+  r' <- signal sz r
  108
+  putMVar m r'
  109
+
  110
+signal :: Int
  111
+       -> (Int,[(Int,MVar ())],[(Int,MVar ())])
  112
+       -> IO (Int,[(Int,MVar ())],[(Int,MVar ())])
  113
+
  114
+signal sz0 (i,a1,a2) = loop (sz0 + i) a1 a2
  115
+ where
  116
+   loop 0  bs b2 = return (0,  bs, b2)
  117
+   loop sz [] [] = return (sz, [], [])
  118
+   loop sz [] b2 = loop sz (reverse b2) []
  119
+   loop sz ((j,b):bs) b2
  120
+     | j > sz = do
  121
+       r <- isEmptyMVar b
  122
+       if r then return (sz, (j,b):bs, b2)
  123
+            else loop sz bs b2
  124
+     | otherwise = do
  125
+       r <- tryPutMVar b ()
  126
+       if r then loop (sz-j) bs b2
  127
+            else loop sz bs b2
2  base.cabal
@@ -120,6 +120,8 @@ Library {
120 120
         Control.Concurrent,
121 121
         Control.Concurrent.Chan,
122 122
         Control.Concurrent.MVar,
  123
+        Control.Concurrent.QSem,
  124
+        Control.Concurrent.QSemN,
123 125
         Control.Exception,
124 126
         Control.Exception.Base
125 127
         Control.Monad,
3  tests/all.T
@@ -118,3 +118,6 @@ test('4006', if_msys(expect_fail), compile_and_run, [''])
118 118
 test('5943', normal, compile_and_run, [''])
119 119
 test('T5962', normal, compile_and_run, [''])
120 120
 test('T7034', normal, compile_and_run, [''])
  121
+
  122
+test('qsem001', normal, compile_and_run, [''])
  123
+test('qsemn001', normal, compile_and_run, [''])
88  tests/qsem001.hs
... ...
@@ -0,0 +1,88 @@
  1
+{-# LANGUAGE CPP #-}
  2
+import Control.Concurrent.QSem as OldQ
  3
+
  4
+import Control.Concurrent.Chan
  5
+import Control.Concurrent (forkIO, threadDelay, killThread, yield)
  6
+import Control.Concurrent.MVar
  7
+import Control.Exception
  8
+import Control.Monad
  9
+
  10
+new = newQSem
  11
+wait = waitQSem
  12
+signal = signalQSem
  13
+
  14
+--------
  15
+-- dummy test-framework
  16
+
  17
+type Assertion = IO ()
  18
+
  19
+x @?= y = when (x /= y) $ error (show x ++ " /= " ++ show y)
  20
+
  21
+testCase :: String -> IO () -> IO ()
  22
+testCase n io = putStrLn ("test " ++ n) >> io
  23
+
  24
+defaultMain = sequence
  25
+------
  26
+
  27
+main = defaultMain tests
  28
+
  29
+tests = [
  30
+    testCase "sem1" sem1,
  31
+    testCase "sem2" sem2,
  32
+    testCase "sem_kill" sem_kill,
  33
+    testCase "sem_fifo" sem_fifo,
  34
+    testCase "sem_bracket" sem_bracket
  35
+ ]
  36
+
  37
+sem1 :: Assertion
  38
+sem1 = do
  39
+  q <- new 0
  40
+  signal q
  41
+  wait q
  42
+
  43
+sem2 :: Assertion
  44
+sem2 = do
  45
+  q <- new 0
  46
+  signal q
  47
+  signal q
  48
+  wait q
  49
+  wait q
  50
+
  51
+sem_fifo :: Assertion
  52
+sem_fifo = do
  53
+  c <- newChan
  54
+  q <- new 0
  55
+  t1 <- forkIO $ do wait q; writeChan c 'a'
  56
+  threadDelay 10000
  57
+  t2 <- forkIO $ do wait q; writeChan c 'b'
  58
+  threadDelay 10000
  59
+  t3 <- forkIO $ do wait q; writeChan c 'c'
  60
+  threadDelay 10000
  61
+  signal q
  62
+  a <- readChan c
  63
+  signal q
  64
+  b <- readChan c
  65
+  signal q
  66
+  c <- readChan c
  67
+  [a,b,c] @?= "abc"
  68
+
  69
+sem_kill :: Assertion
  70
+sem_kill  = do
  71
+  q <- new 0
  72
+  t <- forkIO $ do wait q
  73
+  threadDelay 100000
  74
+  killThread t
  75
+  m <- newEmptyMVar
  76
+  t <- forkIO $ do wait q; putMVar m ()
  77
+  signal q
  78
+  takeMVar m
  79
+
  80
+
  81
+sem_bracket :: Assertion
  82
+sem_bracket = do
  83
+  q <- new 1
  84
+  ts <- forM [1..100000] $ \n -> do
  85
+     forkIO $ do bracket_ (wait q) (signal q) (return ())
  86
+  mapM_ killThread ts
  87
+  wait q
  88
+
110  tests/qsemn001.hs
... ...
@@ -0,0 +1,110 @@
  1
+{-# LANGUAGE CPP #-}
  2
+import Control.Concurrent
  3
+import Control.Exception
  4
+import Control.Monad
  5
+import Control.Concurrent.STM
  6
+
  7
+new = newQSemN
  8
+wait = waitQSemN
  9
+signal = signalQSemN
  10
+
  11
+--------
  12
+-- dummy test-framework
  13
+
  14
+type Assertion = IO ()
  15
+
  16
+x @?= y = when (x /= y) $ error (show x ++ " /= " ++ show y)
  17
+
  18
+testCase :: String -> IO () -> IO ()
  19
+testCase n io = putStrLn ("test " ++ n) >> io
  20
+
  21
+defaultMain = sequence
  22
+------
  23
+
  24
+main = defaultMain tests
  25
+
  26
+tests = [
  27
+    testCase "semn" semn,
  28
+    testCase "semn2" semn2,
  29
+    testCase "semn3" semn3,
  30
+    testCase "semn_kill" semn_kill,
  31
+    testCase "semn_bracket" sem_bracket
  32
+ ]
  33
+
  34
+semn :: Assertion
  35
+semn = do
  36
+  c <- newTChanIO
  37
+  q <- new 0
  38
+  t1 <- forkIO $ do wait q 1; atomically $ writeTChan c 'a'
  39
+  threadDelay 10000
  40
+  t2 <- forkIO $ do wait q 2; atomically $ writeTChan c 'b'
  41
+  threadDelay 10000
  42
+  t3 <- forkIO $ do wait q 3; atomically $ writeTChan c 'c'
  43
+  threadDelay 10000
  44
+  signal q 1
  45
+  a <- atomically $ readTChan c
  46
+  signal q 2
  47
+  b <- atomically $ readTChan c
  48
+  signal q 3
  49
+  c <- atomically $ readTChan c
  50
+  [a,b,c] @?= "abc"
  51
+
  52
+semn2 :: Assertion
  53
+semn2 = do
  54
+  c <- newTChanIO
  55
+  q <- new 0
  56
+  t1 <- forkIO $ do wait q 1; threadDelay 10000; atomically $ writeTChan c 'a'
  57
+  threadDelay 10000
  58
+  t2 <- forkIO $ do wait q 2; threadDelay 20000; atomically $ writeTChan c 'b'
  59
+  threadDelay 10000
  60
+  t3 <- forkIO $ do wait q 3; threadDelay 30000; atomically $ writeTChan c 'c'
  61
+  threadDelay 10000
  62
+  signal q 6
  63
+  a <- atomically $ readTChan c
  64
+  b <- atomically $ readTChan c
  65
+  c <- atomically $ readTChan c
  66
+  [a,b,c] @?= "abc"
  67
+
  68
+semn3 :: Assertion
  69
+semn3 = do
  70
+  c <- newTChanIO
  71
+  q <- new 0
  72
+  t1 <- forkIO $ do wait q 1; threadDelay 10000; atomically $ writeTChan c 'a'
  73
+  threadDelay 10000
  74
+  t2 <- forkIO $ do wait q 2; threadDelay 20000; atomically $ writeTChan c 'b'
  75
+  threadDelay 10000
  76
+  t3 <- forkIO $ do wait q 3; threadDelay 30000; atomically $ writeTChan c 'c'
  77
+  threadDelay 10000
  78
+  signal q 3
  79
+  a <- atomically $ readTChan c
  80
+  b <- atomically $ readTChan c
  81
+  threadDelay 10000
  82
+  [a,b] @?= "ab"
  83
+  d <- atomically $ isEmptyTChan c
  84
+  d @?= True
  85
+  signal q 1
  86
+  threadDelay 10000
  87
+  d <- atomically $ isEmptyTChan c
  88
+  d @?= True
  89
+  signal q 2
  90
+  x <- atomically $ readTChan c
  91
+  x @?= 'c'
  92
+
  93
+semn_kill :: Assertion
  94
+semn_kill  = do
  95
+  q <- new 0
  96
+  t <- forkIO $ do wait q 1
  97
+  threadDelay 10000
  98
+  killThread t
  99
+  m <- newEmptyMVar
  100
+  t <- forkIO $ do wait q 1; putMVar m ()
  101
+  signal q 1
  102
+  takeMVar m
  103
+
  104
+sem_bracket :: Assertion
  105
+sem_bracket = do
  106
+  q <- new 1
  107
+  ts <- forM [1..100000] $ \n -> do
  108
+     forkIO $ do bracket_ (wait q 1) (signal q 1) (return ())
  109
+  mapM_ killThread ts
  110
+  wait q 1

0 notes on commit ea3abf1

Please sign in to comment.
Something went wrong with that request. Please try again.