Skip to content

Commit 24163c2

Browse files
committed
Fixed de-registration of remote processes when the process terminates
Added reregister and reregisterRemoteAsync (keeping in line with Erlang's approach) Changed SimpleLocalNet to use reregisterRemoteAsync to adjust logging (currently without waiting for result)
1 parent 37b0263 commit 24163c2

File tree

5 files changed

+92
-31
lines changed

5 files changed

+92
-31
lines changed

distributed-process-simplelocalnet/src/Control/Distributed/Process/Backend/SimpleLocalnet.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ import Control.Distributed.Process
115115
, WhereIsReply(..)
116116
, whereis
117117
, whereisRemoteAsync
118-
, registerRemote
118+
, reregisterRemoteAsync
119119
, getSelfPid
120120
, register
121121
, expect
@@ -244,7 +244,7 @@ apiRedirectLogsHere backend = do
244244
mLogger <- whereis "logger"
245245
forM_ mLogger $ \logger -> do
246246
nids <- liftIO $ findPeers backend 1000000
247-
forM_ nids $ \nid -> registerRemote nid "logger" logger
247+
forM_ nids $ \nid -> reregisterRemoteAsync nid "logger" logger -- ignore async response
248248

249249
--------------------------------------------------------------------------------
250250
-- Slaves --

distributed-process/src/Control/Distributed/Process.hs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ module Control.Distributed.Process
4444
, call
4545
, terminate
4646
, ProcessTerminationException(..)
47+
, ProcessRegistrationException(..)
4748
, SpawnRef
4849
, getSelfPid
4950
, getSelfNode
@@ -77,10 +78,12 @@ module Control.Distributed.Process
7778
, say
7879
-- * Registry
7980
, register
81+
, reregister
8082
, unregister
8183
, whereis
8284
, nsend
8385
, registerRemoteAsync
86+
, reregisterRemoteAsync
8487
, unregisterRemoteAsync
8588
, whereisRemoteAsync
8689
, nsendRemote
@@ -135,6 +138,7 @@ import Control.Distributed.Process.Internal.Types
135138
, ProcessLinkException(..)
136139
, NodeLinkException(..)
137140
, PortLinkException(..)
141+
, ProcessRegistrationException(..)
138142
, DiedReason(..)
139143
, SpawnRef(..)
140144
, DidSpawn(..)
@@ -197,10 +201,12 @@ import Control.Distributed.Process.Internal.Primitives
197201
, say
198202
-- Registry
199203
, register
204+
, reregister
200205
, unregister
201206
, whereis
202207
, nsend
203208
, registerRemoteAsync
209+
, reregisterRemoteAsync
204210
, unregisterRemoteAsync
205211
, whereisRemoteAsync
206212
, nsendRemote

distributed-process/src/Control/Distributed/Process/Internal/Primitives.hs

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,12 @@ module Control.Distributed.Process.Internal.Primitives
3535
, say
3636
-- * Registry
3737
, register
38+
, reregister
3839
, unregister
3940
, whereis
4041
, nsend
4142
, registerRemoteAsync
43+
, reregisterRemoteAsync
4244
, unregisterRemoteAsync
4345
, whereisRemoteAsync
4446
, nsendRemote
@@ -542,11 +544,23 @@ say string = do
542544
--------------------------------------------------------------------------------
543545

544546
-- | Register a process with the local registry (asynchronous).
547+
-- This version will wait until a response is gotten from the
548+
-- management process. The name must not already be registered.
549+
-- The process need not be on this node.
550+
-- A bad registration will result in a 'ProcessRegistrationException'
545551
--
546552
-- The process to be registered does not have to be local itself.
547553
register :: String -> ProcessId -> Process ()
548-
register label pid = do
549-
sendCtrlMsg Nothing (Register label (Just pid))
554+
register = registerImpl False
555+
556+
-- | Like 'register', but will replace an existing registration.
557+
-- The name must already be registered.
558+
reregister :: String -> ProcessId -> Process ()
559+
reregister = registerImpl True
560+
561+
registerImpl :: Bool -> String -> ProcessId -> Process ()
562+
registerImpl force label pid = do
563+
sendCtrlMsg Nothing (Register label (Just pid) force)
550564
receiveWait [ matchIf (\(RegisterReply label' _) -> label == label')
551565
(\(RegisterReply _ ok) -> handleRegistrationReply label ok)
552566
]
@@ -559,12 +573,18 @@ register label pid = do
559573
-- See comments in 'whereisRemoteAsync'
560574
registerRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
561575
registerRemoteAsync nid label pid =
562-
sendCtrlMsg (Just nid) (Register label (Just pid))
576+
sendCtrlMsg (Just nid) (Register label (Just pid) False)
577+
578+
reregisterRemoteAsync :: NodeId -> String -> ProcessId -> Process ()
579+
reregisterRemoteAsync nid label pid =
580+
sendCtrlMsg (Just nid) (Register label (Just pid) True)
563581

564582
-- | Remove a process from the local registry (asynchronous).
583+
-- This version will wait until a response is gotten from the
584+
-- management process. The name must already be registered.
565585
unregister :: String -> Process ()
566586
unregister label = do
567-
sendCtrlMsg Nothing (Register label Nothing)
587+
sendCtrlMsg Nothing (Register label Nothing False)
568588
receiveWait [ matchIf (\(RegisterReply label' _) -> label == label')
569589
(\(RegisterReply _ ok) -> handleRegistrationReply label ok)
570590
]
@@ -583,7 +603,7 @@ handleRegistrationReply label ok =
583603
-- See comments in 'whereisRemoteAsync'
584604
unregisterRemoteAsync :: NodeId -> String -> Process ()
585605
unregisterRemoteAsync nid label =
586-
sendCtrlMsg (Just nid) (Register label Nothing)
606+
sendCtrlMsg (Just nid) (Register label Nothing False)
587607

588608
-- | Query the local process registry
589609
whereis :: String -> Process (Maybe ProcessId)

distributed-process/src/Control/Distributed/Process/Internal/Types.hs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ data ProcessSignal =
440440
| Died Identifier !DiedReason
441441
| Spawn !(Closure (Process ())) !SpawnRef
442442
| WhereIs !String
443-
| Register !String !(Maybe ProcessId) -- Use 'Nothing' to unregister
443+
| Register !String !(Maybe ProcessId) !Bool -- Use 'Nothing' to unregister, use True to force reregister
444444
| NamedSend !String !Message
445445
deriving Show
446446

@@ -484,7 +484,7 @@ instance Binary ProcessSignal where
484484
put (Died who reason) = putWord8 4 >> put who >> put reason
485485
put (Spawn proc ref) = putWord8 5 >> put proc >> put ref
486486
put (WhereIs label) = putWord8 6 >> put label
487-
put (Register label pid) = putWord8 7 >> put label >> put pid
487+
put (Register label pid force)= putWord8 7 >> put label >> put pid >> put force
488488
put (NamedSend label msg) = putWord8 8 >> put label >> put (messageToPayload msg)
489489
get = do
490490
header <- getWord8
@@ -496,7 +496,7 @@ instance Binary ProcessSignal where
496496
4 -> Died <$> get <*> get
497497
5 -> Spawn <$> get <*> get
498498
6 -> WhereIs <$> get
499-
7 -> Register <$> get <*> get
499+
7 -> Register <$> get <*> get <*> get
500500
8 -> NamedSend <$> get <*> (payloadToMessage <$> get)
501501
_ -> fail "ProcessSignal.get: invalid"
502502

distributed-process/src/Control/Distributed/Process/Node.hs

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ import Data.Map (Map)
2424
import qualified Data.Map as Map
2525
( empty
2626
, toList
27+
, fromList
2728
, partitionWithKey
2829
, elems
2930
, filterWithKey
3031
)
3132
import Data.Set (Set)
3233
import qualified Data.Set as Set (empty, insert, delete, member)
3334
import Data.Foldable (forM_)
34-
import Data.Maybe (isJust, isNothing)
35+
import Data.Maybe (isJust, isNothing, catMaybes)
3536
import Data.Typeable (Typeable)
3637
import Control.Category ((>>>))
3738
import Control.Applicative ((<$>))
@@ -438,8 +439,8 @@ data NCState = NCState
438439
_links :: !(Map Identifier (Set ProcessId))
439440
-- Mapping from remote processes to monitoring local processes
440441
, _monitors :: !(Map Identifier (Set (ProcessId, MonitorRef)))
441-
-- Process registry
442-
, _registry :: !(Map String ProcessId)
442+
-- Process registry: names and where they live, mapped to the PIDs
443+
, _registry :: !(Map (String,NodeId) ProcessId)
443444
}
444445

445446
newtype NC a = NC { unNC :: StateT NCState (ReaderT LocalNode IO) a }
@@ -486,8 +487,8 @@ nodeController = do
486487
ncEffectDied ident reason
487488
NCMsg (ProcessIdentifier from) (Spawn proc ref) ->
488489
ncEffectSpawn from proc ref
489-
NCMsg (ProcessIdentifier from) (Register label pid) ->
490-
ncEffectRegister from label pid
490+
NCMsg (ProcessIdentifier from) (Register label pid force) ->
491+
ncEffectRegister from label pid force
491492
NCMsg (ProcessIdentifier from) (WhereIs label) ->
492493
ncEffectWhereIs from label
493494
NCMsg from (NamedSend label msg') ->
@@ -555,6 +556,8 @@ ncEffectDied ident reason = do
555556
(affectedLinks, unaffectedLinks) <- gets (splitNotif ident . (^. links))
556557
(affectedMons, unaffectedMons) <- gets (splitNotif ident . (^. monitors))
557558

559+
-- _registry :: !(Map (String,NodeId) ProcessId)
560+
558561
let localOnly = case ident of NodeIdentifier _ -> True ; _ -> False
559562

560563
forM_ (Map.toList affectedLinks) $ \(them, uss) ->
@@ -565,10 +568,27 @@ ncEffectDied ident reason = do
565568
forM_ (Map.toList affectedMons) $ \(them, refs) ->
566569
forM_ refs $ \(us, ref) ->
567570
when (localOnly <= isLocal node (ProcessIdentifier us)) $
568-
notifyDied us them reason (Just ref)
571+
notifyDied us them reason (Just ref)
569572

570573
modify' $ (links ^= unaffectedLinks) . (monitors ^= unaffectedMons)
571574

575+
remaining <- fmap Map.toList (gets (^. registry)) >>= mapM ( \whl@((_,nid),pid) ->
576+
case ident `impliesDeathOf` ProcessIdentifier pid ||
577+
ident `impliesDeathOf` NodeIdentifier nid of
578+
True ->
579+
do when (not $ isLocal node (NodeIdentifier nid)) $
580+
liftIO $ sendBinary node
581+
(NodeIdentifier $ localNodeId node)
582+
(NodeIdentifier $ nid)
583+
WithImplicitReconnect
584+
NCMsg
585+
{ ctrlMsgSender = NodeIdentifier (localNodeId node)
586+
, ctrlMsgSignal = Died ident reason
587+
}
588+
return Nothing
589+
False -> return $ Just whl)
590+
modify' $ registry ^= Map.fromList (catMaybes remaining)
591+
572592
-- [Unified: Table 13]
573593
ncEffectSpawn :: ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
574594
ncEffectSpawn pid cProc ref = do
@@ -589,18 +609,33 @@ ncEffectSpawn pid cProc ref = do
589609
-- Unified semantics does not explicitly describe how to implement 'register',
590610
-- but mentions it's "very similar to nsend" (Table 14)
591611
-- We send a response indicated if the operation is invalid
592-
ncEffectRegister :: ProcessId -> String -> Maybe ProcessId -> NC ()
593-
ncEffectRegister from label mPid = do
612+
ncEffectRegister :: ProcessId -> String -> Maybe ProcessId -> Bool -> NC ()
613+
ncEffectRegister from label mPid reregistration = do
594614
node <- ask
595-
currentVal <- gets (^. registryFor label)
596-
let isOk =
615+
currentVal <- gets (^. registryFor (label, localNodeId node))
616+
isOk <-
597617
case mPid of
598618
Nothing -> -- unregister request
599-
isJust currentVal
600-
Just _ -> -- register request
601-
isNothing currentVal
602-
when (isOk) $
603-
modify' $ registryFor label ^= mPid
619+
return $ isJust currentVal
620+
Just thepid -> -- register request
621+
do isvalidlocal <- isValidLocalIdentifier (ProcessIdentifier thepid)
622+
return $ (isNothing currentVal /= reregistration) &&
623+
(not (isLocal node (ProcessIdentifier thepid) ) || isvalidlocal )
624+
if isLocal node (ProcessIdentifier from)
625+
then when (isOk) $
626+
do modify' $ registryFor (label, localNodeId node) ^= mPid
627+
let namedPid =
628+
head $ catMaybes [mPid, currentVal]
629+
when (not $ isLocal node (ProcessIdentifier namedPid)) $
630+
liftIO $ sendBinary node
631+
(ProcessIdentifier from)
632+
(NodeIdentifier (processNodeId namedPid))
633+
WithImplicitReconnect
634+
NCMsg
635+
{ ctrlMsgSender = ProcessIdentifier from
636+
, ctrlMsgSignal = Register label mPid reregistration
637+
}
638+
else modify' $ registryFor (label,processNodeId from) ^= mPid
604639
liftIO $ sendMessage node
605640
(NodeIdentifier (localNodeId node))
606641
(ProcessIdentifier from)
@@ -611,7 +646,7 @@ ncEffectRegister from label mPid = do
611646
ncEffectWhereIs :: ProcessId -> String -> NC ()
612647
ncEffectWhereIs from label = do
613648
node <- ask
614-
mPid <- gets (^. registryFor label)
649+
mPid <- gets (^. registryFor (label, localNodeId node))
615650
liftIO $ sendMessage node
616651
(NodeIdentifier (localNodeId node))
617652
(ProcessIdentifier from)
@@ -621,9 +656,9 @@ ncEffectWhereIs from label = do
621656
-- [Unified: Table 14]
622657
ncEffectNamedSend :: Identifier -> String -> Message -> NC ()
623658
ncEffectNamedSend from label msg = do
624-
mPid <- gets (^. registryFor label)
659+
node <- ask
660+
mPid <- gets (^. registryFor (label, localNodeId node))
625661
-- If mPid is Nothing, we just ignore the named send (as per Table 14)
626-
node <- ask
627662
forM_ mPid $ \pid ->
628663
liftIO $ sendPayload node
629664
from
@@ -673,7 +708,7 @@ destNid (Unlink ident) = Just $ nodeOf ident
673708
destNid (Monitor ref) = Just $ nodeOf (monitorRefIdent ref)
674709
destNid (Unmonitor ref) = Just $ nodeOf (monitorRefIdent ref)
675710
destNid (Spawn _ _) = Nothing
676-
destNid (Register _ _) = Nothing
711+
destNid (Register _ _ _)= Nothing
677712
destNid (WhereIs _) = Nothing
678713
destNid (NamedSend _ _) = Nothing
679714
-- We don't need to forward 'Died' signals; if monitoring/linking is setup,
@@ -745,7 +780,7 @@ links = accessor _links (\ls st -> st { _links = ls })
745780
monitors :: Accessor NCState (Map Identifier (Set (ProcessId, MonitorRef)))
746781
monitors = accessor _monitors (\ms st -> st { _monitors = ms })
747782

748-
registry :: Accessor NCState (Map String ProcessId)
783+
registry :: Accessor NCState (Map (String,NodeId) ProcessId)
749784
registry = accessor _registry (\ry st -> st { _registry = ry })
750785

751786
linksFor :: Identifier -> Accessor NCState (Set ProcessId)
@@ -754,7 +789,7 @@ linksFor ident = links >>> DAC.mapDefault Set.empty ident
754789
monitorsFor :: Identifier -> Accessor NCState (Set (ProcessId, MonitorRef))
755790
monitorsFor ident = monitors >>> DAC.mapDefault Set.empty ident
756791

757-
registryFor :: String -> Accessor NCState (Maybe ProcessId)
792+
registryFor :: (String, NodeId) -> Accessor NCState (Maybe ProcessId)
758793
registryFor ident = registry >>> DAC.mapMaybe ident
759794

760795
-- | @splitNotif ident@ splits a notifications map into those

0 commit comments

Comments
 (0)