Skip to content

Commit

Permalink
Haddock comments, and add module Workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Dan Rosén committed Feb 7, 2013
1 parent c52ac72 commit 4c92a53
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 74 deletions.
30 changes: 20 additions & 10 deletions Control/Concurrent/STM/DTVar.hs
@@ -1,7 +1,4 @@
-- | TVars with a dirty bit that can allow for one listener
--
-- I guess one could generalize this with a list or channel of
-- listeners that wake up when the variable is dirty
-- | TVars with a dirty bit that allows for one listener
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Control.Concurrent.STM.DTVar
( DTVar
Expand All @@ -18,9 +15,13 @@ module Control.Concurrent.STM.DTVar
, modifyDTVar
) where

-- I guess one could generalize this with a list or channel of
-- listeners that wake up when the variable is dirty

import Control.Monad
import Control.Concurrent.STM

-- | TVars with a dirty bit which allows for one listener.
data DTVar a = DTVar
{ var :: TVar a
, dirty :: TVar Bool
Expand All @@ -33,48 +34,57 @@ newDTVar x = do
t_var <- newTVar True
return (DTVar a_var t_var)

-- | `newDTVar` in `IO`
newDTVarIO :: a -> IO (DTVar a)
newDTVarIO = atomically . newDTVar

-- | Write a value to a DTVar, making it dirty
writeDTVar :: DTVar a -> a -> STM ()
writeDTVar (DTVar v d) x = do
writeTVar v x
writeTVar d True

-- | `writeDTVar` in `IO`
writeDTVarIO :: DTVar a -> a -> IO ()
writeDTVarIO u x = atomically (writeDTVar u x)

-- | Reads a DTVar without changing the dirty bit
-- | Reads a DTVar /without/ changing the dirty bit
readDTVar :: DTVar a -> STM a
readDTVar = readTVar . var

-- | Reads a DTVar without changing the dirty bit
-- | `readDTVar` in `IO`
readDTVarIO :: DTVar a -> IO a
readDTVarIO = atomically . readDTVar

-- | Listens until the dirty bit is true, then reads and sets dirty to false
-- | Listens until the dirty bit is true, then removes the dirty bit and
-- returns the read element
listenDTVar :: DTVar a -> STM a
listenDTVar u = do
d <- readTVar (dirty u)
unless d retry
writeTVar (dirty u) False
readTVar (var u)

-- | `listenDTVar` in `IO`
listenDTVarIO :: DTVar a -> IO a
listenDTVarIO = atomically . listenDTVar

-- | Listen until any of the dirty bits are true, then removes all dirty bits
-- and returns all `DTVar`'s values, in order.
listenDTVars :: [DTVar a] -> STM [a]
listenDTVars us = do
ds <- mapM (readTVar . dirty) us
unless (or ds || null ds) retry
mapM_ ((`writeTVar` False) . dirty) us
mapM (readTVar . var) us

-- | `listenDTVars` in `IO`
listenDTVarsIO :: [DTVar a] -> IO [a]
listenDTVarsIO = atomically . listenDTVars

-- | Modify a DTVar, making it dirty.
modifyDTVar :: DTVar a -> (a -> a) -> STM ()
modifyDTVar u f = do
x <- readTVar (var u)
writeDTVar u (f x)
modifyDTVar d f = do
x <- readDTVar d
writeDTVar d (f x)

35 changes: 25 additions & 10 deletions Control/Concurrent/STM/Promise.hs
@@ -1,60 +1,75 @@
{-# LANGUAGE DeriveFunctor #-}
-- | Promises that allow spawning and cancelling in `IO`, and an `STM` result
module Control.Concurrent.STM.Promise
( Promise(..), an
, PromiseResult(..)
, isAn, isUnfinished, isCancelled
, eitherResult, bothResults, manyResults
, eitherResult, bothResults
) where

import Control.Monad.STM

-- | A promise
data Promise a = Promise
{ spawn :: IO ()
-- ^ Instruction for spawning
, cancel :: IO ()
-- ^ Instruction for cancelling
, result :: STM (PromiseResult a)
-- ^ The result of a computation
}
deriving Functor

-- | The result of the promise
data PromiseResult a
= Unfinished
-- ^ Not finished yet (or not even spawned yet))
| Cancelled
-- ^ Cancelled
| An a
-- ^ A result
deriving (Functor, Eq, Ord, Show)

-- | Gets the result (partial function)
an :: PromiseResult a -> a
an (An a) = a
an _ = error "an: on non-An result!"

-- | Is this a result?
isAn :: PromiseResult a -> Bool
isAn An{} = True
isAn _ = False

-- | Is this unfinished?
isUnfinished :: PromiseResult a -> Bool
isUnfinished Unfinished{} = True
isUnfinished _ = False

-- | Is this cancelled?
isCancelled :: PromiseResult a -> Bool
isCancelled Cancelled{} = True
isCancelled _ = False

-- | If either is finished (`An`), return one of them (favor the first one)
--
-- If either is `Unfinished`, this is also `Unfinished`.
--
-- Otherwise, both are `Cancelled` and so is this.
eitherResult :: PromiseResult a -> PromiseResult a -> PromiseResult a
eitherResult (An a) _ = An a
eitherResult _ (An e) = An e
eitherResult Unfinished _ = Unfinished
eitherResult _ Unfinished = Unfinished
eitherResult _ _ = Cancelled

bothResults :: PromiseResult a -> PromiseResult a -> PromiseResult (a,a)
-- | If both are finished (`An`), return them in a tuple.
--
-- If either is `Cancelled`, this is also `Cancelled`.
--
-- Otherwise, both are `Unfinished` and so is this.
bothResults :: PromiseResult a -> PromiseResult b -> PromiseResult (a,b)
bothResults (An a) (An e) = An (a,e)
bothResults Cancelled _ = Cancelled
bothResults _ Cancelled = Cancelled
bothResults _ _ = Unfinished

manyResults :: PromiseResult a -> PromiseResult a -> PromiseResult (Either a (a,a))
manyResults (An a) (An e) = An $ Right (a,e)
manyResults (An a) _ = An $ Left a
manyResults _ (An e) = An $ Left e
manyResults Unfinished _ = Unfinished
manyResults _ Unfinished = Unfinished
manyResults _ _ = Cancelled

9 changes: 8 additions & 1 deletion Control/Concurrent/STM/Promise/Process.hs
@@ -1,4 +1,5 @@
{-# LANGUAGE RecordWildCards #-}
-- | Promises for processes
module Control.Concurrent.STM.Promise.Process
(processPromise, ProcessResult(..), ExitCode(..)) where

Expand All @@ -14,14 +15,20 @@ import System.Process
import System.IO
import System.Exit

-- | The result from a process
data ProcessResult = ProcessResult
{ stderr :: String
, stdout :: String
, excode :: ExitCode
}
deriving (Eq, Ord, Show)

processPromise :: FilePath -> [String] -> String -> IO (Promise ProcessResult)
-- | Make a `Promise`
processPromise
:: FilePath -- ^ Program to run
-> [String] -- ^ Arguments
-> String -- ^ Input string (stdin)
-> IO (Promise ProcessResult) -- ^ Promise object
processPromise cmd args input = do

pid_var <- newTVarIO Nothing
Expand Down
47 changes: 32 additions & 15 deletions Control/Concurrent/STM/Promise/Tree.hs
@@ -1,5 +1,12 @@
{-# LANGUAGE DeriveFunctor, DeriveDataTypeable, DeriveTraversable, DeriveFoldable, RecordWildCards #-}
module Control.Concurrent.STM.Promise.Tree where
{-# LANGUAGE DeriveFunctor, DeriveDataTypeable, DeriveTraversable, DeriveFoldable #-}
-- | A tree of computation
module Control.Concurrent.STM.Promise.Tree
(Label(..)
,Tree(..)
,requireAll,requireAny,tryAll
,showTree
,interleave
,watchTree) where

import Control.Monad hiding (mapM_)
import Prelude hiding (mapM_, foldr1)
Expand All @@ -13,16 +20,15 @@ import Data.Traversable
import Data.Foldable
import Data.Function

(.:) = (.) . (.)

-- | Both/Either labels
data Label
= Both
-- ^ Both of these must succeed with an An
| Either
-- ^ Either of these must succeed with an An, and that one is returned
deriving (Eq, Ord, Show, Typeable)

-- Both/Either-trees
-- | Both/Either-trees
data Tree a
= Node Label (Tree a) (Tree a)
-- ^ Combine two trees with the semantics of `Label`
Expand All @@ -32,15 +38,19 @@ data Tree a
-- ^ There is a mean of recovering this computation, by returning mempty
deriving (Eq, Ord, Show, Typeable, Traversable, Foldable, Functor)

-- | All of these must succeed
requireAll :: [Tree a] -> Tree a
requireAll = foldr1 (Node Both)

-- | Any of these must succeed
requireAny :: [Tree a] -> Tree a
requireAny = foldr1 (Node Either)

-- | As many as possible should succeed, try all.
tryAll :: [Tree a] -> Tree a
tryAll = foldr1 (Node Both) . map Recoverable

-- | Shows a tree
showTree :: Show a => Tree a -> String
showTree = go (2 :: Int)
where
Expand All @@ -54,15 +64,18 @@ showTree = go (2 :: Int)
True ? f = f
False ? _ = id

-- | Cancel a tree
cancelTree :: Tree (Promise a) -> IO ()
cancelTree = mapM_ spawn

-- | A simple scheduling
interleave :: Tree a -> [a]
interleave (Leaf a) = return a
interleave (Node Either t1 t2) = interleave t1 /\/ interleave t2
interleave (Node Both t1 t2) = interleave t1 ++ interleave t2
interleave (Recoverable t) = interleave t

-- | Interleave two lists
(/\/) :: [a] -> [a] -> [a]
(x:xs) /\/ ys = x:(ys /\/ xs)
[] /\/ ys = ys
Expand All @@ -74,9 +87,12 @@ interleave (Recoverable t) = interleave t
-- This would add some more flexibility. To recover the function underneath, you would
-- just supply projections to monoid and instantiate (a -> Promise b) with id

-- | Assuming some other thread(s) evaluate the promises in the tree, this gives
-- a live view of the progress, and cancels unnecessary subtrees (due to `Either`).
watchTree :: Monoid a => Tree (Promise a) -> IO (DTVar (Tree (PromiseResult a)))
watchTree = go where
go t = case t of
watchTree = go
where
go t0 = case t0 of

Leaf a ->

Expand All @@ -103,7 +119,7 @@ watchTree = go where
Node lbl t1 t2 -> do

let combine = case lbl of
Both -> fmap (uncurry mappend) .: bothResults
Both -> \ x y -> fmap (uncurry mappend) (bothResults x y)
Either -> eitherResult

d1 <- go t1
Expand All @@ -125,11 +141,12 @@ watchTree = go where
loop
else do
atomically $ write (Leaf r)
cancelTree t

forkWrapTVar :: Tree (PromiseResult a) -> ((Tree (PromiseResult a) -> STM ()) -> IO ()) -> IO (DTVar (Tree (PromiseResult a)))
forkWrapTVar init_tree mk = do
v <- newDTVarIO init_tree
void $ forkIO $ mk (writeDTVar v)
return v
cancelTree t0

forkWrapTVar :: Tree (PromiseResult a) -> ((Tree (PromiseResult a) -> STM ()) -> IO ()) ->
IO (DTVar (Tree (PromiseResult a)))
forkWrapTVar init_tree mk = do
v <- newDTVarIO init_tree
void $ forkIO $ mk (writeDTVar v)
return v

44 changes: 44 additions & 0 deletions Control/Concurrent/STM/Promise/Workers.hs
@@ -0,0 +1,44 @@
-- | Evaluating promises in parallel
module Control.Concurrent.STM.Promise.Workers (workers,worker,evaluatePromise) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM.Promise
import Control.Monad

maybeIO :: Maybe a -> (a -> IO b) -> IO (Maybe b)
maybeIO m f = maybe (return Nothing) (fmap Just . f) m

-- | Evaluates a single promise, maybe using a timeout in microseconds.
evaluatePromise :: Maybe Int -> Promise a -> IO ()
evaluatePromise m_t promise = do
m_thr <- maybeIO m_t $ \ timeout -> forkIO $ do
threadDelay timeout
cancel promise

spawn promise

atomically $ do
status <- result promise
when (isUnfinished status) retry

void $ maybeIO m_thr killThread

-- | Evaluates a channel of promises, maybe using a timeout in microseconds.
-- Stops when the channel is empty.
worker :: Maybe Int -> TChan (Promise a) -> IO ()
worker m_t ch = go where
go = do
m_promise <- atomically $ tryReadTChan ch
case m_promise of
Just promise -> evaluatePromise m_t promise >> go
Nothing -> return ()


-- | Evaluate these promises on n processors, maybe using a timeout in microseconds.
workers :: Maybe Int -> Int -> [Promise a] -> IO ()
workers m_t n xs = do
ch <- newTChanIO
atomically $ mapM_ (writeTChan ch) xs
replicateM_ n $ forkIO $ worker m_t ch

6 changes: 2 additions & 4 deletions stm-promise.cabal
Expand Up @@ -29,13 +29,13 @@ library
Control.Concurrent.STM.Promise,
Control.Concurrent.STM.Promise.Process,
Control.Concurrent.STM.Promise.Tree,
Control.Concurrent.STM.Promise.Workers,
Control.Concurrent.STM.DTVar

build-depends:
base,
stm,
process,
semigroups
process

test-suite trees
type: exitcode-stdio-1.0
Expand All @@ -48,8 +48,6 @@ test-suite trees
build-depends:
base,
stm-promise,
semigroups,
stm,
testing-feat,
QuickCheck >= 2.4

0 comments on commit 4c92a53

Please sign in to comment.