Skip to content

Commit

Permalink
Eliminate Sync bug. Log Improve. Rewrite.
Browse files Browse the repository at this point in the history
The rewrite of PeerMgrP had forgotten to synchronize on some events.
Thus the ChokeMgrP was never informed about new peers.

While here, move around and improve logging, and rewrite the Console
Process to be monad transformed.
  • Loading branch information
jlouis committed Feb 2, 2010
1 parent cab198e commit 7eaa87e
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 162 deletions.
10 changes: 3 additions & 7 deletions src/ChokeMgrP.hs
Expand Up @@ -72,6 +72,7 @@ start logC ch infoC ur weSeed supC = do
Tick -> tick
RemovePeer t -> removePeer t
AddPeer t pCh -> do
logDebug $ "Adding peer " ++ show t
weSeed <- gets seeding
addPeer' pCh weSeed t)
infoEvent = do
Expand All @@ -88,13 +89,8 @@ start logC ch infoC ur weSeed supC = do
TimerP.register 10 Tick ch
updateDB
runRechokeRound
removePeer tid = modify (\db -> db { peerMap = M.delete tid (peerMap db) })

addPeer :: ChokeMgrChannel -> PeerPid -> PeerChannel -> IO ()
addPeer ch pid = sync . transmit ch . (AddPeer pid)

removePeer :: ChokeMgrChannel -> PeerPid -> IO ()
removePeer ch pid = sync $ transmit ch $ RemovePeer pid
removePeer tid = do logDebug $ "Removing peer " ++ show tid
modify (\db -> db { peerMap = M.delete tid (peerMap db) })

-- INTERNAL FUNCTIONS
----------------------------------------------------------------------
Expand Down
28 changes: 17 additions & 11 deletions src/ConsoleP.hs
Expand Up @@ -36,8 +36,11 @@ where
import Control.Concurrent
import Control.Concurrent.CML
import Control.Exception
import Control.Monad
import Control.Monad.Reader

import Prelude hiding (catch)
import Process

import Logging
import Supervisor
Expand All @@ -47,23 +50,26 @@ data Cmd = Quit -- Quit the program

type CmdChannel = Channel Cmd

data CF = CF { cmdCh :: CmdChannel
, logCh :: LogChannel }

instance Logging CF where
getLogger cf = ("ConsoleP", logCh cf)

-- | Start the logging process and return a channel to it. Sending on this
-- Channel means writing stuff out on stdOut
start :: LogChannel -> Channel () -> SupervisorChan -> IO ThreadId
start logC waitC supC = do
cmdC <- readerP logC -- We shouldn't be doing this in the long run
spawn $ startup cmdC
where startup cmdC = logger cmdC `catches`
[Handler (\ThreadKilled -> return ()),
Handler (\(ex :: SomeException) ->
putStrLn $ "Unknown Message to the consoleP: " ++ show ex)]
logger cmdC = (sync $ choose [logEvent, quitEvent cmdC]) >> logger cmdC
logEvent = wrap (receive logC (const True))
print
quitEvent cmdC = wrap (receive cmdC (==Quit))
(\_ -> sync $ transmit waitC ())

spawnP (CF cmdC logC) () (catchP (forever lp) (defaultStopHandler supC))
where
lp = syncP =<< quitEvent
quitEvent = do
ch <- asks cmdCh
ev <- recvP ch (==Quit)
wrapP ev
(\_ -> syncP =<< sendP waitC ())


readerP :: LogChannel -> IO CmdChannel
readerP logCh = do cmdCh <- channel
Expand Down
84 changes: 11 additions & 73 deletions src/Logging.hs
Expand Up @@ -6,94 +6,32 @@ module Logging (
-- * Types
, LogChannel
, LogPriority(..)
-- * Interface
, logInfo
, logDebug
, logWarn
, logFatal
, logError
-- * Spawning
, startLogger
-- * Interface (deprecated)
, logMsg
, logMsg'
)
where

import Control.Concurrent
import Control.Concurrent.CML
import Control.Monad.Reader

import Data.Monoid

import LoggingTypes
import Prelude hiding (log)

import Process


data LogPriority = Debug -- ^ Fine grained debug info
| Warn -- ^ Potentially harmful situations
| Info -- ^ Informational messages, progress reports
| Error -- ^ Errors that are continuable
| Fatal -- ^ Severe errors. Will probably make the application abort
| None -- ^ No logging at all
deriving (Show, Eq, Ord)


-- Logging filters
type LogFilter = String -> LogPriority

matchP :: String -> LogPriority -> LogFilter
matchP process prio = \s -> if s == process then prio else None

matchAny :: LogPriority -> LogFilter
matchAny prio = const prio

matchNone :: LogFilter
matchNone = const None

instance Monoid LogFilter where
mempty = const None
mappend f g = \x ->
let fx = f x
in if fx /= None then fx else g x

-- | The level by which we log
logLevel :: LogFilter
#ifdef DEBUG
logLevel = matchAny Debug
#else
logLevel = matchAny Info
#endif


-- | The class of types where we have a logger inside them somewhere
class Logging a where
-- | Returns a channel for logging and an Identifying string to use
getLogger :: a -> (String, LogChannel)

instance Logging LogChannel where
getLogger ch = ("Unknown", ch)


-- TODO: Consider generalizing this to any member of Show
data LogMsg = Mes LogPriority String String

instance Show LogMsg where
show (Mes pri name str) = show name ++ "(" ++ show pri ++ "):\t" ++ str

type LogChannel = Channel LogMsg

-- | If a process has access to a logging channel, it is able to log messages to the world
log :: Logging a => LogPriority -> String -> Process a b ()
log prio msg = do
(name, logC) <- asks getLogger
when (prio >= logLevel name)
(liftIO $ logMsg' logC name prio msg)

logInfo, logDebug, logFatal, logWarn, logError :: Logging a => String -> Process a b ()
logInfo = log Info
logDebug = log Debug
logFatal = log Fatal
logWarn = log Warn
logError = log Error
startLogger :: LogChannel -> IO ThreadId
startLogger logC =
spawnP logC () (forever lp)
where
lp = do m <- syncP =<< logEv
liftIO $ print m
logEv = recvP logC (const True)

-- | Log a message to a channel
logMsg :: LogChannel -> String -> IO ()
Expand Down
11 changes: 7 additions & 4 deletions src/Main.hs
Expand Up @@ -21,6 +21,7 @@ import qualified PeerMgrP
import qualified PieceMgrP (start, createPieceDb)
import qualified Process ()
import qualified ChokeMgrP (start)
import Logging
import qualified StatusP
import Supervisor
import qualified TimerP()
Expand Down Expand Up @@ -48,13 +49,14 @@ download name = do
Right bc ->
do print bc
(h, haveMap, pieceMap) <- openAndCheckFile bc
logC <- channel
Logging.startLogger logC
-- setup channels
trackerC <- channel
statusC <- channel
waitC <- channel
pieceMgrC <- channel
supC <- channel
logC <- channel
fspC <- channel
statInC <- channel
pmC <- channel
Expand All @@ -68,7 +70,8 @@ download name = do
left = bytesLeft haveMap pieceMap
clientState = determineState haveMap
-- Create main supervisor process
allForOne [ Worker $ ConsoleP.start logC waitC
allForOne "MainSup"
[ Worker $ ConsoleP.start logC waitC
, Worker $ FSP.start h logC pieceMap fspC
, Worker $ PeerMgrP.start pmC pid (infoHash ti)
pieceMap pieceMgrC fspC logC chokeC statInC (pieceCount ti)
Expand All @@ -77,10 +80,10 @@ download name = do
, Worker $ StatusP.start logC left clientState statusC statInC
, Worker $ TrackerP.start ti pid defaultPort logC statusC statInC
trackerC pmC
, Worker $ ChokeMgrP.start logC chokeC chokeInfoC 100 -- 100 is upload rate in Kilobytes
, Worker $ ChokeMgrP.start logC chokeC chokeInfoC 100 -- 100 is upload rate in KB
(case clientState of
Seeding -> True
Leeching -> False)
] supC
] logC supC
sync $ receive waitC (const True)
return ()
13 changes: 7 additions & 6 deletions src/PeerMgrP.hs
Expand Up @@ -56,7 +56,7 @@ start :: Channel [Peer] -> PeerId -> InfoHash -> PieceMap -> PieceMgrChannel ->
start ch pid ih pm pieceMgrC fsC logC chokeMgrC statC nPieces supC =
do mgrC <- channel
fakeChan <- channel
pool <- liftM snd $ oneForOne [] fakeChan
pool <- liftM snd $ oneForOne "PeerPool" [] logC fakeChan
spawnP (CF ch pieceMgrC mgrC fsC pool chokeMgrC logC)
(ST [] M.empty pid ih) (catchP (forever lp)
(defaultStopHandler supC))
Expand All @@ -70,14 +70,15 @@ start ch pid ih pm pieceMgrC fsC logC chokeMgrC statC nPieces supC =
modify (\s -> s { peersInQueue = ps ++ peersInQueue s }))
peerEvent = do
ev <- recvPC mgrCh
wrapP ev (\msg -> case msg of
Connect tid c -> newPeer tid c
Disconnect tid -> removePeer tid)
wrapP ev (\msg -> do
case msg of
Connect tid c -> newPeer tid c
Disconnect tid -> removePeer tid)
newPeer tid c = do logDebug $ "Adding new peer " ++ show tid
sendPC chokeMgrCh (AddPeer tid c)
sendPC chokeMgrCh (AddPeer tid c) >>= syncP
modify (\s -> s { peers = M.insert tid c (peers s)})
removePeer tid = do logDebug $ "Removing peer " ++ show tid
sendPC chokeMgrCh (RemovePeer tid)
sendPC chokeMgrCh (RemovePeer tid) >>= syncP
modify (\s -> s { peers = M.delete tid (peers s)})
numPeers = 40
fillPeers = do
Expand Down
2 changes: 1 addition & 1 deletion src/PeerP.hs
Expand Up @@ -84,7 +84,7 @@ connect (host, port, pid, ih, pm) pool pieceMgrC fsC logC statC mgrC nPieces =
do logMsg logC "entering peerP loop code"
supC <- channel -- TODO: Should be linked later on
children <- peerChildren logC h mgrC pieceMgrC fsC statC pm nPieces
sync $ transmit pool $ SpawnNew (Supervisor $ allForOne children)
sync $ transmit pool $ SpawnNew (Supervisor $ allForOne "PeerSup" children logC)
return ()

-- INTERNAL FUNCTIONS
Expand Down
80 changes: 69 additions & 11 deletions src/Process.hs
Expand Up @@ -20,13 +20,19 @@ module Process (
, sendPC
, recvP
, recvPC
, recvWrapPC
, wrapP
, stopP
-- * Helpers
, defaultStopHandler
-- * Interface
, logInfo
, logDebug
, logWarn
, logFatal
, logError
)
where

import Data.Monoid
import Control.Applicative

import Control.Concurrent
Expand All @@ -39,11 +45,12 @@ import Control.Monad.State

import Data.Typeable

import LoggingTypes
import Prelude hiding (catch, log)

import System.IO

import Supervisor
-- import Supervisor


-- | A @Process a b c@ is the type of processes with access to configuration data @a@, state @b@
Expand Down Expand Up @@ -75,23 +82,26 @@ spawnP c st p = spawn proc

-- | Run the process monad for its side effect, with a stopHandler if exceptions
-- are raised in the process
catchP :: Process a b () -> Process a b () -> Process a b ()
catchP :: Logging a => Process a b () -> Process a b () -> Process a b ()
catchP proc stopH = cleanupP proc stopH (return ())

-- | Run the process monad for its side effect. @cleanupP p sh ch@ describes to
-- run @p@. If @p@ dies by a kill from a supervisor, run @ch@. Otherwise it runs
-- @ch >> sh@ on death.
cleanupP :: Process a b () -> Process a b () -> Process a b () -> Process a b ()
cleanupP :: Logging a => Process a b () -> Process a b () -> Process a b () -> Process a b ()
cleanupP proc stopH cleanupH = do
st <- get
c <- ask
(a, s') <- liftIO $ runP c st proc `catches`
[ Handler (\ThreadKilled -> do
do runP c st cleanupH)
runP c st ( do logDebug $ "Process Terminating by Supervisor"
cleanupH ))
, Handler (\StopException ->
do runP c st (cleanupH >> stopH)) -- This one is ok
runP c st (do logDebug $ "Process Terminating gracefully"
cleanupH >> stopH)) -- This one is ok
, Handler (\(ex :: SomeException) ->
do hPrint stderr ex; runP c st (cleanupH >> stopH))
runP c st (do logFatal $ "Process exiting due to ex: " ++ show ex
cleanupH >> stopH))
]
put s'
return a
Expand Down Expand Up @@ -128,10 +138,58 @@ wrapP ev p = do
c <- ask
return $ wrap ev (\(v, s) -> runP c s (p v))

-- Convenience function
recvWrapPC :: (a -> Channel c) -> (c -> Process a b y) -> Process a b (Event (y, b))
recvWrapPC sel p = do
ev <- recvPC sel
wrapP ev p

chooseP :: [Process a b (Event (c, b))] -> Process a b (Event (c, b))
chooseP events = (sequence events) >>= (return . choose)

defaultStopHandler supC = do
t <- liftIO $ myThreadId
syncP =<< (sendP supC $ IAmDying t)
------ LOGGING




-- | If a process has access to a logging channel, it is able to log messages to the world
log :: Logging a => LogPriority -> String -> Process a b ()
log prio msg = do
(name, logC) <- asks getLogger
when (prio >= logLevel name)
(liftIO $ logMsg' logC name prio msg)
where logMsg' c name pri = sync . transmit c . Mes pri name

logInfo, logDebug, logFatal, logWarn, logError :: Logging a => String -> Process a b ()
logInfo = log Info

-- Logging filters
type LogFilter = String -> LogPriority

matchP :: String -> LogPriority -> LogFilter
matchP process prio = \s -> if s == process then prio else None

matchAny :: LogPriority -> LogFilter
matchAny prio = const prio

matchNone :: LogFilter
matchNone = const None

instance Monoid LogFilter where
mempty = const None
mappend f g = \x ->
let fx = f x
in if fx /= None then fx else g x

-- | The level by which we log
logLevel :: LogFilter
#ifdef DEBUG
logLevel = matchAny Debug
#else
logLevel = matchAny Info
#endif

logDebug = log Debug
logFatal = log Fatal
logWarn = log Warn
logError = log Error

0 comments on commit 7eaa87e

Please sign in to comment.