Skip to content

Commit

Permalink
Merge pull request #264 from haskell-distributed/fd/nc-local-coms
Browse files Browse the repository at this point in the history
Stop using the transport for local communication.
  • Loading branch information
facundominguez committed Dec 2, 2015
2 parents 10c6034 + 07c5422 commit 4c0c890
Showing 1 changed file with 54 additions and 76 deletions.
130 changes: 54 additions & 76 deletions src/Control/Distributed/Process/Node.hs
Expand Up @@ -169,6 +169,7 @@ import Control.Distributed.Process.Internal.Types
, payloadToMessage
, messageToPayload
, createUnencodedMessage
, unsafeCreateUnencodedMessage
, runLocalProcess
, firstNonReservedProcessId
, ImplicitReconnect(WithImplicitReconnect,NoImplicitReconnect)
Expand Down Expand Up @@ -198,7 +199,6 @@ import Control.Distributed.Process.Management.Internal.Types
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Internal.Messaging
( sendBinary
, sendMessage
, sendPayload
, closeImplicitReconnections
, impliesDeathOf
Expand Down Expand Up @@ -661,6 +661,30 @@ instance Show ProcessKillException where
show (ProcessKillException pid reason) =
"killed-by=" ++ show pid ++ ",reason=" ++ reason

ncSendToProcess :: ProcessId -> Message -> NC ()
ncSendToProcess pid msg = do
node <- ask
if processNodeId pid == localNodeId node
then ncEffectLocalSend node pid msg
else liftIO $ sendBinary node
(NodeIdentifier $ localNodeId node)
(NodeIdentifier $ processNodeId pid)
WithImplicitReconnect
NCMsg { ctrlMsgSender = NodeIdentifier $ localNodeId node
, ctrlMsgSignal = UnreliableSend (processLocalId pid) msg
}

ncSendToNode :: NodeId -> NCMsg -> NC ()
ncSendToNode to msg = do
node <- ask
liftIO $ if to == localNodeId node
then writeChan (localCtrlChan node) $! msg
else sendBinary node
(NodeIdentifier $ localNodeId node)
(NodeIdentifier to)
WithImplicitReconnect
msg

--------------------------------------------------------------------------------
-- Tracing/Debugging --
--------------------------------------------------------------------------------
Expand Down Expand Up @@ -703,11 +727,7 @@ nodeController = do
-- [Unified: Table 7, rule nc_forward]
case destNid (ctrlMsgSignal msg) of
Just nid' | nid' /= localNodeId node ->
liftIO $ sendBinary node
(ctrlMsgSender msg)
(NodeIdentifier nid')
WithImplicitReconnect
msg
ncSendToNode nid' msg
_ ->
return ()

Expand All @@ -728,8 +748,8 @@ nodeController = do
ncEffectRegister from label atnode pid force
NCMsg (ProcessIdentifier from) (WhereIs label) ->
ncEffectWhereIs from label
NCMsg (ProcessIdentifier from) (NamedSend label msg') ->
ncEffectNamedSend from label msg'
NCMsg _ (NamedSend label msg') ->
ncEffectNamedSend label msg'
NCMsg _ (UnreliableSend lpid msg') ->
ncEffectLocalSend node (ProcessId (localNodeId node) lpid) msg'
NCMsg _ (LocalSend to msg') ->
Expand Down Expand Up @@ -774,14 +794,10 @@ ncEffectMonitor from them mRef = do
-- TODO: this is the right sender according to the Unified semantics,
-- but perhaps having 'them' as the sender would make more sense
-- (see also: notifyDied)
liftIO $ sendBinary node
(NodeIdentifier $ localNodeId node)
(NodeIdentifier $ processNodeId from)
WithImplicitReconnect
NCMsg
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
, ctrlMsgSignal = Died them DiedUnknownId
}
ncSendToNode (processNodeId from) $ NCMsg
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
, ctrlMsgSignal = Died them DiedUnknownId
}

-- [Unified: Table 11]
ncEffectUnlink :: ProcessId -> Identifier -> NC ()
Expand Down Expand Up @@ -842,15 +858,10 @@ ncEffectDied ident reason = do
False -> return $ Just (pid,nidlist) )
modify' $ registeredOnNodes ^= (Map.fromList (catMaybes remaining))
where
forwardNameDeath node nid =
liftIO $ sendBinary node
(NodeIdentifier $ localNodeId node)
(NodeIdentifier $ nid)
WithImplicitReconnect
NCMsg
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
, ctrlMsgSignal = Died ident reason
}
forwardNameDeath node nid = ncSendToNode nid
NCMsg { ctrlMsgSender = NodeIdentifier (localNodeId node)
, ctrlMsgSignal = Died ident reason
}

-- [Unified: Table 13]
ncEffectSpawn :: ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
Expand All @@ -864,11 +875,7 @@ ncEffectSpawn pid cProc ref = do
Right p -> p
node <- ask
pid' <- liftIO $ forkProcess node proc
liftIO $ sendMessage node
(NodeIdentifier (localNodeId node))
(ProcessIdentifier pid)
WithImplicitReconnect
(DidSpawn ref pid')
ncSendToProcess pid $ unsafeCreateUnencodedMessage $ DidSpawn ref pid'

-- Unified semantics does not explicitly describe how to implement 'register',
-- but mentions it's "very similar to nsend" (Table 14)
Expand All @@ -893,11 +900,8 @@ ncEffectRegister from label atnode mPid reregistration = do
(Just p) -> liftIO $ trace node (MxRegistered p label)
Nothing -> liftIO $ trace node (MxUnRegistered (fromJust currentVal) label)
newVal <- gets (^. registeredHereFor label)
liftIO $ sendMessage node
(NodeIdentifier (localNodeId node))
(ProcessIdentifier from)
WithImplicitReconnect
(RegisterReply label isOk newVal)
ncSendToProcess from $ unsafeCreateUnencodedMessage $
RegisterReply label isOk newVal
else let operation =
case reregistration of
True -> flip decList
Expand Down Expand Up @@ -927,39 +931,23 @@ ncEffectRegister from label atnode mPid reregistration = do
decList (x:xs) tag = x:decList xs tag
forward node to reg =
when (not $ isLocal node (NodeIdentifier to)) $
liftIO $ sendBinary node
(ProcessIdentifier from)
(NodeIdentifier to)
WithImplicitReconnect
NCMsg
{ ctrlMsgSender = ProcessIdentifier from
, ctrlMsgSignal = reg
}
ncSendToNode to $ NCMsg { ctrlMsgSender = ProcessIdentifier from
, ctrlMsgSignal = reg
}


-- Unified semantics does not explicitly describe 'whereis'
ncEffectWhereIs :: ProcessId -> String -> NC ()
ncEffectWhereIs from label = do
node <- ask
mPid <- gets (^. registeredHereFor label)
liftIO $ sendMessage node
(NodeIdentifier (localNodeId node))
(ProcessIdentifier from)
WithImplicitReconnect
(WhereIsReply label mPid)
ncSendToProcess from $ unsafeCreateUnencodedMessage $ WhereIsReply label mPid

-- [Unified: Table 14]
ncEffectNamedSend :: ProcessId -> String -> Message -> NC ()
ncEffectNamedSend from label msg = do
ncEffectNamedSend :: String -> Message -> NC ()
ncEffectNamedSend label msg = do
mPid <- gets (^. registeredHereFor label)
node <- ask
-- If mPid is Nothing, we just ignore the named send (as per Table 14)
forM_ mPid $ \pid ->
liftIO $ sendPayload node
(ProcessIdentifier from)
(ProcessIdentifier pid)
NoImplicitReconnect
(messageToPayload msg)
forM_ mPid (`ncSendToProcess` msg)

-- [Issue #DP-20]
ncEffectLocalSend :: LocalNode -> ProcessId -> Message -> NC ()
Expand Down Expand Up @@ -1017,7 +1005,7 @@ ncEffectGetInfo from pid =
$ return . (^. localProcessWithId lpid)
case mProc of
Nothing -> dispatch (isLocal node (ProcessIdentifier from))
from node (ProcessInfoNone DiedUnknownId)
from (ProcessInfoNone DiedUnknownId)
Just proc -> do
itsLinks <- gets (^. linksFor them)
itsMons <- gets (^. monitorsFor them)
Expand All @@ -1027,7 +1015,6 @@ ncEffectGetInfo from pid =
let reg = registeredNames registered
dispatch (isLocal node (ProcessIdentifier from))
from
node
ProcessInfo {
infoNode = (processNodeId pid)
, infoRegisteredNames = reg
Expand All @@ -1038,16 +1025,11 @@ ncEffectGetInfo from pid =
where dispatch :: (Serializable a, Show a)
=> Bool
-> ProcessId
-> LocalNode
-> a
-> NC ()
dispatch True dest _ pInfo = postAsMessage dest $ pInfo
dispatch False dest node pInfo = do
liftIO $ sendMessage node
(NodeIdentifier (localNodeId node))
(ProcessIdentifier dest)
WithImplicitReconnect
pInfo
dispatch True dest pInfo = postAsMessage dest $ pInfo
dispatch False dest pInfo =
ncSendToProcess dest $ unsafeCreateUnencodedMessage pInfo

registeredNames = Map.foldlWithKey (\ks k v -> if v == pid
then (k:ks)
Expand Down Expand Up @@ -1094,14 +1076,10 @@ notifyDied dest src reason mRef = do
throwException dest $ PortLinkException pid reason
(False, _, _) ->
-- The change in sender comes from [Unified: Table 10]
liftIO $ sendBinary node
(NodeIdentifier $ localNodeId node)
(NodeIdentifier $ processNodeId dest)
WithImplicitReconnect
NCMsg
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
, ctrlMsgSignal = Died src reason
}
ncSendToNode (processNodeId dest) $ NCMsg
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
, ctrlMsgSignal = Died src reason
}

-- | [Unified: Table 8]
destNid :: ProcessSignal -> Maybe NodeId
Expand Down

0 comments on commit 4c0c890

Please sign in to comment.