Skip to content

Commit 8b95ef1

Browse files
committed
Implement receiveChanTimeout
This closes #47.
1 parent 94ca1b4 commit 8b95ef1

File tree

3 files changed

+72
-14
lines changed

3 files changed

+72
-14
lines changed

distributed-process/src/Control/Distributed/Process.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ module Control.Distributed.Process
1919
-- * Basic messaging
2020
, send
2121
, expect
22+
, expectTimeout
2223
-- * Channels
2324
, ReceivePort
2425
, SendPort
2526
, sendPortId
2627
, newChan
2728
, sendChan
2829
, receiveChan
30+
, receiveChanTimeout
2931
, mergePortsBiased
3032
, mergePortsRR
3133
-- * Advanced messaging
@@ -91,7 +93,6 @@ module Control.Distributed.Process
9193
, bracket_
9294
, finally
9395
-- * Auxiliary API
94-
, expectTimeout
9596
, spawnAsync
9697
, spawnSupervised
9798
, spawnLink
@@ -215,6 +216,7 @@ import Control.Distributed.Process.Internal.Primitives
215216
, finally
216217
-- Auxiliary API
217218
, expectTimeout
219+
, receiveChanTimeout
218220
, spawnAsync
219221
-- Reconnecting
220222
, reconnect

distributed-process/src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ module Control.Distributed.Process.Internal.Primitives
5454
, finally
5555
-- * Auxiliary API
5656
, expectTimeout
57+
, receiveChanTimeout
5758
, spawnAsync
5859
, linkNode
5960
, linkPort
@@ -74,6 +75,7 @@ import Data.Binary (decode)
7475
import Data.Time.Clock (getCurrentTime)
7576
import Data.Time.Format (formatTime)
7677
import System.Locale (defaultTimeLocale)
78+
import System.Timeout (timeout)
7779
import Control.Monad.Reader (ask)
7880
import Control.Monad.IO.Class (MonadIO, liftIO)
7981
import Control.Applicative ((<$>))
@@ -188,7 +190,7 @@ newChan = do
188190
)
189191
where
190192
finalizer :: StrictMVar LocalProcessState -> LocalSendPortId -> IO ()
191-
finalizer processState lcid = modifyMVar_ processState $
193+
finalizer st lcid = modifyMVar_ st $
192194
return . (typedChannelWithId lcid ^= Nothing)
193195

194196
-- | Send a message on a typed channel
@@ -204,18 +206,29 @@ sendChan (SendPort cid) msg = do
204206
-- | Wait for a message on a typed channel
205207
receiveChan :: Serializable a => ReceivePort a -> Process a
206208
receiveChan = liftIO . atomically . receiveSTM
207-
where
208-
receiveSTM :: ReceivePort a -> STM a
209-
receiveSTM (ReceivePortSingle c) =
210-
readTQueue c
211-
receiveSTM (ReceivePortBiased ps) =
212-
foldr1 orElse (map receiveSTM ps)
213-
receiveSTM (ReceivePortRR psVar) = do
214-
ps <- readTVar psVar
215-
a <- foldr1 orElse (map receiveSTM ps)
216-
writeTVar psVar (rotate ps)
217-
return a
218209

210+
-- | Like 'receiveChan' but with a timeout. If the timeout is 0, do a
211+
-- non-blocking check for a message.
212+
receiveChanTimeout :: Serializable a => Int -> ReceivePort a -> Process (Maybe a)
213+
receiveChanTimeout 0 ch = liftIO . atomically $
214+
(Just <$> receiveSTM ch) `orElse` return Nothing
215+
receiveChanTimeout n ch = liftIO . timeout n . atomically $
216+
receiveSTM ch
217+
218+
-- | Receive a message from a typed channel as an STM transaction.
219+
--
220+
-- The transaction retries when no message is available.
221+
receiveSTM :: ReceivePort a -> STM a
222+
receiveSTM (ReceivePortSingle c) =
223+
readTQueue c
224+
receiveSTM (ReceivePortBiased ps) =
225+
foldr1 orElse (map receiveSTM ps)
226+
receiveSTM (ReceivePortRR psVar) = do
227+
ps <- readTVar psVar
228+
a <- foldr1 orElse (map receiveSTM ps)
229+
writeTVar psVar (rotate ps)
230+
return a
231+
where
219232
rotate :: [a] -> [a]
220233
rotate [] = []
221234
rotate (x:xs) = xs ++ [x]
@@ -459,7 +472,7 @@ finally a sequel = bracket_ (return ()) sequel a
459472

460473
-- | Like 'expect' but with a timeout
461474
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
462-
expectTimeout timeout = receiveTimeout timeout [match return]
475+
expectTimeout n = receiveTimeout n [match return]
463476

464477
-- | Asynchronous version of 'spawn'
465478
--

distributed-process/tests/TestCH.hs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,47 @@ testMatchAny transport = do
695695

696696
takeMVar clientDone
697697

698+
-- Test 'receiveChanTimeout'
699+
testReceiveChanTimeout :: NT.Transport -> IO ()
700+
testReceiveChanTimeout transport = do
701+
done <- newEmptyMVar
702+
sendPort <- newEmptyMVar
703+
704+
forkIO $ do
705+
localNode <- newLocalNode transport initRemoteTable
706+
runProcess localNode $ do
707+
-- Create a typed channel
708+
(sp, rp) <- newChan :: Process (SendPort Bool, ReceivePort Bool)
709+
liftIO $ putMVar sendPort sp
710+
711+
-- Wait for a message with a delay. No message arrives, we should get Nothing after 1 second
712+
Nothing <- receiveChanTimeout 1000000 rp
713+
714+
-- Wait for a message with a delay again. Now a message arrives after 0.5 seconds
715+
Just True <- receiveChanTimeout 1000000 rp
716+
717+
-- Wait for a message with zero timeout: non-blocking check. No message is available, we get Nothing
718+
Nothing <- receiveChanTimeout 0 rp
719+
720+
-- Again, but now there is a message available
721+
liftIO $ threadDelay 1000000
722+
Just False <- receiveChanTimeout 0 rp
723+
724+
liftIO $ putMVar done ()
725+
726+
forkIO $ do
727+
localNode <- newLocalNode transport initRemoteTable
728+
runProcess localNode $ do
729+
sp <- liftIO $ readMVar sendPort
730+
731+
liftIO $ threadDelay 1500000
732+
sendChan sp True
733+
734+
liftIO $ threadDelay 500000
735+
sendChan sp False
736+
737+
takeMVar done
738+
698739
main :: IO ()
699740
main = do
700741
Right (transport, transportInternals) <- createTransportExposeInternals "127.0.0.1" "8080" defaultTCPParameters
@@ -739,4 +780,6 @@ main = do
739780
, ("MonitorChannel", testMonitorChannel transport)
740781
-- Reconnect
741782
, ("Reconnect", testReconnect transport transportInternals)
783+
-- ReceiveChanTimeout
784+
, ("ReceiveChanTimeout", testReceiveChanTimeout transport)
742785
]

0 commit comments

Comments
 (0)