Skip to content

Commit

Permalink
Remove first memory leak in the node controller
Browse files Browse the repository at this point in the history
There is at least one other (PINNED) memory leak :-/
  • Loading branch information
edsko committed Aug 13, 2012
1 parent 86b3422 commit 0a02c27
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 42 deletions.
12 changes: 8 additions & 4 deletions distributed-process/distributed-process.cabal
Expand Up @@ -46,7 +46,8 @@ Library
ghc-prim >= 0.2 && < 0.4,
distributed-static >= 0.1 && < 0.2,
rank1dynamic >= 0.1 && < 0.2,
syb >= 0.3 && < 0.4
syb >= 0.3 && < 0.4,
deepseq >= 1.3 && < 1.4
Exposed-modules: Control.Distributed.Process,
Control.Distributed.Process.Serializable,
Control.Distributed.Process.Closure,
Expand All @@ -55,8 +56,9 @@ Library
Control.Distributed.Process.Internal.CQueue,
Control.Distributed.Process.Internal.Types,
Control.Distributed.Process.Internal.Closure.TH,
Control.Distributed.Process.Internal.Closure.BuiltIn,
Control.Distributed.Process.Internal.Closure.BuiltIn
Control.Distributed.Process.Internal.Node
Control.Distributed.Process.Internal.StrictMVar
Extensions: RankNTypes,
ScopedTypeVariables,
FlexibleInstances,
Expand Down Expand Up @@ -89,7 +91,8 @@ Test-Suite TestCH
ghc-prim >= 0.2 && < 0.4,
ansi-terminal >= 0.5 && < 0.6,
distributed-static >= 0.1 && < 0.2,
rank1dynamic >= 0.1 && < 0.2
rank1dynamic >= 0.1 && < 0.2,
deepseq >= 1.3 && < 1.4
Extensions: RankNTypes,
ScopedTypeVariables,
FlexibleInstances,
Expand Down Expand Up @@ -123,7 +126,8 @@ Test-Suite TestClosure
ansi-terminal >= 0.5 && < 0.6,
distributed-static >= 0.1 && < 0.2,
rank1dynamic >= 0.1 && < 0.2,
syb >= 0.3 && < 0.4
syb >= 0.3 && < 0.4,
deepseq >= 1.3 && < 1.4
Other-modules: TestAuxiliary
Extensions: RankNTypes,
ScopedTypeVariables,
Expand Down
Expand Up @@ -9,7 +9,7 @@ import Data.Accessor ((^.), (^=))
import Data.Binary (Binary, encode)
import qualified Data.ByteString.Lazy as BSL (toChunks)
import qualified Data.ByteString as BSS (ByteString)
import Control.Concurrent.MVar (withMVar, modifyMVar_)
import Control.Distributed.Process.Internal.StrictMVar (withMVar, modifyMVar_)
import Control.Concurrent.Chan (writeChan)
import Control.Monad (unless)
import qualified Network.Transport as NT
Expand Down
Expand Up @@ -74,7 +74,7 @@ import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Applicative ((<$>))
import Control.Exception (Exception, throwIO, SomeException)
import qualified Control.Exception as Ex (catch, mask)
import Control.Concurrent.MVar (modifyMVar)
import Control.Distributed.Process.Internal.StrictMVar (modifyMVar)
import Control.Concurrent.Chan (writeChan)
import Control.Concurrent.STM
( STM
Expand Down Expand Up @@ -346,7 +346,7 @@ onException p what = p `catch` \e -> do _ <- what

-- | Lift 'Control.Exception.bracket'
bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket before after thing = do
bracket before after thing =
mask $ \restore -> do
a <- before
r <- restore (thing a) `onException` after a
Expand Down
@@ -0,0 +1,51 @@
-- | Like Control.Concurrent.MVar.Strict but reduce to HNF, not NF
module Control.Distributed.Process.Internal.StrictMVar
( StrictMVar
, newEmptyMVar
, newMVar
, takeMVar
, putMVar
, withMVar
, modifyMVar_
, modifyMVar
) where

import Control.Applicative ((<$>))
import Control.Monad ((>=>))
import Control.Exception (evaluate)
import qualified Control.Concurrent.MVar as MVar
( MVar
, newEmptyMVar
, newMVar
, takeMVar
, putMVar
, withMVar
, modifyMVar_
, modifyMVar
)

newtype StrictMVar a = StrictMVar (MVar.MVar a)

newEmptyMVar :: IO (StrictMVar a)
newEmptyMVar = StrictMVar <$> MVar.newEmptyMVar

newMVar :: a -> IO (StrictMVar a)
newMVar x = evaluate x >> StrictMVar <$> MVar.newMVar x

takeMVar :: StrictMVar a -> IO a
takeMVar (StrictMVar v) = MVar.takeMVar v

putMVar :: StrictMVar a -> a -> IO ()
putMVar (StrictMVar v) x = evaluate x >> MVar.putMVar v x

withMVar :: StrictMVar a -> (a -> IO b) -> IO b
withMVar (StrictMVar v) = MVar.withMVar v

modifyMVar_ :: StrictMVar a -> (a -> IO a) -> IO ()
modifyMVar_ (StrictMVar v) f = MVar.modifyMVar_ v (f >=> evaluate)

modifyMVar :: StrictMVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (StrictMVar v) f = MVar.modifyMVar v (f >=> evaluateFst)
where
evaluateFst :: (a, b) -> IO (a, b)
evaluateFst (x, y) = evaluate x >> return (x, y)
Expand Up @@ -77,7 +77,6 @@ import qualified Data.Accessor.Container as DAC (mapMaybe)
import Control.Category ((>>>))
import Control.Exception (Exception)
import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar (MVar)
import Control.Concurrent.Chan (Chan)
import Control.Concurrent.STM (TChan, TVar)
import qualified Network.Transport as NT (EndPoint, EndPointAddress, Connection)
Expand All @@ -94,7 +93,9 @@ import Control.Distributed.Process.Serializable
, showFingerprint
)
import Control.Distributed.Process.Internal.CQueue (CQueue)
import Control.Distributed.Process.Internal.StrictMVar (StrictMVar)
import Control.Distributed.Static (RemoteTable, Closure)
import Control.DeepSeq (NFData(rnf))

-- import Control.Distributed.Process.Internal.Dynamic (Dynamic)
-- import Control.Distributed.Process.Internal.TypeRep (compareTypeRep) -- and Binary instances
Expand All @@ -104,29 +105,39 @@ import Control.Distributed.Static (RemoteTable, Closure)
--------------------------------------------------------------------------------

-- | Node identifier
newtype NodeId = NodeId { nodeAddress :: NT.EndPointAddress }
deriving (Eq, Ord, Binary)
data NodeId = NodeId { nodeAddress :: !(NT.EndPointAddress) }
deriving (Eq, Ord)

instance NFData NodeId -- Default implementation is okay

instance Binary NodeId where
put (NodeId nid) = put nid
get = NodeId <$> get

instance Show NodeId where
show (NodeId addr) = "nid://" ++ show addr

-- | A local process ID consists of a seed which distinguishes processes from
-- different instances of the same local node and a counter
data LocalProcessId = LocalProcessId
{ lpidUnique :: Int32
, lpidCounter :: Int32
{ lpidUnique :: !Int32
, lpidCounter :: !Int32
}
deriving (Eq, Ord, Typeable, Show)

instance NFData LocalProcessId -- Default implementation is okay

-- | Process identifier
data ProcessId = ProcessId
{ -- | The ID of the node the process is running on
processNodeId :: NodeId
processNodeId :: !NodeId
-- | Node-local identifier for the process
, processLocalId :: LocalProcessId
, processLocalId :: !LocalProcessId
}
deriving (Eq, Ord, Typeable)

instance NFData ProcessId -- Default implementation is okay

instance Show ProcessId where
show (ProcessId (NodeId addr) (LocalProcessId _ lid))
= "pid://" ++ show addr ++ ":" ++ show lid
Expand All @@ -138,6 +149,11 @@ data Identifier =
| SendPortIdentifier SendPortId
deriving (Eq, Ord)

instance NFData Identifier where
rnf (NodeIdentifier nid) = rnf nid
rnf (ProcessIdentifier pid) = rnf pid
rnf (SendPortIdentifier sid) = rnf sid

instance Show Identifier where
show (NodeIdentifier nid) = show nid
show (ProcessIdentifier pid) = show pid
Expand All @@ -159,7 +175,7 @@ data LocalNode = LocalNode
-- | The network endpoint associated with this node
, localEndPoint :: NT.EndPoint
-- | Local node state
, localState :: MVar LocalNodeState
, localState :: StrictMVar LocalNodeState
-- | Channel for the node controller
, localCtrlChan :: Chan NCMsg
-- | Runtime lookup table for supporting closures
Expand All @@ -169,17 +185,17 @@ data LocalNode = LocalNode

-- | Local node state
data LocalNodeState = LocalNodeState
{ _localProcesses :: Map LocalProcessId LocalProcess
, _localPidCounter :: Int32
, _localPidUnique :: Int32
, _localConnections :: Map (Identifier, Identifier) NT.Connection
{ _localProcesses :: !(Map LocalProcessId LocalProcess)
, _localPidCounter :: !Int32
, _localPidUnique :: !Int32
, _localConnections :: !(Map (Identifier, Identifier) NT.Connection)
}

-- | Processes running on our local node
data LocalProcess = LocalProcess
{ processQueue :: CQueue Message
, processId :: ProcessId
, processState :: MVar LocalProcessState
, processState :: StrictMVar LocalProcessState
, processThread :: ThreadId
, processNode :: LocalNode
}
Expand All @@ -190,10 +206,10 @@ runLocalProcess lproc proc = runReaderT (unProcess proc) lproc

-- | Local process state
data LocalProcessState = LocalProcessState
{ _monitorCounter :: Int32
, _spawnCounter :: Int32
, _channelCounter :: Int32
, _typedChannels :: Map LocalSendPortId TypedChannel
{ _monitorCounter :: !Int32
, _spawnCounter :: !Int32
, _channelCounter :: !Int32
, _typedChannels :: !(Map LocalSendPortId TypedChannel)
}

-- | The Cloud Haskell 'Process' type
Expand All @@ -214,12 +230,14 @@ type LocalSendPortId = Int32
-- to create a SendPort.
data SendPortId = SendPortId {
-- | The ID of the process that will receive messages sent on this port
sendPortProcessId :: ProcessId
sendPortProcessId :: !ProcessId
-- | Process-local ID of the channel
, sendPortLocalId :: LocalSendPortId
, sendPortLocalId :: !LocalSendPortId
}
deriving (Eq, Ord)

instance NFData SendPortId -- Default implementation is okay

instance Show SendPortId where
show (SendPortId (ProcessId (NodeId addr) (LocalProcessId _ plid)) clid)
= "cid://" ++ show addr ++ ":" ++ show plid ++ ":" ++ show clid
Expand Down Expand Up @@ -279,12 +297,14 @@ payloadToMessage payload = Message fp msg
-- | MonitorRef is opaque for regular Cloud Haskell processes
data MonitorRef = MonitorRef
{ -- | ID of the entity to be monitored
monitorRefIdent :: Identifier
monitorRefIdent :: !Identifier
-- | Unique to distinguish multiple monitor requests by the same process
, monitorRefCounter :: Int32
, monitorRefCounter :: !Int32
}
deriving (Eq, Ord, Show)

instance NFData MonitorRef -- Default implementation is okay

-- | Message sent by process monitors
data ProcessMonitorNotification =
ProcessMonitorNotification MonitorRef ProcessId DiedReason
Expand Down
40 changes: 26 additions & 14 deletions distributed-process/src/Control/Distributed/Process/Node.hs
Expand Up @@ -36,12 +36,13 @@ import Control.Category ((>>>))
import Control.Applicative ((<$>))
import Control.Monad (void, when, forever)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.State (MonadState, StateT, evalStateT, gets, modify)
import Control.Monad.State.Strict (MonadState, StateT, evalStateT, gets)
import qualified Control.Monad.State.Strict as StateT (get, put)
import Control.Monad.Reader (MonadReader, ReaderT, runReaderT, ask)
import Control.Exception (throwIO, SomeException, Exception, throwTo)
import qualified Control.Exception as Exception (catch)
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar
import Control.Distributed.Process.Internal.StrictMVar
( newMVar
, withMVar
, modifyMVar
Expand Down Expand Up @@ -69,7 +70,10 @@ import Data.Accessor (Accessor, accessor, (^.), (^=), (^:))
import qualified Data.Accessor.Container as DAC (mapDefault, mapMaybe)
import System.Random (randomIO)
import Control.Distributed.Static (RemoteTable, Closure)
import qualified Control.Distributed.Static as Static (unclosure)
import qualified Control.Distributed.Static as Static
( unclosure
, initRemoteTable
)
import Control.Distributed.Process.Internal.Types
( NodeId(..)
, LocalProcessId(..)
Expand Down Expand Up @@ -118,7 +122,7 @@ import Control.Distributed.Process.Internal.Node
)
import Control.Distributed.Process.Internal.Primitives (expect, register, finally)
import qualified Control.Distributed.Process.Internal.Closure.BuiltIn as BuiltIn (remoteTable)
import qualified Control.Distributed.Static as Static (initRemoteTable)
import Control.DeepSeq (force)

--------------------------------------------------------------------------------
-- Initialization --
Expand Down Expand Up @@ -365,11 +369,11 @@ runNodeController =

data NCState = NCState
{ -- Mapping from remote processes to linked local processes
_links :: Map Identifier (Set ProcessId)
_links :: !(Map Identifier (Set ProcessId))
-- Mapping from remote processes to monitoring local processes
, _monitors :: Map Identifier (Set (ProcessId, MonitorRef))
, _monitors :: !(Map Identifier (Set (ProcessId, MonitorRef)))
-- Process registry
, _registry :: Map String ProcessId
, _registry :: !(Map String ProcessId)
}

newtype NC a = NC { unNC :: StateT NCState (ReaderT LocalNode IO) a }
Expand Down Expand Up @@ -438,8 +442,8 @@ ncEffectMonitor from them mRef = do
case (shouldLink, isLocal node (ProcessIdentifier from)) of
(True, _) -> -- [Unified: first rule]
case mRef of
Just ref -> modify $ monitorsFor them ^: Set.insert (from, ref)
Nothing -> modify $ linksFor them ^: Set.insert from
Just ref -> modify' $ monitorsFor them ^: Set.insert (from, ref)
Nothing -> modify' $ linksFor them ^: Set.insert from
(False, True) -> -- [Unified: second rule]
notifyDied from them DiedUnknownId mRef
(False, False) -> -- [Unified: third rule]
Expand All @@ -466,15 +470,15 @@ ncEffectUnlink from them = do
postAsMessage from $ DidUnlinkNode nid
SendPortIdentifier cid ->
postAsMessage from $ DidUnlinkPort cid
modify $ linksFor them ^: Set.delete from
modify' $ linksFor them ^: Set.delete from

-- [Unified: Table 11]
ncEffectUnmonitor :: ProcessId -> MonitorRef -> NC ()
ncEffectUnmonitor from ref = do
node <- ask
when (isLocal node (ProcessIdentifier from)) $
postAsMessage from $ DidUnmonitor ref
modify $ monitorsFor (monitorRefIdent ref) ^: Set.delete (from, ref)
modify' $ monitorsFor (monitorRefIdent ref) ^: Set.delete (from, ref)

-- [Unified: Table 12]
ncEffectDied :: Identifier -> DiedReason -> NC ()
Expand All @@ -495,7 +499,7 @@ ncEffectDied ident reason = do
when (localOnly <= isLocal node (ProcessIdentifier us)) $
notifyDied us them reason (Just ref)

modify $ (links ^= unaffectedLinks) . (monitors ^= unaffectedMons)
modify' $ (links ^= force unaffectedLinks) . (monitors ^= force unaffectedMons)

-- [Unified: Table 13]
ncEffectSpawn :: ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
Expand All @@ -517,7 +521,7 @@ ncEffectSpawn pid cProc ref = do
-- but mentions it's "very similar to nsend" (Table 14)
ncEffectRegister :: String -> Maybe ProcessId -> NC ()
ncEffectRegister label mPid =
modify $ registryFor label ^= mPid
modify' $ registryFor label ^= mPid
-- An acknowledgement is not necessary. If we want a synchronous register,
-- it suffices to send a whereis requiry immediately after the register
-- (that may not suffice if we do decide for unreliable messaging instead)
Expand Down Expand Up @@ -691,7 +695,7 @@ registryFor ident = registry >>> DAC.mapMaybe ident
splitNotif :: Identifier
-> Map Identifier a
-> (Map Identifier a, Map Identifier a)
splitNotif ident = Map.partitionWithKey (const . impliesDeathOf ident)
splitNotif ident = Map.partitionWithKey (const . impliesDeathOf ident)

-- | Does the death of one entity (node, project, channel) imply the death
-- of another?
Expand All @@ -712,3 +716,11 @@ SendPortIdentifier cid `impliesDeathOf` SendPortIdentifier cid' =
cid' == cid
_ `impliesDeathOf` _ =
False

--------------------------------------------------------------------------------
-- Strict evaluation of the state --
--------------------------------------------------------------------------------

-- | Modify and evaluate the state
modify' :: MonadState s m => (s -> s) -> m ()
modify' f = StateT.get >>= \s -> StateT.put $! f s

0 comments on commit 0a02c27

Please sign in to comment.