Skip to content

Commit

Permalink
Merge branch 'semi-typed-recv'
Browse files Browse the repository at this point in the history
Merge semi-typed-recv into master

This changeset provides deferred matching for AbstractMessage.
The 'maybeHandleMessage' capability enables a common pattern, where
one layer of code is managing the process' mailbox and knows nothing
about which types can be handled, whilst another layer does not access
the mailbox, but is able to handle specific message types.

Building on this 'deferred type checking' we provide an exception
handler that can process a range of different exit reasons.
  • Loading branch information
Tim Watson committed Jan 18, 2013
2 parents c88c5c0 + e2cd2d5 commit e7cd226
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 20 deletions.
4 changes: 4 additions & 0 deletions distributed-process/ChangeLog
@@ -1,3 +1,7 @@

* Expose deferred type checking for AbstractMessage
* Provide improved exception handling for deferred type checked exit reasons

2012-11-22 Edsko de Vries <edsko@well-typed.com> 0.4.1 2012-11-22 Edsko de Vries <edsko@well-typed.com> 0.4.1


* Make behaviour of 'register' more Erlang-like (register will now fail if the * Make behaviour of 'register' more Erlang-like (register will now fail if the
Expand Down
12 changes: 10 additions & 2 deletions distributed-process/src/Control/Distributed/Process.hs
Expand Up @@ -5,7 +5,7 @@
-- Peyton Jones -- Peyton Jones
-- (<http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/>), -- (<http://research.microsoft.com/en-us/um/people/simonpj/papers/parallel/>),
-- although some of the details are different. The precise message passing -- although some of the details are different. The precise message passing
-- semantics are based on /A unified semantics for future Erlang/ by Hans -- semantics are based on /A unified semantics for future Erlang/ by Hans
-- Svensson, Lars-Åke Fredlund and Clara Benac Earle. -- Svensson, Lars-Åke Fredlund and Clara Benac Earle.
module Control.Distributed.Process module Control.Distributed.Process
( -- * Basic types ( -- * Basic types
Expand Down Expand Up @@ -39,6 +39,7 @@ module Control.Distributed.Process
, matchUnknown , matchUnknown
, AbstractMessage(..) , AbstractMessage(..)
, matchAny , matchAny
, matchAnyIf
, matchChan , matchChan
-- * Process management -- * Process management
, spawn , spawn
Expand All @@ -48,6 +49,7 @@ module Control.Distributed.Process
, kill , kill
, exit , exit
, catchExit , catchExit
, catchesExit
, ProcessTerminationException(..) , ProcessTerminationException(..)
, ProcessRegistrationException(..) , ProcessRegistrationException(..)
, SpawnRef , SpawnRef
Expand Down Expand Up @@ -192,13 +194,15 @@ import Control.Distributed.Process.Internal.Primitives
, matchUnknown , matchUnknown
, AbstractMessage(..) , AbstractMessage(..)
, matchAny , matchAny
, matchAnyIf
, matchChan , matchChan
-- Process management -- Process management
, terminate , terminate
, ProcessTerminationException(..) , ProcessTerminationException(..)
, die , die
, exit , exit
, catchExit , catchExit
, catchesExit
, kill , kill
, getSelfPid , getSelfPid
, getSelfNode , getSelfNode
Expand Down Expand Up @@ -361,7 +365,11 @@ spawnMonitor nid proc = do
-- "Control.Distributed.Process.Closure". -- "Control.Distributed.Process.Closure".
-- --
-- See also 'spawn'. -- See also 'spawn'.
call :: Serializable a => Static (SerializableDict a) -> NodeId -> Closure (Process a) -> Process a call :: Serializable a
=> Static (SerializableDict a)
-> NodeId
-> Closure (Process a)
-> Process a
call dict nid proc = do call dict nid proc = do
us <- getSelfPid us <- getSelfPid
(pid, mRef) <- spawnMonitor nid (proc `bindCP` cpSend dict us) (pid, mRef) <- spawnMonitor nid (proc `bindCP` cpSend dict us)
Expand Down
Expand Up @@ -21,6 +21,7 @@ module Control.Distributed.Process.Internal.Primitives
, matchUnknown , matchUnknown
, AbstractMessage(..) , AbstractMessage(..)
, matchAny , matchAny
, matchAnyIf
, matchChan , matchChan
-- * Process management -- * Process management
, terminate , terminate
Expand All @@ -29,6 +30,9 @@ module Control.Distributed.Process.Internal.Primitives
, kill , kill
, exit , exit
, catchExit , catchExit
, catchesExit
-- keep the exception constructor hidden, so that handling exit
-- reasons must take place via the 'catchExit' family of primitives
, ProcessExitException() , ProcessExitException()
, getSelfPid , getSelfPid
, getSelfNode , getSelfNode
Expand Down Expand Up @@ -311,24 +315,68 @@ matchIf c p = Match $ MatchMsg $ \msg ->
!decoded = decode (messageEncoding msg) !decoded = decode (messageEncoding msg)
_ -> Nothing _ -> Nothing


-- | Represents a received message and provides two basic operations on it.
data AbstractMessage = AbstractMessage { data AbstractMessage = AbstractMessage {
forward :: ProcessId -> Process () forward :: ProcessId -> Process () -- ^ forward the message to @ProcessId@
, maybeHandleMessage :: forall a b. (Serializable a)
=> (a -> Process b) -> Process (Maybe b) {- ^ Handle the message.
If the type of the message matches the type of the first argument to
the supplied expression, then the expression will be evaluated against
it. If this runtime type checking fails, then @Nothing@ will be returned
to indicate the fact. If the check succeeds and evaluation proceeds
however, the resulting value with be wrapped with @Just@.
-}
} }


-- | Match against an arbitrary message -- | Match against an arbitrary message. 'matchAny' removes the first available
-- message from the process mailbox, and via the 'AbstractMessage' type,
-- supports forwarding /or/ handling the message /if/ it is of the correct
-- type. If /not/ of the right type, then the 'AbstractMessage'
-- @maybeHandleMessage@ function will not evaluate the supplied expression,
-- /but/ the message will still have been removed from the process mailbox!
--
matchAny :: forall b. (AbstractMessage -> Process b) -> Match b matchAny :: forall b. (AbstractMessage -> Process b) -> Match b
matchAny p = Match $ MatchMsg $ Just . p . abstract matchAny p = Match $ MatchMsg $ Just . p . abstract
where
abstract :: Message -> AbstractMessage -- | Match against an arbitrary message. 'matchAnyIf' will /only/ remove the
abstract msg = AbstractMessage { -- message from the process mailbox, /if/ the supplied condition matches. The
forward = \them -> do -- success (or failure) of runtime type checks in @maybeHandleMessage@ does not
proc <- ask -- count here, i.e., if the condition evaluates to @True@ then the message will
liftIO $ sendPayload (processNode proc) -- be removed from the process mailbox and decoded, but that does /not/
(ProcessIdentifier (processId proc)) -- guarantee that an expression passed to @maybeHandleMessage@ will pass the
(ProcessIdentifier them) -- runtime type checks and therefore be evaluated. If the types do not match
NoImplicitReconnect -- up, then @maybeHandleMessage@ returns 'Nothing'.
(messageToPayload msg) matchAnyIf :: forall a b. (Serializable a)
} => (a -> Bool)
-> (AbstractMessage -> Process b)
-> Match b
matchAnyIf c p = Match $ MatchMsg $ \msg ->
case messageFingerprint msg == fingerprint (undefined :: a) of
True | c decoded -> Just (p (abstract msg))
where
decoded :: a
-- Make sure the value is fully decoded so that we don't hang to
-- bytestrings when the calling process doesn't evaluate immediately
!decoded = decode (messageEncoding msg)
_ -> Nothing

abstract :: Message -> AbstractMessage
abstract msg = AbstractMessage {
forward = \them -> do
proc <- ask
liftIO $ sendPayload (processNode proc)
(ProcessIdentifier (processId proc))
(ProcessIdentifier them)
NoImplicitReconnect
(messageToPayload msg)
, maybeHandleMessage = \(proc :: (a -> Process b)) -> do
case messageFingerprint msg == fingerprint (undefined :: a) of
True -> do { r <- proc (decoded :: a); return (Just r) }
where
decoded :: a
!decoded = decode (messageEncoding msg)
_ -> return Nothing
}


-- | Remove any message from the queue -- | Remove any message from the queue
matchUnknown :: Process b -> Match b matchUnknown :: Process b -> Match b
Expand All @@ -349,29 +397,39 @@ terminate :: Process a
terminate = liftIO $ throwIO ProcessTerminationException terminate = liftIO $ throwIO ProcessTerminationException


-- [Issue #110] -- [Issue #110]
-- | Die immediately - throws a 'ProcessExitException' with the given @reason@ -- | Die immediately - throws a 'ProcessExitException' with the given @reason@.
die :: Serializable a => a -> Process b die :: Serializable a => a -> Process b
die reason = do die reason = do
pid <- getSelfPid pid <- getSelfPid
liftIO $ throwIO (ProcessExitException pid (createMessage reason)) liftIO $ throwIO (ProcessExitException pid (createMessage reason))


-- | Forceful request to kill a process -- | Forceful request to kill a process. Where 'exit' provides an exception
-- that can be caught and handled, 'kill' throws an unexposed exception type
-- which cannot be handled explicitly (by type).
kill :: ProcessId -> String -> Process () kill :: ProcessId -> String -> Process ()
-- NOTE: We send the message to our local node controller, which will then -- NOTE: We send the message to our local node controller, which will then
-- forward it to a remote node controller (if applicable). Sending it directly -- forward it to a remote node controller (if applicable). Sending it directly
-- to a remote node controller means that that the message may overtake a -- to a remote node controller means that that the message may overtake a
-- 'monitor' or 'link' request. -- 'monitor' or 'link' request.
kill them reason = sendCtrlMsg Nothing (Kill them reason) kill them reason = sendCtrlMsg Nothing (Kill them reason)


-- | Graceful request to exit a process -- | Graceful request to exit a process. Throws 'ProcessExitException' with the
-- supplied @reason@ encoded as a message. Any /exit signal/ raised in this
-- manner can be handled using the 'catchExit' family of functions.
exit :: Serializable a => ProcessId -> a -> Process () exit :: Serializable a => ProcessId -> a -> Process ()
-- NOTE: We send the message to our local node controller, which will then -- NOTE: We send the message to our local node controller, which will then
-- forward it to a remote node controller (if applicable). Sending it directly -- forward it to a remote node controller (if applicable). Sending it directly
-- to a remote node controller means that that the message may overtake a -- to a remote node controller means that that the message may overtake a
-- 'monitor' or 'link' request. -- 'monitor' or 'link' request.
exit them reason = sendCtrlMsg Nothing (Exit them (createMessage reason)) exit them reason = sendCtrlMsg Nothing (Exit them (createMessage reason))


-- | Catches ProcessExitException -- | Catches 'ProcessExitException'. The handler will not be applied unless its
-- type matches the encoded data stored in the exception (see the /reason/
-- argument given to the 'exit' primitive). If the handler cannot be applied,
-- the exception will be re-thrown.
--
-- To handle 'ProcessExitException' without regard for /reason/, see 'catch'.
-- To handle multiple /reasons/ of differing types, see 'catchesExit'.
catchExit :: forall a b . (Show a, Serializable a) catchExit :: forall a b . (Show a, Serializable a)
=> Process b => Process b
-> (ProcessId -> a -> Process b) -> (ProcessId -> a -> Process b)
Expand All @@ -388,6 +446,29 @@ catchExit act exitHandler = catch act handleExit
-- bytestrings if the caller doesn't use the value immediately -- bytestrings if the caller doesn't use the value immediately
!decoded = decode (messageEncoding msg) !decoded = decode (messageEncoding msg)


-- | As 'Control.Exception.catches' but allows for multiple handlers. Because
-- 'ProcessExitException' stores the exit @reason@ as a typed, encoded message,
-- a handler must accept an input of the expected type. In order to handle
-- a list of potentially different handlers (and therefore input types), a
-- handler passed to 'catchesExit' must accept 'AbstractMessage' and return
-- @Maybe@ (i.e., @Just p@ if it handled the exit reason, otherwise @Nothing@).
--
-- See 'maybeHandleMessage' and 'AsbtractMessage' for more details.
catchesExit :: Process b
-> [(ProcessId -> AbstractMessage -> (Process (Maybe b)))]
-> Process b
catchesExit act handlers = catch act ((flip handleExit) handlers)
where
handleExit :: ProcessExitException
-> [(ProcessId -> AbstractMessage -> Process (Maybe b))]
-> Process b
handleExit ex [] = liftIO $ throwIO ex
handleExit ex@(ProcessExitException from msg) (h:hs) = do
r <- h from (abstract msg)
case r of
Nothing -> handleExit ex hs
Just p -> return p

-- | Our own process ID -- | Our own process ID
getSelfPid :: Process ProcessId getSelfPid :: Process ProcessId
getSelfPid = processId <$> ask getSelfPid = processId <$> ask
Expand Down
135 changes: 135 additions & 0 deletions distributed-process/tests/TestCH.hs
Expand Up @@ -757,6 +757,121 @@ testMatchAny transport = do


takeMVar clientDone takeMVar clientDone


-- | Test 'matchAny'. This repeats the 'testMath' but with a proxy server
-- in between, however we block 'Divide' requests ....
testMatchAnyHandle :: NT.Transport -> Assertion
testMatchAnyHandle transport = do
proxyAddr <- newEmptyMVar
clientDone <- newEmptyMVar

-- Math server
forkIO $ do
localNode <- newLocalNode transport initRemoteTable
mathServer <- forkProcess localNode math
proxyServer <- forkProcess localNode $ forever $ do
receiveWait [
matchAny (maybeForward mathServer)
]
putMVar proxyAddr proxyServer

-- Client
forkIO $ do
localNode <- newLocalNode transport initRemoteTable
mathServer <- readMVar proxyAddr

runProcess localNode $ do
pid <- getSelfPid
send mathServer (Add pid 1 2)
3 <- expect :: Process Double
send mathServer (Divide pid 8 2)
Nothing <- (expectTimeout 100000) :: Process (Maybe Double)
liftIO $ putMVar clientDone ()

takeMVar clientDone
where maybeForward :: ProcessId -> AbstractMessage -> Process (Maybe ())
maybeForward s msg =
maybeHandleMessage msg (\m@(Add _ _ _) -> send s m)

testMatchAnyNoHandle :: NT.Transport -> Assertion
testMatchAnyNoHandle transport = do
addr <- newEmptyMVar
clientDone <- newEmptyMVar
serverDone <- newEmptyMVar

-- Math server
forkIO $ do
localNode <- newLocalNode transport initRemoteTable
server <- forkProcess localNode $ forever $ do
receiveWait [
matchAnyIf
-- the condition has type `Add -> Bool`
(\(Add _ _ _) -> True)
-- the match `AbstractMessage -> Process ()` will succeed!
(\m -> do
-- `String -> Process ()` does *not* match the input types however
r <- (maybeHandleMessage m (\(_ :: String) -> die "NONSENSE" ))
case r of
Nothing -> return ()
Just _ -> die "NONSENSE")
]
-- we *must* have removed the message from our mailbox though!!!
Nothing <- receiveTimeout 100000 [ match (\(Add _ _ _) -> return ()) ]
liftIO $ putMVar serverDone ()
putMVar addr server

-- Client
forkIO $ do
localNode <- newLocalNode transport initRemoteTable
server <- readMVar addr

runProcess localNode $ do
pid <- getSelfPid
send server (Add pid 1 2)
-- we only care about the client having sent a message, so we're done
liftIO $ putMVar clientDone ()

takeMVar clientDone
takeMVar serverDone

-- | Test 'matchAnyIf'. We provide an /echo/ server, but it ignores requests
-- unless the text body @/= "bar"@ - this case should time out rather than
-- removing the message from the process mailbox.
testMatchAnyIf :: NT.Transport -> Assertion
testMatchAnyIf transport = do
echoAddr <- newEmptyMVar
clientDone <- newEmptyMVar

-- echo server
forkIO $ do
localNode <- newLocalNode transport initRemoteTable
echoServer <- forkProcess localNode $ forever $ do
receiveWait [
matchAnyIf (\(_ :: ProcessId, (s :: String)) -> s /= "bar")
handleMessage
]
putMVar echoAddr echoServer

-- Client
forkIO $ do
localNode <- newLocalNode transport initRemoteTable
server <- readMVar echoAddr

runProcess localNode $ do
pid <- getSelfPid
send server (pid, "foo")
"foo" <- expect
send server (pid, "baz")
"baz" <- expect
send server (pid, "bar")
Nothing <- (expectTimeout 100000) :: Process (Maybe Double)
liftIO $ putMVar clientDone ()

takeMVar clientDone
where handleMessage :: AbstractMessage -> Process (Maybe ())
handleMessage msg =
maybeHandleMessage msg (\(pid :: ProcessId, (m :: String))
-> do { send pid m; return () })

-- Test 'receiveChanTimeout' -- Test 'receiveChanTimeout'
testReceiveChanTimeout :: NT.Transport -> Assertion testReceiveChanTimeout :: NT.Transport -> Assertion
testReceiveChanTimeout transport = do testReceiveChanTimeout transport = do
Expand Down Expand Up @@ -894,6 +1009,22 @@ testKillRemote transport = do


takeMVar done takeMVar done


testCatchesExit :: NT.Transport -> Assertion
testCatchesExit transport = do
localNode <- newLocalNode transport initRemoteTable
done <- newEmptyMVar

_ <- forkProcess localNode $ do
(die ("foobar", 123 :: Int))
`catchesExit` [
(\_ m -> maybeHandleMessage m (\(_ :: String) -> return ()))
, (\_ m -> maybeHandleMessage m (\(_ :: Maybe Int) -> return ()))
, (\_ m -> maybeHandleMessage m (\(_ :: String, _ :: Int)
-> (liftIO $ putMVar done ()) >> return ()))
]

takeMVar done

testDie :: NT.Transport -> Assertion testDie :: NT.Transport -> Assertion
testDie transport = do testDie transport = do
localNode <- newLocalNode transport initRemoteTable localNode <- newLocalNode transport initRemoteTable
Expand Down Expand Up @@ -987,12 +1118,16 @@ tests (transport, transportInternals) = [
, testCase "RemoteRegistry" (testRemoteRegistry transport) , testCase "RemoteRegistry" (testRemoteRegistry transport)
, testCase "SpawnLocal" (testSpawnLocal transport) , testCase "SpawnLocal" (testSpawnLocal transport)
, testCase "MatchAny" (testMatchAny transport) , testCase "MatchAny" (testMatchAny transport)
, testCase "MatchAnyHandle" (testMatchAnyHandle transport)
, testCase "MatchAnyNoHandle" (testMatchAnyNoHandle transport)
, testCase "MatchAnyIf" (testMatchAnyIf transport)
, testCase "ReceiveChanTimeout" (testReceiveChanTimeout transport) , testCase "ReceiveChanTimeout" (testReceiveChanTimeout transport)
, testCase "ReceiveChanFeatures" (testReceiveChanFeatures transport) , testCase "ReceiveChanFeatures" (testReceiveChanFeatures transport)
, testCase "KillLocal" (testKillLocal transport) , testCase "KillLocal" (testKillLocal transport)
, testCase "KillRemote" (testKillRemote transport) , testCase "KillRemote" (testKillRemote transport)
, testCase "Die" (testDie transport) , testCase "Die" (testDie transport)
, testCase "PrettyExit" (testPrettyExit transport) , testCase "PrettyExit" (testPrettyExit transport)
, testCase "CatchesExit" (testCatchesExit transport)
, testCase "ExitLocal" (testExitLocal transport) , testCase "ExitLocal" (testExitLocal transport)
, testCase "ExitRemote" (testExitRemote transport) , testCase "ExitRemote" (testExitRemote transport)
] ]
Expand Down

0 comments on commit e7cd226

Please sign in to comment.