Skip to content

Commit

Permalink
Bump to 0.2.2/Exception handling/fix runProcess
Browse files Browse the repository at this point in the history
runProcess only marked the process as finished if the process completed
successfully; if the process threw an exception, a 'blocked indefinitely on
MVar' exception would be thrown
  • Loading branch information
edsko committed Jul 31, 2012
1 parent e06c6bf commit b9b4aaa
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 12 deletions.
6 changes: 6 additions & 0 deletions distributed-process/ChangeLog
@@ -1,3 +1,9 @@
2012-07-31 Edsko de Vries <edsko@well-typed.com> 0.2.2.0

* Add exception handling primitives
* Fix runProcess: if the process threw an exception, a 'waiting indefinitely on
MVar' exception would be thrown.

2012-07-21 Edsko de Vries <edsko@well-typed.com> 0.2.1.4

* Bugfix in the node controller
Expand Down
2 changes: 1 addition & 1 deletion distributed-process/distributed-process.cabal
@@ -1,5 +1,5 @@
Name: distributed-process
Version: 0.2.1.4
Version: 0.2.2.0
Cabal-Version: >=1.8
Build-Type: Simple
License: BSD3
Expand Down
16 changes: 14 additions & 2 deletions distributed-process/src/Control/Distributed/Process.hs
Expand Up @@ -80,8 +80,14 @@ module Control.Distributed.Process
, whereisRemoteAsync
, nsendRemote
, WhereIsReply(..)
-- * Auxiliary API
-- * Exception handling
, catch
, mask
, onException
, bracket
, bracket_
, finally
-- * Auxiliary API
, expectTimeout
, spawnAsync
, spawnSupervised
Expand Down Expand Up @@ -190,8 +196,14 @@ import Control.Distributed.Process.Internal.Primitives
, nsendRemote
-- Closures
, unClosure
-- Auxiliary API
-- Exception handling
, catch
, mask
, onException
, bracket
, bracket_
, finally
-- Auxiliary API
, expectTimeout
, spawnAsync
)
Expand Down
Expand Up @@ -43,8 +43,14 @@ module Control.Distributed.Process.Internal.Primitives
, nsendRemote
-- * Closures
, unClosure
-- * Auxiliary API
-- * Exception handling
, catch
, mask
, onException
, bracket
, bracket_
, finally
-- * Auxiliary API
, expectTimeout
, spawnAsync
, linkNode
Expand All @@ -67,8 +73,8 @@ import System.Locale (defaultTimeLocale)
import Control.Monad.Reader (ask)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Applicative ((<$>))
import Control.Exception (Exception, throw)
import qualified Control.Exception as Ex (catch)
import Control.Exception (Exception, throw, throwIO, SomeException)
import qualified Control.Exception as Ex (catch, mask)
import Control.Concurrent.MVar (modifyMVar)
import Control.Concurrent.Chan (writeChan)
import Control.Concurrent.STM
Expand Down Expand Up @@ -258,7 +264,7 @@ instance Exception ProcessTerminationException

-- | Terminate (throws a ProcessTerminationException)
terminate :: Process a
terminate = liftIO $ throw ProcessTerminationException
terminate = liftIO $ throwIO ProcessTerminationException

-- | Our own process ID
getSelfPid :: Process ProcessId
Expand Down Expand Up @@ -317,15 +323,51 @@ unmonitor ref = do
]

--------------------------------------------------------------------------------
-- Auxiliary API --
-- Exception handling --
--------------------------------------------------------------------------------

-- | Catch exceptions within a process
-- | Lift 'Control.Exception.catch'
catch :: Exception e => Process a -> (e -> Process a) -> Process a
catch p h = do
lproc <- ask
liftIO $ Ex.catch (runLocalProcess lproc p) (runLocalProcess lproc . h)

-- | Lift 'Control.Exception.mask'
mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
mask p = do
lproc <- ask
liftIO $ Ex.mask $ \restore ->
runLocalProcess lproc (p (liftRestore lproc restore))
where
liftRestore :: LocalProcess -> (forall a. IO a -> IO a) -> (forall a. Process a -> Process a)
liftRestore lproc restoreIO = liftIO . restoreIO . runLocalProcess lproc

-- | Lift 'Control.Exception.onException'
onException :: Process a -> Process b -> Process a
onException p what = p `catch` \e -> do _ <- what
liftIO $ throwIO (e :: SomeException)

-- | Lift 'Control.Exception.bracket'
bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket before after thing = do
mask $ \restore -> do
a <- before
r <- restore (thing a) `onException` after a
_ <- after a
return r

-- | Lift 'Control.Exception.bracket_'
bracket_ :: Process a -> Process b -> Process c -> Process c
bracket_ before after thing = bracket before (const after) (const thing)

-- | Lift 'Control.Exception.finally'
finally :: Process a -> Process b -> Process a
finally a sequel = bracket_ (return ()) sequel a

--------------------------------------------------------------------------------
-- Auxiliary API --
--------------------------------------------------------------------------------

-- | Like 'expect' but with a timeout
expectTimeout :: forall a. Serializable a => Int -> Process (Maybe a)
expectTimeout timeout = receiveTimeout timeout [match return]
Expand Down Expand Up @@ -464,7 +506,7 @@ unClosure :: forall a. Typeable a => Closure a -> Process a
unClosure (Closure (Static label) env) = do
rtable <- remoteTable . processNode <$> ask
case resolveClosure rtable label env of
Nothing -> throw . userError $ "Unregistered closure " ++ show label
Nothing -> error $ "Unregistered closure " ++ show label
Just dyn -> return $ fromDyn dyn (throw (typeError dyn))
where
typeError dyn = userError $ "lookupStatic type error: "
Expand Down
4 changes: 2 additions & 2 deletions distributed-process/src/Control/Distributed/Process/Node.hs
Expand Up @@ -121,7 +121,7 @@ import Control.Distributed.Process.Internal.Node
, sendMessage
, sendPayload
)
import Control.Distributed.Process.Internal.Primitives (expect, register)
import Control.Distributed.Process.Internal.Primitives (expect, register, finally)
import qualified Control.Distributed.Process.Internal.Closure.Static as Static (__remoteTable)
import qualified Control.Distributed.Process.Internal.Closure.CP as CP (__remoteTable)

Expand Down Expand Up @@ -189,7 +189,7 @@ closeLocalNode node =
runProcess :: LocalNode -> Process () -> IO ()
runProcess node proc = do
done <- newEmptyMVar
void $ forkProcess node (proc >> liftIO (putMVar done ()))
void $ forkProcess node (proc `finally` liftIO (putMVar done ()))
takeMVar done

-- | Spawn a new process on a local node
Expand Down

0 comments on commit b9b4aaa

Please sign in to comment.