Browse files

Merge branch 'debug-trace'

  • Loading branch information...
2 parents 2419547 + 628f496 commit 6a84b5986a36af869b74f6260df516412cb30f52 @hyperthunk hyperthunk committed Jan 26, 2013
View
3 distributed-process/distributed-process.cabal
@@ -61,6 +61,7 @@ Library
Control.Distributed.Process.Internal.Primitives,
Control.Distributed.Process.Internal.CQueue,
Control.Distributed.Process.Internal.Types,
+ Control.Distributed.Process.Internal.Trace,
Control.Distributed.Process.Internal.Closure.BuiltIn,
Control.Distributed.Process.Internal.Messaging,
Control.Distributed.Process.Internal.StrictList,
@@ -145,7 +146,7 @@ Test-Suite TestStats
ScopedTypeVariables,
DeriveDataTypeable,
GeneralizedNewtypeDeriving
- ghc-options: -Wall -threaded -rtsopts -with-rtsopts=-N -fno-warn-unused-do-bind
+ ghc-options: -Wall -debug -eventlog -threaded -rtsopts -with-rtsopts=-N -fno-warn-unused-do-bind
HS-Source-Dirs: tests
View
19 distributed-process/src/Control/Distributed/Process/Internal/Primitives.hs
@@ -83,6 +83,8 @@ module Control.Distributed.Process.Internal.Primitives
-- * Reconnecting
, reconnect
, reconnectPort
+ -- * Tracing/Debugging
+ , trace
) where
#if ! MIN_VERSION_base(4,6,0)
@@ -168,6 +170,7 @@ import Control.Distributed.Process.Internal.Messaging
, sendPayload
, disconnect
)
+import qualified Control.Distributed.Process.Internal.Trace as Trace
import Control.Distributed.Process.Internal.WeakTQueue
( newTQueueIO
, readTQueue
@@ -893,6 +896,22 @@ reconnectPort them = do
liftIO $ disconnect node (ProcessIdentifier us) (SendPortIdentifier (sendPortId them))
--------------------------------------------------------------------------------
+-- Debugging/Tracing --
+--------------------------------------------------------------------------------
+
+-- | Send a message to the internal (system) trace facility. If tracing is
+-- enabled, this will create a custom trace event in the event log. Specifically,
+-- the environment variable @DISTRIBUTED_PROCESS_TRACE_FILE@ is set to a valid
+-- path, then trace output will be written to that file. Otherwise, if the GHC
+-- eventlog is enabled (i.e., you've compiled with @-eventlog@ set) and the
+-- relevant @+RTS@ options given, then the message will be sent to the event log.
+-- If neither facility is in use then trace messages will be silently dropped.
+trace :: String -> Process ()
+trace s = do
+ node <- processNode <$> ask
+ liftIO $ Trace.trace (localTracer node) s
+
+--------------------------------------------------------------------------------
-- Auxiliary functions --
--------------------------------------------------------------------------------
View
122 distributed-process/src/Control/Distributed/Process/Internal/Trace.hs
@@ -0,0 +1,122 @@
+-- | Simple (internal) system logging/tracing support.
+module Control.Distributed.Process.Internal.Trace
+ ( Tracer
+ , TraceArg(..)
+ , trace
+ , traceFormat
+ , startTracing
+ , stopTracer
+ ) where
+
+import Control.Concurrent (forkIO)
+import Control.Concurrent.Chan (writeChan)
+import Control.Concurrent.STM
+ ( TQueue
+ , newTQueueIO
+ , readTQueue
+ , writeTQueue
+ , atomically
+ )
+import Control.Distributed.Process.Internal.Types
+ ( Tracer(..)
+ , LocalNode(..)
+ , NCMsg(..)
+ , Identifier(ProcessIdentifier)
+ , ProcessSignal(NamedSend)
+ , forever'
+ , nullProcessId
+ , createMessage
+ )
+import Control.Exception
+ ( catch
+ , throwTo
+ , SomeException
+ , AsyncException(ThreadKilled)
+ )
+import Data.List (intersperse)
+import Data.Time.Clock (getCurrentTime)
+import Data.Time.Format (formatTime)
+import Debug.Trace (traceEventIO)
+
+import Prelude hiding (catch)
+
+import System.Environment (getEnv)
+import System.IO
+ ( Handle
+ , IOMode(AppendMode)
+ , BufferMode(..)
+ , openFile
+ , hClose
+ , hPutStrLn
+ , hSetBuffering
+ )
+import System.Locale (defaultTimeLocale)
+
+data TraceArg =
+ TraceStr String
+ | forall a. (Show a) => Trace a
+
+startTracing :: LocalNode -> IO LocalNode
+startTracing node = do
+ tracer <- defaultTracer node
+ return node { localTracer = tracer }
+
+defaultTracer :: LocalNode -> IO Tracer
+defaultTracer node = do
+ catch (getEnv "DISTRIBUTED_PROCESS_TRACE_FILE" >>= logfileTracer)
+ (\(_ :: IOError) -> defaultTracerAux node)
+
+defaultTracerAux :: LocalNode -> IO Tracer
+defaultTracerAux node = do
+ catch (getEnv "DISTRIBUTED_PROCESS_TRACE_CONSOLE" >> procTracer node)
+ (\(_ :: IOError) -> return (EventLogTracer traceEventIO))
+ where procTracer :: LocalNode -> IO Tracer
+ procTracer n = return $ (LocalNodeTracer n)
+
+logfileTracer :: FilePath -> IO Tracer
+logfileTracer p = do
+ q <- newTQueueIO
+ h <- openFile p AppendMode
+ hSetBuffering h LineBuffering
+ tid <- forkIO $ logger h q `catch` (\(_ :: SomeException) ->
+ hClose h >> return ())
+ return $ LogFileTracer tid q h
+ where logger :: Handle -> TQueue String -> IO ()
+ logger h q' = forever' $ do
+ msg <- atomically $ readTQueue q'
+ now <- getCurrentTime
+ hPutStrLn h $ msg ++ (formatTime defaultTimeLocale " - %c" now)
+
+-- TODO: compatibility layer for GHC/base versions (e.g., where's killThread?)
+
+stopTracer :: Tracer -> IO () -- overzealous but harmless duplication of hClose
+stopTracer (LogFileTracer tid _ h) = throwTo tid ThreadKilled >> hClose h
+stopTracer _ = return ()
+
+trace :: Tracer -> String -> IO ()
+trace (LogFileTracer _ q _) msg = atomically $ writeTQueue q msg
+trace (LocalNodeTracer n) msg = sendTraceMsg n msg
+trace (EventLogTracer t) msg = t msg
+trace InactiveTracer _ = return ()
+
+traceFormat :: Tracer
+ -> String
+ -> [TraceArg]
+ -> IO ()
+traceFormat t d ls =
+ trace t $ concat (intersperse d (map toS ls))
+ where toS :: TraceArg -> String
+ toS (TraceStr s) = s
+ toS (Trace a) = show a
+
+sendTraceMsg :: LocalNode -> String -> IO ()
+sendTraceMsg node string = do
+ now <- getCurrentTime
+ msg <- return $ (formatTime defaultTimeLocale "%c" now, string)
+ emptyPid <- return $ (nullProcessId (localNodeId node))
+ traceMsg <- return $ NCMsg {
+ ctrlMsgSender = ProcessIdentifier (emptyPid)
+ , ctrlMsgSignal = (NamedSend "logger" (createMessage msg))
+ }
+ writeChan (localCtrlChan node) traceMsg
+
View
80 distributed-process/src/Control/Distributed/Process/Internal/Types.hs
@@ -14,6 +14,7 @@ module Control.Distributed.Process.Internal.Types
, nullProcessId
-- * Local nodes and processes
, LocalNode(..)
+ , Tracer(..)
, LocalNodeState(..)
, LocalProcess(..)
, LocalProcessState(..)
@@ -67,6 +68,8 @@ module Control.Distributed.Process.Internal.Types
, channelCounter
, typedChannels
, typedChannelWithId
+ -- * Utilities
+ , forever'
) where
import System.Mem.Weak (Weak)
@@ -88,6 +91,7 @@ import Control.Exception (Exception)
import Control.Concurrent (ThreadId)
import Control.Concurrent.Chan (Chan)
import Control.Concurrent.STM (STM)
+import qualified Control.Concurrent.STM as STM (TQueue)
import qualified Network.Transport as NT (EndPoint, EndPointAddress, Connection)
import Control.Applicative (Applicative, Alternative, (<$>), (<*>))
import Control.Monad.Reader (MonadReader(..), ReaderT, runReaderT)
@@ -106,6 +110,7 @@ import Control.Distributed.Process.Internal.StrictMVar (StrictMVar)
import Control.Distributed.Process.Internal.WeakTQueue (TQueue)
import Control.Distributed.Static (RemoteTable, Closure)
import qualified Control.Distributed.Process.Internal.StrictContainerAccessors as DAC (mapMaybe)
+import System.IO (Handle)
--------------------------------------------------------------------------------
-- Node and process identifiers --
@@ -175,19 +180,28 @@ nullProcessId nid =
-- Local nodes and processes --
--------------------------------------------------------------------------------
+-- | Required for system tracing in the node controller
+data Tracer =
+ LogFileTracer !ThreadId !(STM.TQueue String) !Handle
+ | EventLogTracer !(String -> IO ())
+ | LocalNodeTracer !LocalNode
+ | InactiveTracer -- NB: never used, this is required to initialize LocalNode
+
-- | Local nodes
data LocalNode = LocalNode
{ -- | 'NodeId' of the node
- localNodeId :: !NodeId
+ localNodeId :: !NodeId
-- | The network endpoint associated with this node
- , localEndPoint :: !NT.EndPoint
+ , localEndPoint :: !NT.EndPoint
-- | Local node state
- , localState :: !(StrictMVar LocalNodeState)
+ , localState :: !(StrictMVar LocalNodeState)
-- | Channel for the node controller
- , localCtrlChan :: !(Chan NCMsg)
+ , localCtrlChan :: !(Chan NCMsg)
+ -- | Current active system debug/trace log
+ , localTracer :: !Tracer
-- | Runtime lookup table for supporting closures
-- TODO: this should be part of the CH state, not the local endpoint state
- , remoteTable :: !RemoteTable
+ , remoteTable :: !RemoteTable
}
data ImplicitReconnect = WithImplicitReconnect | NoImplicitReconnect
@@ -432,7 +446,7 @@ data WhereIsReply = WhereIsReply String (Maybe ProcessId)
data RegisterReply = RegisterReply String Bool
deriving (Show, Typeable)
--- | Provide information about a running process
+-- | Provide information about a running process
data ProcessInfo = ProcessInfo {
infoNode :: NodeId
, infoRegisteredNames :: [String]
@@ -441,21 +455,9 @@ data ProcessInfo = ProcessInfo {
, infoLinks :: [ProcessId]
} deriving (Show, Eq, Typeable)
-instance Binary ProcessInfo where
- get = ProcessInfo <$> get <*> get <*> get <*> get <*> get
- put pInfo = put (infoNode pInfo)
- >> put (infoRegisteredNames pInfo)
- >> put (infoMessageQueueLength pInfo)
- >> put (infoMonitors pInfo)
- >> put (infoLinks pInfo)
-
data ProcessInfoNone = ProcessInfoNone DiedReason
deriving (Show, Typeable)
-instance Binary ProcessInfoNone where
- get = ProcessInfoNone <$> get
- put (ProcessInfoNone r) = put r
-
--------------------------------------------------------------------------------
-- Node controller internal data types --
--------------------------------------------------------------------------------
@@ -531,16 +533,16 @@ instance Binary ProcessSignal where
get = do
header <- getWord8
case header of
- 0 -> Link <$> get
- 1 -> Unlink <$> get
- 2 -> Monitor <$> get
- 3 -> Unmonitor <$> get
- 4 -> Died <$> get <*> get
- 5 -> Spawn <$> get <*> get
- 6 -> WhereIs <$> get
- 7 -> Register <$> get <*> get <*> get <*> get
- 8 -> NamedSend <$> get <*> (payloadToMessage <$> get)
- 9 -> Kill <$> get <*> get
+ 0 -> Link <$> get
+ 1 -> Unlink <$> get
+ 2 -> Monitor <$> get
+ 3 -> Unmonitor <$> get
+ 4 -> Died <$> get <*> get
+ 5 -> Spawn <$> get <*> get
+ 6 -> WhereIs <$> get
+ 7 -> Register <$> get <*> get <*> get <*> get
+ 8 -> NamedSend <$> get <*> (payloadToMessage <$> get)
+ 9 -> Kill <$> get <*> get
10 -> Exit <$> get <*> (payloadToMessage <$> get)
30 -> GetInfo <$> get
_ -> fail "ProcessSignal.get: invalid"
@@ -589,6 +591,18 @@ instance Binary RegisterReply where
put (RegisterReply label ok) = put label >> put ok
get = RegisterReply <$> get <*> get
+instance Binary ProcessInfo where
+ get = ProcessInfo <$> get <*> get <*> get <*> get <*> get
+ put pInfo = put (infoNode pInfo)
+ >> put (infoRegisteredNames pInfo)
+ >> put (infoMessageQueueLength pInfo)
+ >> put (infoMonitors pInfo)
+ >> put (infoLinks pInfo)
+
+instance Binary ProcessInfoNone where
+ get = ProcessInfoNone <$> get
+ put (ProcessInfoNone r) = put r
+
--------------------------------------------------------------------------------
-- Accessors --
--------------------------------------------------------------------------------
@@ -625,3 +639,13 @@ typedChannels = accessor _typedChannels (\cs st -> st { _typedChannels = cs })
typedChannelWithId :: LocalSendPortId -> Accessor LocalProcessState (Maybe TypedChannel)
typedChannelWithId cid = typedChannels >>> DAC.mapMaybe cid
+
+--------------------------------------------------------------------------------
+-- Utilities --
+--------------------------------------------------------------------------------
+
+-- Like 'Control.Monad.forever' but sans space leak
+{-# INLINE forever' #-}
+forever' :: Monad m => m a -> m b
+forever' a = let a' = a >> a' in a'
+
View
94 distributed-process/src/Control/Distributed/Process/Node.hs
@@ -62,7 +62,9 @@ import Control.Distributed.Process.Internal.StrictMVar
, takeMVar
)
import Control.Concurrent.Chan (newChan, writeChan, readChan)
-import Control.Concurrent.STM (atomically)
+import Control.Concurrent.STM
+ ( atomically
+ )
import Control.Distributed.Process.Internal.CQueue
( CQueue
, enqueue
@@ -97,6 +99,7 @@ import Control.Distributed.Process.Internal.Types
, LocalProcessId(..)
, ProcessId(..)
, LocalNode(..)
+ , Tracer(InactiveTracer)
, LocalNodeState(..)
, LocalProcess(..)
, LocalProcessState(..)
@@ -108,6 +111,7 @@ import Control.Distributed.Process.Internal.Types
, localPidUnique
, localProcessWithId
, localConnections
+ , forever'
, MonitorRef(..)
, ProcessMonitorNotification(..)
, NodeMonitorNotification(..)
@@ -139,6 +143,12 @@ import Control.Distributed.Process.Internal.Types
, firstNonReservedProcessId
, ImplicitReconnect(WithImplicitReconnect, NoImplicitReconnect)
)
+import Control.Distributed.Process.Internal.Trace
+ ( TraceArg(..)
+ , traceFormat
+ , startTracing
+ , stopTracer
+ )
import Control.Distributed.Process.Serializable (Serializable)
import Control.Distributed.Process.Internal.Messaging
( sendBinary
@@ -195,16 +205,13 @@ createBareLocalNode endPoint rtable = do
, localEndPoint = endPoint
, localState = state
, localCtrlChan = ctrlChan
+ , localTracer = InactiveTracer
, remoteTable = rtable
}
- void . forkIO $ runNodeController node
- void . forkIO $ handleIncomingMessages node
- return node
-
--- Like 'Control.Monad.forever' but sans space leak
-{-# INLINE forever' #-}
-forever' :: Monad m => m a -> m b
-forever' a = let a' = a >> a' in a'
+ tracedNode <- startTracing node
+ void . forkIO $ runNodeController tracedNode
+ void . forkIO $ handleIncomingMessages tracedNode
+ return tracedNode
-- | Start and register the service processes on a node
-- (for now, this is only the logger)
@@ -218,6 +225,10 @@ startServiceProcesses node = do
[ match $ \((time, pid, string) ::(String, ProcessId, String)) -> do
liftIO . hPutStrLn stderr $ time ++ " " ++ show pid ++ ": " ++ string
loop
+ , match $ \((time, string) :: (String, String)) -> do
+ -- this is a 'trace' message from the local node tracer
+ liftIO . hPutStrLn stderr $ time ++ " [trace] " ++ string
+ loop
, match $ \(ch :: SendPort ()) -> -- a shutdown request
sendChan ch ()
]
@@ -267,6 +278,7 @@ forkProcess node proc = modifyMVar (localState node) startProcess
reason <- Exception.catch
(runLocalProcess lproc proc >> return DiedNormal)
(return . DiedException . (show :: SomeException -> String))
+
-- [Unified: Table 4, rules termination and exiting]
modifyMVar_ (localState node) (cleanupProcess pid)
writeChan (localCtrlChan node) NCMsg
@@ -423,22 +435,27 @@ handleIncomingMessages node = go initConnectionState
$ st
)
NT.ErrorEvent (NT.TransportError NT.EventEndPointFailed str) ->
- fail $ "Cloud Haskell fatal error: end point failed: " ++ str
+ fatal $ "Cloud Haskell fatal error: end point failed: " ++ str
NT.ErrorEvent (NT.TransportError NT.EventTransportFailed str) ->
- fail $ "Cloud Haskell fatal error: transport failed: " ++ str
+ fatal $ "Cloud Haskell fatal error: transport failed: " ++ str
NT.EndPointClosed ->
- return ()
+ stopTracer (localTracer node) >> return ()
NT.ReceivedMulticast _ _ ->
-- If we received a multicast message, something went horribly wrong
-- and we just give up
- fail "Cloud Haskell fatal error: received unexpected multicast"
+ fatal "Cloud Haskell fatal error: received unexpected multicast"
+
+ fatal :: String -> IO ()
+ fatal msg = stopTracer (localTracer node) >> fail msg
invalidRequest :: NT.ConnectionId -> ConnectionState -> IO ()
- invalidRequest cid st =
+ invalidRequest cid st = do
-- TODO: We should treat this as a fatal error on the part of the remote
-- node. That is, we should report the remote node as having died, and we
-- should close incoming connections (this requires a Transport layer
-- extension).
+ traceEventFmtIO node "" [(TraceStr "[network] invalid request: "),
+ (Trace cid)]
go ( incomingAt cid ^= Nothing
$ st
)
@@ -491,6 +508,35 @@ instance Show ProcessKillException where
"killed-by=" ++ show pid ++ ",reason=" ++ reason
--------------------------------------------------------------------------------
+-- Tracing/Debugging --
+--------------------------------------------------------------------------------
+
+-- [Issue #104]
+
+traceNotifyDied :: LocalNode -> Identifier -> DiedReason -> NC ()
+traceNotifyDied node ident reason =
+ case reason of
+ DiedNormal -> return ()
+ _ -> traceNcEventFmt node " "
+ [(TraceStr "[node-controller]"),
+ (Trace ident),
+ (Trace reason)]
+
+traceNcEventFmt :: LocalNode -> String -> [TraceArg] -> NC ()
+traceNcEventFmt node fmt args =
+ liftIO $ traceEventFmtIO node fmt args
+
+traceEventFmtIO :: LocalNode
+ -> String
+ -> [TraceArg]
+ -> IO ()
+traceEventFmtIO node fmt args =
+ withLocalTracer node $ \t -> traceFormat t fmt args
+
+withLocalTracer :: LocalNode -> (Tracer -> IO ()) -> IO ()
+withLocalTracer node act = act (localTracer node)
+
+--------------------------------------------------------------------------------
-- Core functionality --
--------------------------------------------------------------------------------
@@ -597,6 +643,7 @@ ncEffectUnmonitor from ref = do
ncEffectDied :: Identifier -> DiedReason -> NC ()
ncEffectDied ident reason = do
node <- ask
+ traceNotifyDied node ident reason
(affectedLinks, unaffectedLinks) <- gets (splitNotif ident . (^. links))
(affectedMons, unaffectedMons) <- gets (splitNotif ident . (^. monitors))
@@ -639,7 +686,6 @@ ncEffectDied ident reason = do
, ctrlMsgSignal = Died ident reason
}
-
-- [Unified: Table 13]
ncEffectSpawn :: ProcessId -> Closure (Process ()) -> SpawnRef -> NC ()
ncEffectSpawn pid cProc ref = do
@@ -841,18 +887,18 @@ notifyDied dest src reason mRef = do
-- | [Unified: Table 8]
destNid :: ProcessSignal -> Maybe NodeId
-destNid (Link ident) = Just $ nodeOf ident
-destNid (Unlink ident) = Just $ nodeOf ident
-destNid (Monitor ref) = Just $ nodeOf (monitorRefIdent ref)
-destNid (Unmonitor ref) = Just $ nodeOf (monitorRefIdent ref)
-destNid (Spawn _ _) = Nothing
-destNid (Register _ _ _ _) = Nothing
-destNid (WhereIs _) = Nothing
-destNid (NamedSend _ _) = Nothing
+destNid (Link ident) = Just $ nodeOf ident
+destNid (Unlink ident) = Just $ nodeOf ident
+destNid (Monitor ref) = Just $ nodeOf (monitorRefIdent ref)
+destNid (Unmonitor ref) = Just $ nodeOf (monitorRefIdent ref)
+destNid (Spawn _ _) = Nothing
+destNid (Register _ _ _ _) = Nothing
+destNid (WhereIs _) = Nothing
+destNid (NamedSend _ _) = Nothing
-- We don't need to forward 'Died' signals; if monitoring/linking is setup,
-- then when a local process dies the monitoring/linking machinery will take
-- care of notifying remote nodes
-destNid (Died _ _) = Nothing
+destNid (Died _ _) = Nothing
destNid (Kill pid _) = Just $ processNodeId pid
destNid (Exit pid _) = Just $ processNodeId pid
destNid (GetInfo pid) = Just $ processNodeId pid
View
24 distributed-process/tests/TestStats.hs
@@ -2,28 +2,28 @@
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Main where
-import Prelude hiding (catch, log)
-import Test.Framework
- ( Test
- , defaultMain
- , testGroup
+import Control.Concurrent.MVar
+ ( MVar
+ , newEmptyMVar
+ , putMVar
+ , takeMVar
)
-import Network.Transport.TCP
import Control.Distributed.Process
import Control.Distributed.Process.Node
( forkProcess
, newLocalNode
, initRemoteTable
, closeLocalNode
, LocalNode)
-import Control.Concurrent.MVar
- ( MVar
- , newEmptyMVar
- , putMVar
- , takeMVar
- )
import Data.Binary()
import Data.Typeable()
+import Network.Transport.TCP
+import Prelude hiding (catch, log)
+import Test.Framework
+ ( Test
+ , defaultMain
+ , testGroup
+ )
import Test.HUnit (Assertion)
import Test.HUnit.Base (assertBool)
import Test.Framework.Providers.HUnit (testCase)

0 comments on commit 6a84b59

Please sign in to comment.