diff --git a/libraries/base/LwConc/Substrate.hs b/libraries/base/LwConc/Substrate.hs index dab89f753c68..332aba091d89 100644 --- a/libraries/base/LwConc/Substrate.hs +++ b/libraries/base/LwConc/Substrate.hs @@ -44,6 +44,7 @@ module LwConc.Substrate , newPVar -- a -> PTM (PVar a) , newPVarIO -- a -> IO (PVar a) , readPVar -- PVar a -> PTM a +, readPVarIO -- PVar a -> IO a , writePVar -- PVar a -> a -> PTM () ------------------------------------------------------------------------------ @@ -268,6 +269,12 @@ newPVarIO val = IO $ \s1# -> readPVar :: PVar a -> PTM a readPVar (PVar tvar#) = PTM $ \s# -> readTVar# tvar# s# +-- |Return the current value stored in a PVar +{-# INLINE readPVarIO #-} +readPVarIO :: PVar a -> IO a +readPVarIO (PVar tvar#) = IO $ \s# -> readTVar# tvar# s# + + -- |Write the supplied value into a PVar {-# INLINE writePVar #-} writePVar :: PVar a -> a -> PTM () diff --git a/libraries/lwconc/LwConc/PTM.hs b/libraries/lwconc/LwConc/PTM.hs new file mode 100644 index 000000000000..8956c931534d --- /dev/null +++ b/libraries/lwconc/LwConc/PTM.hs @@ -0,0 +1,45 @@ +{-# LANGUAGE CPP #-} + +#if __GLASGOW_HASKELL__ >= 701 +{-# LANGUAGE Trustworthy #-} +#endif + +----------------------------------------------------------------------------- +-- | +-- Module : LwConc.PTM +-- Copyright : (c) The University of Glasgow 2004 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable (requires PTM) +-- +-- Software Transactional Memory: a modular composable concurrency +-- abstraction. See +-- +-- * /Composable memory transactions/, by Tim Harris, Simon Marlow, Simon +-- Peyton Jones, and Maurice Herlihy, in /ACM Conference on Principles +-- and Practice of Parallel Programming/ 2005. +-- +-- +----------------------------------------------------------------------------- + +module LwConc.PTM ( + module LwConc.PTM.TVar, +#ifdef __GLASGOW_HASKELL__ + module LwConc.PTM.TMVar, + module LwConc.PTM.TChan, + module LwConc.PTM.TQueue, + module LwConc.PTM.TBQueue, +#endif + module LwConc.PTM.TArray + ) where + +import LwConc.PTM.TVar +#ifdef __GLASGOW_HASKELL__ +import LwConc.PTM.TMVar +import LwConc.PTM.TChan +#endif +import LwConc.PTM.TArray +import LwConc.PTM.TQueue +import LwConc.PTM.TBQueue diff --git a/libraries/lwconc/LwConc/PTM/TArray.hs b/libraries/lwconc/LwConc/PTM/TArray.hs new file mode 100644 index 000000000000..0aace06bf181 --- /dev/null +++ b/libraries/lwconc/LwConc/PTM/TArray.hs @@ -0,0 +1,67 @@ +{-# LANGUAGE CPP, DeriveDataTypeable, FlexibleInstances, MultiParamTypeClasses #-} + +#if __GLASGOW_HASKELL__ >= 701 +{-# LANGUAGE Trustworthy #-} +#endif + +----------------------------------------------------------------------------- +-- | +-- Module : LwConc.PTM.TArray +-- Copyright : (c) The University of Glasgow 2005 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable (requires PTM) +-- +-- TArrays: transactional arrays, for use in the PTM monad +-- +----------------------------------------------------------------------------- + +module LwConc.PTM.TArray ( + TArray +) where + +import Data.Array (Array, bounds) +import Data.Array.Base (listArray, arrEleBottom, unsafeAt, MArray(..), + IArray(numElements)) +import Data.Ix (rangeSize) +import Data.Typeable (Typeable) +#ifdef __GLASGOW_HASKELL__ +import LwConc.Substrate +#else +import Control.Sequential.PTM (PTM) +#endif + +-- |TArray is a transactional array, supporting the usual 'MArray' +-- interface for mutable arrays. +-- +-- It is currently implemented as @Array ix (TVar e)@, +-- but it may be replaced by a more efficient implementation in the future +-- (the interface will remain the same, however). +-- +newtype TArray i e = TArray (Array i (PVar e)) deriving (Eq, Typeable) + +instance MArray TArray e PTM where + getBounds (TArray a) = return (bounds a) + newArray b e = do + a <- rep (rangeSize b) (newPVar e) + return $ TArray (listArray b a) + newArray_ b = do + a <- rep (rangeSize b) (newPVar arrEleBottom) + return $ TArray (listArray b a) + unsafeRead (TArray a) i = readPVar $ unsafeAt a i + unsafeWrite (TArray a) i e = writePVar (unsafeAt a i) e + getNumElements (TArray a) = return (numElements a) + +-- | Like 'replicateM' but uses an accumulator to prevent stack overflows. +-- Unlike 'replicateM' the returned list is in reversed order. +-- This doesn't matter though since this function is only used to create +-- arrays with identical elements. +rep :: Monad m => Int -> m a -> m [a] +rep n m = go n [] + where + go 0 xs = return xs + go i xs = do + x <- m + go (i-1) (x:xs) diff --git a/libraries/lwconc/LwConc/PTM/TBQueue.hs b/libraries/lwconc/LwConc/PTM/TBQueue.hs new file mode 100644 index 000000000000..4e1db0db793c --- /dev/null +++ b/libraries/lwconc/LwConc/PTM/TBQueue.hs @@ -0,0 +1,179 @@ +{-# OPTIONS_GHC -fno-warn-name-shadowing #-} +{-# LANGUAGE CPP, DeriveDataTypeable #-} + +#if __GLASGOW_HASKELL__ >= 701 +{-# LANGUAGE Trustworthy #-} +#endif + +----------------------------------------------------------------------------- +-- | +-- Module : LwConc.PTM.TBQueue +-- Copyright : (c) The University of Glasgow 2012 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable (requires PTM) +-- +-- 'TBQueue' is a bounded version of 'TQueue'. The queue has a maximum +-- capacity set when it is created. If the queue already contains the +-- maximum number of elements, then 'writeTBQueue' blocks until an +-- element is removed from the queue. +-- +-- The implementation is based on the traditional purely-functional +-- queue representation that uses two lists to obtain amortised /O(1)/ +-- enqueue and dequeue operations. +-- +----------------------------------------------------------------------------- + +module LwConc.PTM.TBQueue ( + -- * TBQueue + TBQueue, + newTBQueue, + newTBQueueIO, + readTBQueue, + -- tryReadTBQueue, + peekTBQueue, + -- tryPeekTBQueue, + writeTBQueue, + unGetTBQueue, + isEmptyTBQueue, + ) where + +import Data.Typeable +import LwConc.Substrate + +#define _UPK_(x) {-# UNPACK #-} !(x) + +-- | 'TBQueue' is an abstract type representing a bounded FIFO channel. +data TBQueue a + = TBQueue _UPK_(PVar Int) -- CR: read capacity + _UPK_(PVar [a]) -- R: elements waiting to be read + _UPK_(PVar Int) -- CW: write capacity + _UPK_(PVar [a]) -- W: elements written (head is most recent) + deriving Typeable + +instance Eq (TBQueue a) where + TBQueue a _ _ _ == TBQueue b _ _ _ = a == b + +-- Total channel capacity remaining is CR + CW. Reads only need to +-- access CR, writes usually need to access only CW but sometimes need +-- CR. So in the common case we avoid contention between CR and CW. +-- +-- - when removing an element from R: +-- CR := CR + 1 +-- +-- - when adding an element to W: +-- if CW is non-zero +-- then CW := CW - 1 +-- then if CR is non-zero +-- then CW := CR - 1; CR := 0 +-- else **FULL** + +-- |Build and returns a new instance of 'TBQueue' +newTBQueue :: Int -- ^ maximum number of elements the queue can hold + -> PTM (TBQueue a) +newTBQueue size = do + read <- newPVar [] + write <- newPVar [] + rsize <- newPVar 0 + wsize <- newPVar size + return (TBQueue rsize read wsize write) + +-- |@IO@ version of 'newTBQueue'. This is useful for creating top-level +-- 'TBQueue's using 'System.IO.Unsafe.unsafePerformIO', because using +-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't +-- possible. +newTBQueueIO :: Int -> IO (TBQueue a) +newTBQueueIO size = do + read <- newPVarIO [] + write <- newPVarIO [] + rsize <- newPVarIO 0 + wsize <- newPVarIO size + return (TBQueue rsize read wsize write) + +-- |Write a value to a 'TBQueue'; blocks if the queue is full. +writeTBQueue :: TBQueue a -> a -> PTM () +writeTBQueue (TBQueue rsize _read wsize write) a = do + w <- readPVar wsize + if (w /= 0) + then do writePVar wsize (w - 1) + else do + r <- readPVar rsize + if (r /= 0) + then do writePVar rsize 0 + writePVar wsize (r - 1) + else retry + listend <- readPVar write + writePVar write (a:listend) + +-- |Read the next value from the 'TBQueue'. +readTBQueue :: TBQueue a -> PTM a +readTBQueue (TBQueue rsize read _wsize write) = do + xs <- readPVar read + r <- readPVar rsize + writePVar rsize (r + 1) + case xs of + (x:xs') -> do + writePVar read xs' + return x + [] -> do + ys <- readPVar write + case ys of + [] -> retry + _ -> do + let (z:zs) = reverse ys -- NB. lazy: we want the transaction to be + -- short, otherwise it will conflict + writePVar write [] + writePVar read zs + return z + +-- | A version of 'readTBQueue' which does not retry. Instead it +-- returns @Nothing@ if no value is available. +-- tryReadTBQueue :: TBQueue a -> PTM (Maybe a) +-- tryReadTBQueue c = fmap Just (readTBQueue c) `orElse` return Nothing + +-- | Get the next value from the @TBQueue@ without removing it, +-- retrying if the channel is empty. +peekTBQueue :: TBQueue a -> PTM a +peekTBQueue c = do + x <- readTBQueue c + unGetTBQueue c x + return x + +-- | A version of 'peekTBQueue' which does not retry. Instead it +-- returns @Nothing@ if no value is available. +-- tryPeekTBQueue :: TBQueue a -> PTM (Maybe a) +-- tryPeekTBQueue c = do +-- m <- tryReadTBQueue c +-- case m of +-- Nothing -> return Nothing +-- Just x -> do +-- unGetTBQueue c x +-- return m + +-- |Put a data item back onto a channel, where it will be the next item read. +-- Blocks if the queue is full. +unGetTBQueue :: TBQueue a -> a -> PTM () +unGetTBQueue (TBQueue rsize read wsize _write) a = do + r <- readPVar rsize + if (r > 0) + then do writePVar rsize (r - 1) + else do + w <- readPVar wsize + if (w > 0) + then writePVar wsize (w - 1) + else retry + xs <- readPVar read + writePVar read (a:xs) + +-- |Returns 'True' if the supplied 'TBQueue' is empty. +isEmptyTBQueue :: TBQueue a -> PTM Bool +isEmptyTBQueue (TBQueue _rsize read _wsize write) = do + xs <- readPVar read + case xs of + (_:_) -> return False + [] -> do ys <- readPVar write + case ys of + [] -> return True + _ -> return False diff --git a/libraries/lwconc/LwConc/PTM/TChan.hs b/libraries/lwconc/LwConc/PTM/TChan.hs new file mode 100644 index 000000000000..43c8a535bfe4 --- /dev/null +++ b/libraries/lwconc/LwConc/PTM/TChan.hs @@ -0,0 +1,198 @@ +{-# OPTIONS_GHC -fno-warn-name-shadowing #-} +{-# LANGUAGE CPP, DeriveDataTypeable #-} + +#if __GLASGOW_HASKELL__ >= 701 +{-# LANGUAGE Trustworthy #-} +#endif + +----------------------------------------------------------------------------- +-- | +-- Module : LwConc.PTM.TChan +-- Copyright : (c) The University of Glasgow 2004 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable (requires PTM) +-- +-- TChan: Transactional channels +-- (GHC only) +-- +----------------------------------------------------------------------------- + +module LwConc.PTM.TChan ( +#ifdef __GLASGOW_HASKELL__ + -- * TChans + TChan, + + -- ** Construction + newTChan, + newTChanIO, + newBroadcastTChan, + newBroadcastTChanIO, + dupTChan, + cloneTChan, + + -- ** Reading and writing + readTChan, + tryReadTChan, + peekTChan, + tryPeekTChan, + writeTChan, + unGetTChan, + isEmptyTChan +#endif + ) where + +#ifdef __GLASGOW_HASKELL__ +import LwConc.Substrate + +import Data.Typeable (Typeable) + +#define _UPK_(x) {-# UNPACK #-} !(x) + +-- | 'TChan' is an abstract type representing an unbounded FIFO channel. +data TChan a = TChan _UPK_(PVar (TVarList a)) + _UPK_(PVar (TVarList a)) + deriving (Eq, Typeable) + +type TVarList a = PVar (TList a) +data TList a = TNil | TCons a _UPK_(TVarList a) + +-- |Build and return a new instance of 'TChan' +newTChan :: PTM (TChan a) +newTChan = do + hole <- newPVar TNil + read <- newPVar hole + write <- newPVar hole + return (TChan read write) + +-- |@IO@ version of 'newTChan'. This is useful for creating top-level +-- 'TChan's using 'System.IO.Unsafe.unsafePerformIO', because using +-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't +-- possible. +newTChanIO :: IO (TChan a) +newTChanIO = do + hole <- newPVarIO TNil + read <- newPVarIO hole + write <- newPVarIO hole + return (TChan read write) + +-- | Create a write-only 'TChan'. More precisely, 'readTChan' will 'retry' +-- even after items have been written to the channel. The only way to read +-- a broadcast channel is to duplicate it with 'dupTChan'. +-- +-- Consider a server that broadcasts messages to clients: +-- +-- >serve :: TChan Message -> Client -> IO loop +-- >serve broadcastChan client = do +-- > myChan <- dupTChan broadcastChan +-- > forever $ do +-- > message <- readTChan myChan +-- > send client message +-- +-- The problem with using 'newTChan' to create the broadcast channel is that if +-- it is only written to and never read, items will pile up in memory. By +-- using 'newBroadcastTChan' to create the broadcast channel, items can be +-- garbage collected after clients have seen them. +newBroadcastTChan :: PTM (TChan a) +newBroadcastTChan = do + write_hole <- newPVar TNil + read <- newPVar (error "reading from a TChan created by newBroadcastTChan; use dupTChan first") + write <- newPVar write_hole + return (TChan read write) + +-- | @IO@ version of 'newBroadcastTChan'. +newBroadcastTChanIO :: IO (TChan a) +newBroadcastTChanIO = do + dummy_hole <- newPVarIO TNil + write_hole <- newPVarIO TNil + read <- newPVarIO dummy_hole + write <- newPVarIO write_hole + return (TChan read write) + +-- |Write a value to a 'TChan'. +writeTChan :: TChan a -> a -> PTM () +writeTChan (TChan _read write) a = do + listend <- readPVar write -- listend == TVar pointing to TNil + new_listend <- newPVar TNil + writePVar listend (TCons a new_listend) + writePVar write new_listend + +-- |Read the next value from the 'TChan'. +readTChan :: TChan a -> PTM a +readTChan (TChan read _write) = do + listhead <- readPVar read + head <- readPVar listhead + case head of + TNil -> retry + TCons a tail -> do + writePVar read tail + return a + +-- | A version of 'readTChan' which does not retry. Instead it +-- returns @Nothing@ if no value is available. +tryReadTChan :: TChan a -> PTM (Maybe a) +tryReadTChan (TChan read _write) = do + listhead <- readPVar read + head <- readPVar listhead + case head of + TNil -> return Nothing + TCons a tl -> do + writePVar read tl + return (Just a) + +-- | Get the next value from the @TChan@ without removing it, +-- retrying if the channel is empty. +peekTChan :: TChan a -> PTM a +peekTChan (TChan read _write) = do + listhead <- readPVar read + head <- readPVar listhead + case head of + TNil -> retry + TCons a _ -> return a + +-- | A version of 'peekTChan' which does not retry. Instead it +-- returns @Nothing@ if no value is available. +tryPeekTChan :: TChan a -> PTM (Maybe a) +tryPeekTChan (TChan read _write) = do + listhead <- readPVar read + head <- readPVar listhead + case head of + TNil -> return Nothing + TCons a _ -> return (Just a) + +-- |Duplicate a 'TChan': the duplicate channel begins empty, but data written to +-- either channel from then on will be available from both. Hence this creates +-- a kind of broadcast channel, where data written by anyone is seen by +-- everyone else. +dupTChan :: TChan a -> PTM (TChan a) +dupTChan (TChan _read write) = do + hole <- readPVar write + new_read <- newPVar hole + return (TChan new_read write) + +-- |Put a data item back onto a channel, where it will be the next item read. +unGetTChan :: TChan a -> a -> PTM () +unGetTChan (TChan read _write) a = do + listhead <- readPVar read + newhead <- newPVar (TCons a listhead) + writePVar read newhead + +-- |Returns 'True' if the supplied 'TChan' is empty. +isEmptyTChan :: TChan a -> PTM Bool +isEmptyTChan (TChan read _write) = do + listhead <- readPVar read + head <- readPVar listhead + case head of + TNil -> return True + TCons _ _ -> return False + +-- |Clone a 'TChan': similar to dupTChan, but the cloned channel starts with the +-- same content available as the original channel. +cloneTChan :: TChan a -> PTM (TChan a) +cloneTChan (TChan read write) = do + readpos <- readPVar read + new_read <- newPVar readpos + return (TChan new_read write) +#endif diff --git a/libraries/lwconc/LwConc/PTM/TMVar.hs b/libraries/lwconc/LwConc/PTM/TMVar.hs new file mode 100644 index 000000000000..7e98e4556723 --- /dev/null +++ b/libraries/lwconc/LwConc/PTM/TMVar.hs @@ -0,0 +1,153 @@ +{-# LANGUAGE CPP, DeriveDataTypeable #-} + +#if __GLASGOW_HASKELL__ >= 701 +{-# LANGUAGE Trustworthy #-} +#endif + +----------------------------------------------------------------------------- +-- | +-- Module : LwConc.PTM.TMVar +-- Copyright : (c) The University of Glasgow 2004 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable (requires PTM) +-- +-- TMVar: Transactional MVars, for use in the PTM monad +-- (GHC only) +-- +----------------------------------------------------------------------------- + +module LwConc.PTM.TMVar ( +#ifdef __GLASGOW_HASKELL__ + -- * TMVars + TMVar, + newTMVar, + newEmptyTMVar, + newTMVarIO, + newEmptyTMVarIO, + takeTMVar, + putTMVar, + readTMVar, + tryReadTMVar, + swapTMVar, + tryTakeTMVar, + tryPutTMVar, + isEmptyTMVar +#endif + ) where + +#ifdef __GLASGOW_HASKELL__ +import LwConc.Substrate + +import Data.Typeable (Typeable) + +newtype TMVar a = TMVar (PVar (Maybe a)) deriving (Eq, Typeable) +{- ^ +A 'TMVar' is a synchronising variable, used +for communication between concurrent threads. It can be thought of +as a box, which may be empty or full. +-} + +-- |Create a 'TMVar' which contains the supplied value. +newTMVar :: a -> PTM (TMVar a) +newTMVar a = do + t <- newPVar (Just a) + return (TMVar t) + +-- |@IO@ version of 'newTMVar'. This is useful for creating top-level +-- 'TMVar's using 'System.IO.Unsafe.unsafePerformIO', because using +-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't +-- possible. +newTMVarIO :: a -> IO (TMVar a) +newTMVarIO a = do + t <- newPVarIO (Just a) + return (TMVar t) + +-- |Create a 'TMVar' which is initially empty. +newEmptyTMVar :: PTM (TMVar a) +newEmptyTMVar = do + t <- newPVar Nothing + return (TMVar t) + +-- |@IO@ version of 'newEmptyTMVar'. This is useful for creating top-level +-- 'TMVar's using 'System.IO.Unsafe.unsafePerformIO', because using +-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't +-- possible. +newEmptyTMVarIO :: IO (TMVar a) +newEmptyTMVarIO = do + t <- newPVarIO Nothing + return (TMVar t) + +-- |Return the contents of the 'TMVar'. If the 'TMVar' is currently +-- empty, the transaction will 'retry'. After a 'takeTMVar', +-- the 'TMVar' is left empty. +takeTMVar :: TMVar a -> PTM a +takeTMVar (TMVar t) = do + m <- readPVar t + case m of + Nothing -> retry + Just a -> do writePVar t Nothing; return a + +-- | A version of 'takeTMVar' that does not 'retry'. The 'tryTakeTMVar' +-- function returns 'Nothing' if the 'TMVar' was empty, or @'Just' a@ if +-- the 'TMVar' was full with contents @a@. After 'tryTakeTMVar', the +-- 'TMVar' is left empty. +tryTakeTMVar :: TMVar a -> PTM (Maybe a) +tryTakeTMVar (TMVar t) = do + m <- readPVar t + case m of + Nothing -> return Nothing + Just a -> do writePVar t Nothing; return (Just a) + +-- |Put a value into a 'TMVar'. If the 'TMVar' is currently full, +-- 'putTMVar' will 'retry'. +putTMVar :: TMVar a -> a -> PTM () +putTMVar (TMVar t) a = do + m <- readPVar t + case m of + Nothing -> do writePVar t (Just a); return () + Just _ -> retry + +-- | A version of 'putTMVar' that does not 'retry'. The 'tryPutTMVar' +-- function attempts to put the value @a@ into the 'TMVar', returning +-- 'True' if it was successful, or 'False' otherwise. +tryPutTMVar :: TMVar a -> a -> PTM Bool +tryPutTMVar (TMVar t) a = do + m <- readPVar t + case m of + Nothing -> do writePVar t (Just a); return True + Just _ -> return False + +-- | This is a combination of 'takeTMVar' and 'putTMVar'; ie. it +-- takes the value from the 'TMVar', puts it back, and also returns +-- it. +readTMVar :: TMVar a -> PTM a +readTMVar (TMVar t) = do + m <- readPVar t + case m of + Nothing -> retry + Just a -> return a + +-- | A version of 'readTMVar' which does not retry. Instead it +-- returns @Nothing@ if no value is available. +tryReadTMVar :: TMVar a -> PTM (Maybe a) +tryReadTMVar (TMVar t) = readPVar t + +-- |Swap the contents of a 'TMVar' for a new value. +swapTMVar :: TMVar a -> a -> PTM a +swapTMVar (TMVar t) new = do + m <- readPVar t + case m of + Nothing -> retry + Just old -> do writePVar t (Just new); return old + +-- |Check whether a given 'TMVar' is empty. +isEmptyTMVar :: TMVar a -> PTM Bool +isEmptyTMVar (TMVar t) = do + m <- readPVar t + case m of + Nothing -> return True + Just _ -> return False +#endif diff --git a/libraries/lwconc/LwConc/PTM/TQueue.hs b/libraries/lwconc/LwConc/PTM/TQueue.hs new file mode 100644 index 000000000000..b298e247384f --- /dev/null +++ b/libraries/lwconc/LwConc/PTM/TQueue.hs @@ -0,0 +1,136 @@ +{-# OPTIONS_GHC -fno-warn-name-shadowing #-} +{-# LANGUAGE CPP, DeriveDataTypeable #-} + +#if __GLASGOW_HASKELL__ >= 701 +{-# LANGUAGE Trustworthy #-} +#endif + +----------------------------------------------------------------------------- +-- | +-- Module : LwConc.PTM.TQueue +-- Copyright : (c) The University of Glasgow 2012 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable (requires PTM) +-- +-- A 'TQueue' is like a 'TChan', with two important differences: +-- +-- * it has faster throughput than both 'TChan' and 'Chan' (although +-- the costs are amortised, so the cost of individual operations +-- can vary a lot). +-- +-- * it does /not/ provide equivalents of the 'dupTChan' and +-- 'cloneTChan' operations. +-- +-- The implementation is based on the traditional purely-functional +-- queue representation that uses two lists to obtain amortised /O(1)/ +-- enqueue and dequeue operations. +-- +----------------------------------------------------------------------------- + +module LwConc.PTM.TQueue ( + -- * TQueue + TQueue, + newTQueue, + newTQueueIO, + readTQueue, + -- tryReadTQueue, + peekTQueue, + -- tryPeekTQueue, + writeTQueue, + unGetTQueue, + isEmptyTQueue, + ) where + +import LwConc.Substrate +import Data.Typeable (Typeable) + +-- | 'TQueue' is an abstract type representing an unbounded FIFO channel. +data TQueue a = TQueue {-# UNPACK #-} !(PVar [a]) + {-# UNPACK #-} !(PVar [a]) + deriving Typeable + +instance Eq (TQueue a) where + TQueue a _ == TQueue b _ = a == b + +-- |Build and returns a new instance of 'TQueue' +newTQueue :: PTM (TQueue a) +newTQueue = do + read <- newPVar [] + write <- newPVar [] + return (TQueue read write) + +-- |@IO@ version of 'newTQueue'. This is useful for creating top-level +-- 'TQueue's using 'System.IO.Unsafe.unsafePerformIO', because using +-- 'atomically' inside 'System.IO.Unsafe.unsafePerformIO' isn't +-- possible. +newTQueueIO :: IO (TQueue a) +newTQueueIO = do + read <- newPVarIO [] + write <- newPVarIO [] + return (TQueue read write) + +-- |Write a value to a 'TQueue'. +writeTQueue :: TQueue a -> a -> PTM () +writeTQueue (TQueue _read write) a = do + listend <- readPVar write + writePVar write (a:listend) + +-- |Read the next value from the 'TQueue'. +readTQueue :: TQueue a -> PTM a +readTQueue (TQueue read write) = do + xs <- readPVar read + case xs of + (x:xs') -> do writePVar read xs' + return x + [] -> do ys <- readPVar write + case ys of + [] -> retry + _ -> case reverse ys of + [] -> error "readTQueue" + (z:zs) -> do writePVar write [] + writePVar read zs + return z + +-- | A version of 'readTQueue' which does not retry. Instead it +-- returns @Nothing@ if no value is available. +-- tryReadTQueue :: TQueue a -> PTM (Maybe a) +-- tryReadTQueue c = fmap Just (readTQueue c) `orElse` return Nothing + +-- | Get the next value from the @TQueue@ without removing it, +-- retrying if the channel is empty. +peekTQueue :: TQueue a -> PTM a +peekTQueue c = do + x <- readTQueue c + unGetTQueue c x + return x + +-- | A version of 'peekTQueue' which does not retry. Instead it +-- returns @Nothing@ if no value is available. +-- tryPeekTQueue :: TQueue a -> PTM (Maybe a) +-- tryPeekTQueue c = do +-- m <- tryReadTQueue c +-- case m of +-- Nothing -> return Nothing +-- Just x -> do +-- unGetTQueue c x +-- return m + +-- |Put a data item back onto a channel, where it will be the next item read. +unGetTQueue :: TQueue a -> a -> PTM () +unGetTQueue (TQueue read _write) a = do + xs <- readPVar read + writePVar read (a:xs) + +-- |Returns 'True' if the supplied 'TQueue' is empty. +isEmptyTQueue :: TQueue a -> PTM Bool +isEmptyTQueue (TQueue read write) = do + xs <- readPVar read + case xs of + (_:_) -> return False + [] -> do ys <- readPVar write + case ys of + [] -> return True + _ -> return False diff --git a/libraries/lwconc/LwConc/PTM/TSem.hs b/libraries/lwconc/LwConc/PTM/TSem.hs new file mode 100644 index 000000000000..709b720d9acc --- /dev/null +++ b/libraries/lwconc/LwConc/PTM/TSem.hs @@ -0,0 +1,53 @@ +----------------------------------------------------------------------------- +-- | +-- Module : LwConc.PTM.TSem +-- Copyright : (c) The University of Glasgow 2012 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable (requires PTM) +-- +-- 'TSem': transactional semaphores. +-- +----------------------------------------------------------------------------- + +{-# LANGUAGE DeriveDataTypeable #-} +module LwConc.PTM.TSem ( + TSem, newTSem, waitTSem, signalTSem + ) where + +import LwConc.Substrate +import Control.Monad +import Data.Typeable + +-- | 'TSem' is a transactional semaphore. It holds a certain number +-- of units, and units may be acquired or released by 'waitTSem' and +-- 'signalTSem' respectively. When the 'TSem' is empty, 'waitTSem' +-- blocks. +-- +-- Note that 'TSem' has no concept of fairness, and there is no +-- guarantee that threads blocked in `waitTSem` will be unblocked in +-- the same order; in fact they will all be unblocked at the same time +-- and will fight over the 'TSem'. Hence 'TSem' is not suitable if +-- you expect there to be a high number of threads contending for the +-- resource. However, like other PTM abstractions, 'TSem' is +-- composable. +-- +newtype TSem = TSem (PVar Int) + deriving (Eq, Typeable) + +newTSem :: Int -> PTM TSem +newTSem i = fmap TSem (newPVar i) + +waitTSem :: TSem -> PTM () +waitTSem (TSem t) = do + i <- readPVar t + when (i <= 0) retry + writePVar t $! (i-1) + +signalTSem :: TSem -> PTM () +signalTSem (TSem t) = do + i <- readPVar t + writePVar t $! i+1 + diff --git a/libraries/lwconc/LwConc/PTM/TVar.hs b/libraries/lwconc/LwConc/PTM/TVar.hs new file mode 100644 index 000000000000..bb1402d1c5aa --- /dev/null +++ b/libraries/lwconc/LwConc/PTM/TVar.hs @@ -0,0 +1,90 @@ +{-# LANGUAGE CPP #-} + +#if __GLASGOW_HASKELL__ >= 701 +{-# LANGUAGE Trustworthy #-} +#endif + +----------------------------------------------------------------------------- +-- | +-- Module : LwConc.PTM.TVar +-- Copyright : (c) The University of Glasgow 2004 +-- License : BSD-style (see the file libraries/base/LICENSE) +-- +-- Maintainer : libraries@haskell.org +-- Stability : experimental +-- Portability : non-portable (requires PTM) +-- +-- TVar: Transactional variables +-- +----------------------------------------------------------------------------- + +module LwConc.PTM.TVar ( + -- * TVars + TVar, + newTVar, + newTVarIO, + readTVar, + readTVarIO, + writeTVar, + modifyTVar, + modifyTVar', + swapTVar, +#ifdef __GLASGOW_HASKELL__ + -- registerDelay +#endif + ) where + +#ifdef __GLASGOW_HASKELL__ +import LwConc.Substrate +#else +import Control.Sequential.PTM +#endif + +#if ! (MIN_VERSION_base(4,2,0)) +readPVarIO = atomically . readPVar +#endif + +type TVar = PVar + +newTVar :: a -> PTM (TVar a) +newTVar = newPVar + +newTVarIO :: a -> IO (TVar a) +newTVarIO = newPVarIO + +readTVar :: TVar a -> PTM a +readTVar = readPVar + +writeTVar :: TVar a -> a -> PTM () +writeTVar = writePVar + +readTVarIO :: TVar a -> IO a +readTVarIO = readPVarIO + +-- Like 'modifyIORef' but for 'TVar'. +-- | Mutate the contents of a 'TVar'. /N.B./, this version is +-- non-strict. +modifyTVar :: TVar a -> (a -> a) -> PTM () +modifyTVar var f = do + x <- readPVar var + writePVar var (f x) +{-# INLINE modifyTVar #-} + + +-- | Strict version of 'modifyTVar'. +modifyTVar' :: TVar a -> (a -> a) -> PTM () +modifyTVar' var f = do + x <- readPVar var + writePVar var $! f x +{-# INLINE modifyTVar' #-} + + +-- Like 'swapTMVar' but for 'TVar'. +-- | Swap the contents of a 'TVar' for a new value. +swapTVar :: TVar a -> a -> PTM a +swapTVar var new = do + old <- readPVar var + writePVar var new + return old +{-# INLINE swapTVar #-} + diff --git a/libraries/lwconc/LwConc/Sequential/PTM.hs b/libraries/lwconc/LwConc/Sequential/PTM.hs new file mode 100644 index 000000000000..7296835aca76 --- /dev/null +++ b/libraries/lwconc/LwConc/Sequential/PTM.hs @@ -0,0 +1,100 @@ +-- Transactional memory for sequential implementations. +-- Transactions do not run concurrently, but are atomic in the face +-- of exceptions. + +{-# LANGUAGE CPP #-} + +#if __GLASGOW_HASKELL__ >= 701 +{-# LANGUAGE Trustworthy #-} +{-# LANGUAGE ScopedTypeVariables #-} +#endif + +-- #hide +module LwConc.Sequential.PTM ( + PTM, atomically, throwPTM, catchPTM, + TVar, newTVar, newTVarIO, readTVar, readTVarIO, writeTVar + ) where + +#if __GLASGOW_HASKELL__ < 705 +import Prelude hiding (catch) +#endif +import Control.Applicative (Applicative(pure, (<*>))) +import Control.Exception +import Data.IORef + +-- The reference contains a rollback action to be executed on exceptions +newtype PTM a = PTM (IORef (IO ()) -> IO a) + +unPTM :: PTM a -> IORef (IO ()) -> IO a +unPTM (PTM f) = f + +instance Functor PTM where + fmap f (PTM m) = PTM (fmap f . m) + +instance Applicative PTM where + pure = PTM . const . pure + PTM mf <*> PTM mx = PTM $ \ r -> mf r <*> mx r + +instance Monad PTM where + return = pure + PTM m >>= k = PTM $ \ r -> do + x <- m r + unPTM (k x) r + +#ifdef BASE4 +atomically :: PTM a -> IO a +atomically (PTM m) = do + r <- newIORef (return ()) + m r `onException` do + rollback <- readIORef r + rollback +#else +atomically :: PTM a -> IO a +atomically (PTM m) = do + r <- newIORef (return ()) + m r `catch` \ (ex::SomeException) -> do + rollback <- readIORef r + rollback + throw ex +#endif + +throwPTM :: Exception e => e -> PTM a +throwPTM = PTM . const . throwIO + +catchPTM :: Exception e => PTM a -> (e -> PTM a) -> PTM a +catchPTM (PTM m) h = PTM $ \ r -> do + old_rollback <- readIORef r + writeIORef r (return ()) + res <- try (m r) + rollback_m <- readIORef r + case res of + Left ex -> do + rollback_m + writeIORef r old_rollback + unPTM (h ex) r + Right a -> do + writeIORef r (rollback_m >> old_rollback) + return a + +newtype TVar a = TVar (IORef a) + deriving (Eq) + +newTVar :: a -> PTM (TVar a) +newTVar a = PTM (const (newTVarIO a)) + +newTVarIO :: a -> IO (TVar a) +newTVarIO a = do + ref <- newIORef a + return (TVar ref) + +readTVar :: TVar a -> PTM a +readTVar (TVar ref) = PTM (const (readIORef ref)) + +readTVarIO :: TVar a -> IO a +readTVarIO (TVar ref) = readIORef ref + +writeTVar :: TVar a -> a -> PTM () +writeTVar (TVar ref) a = PTM $ \ r -> do + oldval <- readIORef ref + modifyIORef r (writeIORef ref oldval >>) + writeIORef ref a diff --git a/libraries/lwconc/lwconc.cabal b/libraries/lwconc/lwconc.cabal index e2c76bf97b34..219de7bea3b2 100644 --- a/libraries/lwconc/lwconc.cabal +++ b/libraries/lwconc/lwconc.cabal @@ -19,6 +19,16 @@ Library LwConc.ConcurrentList LwConc.MVarList LwConc.MVar + LwConc.PTM + LwConc.PTM.TArray + LwConc.PTM.TVar + LwConc.PTM.TChan + LwConc.PTM.TMVar + LwConc.PTM.TQueue + LwConc.PTM.TBQueue + LwConc.PTM.TSem + other-modules: + LwConc.Sequential.PTM extensions: CPP Build-Depends: base >= 4.2 && < 5, array,