Permalink
Browse files

Merge branch 'master' of git://github.com/bos/event

Merge commit 'bos/master'
  • Loading branch information...
tibbe committed Jan 6, 2010
2 parents 9de2f0a + ac77966 commit 9fd7df526afbc8c74bf2b7f470ad73c8a98d234c
View
@@ -1,9 +1,12 @@
*~
+*#
+.#*
dist/*
.hpc/*
+.*.swp
*.hi
*.o
Setup.exe*
Setup
TAGS
-config.mk
+config.mk
View
@@ -0,0 +1,63 @@
+module Args
+ (
+ theLast
+ , ljust
+ , parseArgs
+ , positive
+ ) where
+
+import Data.Monoid (Monoid(..), Last(..), getLast)
+import System.Console.GetOpt (OptDescr, ArgOrder(Permute), getOpt, usageInfo)
+import System.Environment (getArgs, getProgName)
+import System.Exit (ExitCode(..), exitWith)
+import System.IO (hPutStrLn, stderr)
+
+-- | Deconstructor for 'Last' values.
+theLast :: (cfg -> Last a) -- ^ Field to access.
+ -> cfg
+ -> a
+theLast f cfg = case f cfg of
+ Last Nothing -> error "some horrible config sin has occurred"
+ Last (Just a) -> a
+
+-- | Parse command line options.
+parseArgs :: Monoid cfg => cfg -> [OptDescr (IO cfg)] -> [String]
+ -> IO (cfg, [String])
+parseArgs defCfg options args =
+ case getOpt Permute options args of
+ (_, _, (err:_)) -> parseError err
+ (opts, rest, _) -> do
+ cfg <- (mappend defCfg . mconcat) `fmap` sequence opts
+ return (cfg, rest)
+
+-- | Constructor for 'Last' values.
+ljust :: a -> Last a
+ljust = Last . Just
+
+-- | Parse a positive number.
+positive :: (Num a, Ord a, Read a) =>
+ String -> (Last a -> cfg) -> String -> IO cfg
+positive q f s =
+ case reads s of
+ [(n,"")] | n > 0 -> return . f $ ljust n
+ | otherwise -> parseError $ q ++ " must be positive"
+ _ -> parseError $ "invalid " ++ q ++ " provided"
+
+-- | Display an error message from a command line parsing failure, and
+-- exit.
+parseError :: String -> IO a
+parseError msg = do
+ progName <- getProgName
+ hPutStrLn stderr $ "Error: " ++ msg
+ hPutStrLn stderr $ "Run \"" ++ progName ++ " --help\" for usage information\n"
+ exitWith (ExitFailure 64)
+
+printUsage :: [OptDescr b] -> ExitCode -> IO a
+printUsage options exitCode = do
+ p <- getProgName
+ putStr (usageInfo ("Usage: " ++ p ++ " [OPTIONS] [ARGS]") options)
+ mapM_ putStrLn [
+ ""
+ , "hi mom!"
+ ]
+ exitWith exitCode
View
@@ -7,26 +7,110 @@
module Main where
-import Control.Monad
-import Data.Array.IArray
-import Data.Array.Unboxed
-import System.Posix.IO
-import System.Posix.Resource
-import System.Posix.Types
+import Args (ljust, parseArgs, positive, theLast)
+import Control.Concurrent (MVar, forkIO, takeMVar, newEmptyMVar, putMVar)
+import Control.Monad (forM_, replicateM, when)
+import Data.Array.Unboxed (UArray, listArray)
+import Data.Function (on)
+import Data.IORef (IORef, atomicModifyIORef, newIORef)
+import Data.Int (Int32)
+import Data.Monoid (Monoid(..), Last(..), getLast)
+import Foreign.C.Error (throwErrnoIfMinus1Retry)
+import Foreign.Marshal.Alloc (alloca)
+import Foreign.Ptr (Ptr)
+import Foreign.C.Types (CChar)
+import System.Console.GetOpt (ArgDescr(ReqArg), OptDescr(..))
+import System.Environment (getArgs)
+import System.Event (Event(..), evtRead, evtWrite, loop, new, registerFd)
+import System.Posix.IO (createPipe)
+import System.Posix.Resource (ResourceLimit(..), ResourceLimits(..),
+ Resource(..), setResourceLimit)
+import System.Posix.Internals (c_close, c_read, c_write)
+import System.Posix.Types (Fd(..))
-numPipes :: Int
-numPipes = 1024
+data Config = Config {
+ cfgNumPipes :: Last Int
+ }
+
+defaultConfig :: Config
+defaultConfig = Config {
+ cfgNumPipes = ljust 1024
+ }
+
+instance Monoid Config where
+ mempty = Config {
+ cfgNumPipes = mempty
+ }
+ mappend a b = Config {
+ cfgNumPipes = app cfgNumPipes a b
+ }
+ where app = on mappend
+
+defaultOptions :: [OptDescr (IO Config)]
+defaultOptions = [
+ Option ['n'] ["num-pipes"]
+ (ReqArg (positive "number of pipes" $ \n -> mempty { cfgNumPipes = n }) "N")
+ "number of pipes to use"
+ ]
+
+readCallback :: MVar () -> IORef Int -> Fd -> Event -> IO ()
+readCallback done ref fd _ = do
+ a <- atomicModifyIORef ref (\a -> let !b = a+1 in (b,b))
+ print ("read",fd,a)
+ if a > 10
+ then do
+ close fd
+ putMVar done ()
+ else do
+ readByte fd
+
+writeCallback :: IORef Int -> Fd -> Event -> IO ()
+writeCallback ref fd _ = do
+ a <- atomicModifyIORef ref (\a -> let !b = a+1 in (b,b))
+ print ("write",fd,a)
+ if a > 10
+ then close fd
+ else do
+ writeByte fd
main :: IO ()
main = do
- -- Increase the maximum number of file descriptors to fit the
- -- number of pipes.
- let lim = ResourceLimit $ fromIntegral numPipes * 2 + 50
+ (cfg, args) <- parseArgs defaultConfig defaultOptions =<< getArgs
+ let numPipes = theLast cfgNumPipes cfg
+ lim = ResourceLimit $ fromIntegral numPipes * 2 + 50
setResourceLimit ResourceOpenFiles
ResourceLimits { softLimit = lim, hardLimit = lim }
- -- Create the pipes.
- ps <- concatMap (\(Fd x, Fd y) -> [fromIntegral x, fromIntegral y]) `fmap`
- replicateM numPipes createPipe
- let pipes = listArray (0, numPipes) ps :: UArray Int Int
- return ()
+ pipePairs <- replicateM numPipes createPipe
+ print pipePairs
+ let pipes = concatMap (\(r,w) -> [r,w]) pipePairs
+
+ mgr <- new
+ forkIO $ loop mgr
+ rref <- newIORef 0
+ wref <- newIORef 0
+ done <- newEmptyMVar
+ forM_ pipePairs $ \(r,w) -> do
+ registerFd mgr (readCallback done rref) r evtRead
+ registerFd mgr (writeCallback wref) w evtWrite
+
+ let pipeArray :: UArray Int Int32
+ pipeArray = listArray (0, numPipes) . map fromIntegral $ pipes
+ takeMVar done
+
+readByte :: Fd -> IO ()
+readByte (Fd fd) =
+ alloca $ \p -> do
+ n <- throwErrnoIfMinus1Retry "readByte" $ c_read fd p 1
+ when (n /= 1) . error $ "readByte returned " ++ show n
+
+writeByte :: Fd -> IO ()
+writeByte (Fd fd) =
+ alloca $ \p -> do
+ n <- throwErrnoIfMinus1Retry "writeByte" $ c_write fd p 1
+ when (n /= 1) . error $ "writeByte returned " ++ show n
+
+close :: Fd -> IO ()
+close (Fd fd) = do
+ c_close fd
+ return ()
View
@@ -31,10 +31,9 @@ library
build-depends:
array,
- base < 4.1,
- containers == 0.2.*,
+ base < 5,
+ containers,
ghc-prim,
- old-locale,
time < 1.2,
unix
@@ -69,8 +68,8 @@ Executable test
build-depends:
array,
- base < 4.1,
- containers == 0.2.*,
+ base < 5,
+ containers,
ghc-prim,
HUnit < 1.3,
old-locale,
View
@@ -10,7 +10,9 @@ module System.Event
new,
-- * Registering interest in I/O events
- Event(..),
+ Event,
+ evtRead,
+ evtWrite,
IOCallback,
registerFd,
@@ -27,7 +29,7 @@ module System.Event
------------------------------------------------------------------------
-- Imports
-import Control.Monad (sequence_)
+import Control.Monad (liftM3, sequence_)
import Data.IntMap as IM
import Data.IORef
import Data.Maybe (maybe)
@@ -36,23 +38,23 @@ import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, diffUTCTime,
import Data.Unique
import System.Posix.Types (Fd)
-import System.Event.Internal (Backend, Event(..), Timeout(..))
+import System.Event.Internal (Backend, Event, evtRead, evtWrite, Timeout(..), wmWakeup)
import qualified System.Event.Internal as I
import qualified System.Event.TimeoutTable as TT
-#ifdef BACKEND_KQUEUE
-import qualified System.Event.KQueue as KQueue
-#elif BACKEND_EPOLL
-import qualified System.Event.EPoll as EPoll
+#if defined(BACKEND_KQUEUE)
+import qualified System.Event.KQueue as Backend
+#elif defined(BACKEND_EPOLL)
+import qualified System.Event.EPoll as Backend
#else
# error not implemented for this operating system
#endif
------------------------------------------------------------------------
-- Types
--- | Vector of I/O callbacks, indexed by file descriptor.
-type IOCallbacks = IntMap ([Event] -> IO ())
+-- | Callback invoked on I/O events.
+type IOCallback = Fd -> Event -> IO ()
-- FIXME: choose a quicker time representation than UTCTime? We'll be calling
-- "getCurrentTime" a lot.
@@ -64,7 +66,7 @@ type TimeoutTable = TT.TimeoutTable TimeRep TimeoutKey TimeoutCallback
-- | The event manager state.
data EventManager = forall a. Backend a => EventManager
{ _elBackend :: !a -- ^ Backend
- , _elIOCallbacks :: !(IORef IOCallbacks) -- ^ I/O callbacks
+ , _elIOCallbacks :: !(IORef (IntMap IOCallback)) -- ^ I/O callbacks
, _elTimeoutTable :: !(IORef TimeoutTable) -- ^ Timeout table
}
@@ -73,25 +75,14 @@ data EventManager = forall a. Backend a => EventManager
-- | Create a new event manager.
new :: IO EventManager
-new = do
-#ifdef BACKEND_KQUEUE
- be <- KQueue.new
-#elif BACKEND_EPOLL
- be <- EPoll.new
-#endif
- cbs <- newIORef empty
- tms <- newIORef TT.empty
- return $ EventManager be cbs tms
+new = liftM3 EventManager Backend.new (newIORef empty) (newIORef TT.empty)
------------------------------------------------------------------------
-- Event loop
-- | Start handling events. This function never returns.
loop :: EventManager -> IO ()
-loop mgr@(EventManager be _ tt) = do
- now <- getCurrentTime
- go now
-
+loop mgr@(EventManager be _ tt) = go =<< getCurrentTime
where
go now = do
timeout <- mkTimeout now
@@ -132,57 +123,50 @@ loop mgr@(EventManager be _ tt) = do
------------------------------------------------------------------------
-- Registering interest in I/O events
--- | Callback invoked on I/O events.
-type IOCallback = [Event] -> IO ()
-
-- | @registerFd mgr cb fd evs@ registers interest in the events @evs@
-- on the file descriptor @fd@. @cb@ is called for each event that
-- occurs.
-registerFd :: EventManager -> IOCallback -> Fd -> [Event] -> IO ()
+registerFd :: EventManager -> IOCallback -> Fd -> Event -> IO ()
registerFd (EventManager be cbs _) cb fd evs = do
atomicModifyIORef cbs $ \c -> (IM.insert (fromIntegral fd) cb c, ())
I.set be (fromIntegral fd) evs
- -- TODO: uncomment once wakeup is implemented in the backends
-
- -- I.wakeup be
+ I.wakeup be wmWakeup
------------------------------------------------------------------------
-- Registering interest in timeout events
registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
-registerTimeout (EventManager _ _ tt) ms cb = do
+registerTimeout (EventManager be _ tt) ms cb = do
now <- getCurrentTime
let expTime = addUTCTime (1000 * fromIntegral ms) now
key <- newUnique
atomicModifyIORef tt $ \tab -> (TT.insert expTime key cb tab, ())
- -- I.wakeup be
+ I.wakeup be wmWakeup
return key
clearTimeout :: EventManager -> TimeoutKey -> IO ()
-clearTimeout (EventManager _ _ tt) key = do
+clearTimeout (EventManager be _ tt) key = do
atomicModifyIORef tt $ \tab -> (TT.delete key tab, ())
- -- I.wakeup be
- return ()
+ I.wakeup be wmWakeup
updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
-updateTimeout (EventManager _ _ tt) key ms = do
+updateTimeout (EventManager be _ tt) key ms = do
now <- getCurrentTime
let expTime = addUTCTime (1000 * fromIntegral ms) now
atomicModifyIORef tt $ \tab -> (TT.update key expTime tab, ())
- -- I.wakeup be
- return ()
+ I.wakeup be wmWakeup
------------------------------------------------------------------------
-- Utilities
-- | Call the callback corresponding to the given file descriptor.
-onFdEvent :: EventManager -> Fd -> [Event] -> IO ()
+onFdEvent :: EventManager -> Fd -> Event -> IO ()
onFdEvent (EventManager _ cbs' _) fd evs = do
cbs <- readIORef cbs'
case IM.lookup (fromIntegral fd) cbs of
- Just cb -> cb evs
+ Just cb -> cb fd evs
Nothing -> return () -- TODO: error?
onTimeoutEvent :: EventManager -> TimeRep -> IO ()
Oops, something went wrong.

0 comments on commit 9fd7df5

Please sign in to comment.