From 29491c3de870ad384cd3669318d72aab190ccd49 Mon Sep 17 00:00:00 2001 From: Alexander Vershilov Date: Thu, 29 Jan 2015 03:30:40 +0300 Subject: [PATCH 1/7] Implement nsend according to semantics. --- .../Process/Internal/Primitives.hs | 4 ++-- src/Control/Distributed/Process/Node.hs | 22 ++++++++++++++----- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/Control/Distributed/Process/Internal/Primitives.hs b/src/Control/Distributed/Process/Internal/Primitives.hs index e73783eb..e6cd873f 100644 --- a/src/Control/Distributed/Process/Internal/Primitives.hs +++ b/src/Control/Distributed/Process/Internal/Primitives.hs @@ -121,6 +121,7 @@ import Prelude hiding (catch) #endif import Data.Binary (decode) +import Data.Foldable (traverse_) import Data.Time.Clock (getCurrentTime) import Data.Time.Format (formatTime) import System.Locale (defaultTimeLocale) @@ -1110,8 +1111,7 @@ whereisRemoteAsync nid label = -- | Named send to a process in the local registry (asynchronous) nsend :: Serializable a => String -> a -> Process () -nsend label msg = - sendCtrlMsg Nothing (NamedSend label (createUnencodedMessage msg)) +nsend label msg = traverse_ (`send` msg) =<< whereis label -- | Named send to a process in the local registry (asynchronous). -- This function makes /no/ attempt to serialize and (in the case when the diff --git a/src/Control/Distributed/Process/Node.hs b/src/Control/Distributed/Process/Node.hs index 0b1dbac6..e7849cdc 100644 --- a/src/Control/Distributed/Process/Node.hs +++ b/src/Control/Distributed/Process/Node.hs @@ -159,7 +159,8 @@ import Control.Distributed.Process.Internal.Types , createUnencodedMessage , runLocalProcess , firstNonReservedProcessId - , ImplicitReconnect(WithImplicitReconnect) + , ImplicitReconnect(NoImplicitReconnect, WithImplicitReconnect) + , messageToPayload ) import Control.Distributed.Process.Management.Internal.Agent ( mxAgentController @@ -187,6 +188,7 @@ import Control.Distributed.Process.Serializable (Serializable) import Control.Distributed.Process.Internal.Messaging ( sendBinary , sendMessage + , sendPayload , closeImplicitReconnections , impliesDeathOf ) @@ -686,8 +688,8 @@ nodeController = do ncEffectRegister from label atnode pid force NCMsg (ProcessIdentifier from) (WhereIs label) -> ncEffectWhereIs from label - NCMsg _ (NamedSend label msg') -> - ncEffectNamedSend label msg' + NCMsg (ProcessIdentifier from) (NamedSend label msg') -> + ncEffectNamedSend from label msg' NCMsg _ (LocalSend to msg') -> ncEffectLocalSend node to msg' NCMsg _ (LocalPortSend to msg') -> @@ -904,11 +906,19 @@ ncEffectWhereIs from label = do (WhereIsReply label mPid) -- [Unified: Table 14] -ncEffectNamedSend :: String -> Message -> NC () -ncEffectNamedSend label msg = do +ncEffectNamedSend :: ProcessId -> String -> Message -> NC () +ncEffectNamedSend from label msg = do mPid <- gets (^. registeredHereFor label) -- If mPid is Nothing, we just ignore the named send (as per Table 14) - forM_ mPid $ \pid -> postMessage pid msg + node <- ask + forM_ mPid $ \pid -> + if isLocal node (ProcessIdentifier pid) + then postMessage pid msg + else liftIO $ sendPayload node + (ProcessIdentifier from) + (ProcessIdentifier pid) + NoImplicitReconnect + (messageToPayload msg) -- [Issue #DP-20] ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC () From 4600e7852bb9d01c6896793ba589d8e43f25e1e8 Mon Sep 17 00:00:00 2001 From: Alexander Vershilov Date: Fri, 30 Jan 2015 00:48:37 +0300 Subject: [PATCH 2/7] Update spec: Registry --- doc/semantics/CloudHaskellSemantics.tex | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/doc/semantics/CloudHaskellSemantics.tex b/doc/semantics/CloudHaskellSemantics.tex index 5a223e5b..625acc12 100644 --- a/doc/semantics/CloudHaskellSemantics.tex +++ b/doc/semantics/CloudHaskellSemantics.tex @@ -700,6 +700,23 @@ \subsection{Ordering and Typed Channels} for all messages from $P$ to $Q$, rather than using an ordered connection per typed channel plus one for direct messages. +\subsection{Registry} +The identifiers of both local and remote processes can be stored in the Registry. +The operation \texttt{registerRemoteAsync} can register processes at remote nodes. +When a message is sent to a remote node using \texttt{nsendRemote} there is no +guarantee that the process that should receive the message is located at the +node; thus it may be necessary to relay the message to a process on yet another node. + +Both operations \texttt{nsend} and \texttt{nsendRemote} discard the messages if +no process is registered with the given name, or already dead. + +Both \texttt{nsend} and \texttt{nsendRemote} guarantee ordering between messages +sent between two processes using one of those mechanism, however ordering between +messages sent by \texttt{send} and \texttt{nsend} is not preserved. + +Current implementation for monitoring processes that are stored in registry +are left unspecified, this may be changed in future. + \bibliographystyle{apalike} \bibliography{references} From f8781385c8e2edcc81b4c6d360b4edaff1788cd4 Mon Sep 17 00:00:00 2001 From: Alexander Vershilov Date: Tue, 24 Feb 2015 23:04:31 +0300 Subject: [PATCH 3/7] Initial implementation of RegistryAgent Registry agent monitors all processes that were added to registry and removes them from registry if they die. --- distributed-process.cabal | 1 + src/Control/Distributed/Process/Node.hs | 2 + .../Distributed/Process/Node/RegistryAgent.hs | 69 +++++++++++++++++++ 3 files changed, 72 insertions(+) create mode 100644 src/Control/Distributed/Process/Node/RegistryAgent.hs diff --git a/distributed-process.cabal b/distributed-process.cabal index 3a938843..878a7e5c 100644 --- a/distributed-process.cabal +++ b/distributed-process.cabal @@ -71,6 +71,7 @@ Library Control.Distributed.Process.Internal.WeakTQueue, Control.Distributed.Process.Management, Control.Distributed.Process.Node, + Control.Distributed.Process.Node.RegistryAgent Control.Distributed.Process.Serializable, Control.Distributed.Process.UnsafePrimitives Control.Distributed.Process.Management.Internal.Agent, diff --git a/src/Control/Distributed/Process/Node.hs b/src/Control/Distributed/Process/Node.hs index e7849cdc..4a6a8336 100644 --- a/src/Control/Distributed/Process/Node.hs +++ b/src/Control/Distributed/Process/Node.hs @@ -68,6 +68,7 @@ import Control.Exception ) import qualified Control.Exception as Exception (Handler(..), catches, finally) import Control.Concurrent (forkIO, forkIOWithUnmask, myThreadId) +import Control.Distributed.Process.Node.RegistryAgent (registryMonitorAgent) import Control.Distributed.Process.Internal.StrictMVar ( newMVar , withMVar @@ -298,6 +299,7 @@ startServiceProcesses node = do runProcess node $ register Table.mxTableCoordinator tableCoordinatorPid logger <- forkProcess node loop runProcess node $ register "logger" logger + runProcess node $ void $ registryMonitorAgent where fork = forkProcess node diff --git a/src/Control/Distributed/Process/Node/RegistryAgent.hs b/src/Control/Distributed/Process/Node/RegistryAgent.hs new file mode 100644 index 00000000..3f590021 --- /dev/null +++ b/src/Control/Distributed/Process/Node/RegistryAgent.hs @@ -0,0 +1,69 @@ +----------------------------------------------------------------------------- +---- | +---- Module : Control.Distributed.Process.Node.RegistryAgent +---- Copyright : (c) Tweag I/O 2015 +---- License : BSD3 (see the file LICENSE) +---- +---- Maintainer : Tim Watson +---- Stability : experimental +---- Portability : non-portable (requires concurrency) +---- +---- This module provides registry monitoring agent, implemented as +---- a /distributed-process Management Agent/. Once 'node' starts it run this +---- agent, such agent will monitor every remove process that is added to node +---- and remove Processes from registry if they die. +---- +------------------------------------------------------------------------------- + +module Control.Distributed.Process.Node.RegistryAgent + ( registryMonitorAgent + ) where + +import Control.Distributed.Process.Management +import Control.Distributed.Process.Internal.Types +import Control.Distributed.Process.Internal.Primitives +import Control.Monad (when) +import Data.Foldable +import Data.Map.Strict (Map) +import qualified Data.Map.Strict as Map + +registryMonitorAgentId :: MxAgentId +registryMonitorAgentId = MxAgentId "service.registry.monitoring" + +registryMonitorAgent :: Process ProcessId +registryMonitorAgent = do + mxAgent registryMonitorAgentId initState + [ mxSink $ \(ProcessMonitorNotification mr pid _) -> do + hm <- mxGetLocal + traverse_ (\(label, mref) -> when (mr == mref) $ do + liftMX $ do + mynid <- getSelfNode + sendCtrlMsg Nothing (Unmonitor mref) + sendCtrlMsg Nothing (Register label mynid Nothing False) + mxSetLocal $! pid `Map.delete` hm + ) (pid `Map.lookup` hm) + mxReady + , mxSink $ \ev -> + let act = case ev of + MxRegistered pid label -> do + hm <- mxGetLocal + case pid `Map.lookup` hm of + Nothing -> do + mon <- liftMX (monitor pid) + mxUpdateLocal (Map.insert pid (label,mon)) + Just _ -> return () + MxUnRegistered pid _ -> do + hm <- mxGetLocal + traverse_ (\(_,mref) -> do + liftMX $ sendCtrlMsg Nothing (Unmonitor mref) + mxSetLocal $! pid `Map.delete` hm + ) (pid `Map.lookup` hm) + _ -> return () + in act >> mxReady + -- remove async answers from mailbox + , mxSink $ \RegisterReply{} -> mxReady + , mxSink $ \DidUnmonitor{} -> mxReady + ] + where + initState :: Map ProcessId (String,MonitorRef) + initState = Map.empty From 1df7b962288d98bf41a1cb7e1fce2119d32f5f58 Mon Sep 17 00:00:00 2001 From: Alexander Vershilov Date: Wed, 25 Feb 2015 14:24:11 +0300 Subject: [PATCH 4/7] TMP: use forM_ instead of traverse_ --- src/Control/Distributed/Process/Node/RegistryAgent.hs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/Control/Distributed/Process/Node/RegistryAgent.hs b/src/Control/Distributed/Process/Node/RegistryAgent.hs index 3f590021..95104357 100644 --- a/src/Control/Distributed/Process/Node/RegistryAgent.hs +++ b/src/Control/Distributed/Process/Node/RegistryAgent.hs @@ -23,7 +23,7 @@ import Control.Distributed.Process.Management import Control.Distributed.Process.Internal.Types import Control.Distributed.Process.Internal.Primitives import Control.Monad (when) -import Data.Foldable +import Data.Foldable (forM_) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map @@ -35,13 +35,12 @@ registryMonitorAgent = do mxAgent registryMonitorAgentId initState [ mxSink $ \(ProcessMonitorNotification mr pid _) -> do hm <- mxGetLocal - traverse_ (\(label, mref) -> when (mr == mref) $ do + forM_ (pid `Map.lookup` hm) $ \(label, mref) -> when (mr == mref) $ do liftMX $ do mynid <- getSelfNode sendCtrlMsg Nothing (Unmonitor mref) sendCtrlMsg Nothing (Register label mynid Nothing False) mxSetLocal $! pid `Map.delete` hm - ) (pid `Map.lookup` hm) mxReady , mxSink $ \ev -> let act = case ev of @@ -54,10 +53,9 @@ registryMonitorAgent = do Just _ -> return () MxUnRegistered pid _ -> do hm <- mxGetLocal - traverse_ (\(_,mref) -> do + forM_ (pid `Map.lookup` hm) $ \(_,mref) -> do liftMX $ sendCtrlMsg Nothing (Unmonitor mref) mxSetLocal $! pid `Map.delete` hm - ) (pid `Map.lookup` hm) _ -> return () in act >> mxReady -- remove async answers from mailbox From 98d63caaee7822ff704d96bf340761769f0b0d9b Mon Sep 17 00:00:00 2001 From: Alexander Vershilov Date: Wed, 25 Feb 2015 14:25:08 +0300 Subject: [PATCH 5/7] TMP: remove redundant unmonitor call --- src/Control/Distributed/Process/Node/RegistryAgent.hs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Control/Distributed/Process/Node/RegistryAgent.hs b/src/Control/Distributed/Process/Node/RegistryAgent.hs index 95104357..d2705034 100644 --- a/src/Control/Distributed/Process/Node/RegistryAgent.hs +++ b/src/Control/Distributed/Process/Node/RegistryAgent.hs @@ -38,7 +38,6 @@ registryMonitorAgent = do forM_ (pid `Map.lookup` hm) $ \(label, mref) -> when (mr == mref) $ do liftMX $ do mynid <- getSelfNode - sendCtrlMsg Nothing (Unmonitor mref) sendCtrlMsg Nothing (Register label mynid Nothing False) mxSetLocal $! pid `Map.delete` hm mxReady From 23c2ef6f9a830ee371756f8314b78b8aca50beb6 Mon Sep 17 00:00:00 2001 From: Alexander Vershilov Date: Wed, 25 Feb 2015 14:56:16 +0300 Subject: [PATCH 6/7] Export monitorAsync from Internal modules. --- src/Control/Distributed/Process/Internal/Primitives.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Control/Distributed/Process/Internal/Primitives.hs b/src/Control/Distributed/Process/Internal/Primitives.hs index e6cd873f..3ea65bf3 100644 --- a/src/Control/Distributed/Process/Internal/Primitives.hs +++ b/src/Control/Distributed/Process/Internal/Primitives.hs @@ -71,6 +71,7 @@ module Control.Distributed.Process.Internal.Primitives , unlink , monitor , unmonitor + , unmonitorAsync , withMonitor -- * Logging , say From 3ed9bf7cfafc742dca05012a207e0374afc57b8d Mon Sep 17 00:00:00 2001 From: Alexander Vershilov Date: Wed, 25 Feb 2015 14:56:48 +0300 Subject: [PATCH 7/7] TMP: simplify agent --- .../Distributed/Process/Node/RegistryAgent.hs | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/src/Control/Distributed/Process/Node/RegistryAgent.hs b/src/Control/Distributed/Process/Node/RegistryAgent.hs index d2705034..05228223 100644 --- a/src/Control/Distributed/Process/Node/RegistryAgent.hs +++ b/src/Control/Distributed/Process/Node/RegistryAgent.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE BangPatterns #-} ----------------------------------------------------------------------------- ---- | ---- Module : Control.Distributed.Process.Node.RegistryAgent @@ -22,7 +23,6 @@ module Control.Distributed.Process.Node.RegistryAgent import Control.Distributed.Process.Management import Control.Distributed.Process.Internal.Types import Control.Distributed.Process.Internal.Primitives -import Control.Monad (when) import Data.Foldable (forM_) import Data.Map.Strict (Map) import qualified Data.Map.Strict as Map @@ -33,28 +33,26 @@ registryMonitorAgentId = MxAgentId "service.registry.monitoring" registryMonitorAgent :: Process ProcessId registryMonitorAgent = do mxAgent registryMonitorAgentId initState - [ mxSink $ \(ProcessMonitorNotification mr pid _) -> do - hm <- mxGetLocal - forM_ (pid `Map.lookup` hm) $ \(label, mref) -> when (mr == mref) $ do - liftMX $ do - mynid <- getSelfNode - sendCtrlMsg Nothing (Register label mynid Nothing False) - mxSetLocal $! pid `Map.delete` hm + [ mxSink $ \(ProcessMonitorNotification _ pid _) -> do + mxUpdateLocal (Map.delete pid) mxReady , mxSink $ \ev -> let act = case ev of - MxRegistered pid label -> do + MxRegistered pid _ -> do hm <- mxGetLocal case pid `Map.lookup` hm of Nothing -> do - mon <- liftMX (monitor pid) - mxUpdateLocal (Map.insert pid (label,mon)) + mon <- liftMX $ monitor pid + mxUpdateLocal (Map.insert pid (mon, 1)) Just _ -> return () MxUnRegistered pid _ -> do hm <- mxGetLocal - forM_ (pid `Map.lookup` hm) $ \(_,mref) -> do - liftMX $ sendCtrlMsg Nothing (Unmonitor mref) - mxSetLocal $! pid `Map.delete` hm + forM_ (pid `Map.lookup` hm) $ \(mref, i) -> + let !i' = succ i + in if i' == 0 + then do liftMX $ unmonitorAsync mref + mxSetLocal $! pid `Map.delete` hm + else mxSetLocal $ Map.insert pid (mref,i') hm _ -> return () in act >> mxReady -- remove async answers from mailbox @@ -62,5 +60,5 @@ registryMonitorAgent = do , mxSink $ \DidUnmonitor{} -> mxReady ] where - initState :: Map ProcessId (String,MonitorRef) + initState :: Map ProcessId (MonitorRef,Int) initState = Map.empty