Skip to content

Commit

Permalink
Change representation of ReceiveChan
Browse files Browse the repository at this point in the history
This makes it easier to add additional functionality.
  • Loading branch information
edsko committed Nov 6, 2012
1 parent 4189949 commit a91fb7e
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 31 deletions.
Expand Up @@ -89,6 +89,7 @@ import Control.Distributed.Process.Internal.StrictMVar
import Control.Concurrent.Chan (writeChan)
import Control.Concurrent.STM
( STM
, TVar
, atomically
, orElse
, newTVar
Expand Down Expand Up @@ -181,7 +182,7 @@ newChan = do
let sport = SendPort cid
chan <- liftIO newTQueueIO
chan' <- mkWeakTQueue chan $ finalizer (processState proc) lcid
let rport = ReceivePortSingle chan
let rport = ReceivePort $ readTQueue chan
let tch = TypedChannel chan'
return ( (channelCounter ^: (+ 1))
. (typedChannelWithId lcid ^= Just tch)
Expand Down Expand Up @@ -215,35 +216,30 @@ receiveChanTimeout 0 ch = liftIO . atomically $
receiveChanTimeout n ch = liftIO . timeout n . atomically $
receiveSTM ch

-- | Receive a message from a typed channel as an STM transaction.
--
-- The transaction retries when no message is available.
receiveSTM :: ReceivePort a -> STM a
receiveSTM (ReceivePortSingle c) =
readTQueue c
receiveSTM (ReceivePortBiased ps) =
foldr1 orElse (map receiveSTM ps)
receiveSTM (ReceivePortRR psVar) = do
ps <- readTVar psVar
a <- foldr1 orElse (map receiveSTM ps)
writeTVar psVar (rotate ps)
return a
where
rotate :: [a] -> [a]
rotate [] = []
rotate (x:xs) = xs ++ [x]

-- | Merge a list of typed channels.
--
-- The result port is left-biased: if there are messages available on more
-- than one port, the first available message is returned.
mergePortsBiased :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased = return . ReceivePortBiased
mergePortsBiased = return . ReceivePort. foldr1 orElse . map receiveSTM

-- | Like 'mergePortsBiased', but with a round-robin scheduler (rather than
-- left-biased)
mergePortsRR :: Serializable a => [ReceivePort a] -> Process (ReceivePort a)
mergePortsRR ps = liftIO . atomically $ ReceivePortRR <$> newTVar ps
mergePortsRR = \ps -> do
psVar <- liftIO . atomically $ newTVar (map receiveSTM ps)
return $ ReceivePort (rr psVar)
where
rotate :: [a] -> [a]
rotate [] = []
rotate (x:xs) = xs ++ [x]

rr :: TVar [STM a] -> STM a
rr psVar = do
ps <- readTVar psVar
a <- foldr1 orElse ps
writeTVar psVar (rotate ps)
return a

--------------------------------------------------------------------------------
-- Advanced messaging --
Expand Down
Expand Up @@ -82,7 +82,7 @@ import Control.Category ((>>>))
import Control.Exception (Exception)
import Control.Concurrent (ThreadId)
import Control.Concurrent.Chan (Chan)
import Control.Concurrent.STM (TVar)
import Control.Concurrent.STM (STM, TVar)
import qualified Network.Transport as NT (EndPoint, EndPointAddress, Connection)
import Control.Applicative (Applicative, (<$>), (<*>))
import Control.Monad.Reader (MonadReader(..), ReaderT, runReaderT)
Expand Down Expand Up @@ -262,6 +262,10 @@ newtype SendPort a = SendPort {
deriving (Typeable, Binary, Show, Eq, Ord)

-- | The receive end of a typed channel (not serializable)
newtype ReceivePort a = ReceivePort { receiveSTM :: STM a }
deriving Typeable

{-
data ReceivePort a =
-- | A single receive port
ReceivePortSingle (TQueue a)
Expand All @@ -270,6 +274,7 @@ data ReceivePort a =
-- | A round-robin combination of receive ports
| ReceivePortRR (TVar [ReceivePort a])
deriving Typeable
-}

--------------------------------------------------------------------------------
-- Messages --
Expand Down
23 changes: 15 additions & 8 deletions distributed-process/tests/TestCH.hs
Expand Up @@ -443,29 +443,34 @@ testMergeChannels transport = do
where
-- Single layer of merging
testFlat :: LocalNode -> Bool -> String -> IO ()
testFlat localNode biased expected =
runProcess localNode $ do
testFlat localNode biased expected = do
done <- newEmptyMVar
forkProcess localNode $ do
rs <- mapM charChannel "abc"
m <- mergePorts biased rs
xs <- replicateM 9 $ receiveChan m
True <- return $ xs == expected
return ()
liftIO $ putMVar done ()
takeMVar done

-- Two layers of merging
testNested :: LocalNode -> Bool -> Bool -> String -> IO ()
testNested localNode biasedInner biasedOuter expected =
runProcess localNode $ do
testNested localNode biasedInner biasedOuter expected = do
done <- newEmptyMVar
forkProcess localNode $ do
rss <- mapM (mapM charChannel) ["abc", "def", "ghi"]
ms <- mapM (mergePorts biasedInner) rss
m <- mergePorts biasedOuter ms
xs <- replicateM (9 * 3) $ receiveChan m
True <- return $ xs == expected
return ()
liftIO $ putMVar done ()
takeMVar done

-- Test that if no messages are (immediately) available, the scheduler makes no difference
testBlocked :: LocalNode -> Bool -> IO ()
testBlocked localNode biased = do
vs <- replicateM 3 newEmptyMVar
done <- newEmptyMVar

forkProcess localNode $ do
[sa, sb, sc] <- liftIO $ mapM readMVar vs
Expand Down Expand Up @@ -496,13 +501,15 @@ testMergeChannels transport = do
, (sa, 'a')
]

runProcess localNode $ do
forkProcess localNode $ do
(ss, rs) <- unzip <$> replicateM 3 newChan
liftIO $ mapM_ (uncurry putMVar) $ zip vs ss
m <- mergePorts biased rs
xs <- replicateM (6 * 3) $ receiveChan m
True <- return $ xs == "abcacbbacbcacabcba"
return ()
liftIO $ putMVar done ()

takeMVar done

mergePorts :: Serializable a => Bool -> [ReceivePort a] -> Process (ReceivePort a)
mergePorts True = mergePortsBiased
Expand Down
2 changes: 1 addition & 1 deletion distributed-process/tests/TestClosure.hs
Expand Up @@ -31,7 +31,7 @@ import Control.Distributed.Process.Internal.Types (NodeId(nodeAddress))
import Control.Distributed.Static (staticLabel, staticClosure)

import Test.HUnit (Assertion)
import Test.Framework (Test, defaultMain, testGroup)
import Test.Framework (Test, defaultMain)
import Test.Framework.Providers.HUnit (testCase)

--------------------------------------------------------------------------------
Expand Down

0 comments on commit a91fb7e

Please sign in to comment.