Permalink
Browse files

extend AbstractMessage with deferred type checking

  • Loading branch information...
1 parent c88c5c0 commit 8af66f4dcdd1d9e567ccc9febcbac9465759ce23 @hyperthunk hyperthunk committed Jan 14, 2013
@@ -39,6 +39,7 @@ module Control.Distributed.Process
, matchUnknown
, AbstractMessage(..)
, matchAny
+ , matchAnyIf
, matchChan
-- * Process management
, spawn
@@ -192,6 +193,7 @@ import Control.Distributed.Process.Internal.Primitives
, matchUnknown
, AbstractMessage(..)
, matchAny
+ , matchAnyIf
, matchChan
-- Process management
, terminate
@@ -21,6 +21,7 @@ module Control.Distributed.Process.Internal.Primitives
, matchUnknown
, AbstractMessage(..)
, matchAny
+ , matchAnyIf
, matchChan
-- * Process management
, terminate
@@ -311,24 +312,62 @@ matchIf c p = Match $ MatchMsg $ \msg ->
!decoded = decode (messageEncoding msg)
_ -> Nothing
+-- | Represents a received message and provides two operations on it.
data AbstractMessage = AbstractMessage {
- forward :: ProcessId -> Process ()
+ forward :: ProcessId -> Process () -- ^ forward the message to @ProcessId@
+ , maybeHandleMessage :: forall a b. (Serializable a)
+ => (a -> Process b) -> Process (Maybe b) -- ^ handle the message if it is of the given type
}
--- | Match against an arbitrary message
+-- | Match against an arbitrary message. 'matchAny' removes the first available
+-- message from the process mailbox, and via the 'AbstractMessage' type,
+-- supports forwarding /or/ handling the message /if/ it is of the correct
+-- type. If /not/ of the right type, then the 'AbstractMessage'
+-- @maybeHandleMessage@ function will not evaluate the supplied expression,
+-- /but/ the message will still have been removed from the process mailbox!
+--
matchAny :: forall b. (AbstractMessage -> Process b) -> Match b
-matchAny p = Match $ MatchMsg $ Just . p . abstract
- where
- abstract :: Message -> AbstractMessage
- abstract msg = AbstractMessage {
- forward = \them -> do
- proc <- ask
- liftIO $ sendPayload (processNode proc)
- (ProcessIdentifier (processId proc))
- (ProcessIdentifier them)
- NoImplicitReconnect
- (messageToPayload msg)
- }
+matchAny p = Match $ MatchMsg $ Just . p . abstract
+
+-- | Match against an arbitrary message. 'matchAnyIf' will /only/ remove the
+-- message from the process mailbox, /if/ the supplied condition matches. The
+-- success (or failure) of runtime type checks in @maybeHandleMessage@ does not
+-- count here, i.e., if the condition evaluates to @True@ then the message will
+-- be removed from the process mailbox and decoded, but that does /not/
+-- guarantee that an expression passed to @maybeHandleMessage@ will pass the
+-- runtime type checks and therefore be evaluated. If the types do not match
+-- up, then @maybeHandleMessage@ returns 'Nothing'.
+matchAnyIf :: forall a b. (Serializable a)
+ => (a -> Bool)
+ -> (AbstractMessage -> Process b)
+ -> Match b
+matchAnyIf c p = Match $ MatchMsg $ \msg ->
+ case messageFingerprint msg == fingerprint (undefined :: a) of
+ True | c decoded -> Just (p (abstract msg))
+ where
+ decoded :: a
+ -- Make sure the value is fully decoded so that we don't hang to
+ -- bytestrings when the calling process doesn't evaluate immediately
+ !decoded = decode (messageEncoding msg)
+ _ -> Nothing
+
+abstract :: Message -> AbstractMessage
+abstract msg = AbstractMessage {
+ forward = \them -> do
+ proc <- ask
+ liftIO $ sendPayload (processNode proc)
+ (ProcessIdentifier (processId proc))
+ (ProcessIdentifier them)
+ NoImplicitReconnect
+ (messageToPayload msg)
+ , maybeHandleMessage = \(proc :: (a -> Process b)) -> do
+ case messageFingerprint msg == fingerprint (undefined :: a) of
+ True -> do { r <- proc (decoded :: a); return (Just r) }
+ where
+ decoded :: a
+ !decoded = decode (messageEncoding msg)
+ _ -> return Nothing
+ }
-- | Remove any message from the queue
matchUnknown :: Process b -> Match b
@@ -757,6 +757,121 @@ testMatchAny transport = do
takeMVar clientDone
+-- | Test 'matchAny'. This repeats the 'testMath' but with a proxy server
+-- in between, however we block 'Divide' requests ....
+testMatchAnyHandle :: NT.Transport -> Assertion
+testMatchAnyHandle transport = do
+ proxyAddr <- newEmptyMVar
+ clientDone <- newEmptyMVar
+
+ -- Math server
+ forkIO $ do
+ localNode <- newLocalNode transport initRemoteTable
+ mathServer <- forkProcess localNode math
+ proxyServer <- forkProcess localNode $ forever $ do
+ receiveWait [
+ matchAny (maybeForward mathServer)
+ ]
+ putMVar proxyAddr proxyServer
+
+ -- Client
+ forkIO $ do
+ localNode <- newLocalNode transport initRemoteTable
+ mathServer <- readMVar proxyAddr
+
+ runProcess localNode $ do
+ pid <- getSelfPid
+ send mathServer (Add pid 1 2)
+ 3 <- expect :: Process Double
+ send mathServer (Divide pid 8 2)
+ Nothing <- (expectTimeout 100000) :: Process (Maybe Double)
+ liftIO $ putMVar clientDone ()
+
+ takeMVar clientDone
+ where maybeForward :: ProcessId -> AbstractMessage -> Process (Maybe ())
+ maybeForward s msg =
+ maybeHandleMessage msg (\m@(Add _ _ _) -> send s m)
+
+testMatchAnyNoHandle :: NT.Transport -> Assertion
+testMatchAnyNoHandle transport = do
+ addr <- newEmptyMVar
+ clientDone <- newEmptyMVar
+ serverDone <- newEmptyMVar
+
+ -- Math server
+ forkIO $ do
+ localNode <- newLocalNode transport initRemoteTable
+ server <- forkProcess localNode $ forever $ do
+ receiveWait [
+ matchAnyIf
+ -- the condition has type `Add -> Bool`
+ (\(Add _ _ _) -> True)
+ -- the match `AbstractMessage -> Process ()` will succeed!
+ (\m -> do
+ -- `String -> Process ()` does *not* match the input types however
+ r <- (maybeHandleMessage m (\(_ :: String) -> die "NONSENSE" ))
+ case r of
+ Nothing -> return ()
+ Just _ -> die "NONSENSE")
+ ]
+ -- we *must* have removed the message from our mailbox though!!!
+ Nothing <- receiveTimeout 100000 [ match (\(Add _ _ _) -> return ()) ]
+ liftIO $ putMVar serverDone ()
+ putMVar addr server
+
+ -- Client
+ forkIO $ do
+ localNode <- newLocalNode transport initRemoteTable
+ server <- readMVar addr
+
+ runProcess localNode $ do
+ pid <- getSelfPid
+ send server (Add pid 1 2)
+ -- we only care about the client having sent a message, so we're done
+ liftIO $ putMVar clientDone ()
+
+ takeMVar clientDone
+ takeMVar serverDone
+
+-- | Test 'matchAnyIf'. We provide an /echo/ server, but it ignores requests
+-- unless the text body @/= "bar"@ - this case should time out rather than
+-- removing the message from the process mailbox.
+testMatchAnyIf :: NT.Transport -> Assertion
+testMatchAnyIf transport = do
+ echoAddr <- newEmptyMVar
+ clientDone <- newEmptyMVar
+
+ -- echo server
+ forkIO $ do
+ localNode <- newLocalNode transport initRemoteTable
+ echoServer <- forkProcess localNode $ forever $ do
+ receiveWait [
+ matchAnyIf (\(_ :: ProcessId, (s :: String)) -> s /= "bar")
+ handleMessage
+ ]
+ putMVar echoAddr echoServer
+
+ -- Client
+ forkIO $ do
+ localNode <- newLocalNode transport initRemoteTable
+ server <- readMVar echoAddr
+
+ runProcess localNode $ do
+ pid <- getSelfPid
+ send server (pid, "foo")
+ "foo" <- expect
+ send server (pid, "baz")
+ "baz" <- expect
+ send server (pid, "bar")
+ Nothing <- (expectTimeout 100000) :: Process (Maybe Double)
+ liftIO $ putMVar clientDone ()
+
+ takeMVar clientDone
+ where handleMessage :: AbstractMessage -> Process (Maybe ())
+ handleMessage msg =
+ maybeHandleMessage msg (\(pid :: ProcessId, (m :: String))
+ -> do { send pid m; return () })
+
-- Test 'receiveChanTimeout'
testReceiveChanTimeout :: NT.Transport -> Assertion
testReceiveChanTimeout transport = do
@@ -987,6 +1102,9 @@ tests (transport, transportInternals) = [
, testCase "RemoteRegistry" (testRemoteRegistry transport)
, testCase "SpawnLocal" (testSpawnLocal transport)
, testCase "MatchAny" (testMatchAny transport)
+ , testCase "MatchAnyHandle" (testMatchAnyHandle transport)
+ , testCase "MatchAnyNoHandle" (testMatchAnyNoHandle transport)
+ , testCase "MatchAnyIf" (testMatchAnyIf transport)
, testCase "ReceiveChanTimeout" (testReceiveChanTimeout transport)
, testCase "ReceiveChanFeatures" (testReceiveChanFeatures transport)
, testCase "KillLocal" (testKillLocal transport)

0 comments on commit 8af66f4

Please sign in to comment.