From 4c92a531c4b4003a1079418376bdddf6f0160c93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20Ros=C3=A9n?= Date: Thu, 7 Feb 2013 10:58:07 +0100 Subject: [PATCH] Haddock comments, and add module Workers --- Control/Concurrent/STM/DTVar.hs | 30 ++++++++++----- Control/Concurrent/STM/Promise.hs | 35 ++++++++++++----- Control/Concurrent/STM/Promise/Process.hs | 9 ++++- Control/Concurrent/STM/Promise/Tree.hs | 47 +++++++++++++++-------- Control/Concurrent/STM/Promise/Workers.hs | 44 +++++++++++++++++++++ stm-promise.cabal | 6 +-- test/Trees.hs | 37 ++---------------- 7 files changed, 134 insertions(+), 74 deletions(-) create mode 100644 Control/Concurrent/STM/Promise/Workers.hs diff --git a/Control/Concurrent/STM/DTVar.hs b/Control/Concurrent/STM/DTVar.hs index 8473ac5..63ac6e8 100644 --- a/Control/Concurrent/STM/DTVar.hs +++ b/Control/Concurrent/STM/DTVar.hs @@ -1,7 +1,4 @@ --- | TVars with a dirty bit that can allow for one listener --- --- I guess one could generalize this with a list or channel of --- listeners that wake up when the variable is dirty +-- | TVars with a dirty bit that allows for one listener {-# LANGUAGE GeneralizedNewtypeDeriving #-} module Control.Concurrent.STM.DTVar ( DTVar @@ -18,9 +15,13 @@ module Control.Concurrent.STM.DTVar , modifyDTVar ) where +-- I guess one could generalize this with a list or channel of +-- listeners that wake up when the variable is dirty + import Control.Monad import Control.Concurrent.STM +-- | TVars with a dirty bit which allows for one listener. data DTVar a = DTVar { var :: TVar a , dirty :: TVar Bool @@ -33,26 +34,30 @@ newDTVar x = do t_var <- newTVar True return (DTVar a_var t_var) +-- | `newDTVar` in `IO` newDTVarIO :: a -> IO (DTVar a) newDTVarIO = atomically . newDTVar +-- | Write a value to a DTVar, making it dirty writeDTVar :: DTVar a -> a -> STM () writeDTVar (DTVar v d) x = do writeTVar v x writeTVar d True +-- | `writeDTVar` in `IO` writeDTVarIO :: DTVar a -> a -> IO () writeDTVarIO u x = atomically (writeDTVar u x) --- | Reads a DTVar without changing the dirty bit +-- | Reads a DTVar /without/ changing the dirty bit readDTVar :: DTVar a -> STM a readDTVar = readTVar . var --- | Reads a DTVar without changing the dirty bit +-- | `readDTVar` in `IO` readDTVarIO :: DTVar a -> IO a readDTVarIO = atomically . readDTVar --- | Listens until the dirty bit is true, then reads and sets dirty to false +-- | Listens until the dirty bit is true, then removes the dirty bit and +-- returns the read element listenDTVar :: DTVar a -> STM a listenDTVar u = do d <- readTVar (dirty u) @@ -60,9 +65,12 @@ listenDTVar u = do writeTVar (dirty u) False readTVar (var u) +-- | `listenDTVar` in `IO` listenDTVarIO :: DTVar a -> IO a listenDTVarIO = atomically . listenDTVar +-- | Listen until any of the dirty bits are true, then removes all dirty bits +-- and returns all `DTVar`'s values, in order. listenDTVars :: [DTVar a] -> STM [a] listenDTVars us = do ds <- mapM (readTVar . dirty) us @@ -70,11 +78,13 @@ listenDTVars us = do mapM_ ((`writeTVar` False) . dirty) us mapM (readTVar . var) us +-- | `listenDTVars` in `IO` listenDTVarsIO :: [DTVar a] -> IO [a] listenDTVarsIO = atomically . listenDTVars +-- | Modify a DTVar, making it dirty. modifyDTVar :: DTVar a -> (a -> a) -> STM () -modifyDTVar u f = do - x <- readTVar (var u) - writeDTVar u (f x) +modifyDTVar d f = do + x <- readDTVar d + writeDTVar d (f x) diff --git a/Control/Concurrent/STM/Promise.hs b/Control/Concurrent/STM/Promise.hs index 5a54389..5153a2b 100644 --- a/Control/Concurrent/STM/Promise.hs +++ b/Control/Concurrent/STM/Promise.hs @@ -1,42 +1,60 @@ {-# LANGUAGE DeriveFunctor #-} +-- | Promises that allow spawning and cancelling in `IO`, and an `STM` result module Control.Concurrent.STM.Promise ( Promise(..), an , PromiseResult(..) , isAn, isUnfinished, isCancelled - , eitherResult, bothResults, manyResults + , eitherResult, bothResults ) where import Control.Monad.STM +-- | A promise data Promise a = Promise { spawn :: IO () + -- ^ Instruction for spawning , cancel :: IO () + -- ^ Instruction for cancelling , result :: STM (PromiseResult a) + -- ^ The result of a computation } deriving Functor +-- | The result of the promise data PromiseResult a = Unfinished + -- ^ Not finished yet (or not even spawned yet)) | Cancelled + -- ^ Cancelled | An a + -- ^ A result deriving (Functor, Eq, Ord, Show) +-- | Gets the result (partial function) an :: PromiseResult a -> a an (An a) = a an _ = error "an: on non-An result!" +-- | Is this a result? isAn :: PromiseResult a -> Bool isAn An{} = True isAn _ = False +-- | Is this unfinished? isUnfinished :: PromiseResult a -> Bool isUnfinished Unfinished{} = True isUnfinished _ = False +-- | Is this cancelled? isCancelled :: PromiseResult a -> Bool isCancelled Cancelled{} = True isCancelled _ = False +-- | If either is finished (`An`), return one of them (favor the first one) +-- +-- If either is `Unfinished`, this is also `Unfinished`. +-- +-- Otherwise, both are `Cancelled` and so is this. eitherResult :: PromiseResult a -> PromiseResult a -> PromiseResult a eitherResult (An a) _ = An a eitherResult _ (An e) = An e @@ -44,17 +62,14 @@ eitherResult Unfinished _ = Unfinished eitherResult _ Unfinished = Unfinished eitherResult _ _ = Cancelled -bothResults :: PromiseResult a -> PromiseResult a -> PromiseResult (a,a) +-- | If both are finished (`An`), return them in a tuple. +-- +-- If either is `Cancelled`, this is also `Cancelled`. +-- +-- Otherwise, both are `Unfinished` and so is this. +bothResults :: PromiseResult a -> PromiseResult b -> PromiseResult (a,b) bothResults (An a) (An e) = An (a,e) bothResults Cancelled _ = Cancelled bothResults _ Cancelled = Cancelled bothResults _ _ = Unfinished -manyResults :: PromiseResult a -> PromiseResult a -> PromiseResult (Either a (a,a)) -manyResults (An a) (An e) = An $ Right (a,e) -manyResults (An a) _ = An $ Left a -manyResults _ (An e) = An $ Left e -manyResults Unfinished _ = Unfinished -manyResults _ Unfinished = Unfinished -manyResults _ _ = Cancelled - diff --git a/Control/Concurrent/STM/Promise/Process.hs b/Control/Concurrent/STM/Promise/Process.hs index 7074e52..8f131e5 100644 --- a/Control/Concurrent/STM/Promise/Process.hs +++ b/Control/Concurrent/STM/Promise/Process.hs @@ -1,4 +1,5 @@ {-# LANGUAGE RecordWildCards #-} +-- | Promises for processes module Control.Concurrent.STM.Promise.Process (processPromise, ProcessResult(..), ExitCode(..)) where @@ -14,6 +15,7 @@ import System.Process import System.IO import System.Exit +-- | The result from a process data ProcessResult = ProcessResult { stderr :: String , stdout :: String @@ -21,7 +23,12 @@ data ProcessResult = ProcessResult } deriving (Eq, Ord, Show) -processPromise :: FilePath -> [String] -> String -> IO (Promise ProcessResult) +-- | Make a `Promise` +processPromise + :: FilePath -- ^ Program to run + -> [String] -- ^ Arguments + -> String -- ^ Input string (stdin) + -> IO (Promise ProcessResult) -- ^ Promise object processPromise cmd args input = do pid_var <- newTVarIO Nothing diff --git a/Control/Concurrent/STM/Promise/Tree.hs b/Control/Concurrent/STM/Promise/Tree.hs index b6ac1c5..38cd4c0 100644 --- a/Control/Concurrent/STM/Promise/Tree.hs +++ b/Control/Concurrent/STM/Promise/Tree.hs @@ -1,5 +1,12 @@ -{-# LANGUAGE DeriveFunctor, DeriveDataTypeable, DeriveTraversable, DeriveFoldable, RecordWildCards #-} -module Control.Concurrent.STM.Promise.Tree where +{-# LANGUAGE DeriveFunctor, DeriveDataTypeable, DeriveTraversable, DeriveFoldable #-} +-- | A tree of computation +module Control.Concurrent.STM.Promise.Tree + (Label(..) + ,Tree(..) + ,requireAll,requireAny,tryAll + ,showTree + ,interleave + ,watchTree) where import Control.Monad hiding (mapM_) import Prelude hiding (mapM_, foldr1) @@ -13,8 +20,7 @@ import Data.Traversable import Data.Foldable import Data.Function -(.:) = (.) . (.) - +-- | Both/Either labels data Label = Both -- ^ Both of these must succeed with an An @@ -22,7 +28,7 @@ data Label -- ^ Either of these must succeed with an An, and that one is returned deriving (Eq, Ord, Show, Typeable) --- Both/Either-trees +-- | Both/Either-trees data Tree a = Node Label (Tree a) (Tree a) -- ^ Combine two trees with the semantics of `Label` @@ -32,15 +38,19 @@ data Tree a -- ^ There is a mean of recovering this computation, by returning mempty deriving (Eq, Ord, Show, Typeable, Traversable, Foldable, Functor) +-- | All of these must succeed requireAll :: [Tree a] -> Tree a requireAll = foldr1 (Node Both) +-- | Any of these must succeed requireAny :: [Tree a] -> Tree a requireAny = foldr1 (Node Either) +-- | As many as possible should succeed, try all. tryAll :: [Tree a] -> Tree a tryAll = foldr1 (Node Both) . map Recoverable +-- | Shows a tree showTree :: Show a => Tree a -> String showTree = go (2 :: Int) where @@ -54,15 +64,18 @@ showTree = go (2 :: Int) True ? f = f False ? _ = id +-- | Cancel a tree cancelTree :: Tree (Promise a) -> IO () cancelTree = mapM_ spawn +-- | A simple scheduling interleave :: Tree a -> [a] interleave (Leaf a) = return a interleave (Node Either t1 t2) = interleave t1 /\/ interleave t2 interleave (Node Both t1 t2) = interleave t1 ++ interleave t2 interleave (Recoverable t) = interleave t +-- | Interleave two lists (/\/) :: [a] -> [a] -> [a] (x:xs) /\/ ys = x:(ys /\/ xs) [] /\/ ys = ys @@ -74,9 +87,12 @@ interleave (Recoverable t) = interleave t -- This would add some more flexibility. To recover the function underneath, you would -- just supply projections to monoid and instantiate (a -> Promise b) with id +-- | Assuming some other thread(s) evaluate the promises in the tree, this gives +-- a live view of the progress, and cancels unnecessary subtrees (due to `Either`). watchTree :: Monoid a => Tree (Promise a) -> IO (DTVar (Tree (PromiseResult a))) -watchTree = go where - go t = case t of +watchTree = go + where + go t0 = case t0 of Leaf a -> @@ -103,7 +119,7 @@ watchTree = go where Node lbl t1 t2 -> do let combine = case lbl of - Both -> fmap (uncurry mappend) .: bothResults + Both -> \ x y -> fmap (uncurry mappend) (bothResults x y) Either -> eitherResult d1 <- go t1 @@ -125,11 +141,12 @@ watchTree = go where loop else do atomically $ write (Leaf r) - cancelTree t - -forkWrapTVar :: Tree (PromiseResult a) -> ((Tree (PromiseResult a) -> STM ()) -> IO ()) -> IO (DTVar (Tree (PromiseResult a))) -forkWrapTVar init_tree mk = do - v <- newDTVarIO init_tree - void $ forkIO $ mk (writeDTVar v) - return v + cancelTree t0 + + forkWrapTVar :: Tree (PromiseResult a) -> ((Tree (PromiseResult a) -> STM ()) -> IO ()) -> + IO (DTVar (Tree (PromiseResult a))) + forkWrapTVar init_tree mk = do + v <- newDTVarIO init_tree + void $ forkIO $ mk (writeDTVar v) + return v diff --git a/Control/Concurrent/STM/Promise/Workers.hs b/Control/Concurrent/STM/Promise/Workers.hs new file mode 100644 index 0000000..1b71739 --- /dev/null +++ b/Control/Concurrent/STM/Promise/Workers.hs @@ -0,0 +1,44 @@ +-- | Evaluating promises in parallel +module Control.Concurrent.STM.Promise.Workers (workers,worker,evaluatePromise) where + +import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.STM.Promise +import Control.Monad + +maybeIO :: Maybe a -> (a -> IO b) -> IO (Maybe b) +maybeIO m f = maybe (return Nothing) (fmap Just . f) m + +-- | Evaluates a single promise, maybe using a timeout in microseconds. +evaluatePromise :: Maybe Int -> Promise a -> IO () +evaluatePromise m_t promise = do + m_thr <- maybeIO m_t $ \ timeout -> forkIO $ do + threadDelay timeout + cancel promise + + spawn promise + + atomically $ do + status <- result promise + when (isUnfinished status) retry + + void $ maybeIO m_thr killThread + +-- | Evaluates a channel of promises, maybe using a timeout in microseconds. +-- Stops when the channel is empty. +worker :: Maybe Int -> TChan (Promise a) -> IO () +worker m_t ch = go where + go = do + m_promise <- atomically $ tryReadTChan ch + case m_promise of + Just promise -> evaluatePromise m_t promise >> go + Nothing -> return () + + +-- | Evaluate these promises on n processors, maybe using a timeout in microseconds. +workers :: Maybe Int -> Int -> [Promise a] -> IO () +workers m_t n xs = do + ch <- newTChanIO + atomically $ mapM_ (writeTChan ch) xs + replicateM_ n $ forkIO $ worker m_t ch + diff --git a/stm-promise.cabal b/stm-promise.cabal index aacb193..33d5c73 100644 --- a/stm-promise.cabal +++ b/stm-promise.cabal @@ -29,13 +29,13 @@ library Control.Concurrent.STM.Promise, Control.Concurrent.STM.Promise.Process, Control.Concurrent.STM.Promise.Tree, + Control.Concurrent.STM.Promise.Workers, Control.Concurrent.STM.DTVar build-depends: base, stm, - process, - semigroups + process test-suite trees type: exitcode-stdio-1.0 @@ -48,8 +48,6 @@ test-suite trees build-depends: base, stm-promise, - semigroups, stm, - testing-feat, QuickCheck >= 2.4 diff --git a/test/Trees.hs b/test/Trees.hs index 818a635..48ae62e 100644 --- a/test/Trees.hs +++ b/test/Trees.hs @@ -7,9 +7,9 @@ import Control.Concurrent.STM import Control.Concurrent.STM.DTVar import Control.Concurrent.STM.Promise import Control.Concurrent.STM.Promise.Tree +import Control.Concurrent.STM.Promise.Workers import Control.Monad import Data.List -import Data.Ord import Data.Function import Data.IORef import Data.Monoid @@ -36,7 +36,7 @@ instance Monoid Int where eval :: (Ord a,Monoid a) => Tree a -> [a] eval = go where - go t = case t of + go t0 = case t0 of Leaf x -> return x Node Both t1 t2 -> nubSorted $ mappend <$> go t1 <*> go t2 Node Either t1 t2 -> nubSorted $ go t1 ++ go t2 @@ -74,37 +74,6 @@ delayPromise b t = do return Promise{..} -maybeIO :: Maybe a -> (a -> IO b) -> IO (Maybe b) -maybeIO m f = maybe (return Nothing) (fmap Just . f) m - -worker :: Maybe Int -> TChan (Promise a) -> IO () -worker m_t ch = fix $ \ loop -> do - m_promise <- atomically $ tryReadTChan ch - case m_promise of - Nothing -> return () - Just promise -> do - m_thr <- maybeIO m_t $ \ timeout -> forkIO $ do - threadDelay timeout - cancel promise - - spawn promise - - atomically $ do - status <- result promise - when (isUnfinished status) retry - - void $ maybeIO m_thr killThread - - loop - --- Evaluate these promises on n processors, using maybe a timeout in --- microseconds. This function should be put in a library somewhere. -workers :: Maybe Int -> Int -> [Promise a] -> IO () -workers m_t n xs = do - ch <- newTChanIO - atomically $ mapM_ (writeTChan ch) xs - replicateM_ n $ forkIO $ worker m_t ch - mkPromiseTree :: Arbitrary a => Int -> Tree a -> Gen (IO (Tree (Promise a))) mkPromiseTree timeout = go where go t = case t of @@ -143,7 +112,7 @@ runTest a size = do (prop_equal a (modifyIORef tests succ) (modifyIORef cancelled succ) 10 10000) ts <- readIORef tests cs <- readIORef cancelled - return $ ((ts,cs),isSuccess res) + return ((ts,cs),isSuccess res) main :: IO ()