Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

First draft of timeout support

  • Loading branch information...
commit 8fb9d815fd660b05e953d74beb8174f38596d048 1 parent b4e785a
@gregorycollins authored tibbe committed
View
2  event.cabal
@@ -30,6 +30,8 @@ library
base < 4.1,
containers == 0.2.*,
ghc-prim,
+ old-locale,
+ time < 1.2,
unix
if os(darwin)
View
172 src/System/Event.hs
@@ -1,4 +1,7 @@
-{-# LANGUAGE CPP, ExistentialQuantification, ForeignFunctionInterface #-}
+{-# LANGUAGE BangPatterns #-}
+{-# LANGUAGE CPP #-}
+{-# LANGUAGE ExistentialQuantification #-}
+{-# LANGUAGE ForeignFunctionInterface #-}
module System.Event
( -- * Types
@@ -7,22 +10,39 @@ module System.Event
-- * Creation
new,
- -- * Registering interest in events
+ -- * Registering interest in I/O events
Event(..),
Callback,
- set,
+ setFD,
+
+ -- * Registering timeout callbacks
+ setTimeout,
+ updateTimeout,
+ clearTimeout,
-- * Event loop
loop
) where
-import Data.IntMap as IM
-import Data.IORef
-import Foreign.C.Types (CInt)
-
-import System.Event.Internal (Backend, Event(..))
+------------------------------------------------------------------------
+-- Imports
+
+import Control.Monad (sequence_)
+import Data.IntMap as IM
+import Data.IORef
+import Data.Maybe (maybe)
+import Data.Time.Clock ( NominalDiffTime
+ , UTCTime
+ , addUTCTime
+ , diffUTCTime
+ , getCurrentTime)
+import Data.Unique
+import System.Posix.Types (Fd(..))
+
+import System.Event.Internal (Backend, Event(..), Timeout(..))
import qualified System.Event.Internal as I
+import qualified System.Event.TimeoutTable as TT
#ifdef BACKEND_KQUEUE
import qualified System.Event.KQueue as KQueue
@@ -32,16 +52,26 @@ import qualified System.Event.EPoll as EPoll
# error not implemented for this operating system
#endif
+
------------------------------------------------------------------------
-- Types
-- | Vector of callbacks indexed by file descriptor.
type Callbacks = IntMap ([Event] -> IO ())
+-- FIXME: choose a quicker time representation than UTCTime? We'll be calling
+-- "getCurrentTime" a lot.
+type TimeRep = UTCTime
+type TimeoutKey = Unique
+type TimeoutCallback = IO ()
+type TimeoutTable = TT.TimeoutTable TimeRep TimeoutKey TimeoutCallback
+
-- | The event loop state.
data EventLoop = forall a. Backend a => EventLoop
- !a -- Backend
- (IORef Callbacks)
+ { _elBackend :: !a -- ^ Backend
+ , _elIOCallbacks :: !(IORef Callbacks) -- ^ I/O callbacks
+ , _elTimeoutTable :: !(IORef TimeoutTable) -- ^ Timeout table
+ }
------------------------------------------------------------------------
-- Creation
@@ -55,20 +85,59 @@ new = do
be <- EPoll.new
#endif
cbs <- newIORef empty
- return $ EventLoop be cbs
+ tms <- newIORef TT.empty
+ return $ EventLoop be cbs tms
------------------------------------------------------------------------
-- Event loop
-- | Start handling events. This function never returns.
loop :: EventLoop -> IO ()
-loop el = loop'
- where loop' = runOnce el >> loop'
+loop el@(EventLoop be _ tt) = do
+ now <- getCurrentTime
+ go now
+
+ where
+ --------------------------------------------------------------------------
+ go now = do
+ timeout <- mkTimeout now
+ reason <- I.poll be timeout ioCallback
+
+ now' <- getCurrentTime
+
+ case reason of
+ I.TimedOut -> timeoutCallback now'
+ _ -> return ()
+
+ go now'
+
+ --------------------------------------------------------------------------
+ inMs :: NominalDiffTime -> Maybe Timeout
+ inMs d =
+ if v <= 0 then Nothing else Just $ Timeout v
+ where
+ v = floor (1000 * d)
+
+ --------------------------------------------------------------------------
+ timeoutCallback = onTimeoutEvent el
+ ioCallback = onFdEvent el
+
+ --------------------------------------------------------------------------
+ mkTimeout now = do
+ tt' <- readIORef tt
+
+ let mbOldest = TT.findOldest tt'
+
+ -- if there are expired items in the timeout table then we need to run
+ -- the callback now; normally this would be handled within I.poll but
+ -- it could happen if e.g. one of the timeout callbacks took a long
+ -- time
+ maybe (return Forever)
+ (\(tm,_,_) -> maybe (timeoutCallback now >> mkTimeout now)
+ return
+ (inMs $ diffUTCTime tm now))
+ mbOldest
-runOnce :: EventLoop -> IO ()
-runOnce (EventLoop be cbs) = do
- cbs' <- readIORef cbs
- I.poll be (onFdEvent cbs')
------------------------------------------------------------------------
-- Registering interest in events
@@ -78,17 +147,74 @@ type Callback = [Event] -> IO ()
-- | @set el cb fd evs@ registers interest in the events @evs@ on the
-- file descriptor @fd@. @cb@ is called for each event that occurs.
-set :: EventLoop -> Callback -> CInt -> [Event] -> IO ()
-set (EventLoop be cbs) cb fd evs = do
- modifyIORef cbs (IM.insert (fromIntegral fd) cb)
- I.set be fd evs
+setFD :: EventLoop -> Callback -> Fd -> [Event] -> IO ()
+setFD (EventLoop be cbs _) cb fd evs = do
+ atomicModifyIORef cbs $ \c -> (IM.insert (fromIntegral fd) cb c, ())
+ I.set be (fromIntegral fd) evs
+ -- TODO: uncomment once wakeup is implemented in the backends
+
+ -- I.wakeup be
+
+
+------------------------------------------------------------------------
+-- Registering timeout callbacks
+
+setTimeout :: EventLoop -> Int -> TimeoutCallback -> IO TimeoutKey
+setTimeout (EventLoop _ _ tt) ms cb = do
+ now <- getCurrentTime
+ let expTime = addUTCTime (1000 * fromIntegral ms) now
+ key <- newUnique
+
+ atomicModifyIORef tt $ \tab -> (TT.insert expTime key cb tab, ())
+ -- I.wakeup be
+ return key
+
+
+clearTimeout :: EventLoop -> TimeoutKey -> IO ()
+clearTimeout (EventLoop _ _ tt) key = do
+ atomicModifyIORef tt $ \tab -> (TT.delete key tab, ())
+ -- I.wakeup be
+ return ()
+
+
+updateTimeout :: EventLoop -> TimeoutKey -> Int -> IO ()
+updateTimeout (EventLoop _ _ tt) key ms = do
+ now <- getCurrentTime
+ let expTime = addUTCTime (1000 * fromIntegral ms) now
+
+ atomicModifyIORef tt $ \tab -> (TT.update key expTime tab, ())
+ -- I.wakeup be
+ return ()
+
------------------------------------------------------------------------
-- Utilities
-- | Call the callback corresponding to the given file descriptor.
-onFdEvent :: Callbacks -> CInt -> [Event] -> IO ()
-onFdEvent cbs fd evs =
+onFdEvent :: EventLoop -> Fd -> [Event] -> IO ()
+onFdEvent (EventLoop _ cbs' _) fd evs = do
+ cbs <- readIORef cbs'
case IM.lookup (fromIntegral fd) cbs of
Just cb -> cb evs
Nothing -> return () -- TODO: error?
+
+
+onTimeoutEvent :: EventLoop -> TimeRep -> IO ()
+onTimeoutEvent (EventLoop _ _ tt) now = do
+ touts <- atomicModifyIORef tt grabExpired
+ sequence_ touts
+
+ where
+ grabExpired :: TimeoutTable -> (TimeoutTable, [TimeoutCallback])
+ grabExpired table = go [] table
+
+ go l table =
+ case TT.findOldest table of
+ Nothing -> (table,l)
+ Just (t,k,c) -> if expired t
+ then let !table' = TT.delete k table
+ in go (c:l) table'
+ else (table, l)
+
+ expired t = diffUTCTime now t >= 0
+
View
60 src/System/Event/EPoll.hsc
@@ -4,7 +4,7 @@ module System.Event.EPoll where
#include <sys/epoll.h>
-import Control.Monad (liftM2, when)
+import Control.Monad (liftM, liftM2, when)
import Data.Bits ((.|.))
import Foreign.C.Error (throwErrnoIfMinus1)
import Foreign.C.Types (CInt, CUInt)
@@ -12,21 +12,23 @@ import Foreign.Marshal.Error (void)
import Foreign.Marshal.Utils (with)
import Foreign.Ptr (Ptr)
import Foreign.Storable (Storable(..))
+import System.Posix.Types (Fd(..))
import qualified System.Event.Array as A
import qualified System.Event.Internal as E
+import System.Event.Internal (Timeout(..))
-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
newtype EPollFd = EPollFd
- { unEPollFd :: CInt
+ { unEPollFd :: Fd
}
-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
data Event = Event
{ eventTypes :: EventType
- , eventFd :: CInt
+ , eventFd :: Fd
}
instance Storable Event where
@@ -89,11 +91,11 @@ foreign import ccall unsafe "sys/epoll.h epoll_wait"
epollCreate :: IO EPollFd
epollCreate =
- EPollFd `fmap` throwErrnoIfMinus1 "epollCreate" (c_epoll_create size)
+ EPollFd `fmap` throwErrnoIfMinus1 "epollCreate" (liftM Fd $ c_epoll_create size)
where
-- From manpage EPOLL_CREATE(2): "Since Linux 2.6.8, the size argument is
- -- unused. (The kernel dynamically sizes the required data structures without
- -- needing this initial hint.)" We pass 256 because libev does.
+ -- unused. (The kernel dynamically sizes the required data structures
+ -- without needing this initial hint.)" We pass 256 because libev does.
size = 256 :: CInt
epollControl :: EPollFd -> ControlOp -> CInt -> Ptr Event -> IO ()
@@ -101,7 +103,11 @@ epollControl epfd op fd event =
void $
throwErrnoIfMinus1
"epollControl"
- (c_epoll_ctl (unEPollFd epfd) (unControlOp op) fd event)
+ (c_epoll_ctl
+ (fromIntegral $ unEPollFd epfd)
+ (unControlOp op)
+ (fromIntegral fd)
+ event)
epollWait :: EPollFd -> Ptr Event -> Int -> Int -> IO Int
epollWait epfd events maxNumEvents maxNumMilliseconds =
@@ -109,7 +115,7 @@ epollWait epfd events maxNumEvents maxNumMilliseconds =
throwErrnoIfMinus1
"epollWait"
(c_epoll_wait
- (unEPollFd epfd)
+ (fromIntegral $ unEPollFd epfd)
events
(fromIntegral maxNumEvents)
(fromIntegral maxNumMilliseconds)
@@ -123,37 +129,49 @@ data EPoll = EPoll
}
instance E.Backend EPoll where
- new = new
- set = set
- poll = poll
+ new = new
+ set = set
+ poll = poll
+ wakeup = wakeup
new :: IO EPoll
new = liftM2 EPoll epollCreate (A.new 64)
-set :: EPoll -> CInt -> [E.Event] -> IO ()
+set :: EPoll -> Fd -> [E.Event] -> IO ()
set ep fd events =
- with e $ epollControl (epollEpfd ep) controlOpAdd fd
+ with e $ epollControl (epollEpfd ep) controlOpAdd (fromIntegral fd)
where
e = Event ets fd
ets = combineEventTypes (map fromEvent events)
-poll :: EPoll -> (CInt -> [E.Event] -> IO ()) -> IO ()
-poll ep f = do
+poll :: EPoll -- ^ state
+ -> Timeout -- ^ timeout in milliseconds
+ -> (Fd -> [E.Event] -> IO ()) -- ^ I/O callback
+ -> IO E.Result
+poll ep timeout f = do
let epfd = epollEpfd ep
let events = epollEvents ep
n <- A.unsafeLoad events $ \es cap ->
- epollWait epfd es cap maxNumMilliseconds
+ epollWait epfd es cap $ fromTimeout timeout
- cap <- A.capacity events
- when (n == cap) $ A.ensureCapacity events (2 * cap)
+ if n == 0 then
+ return E.TimedOut
+ else do
+ cap <- A.capacity events
+ when (n == cap) $ A.ensureCapacity events (2 * cap)
- A.mapM_ events $ \e -> f (eventFd e) []
- where
- maxNumMilliseconds = 1000
+ A.mapM_ events $ \e -> f (eventFd e) []
+
+ return E.Activity
-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
fromEvent :: E.Event -> EventType
fromEvent E.Read = eventTypeReadyForRead
fromEvent E.Write = eventTypeReadyForWrite
+
+
+fromTimeout :: Timeout -> Int
+fromTimeout Forever = -1
+fromTimeout (Timeout ms) = fromIntegral ms
View
25 src/System/Event/Internal.hs
@@ -1,11 +1,19 @@
module System.Event.Internal where
import Foreign.C.Types (CInt)
+import System.Posix.Types (Fd)
-- | An I/O event.
data Event = Read -- ^ The file descriptor is ready to be read
| Write -- ^ The file descriptor is ready to be written to
+-- | A type alias for timeouts
+data Timeout = Timeout CInt
+ | Forever
+
+-- | Indicates whether poll returned because of activity or timeout
+data Result = Activity | TimedOut
+
-- | Event notification backend.
class Backend a where
-- | Create a new backend.
@@ -13,13 +21,20 @@ class Backend a where
-- | Poll backend for new events. The provided callback is called
-- once per file descriptor with new events.
- poll :: a
- -> (CInt -> [Event] -> IO ()) -- Callback
- -> IO ()
+ poll :: a -- ^ backend state
+ -> Timeout -- ^ timeout in milliseconds
+ -> (Fd -> [Event] -> IO ()) -- ^ I/O callback
+ -> IO Result
-- | Register interest in the given events on the given file
-- descriptor.
set :: a
- -> CInt -- File descriptor
- -> [Event] -- Events to watch for
+ -> Fd -- ^ file descriptor
+ -> [Event] -- ^ events to watch for
-> IO ()
+
+ -- | This should cause the underlying polling mechanism to unblock. An
+ -- | example of how to do this is provided by the GHC runtime system: when
+ -- | you create the Backend, create a pipe and register interest in the
+ -- | read end.
+ wakeup :: a -> IO ()
View
52 src/System/Event/KQueue.hsc
@@ -10,9 +10,10 @@ import Foreign.Ptr
import Foreign.Storable
import Foreign.Marshal.Alloc
import Prelude hiding (filter)
+import System.Posix.Types (Fd(..))
import qualified System.Event.Internal as E
-
+import System.Event.Internal (Timeout(..))
import qualified System.Event.Array as A
#include <sys/types.h>
@@ -126,7 +127,21 @@ kevent k chs chlen evs evlen ts
c_kevent k chs (fromIntegral chlen) evs (fromIntegral evlen) ts
withTimeSpec :: TimeSpec -> (Ptr TimeSpec -> IO a) -> IO a
-withTimeSpec ts f = alloca $ \ptr -> poke ptr ts >> f ptr
+withTimeSpec ts f =
+ if tv_sec ts < 0 then
+ f nullPtr
+ else
+ alloca $ \ptr -> poke ptr ts >> f ptr
+
+msToTimeSpec :: Timeout -> TimeSpec
+msToTimeSpec Forever = TimeSpec (-1) (-1)
+msToTimeSpec (Timeout ms) = TimeSpec (toEnum sec) (toEnum nanosec)
+ where
+ sec :: Int
+ sec = fromEnum $ ms `div` 1000
+
+ nanosec :: Int
+ nanosec = (fromEnum ms - 1000*sec) * 1000000
------------------------------------------------------------------------
-- Exported interface
@@ -141,6 +156,7 @@ instance E.Backend EventQueue where
new = new
poll = poll
set q fd evs = set q fd (combineFilters $ map fromEvent evs) flagAdd
+ wakeup = undefined
new :: IO EventQueue
new = do
@@ -149,30 +165,36 @@ new = do
events' <- A.new 64
return $ EventQueue kq' changes' events'
-set :: EventQueue -> CInt -> Filter -> Flag -> IO ()
+set :: EventQueue -> Fd -> Filter -> Flag -> IO ()
set q fd fltr flg =
A.snoc (changes q) (Event (fromIntegral fd) fltr flg 0 0 nullPtr)
-poll :: EventQueue -> (CInt -> [E.Event] -> IO ()) -> IO ()
-poll q f = do
+poll :: EventQueue
+ -> Timeout
+ -> (Fd -> [E.Event] -> IO ())
+ -> IO E.Result
+poll q tout f = do
changesLen <- A.length (changes q)
len <- A.length (events q)
- when (changesLen > len) $ do
- A.ensureCapacity (events q) (2 * changesLen)
+ when (changesLen > len) $ A.ensureCapacity (events q) (2 * changesLen)
res <- A.useAsPtr (changes q) $ \changesPtr chLen ->
A.useAsPtr (events q) $ \eventsPtr evLen ->
- withTimeSpec (TimeSpec 1 0) $ \tsPtr ->
+ withTimeSpec (msToTimeSpec tout) $ \tsPtr ->
kevent (kq q) changesPtr chLen eventsPtr evLen tsPtr
- when (res > 0) $ putStrLn "events!"
+ if res == 0 then
+ return E.TimedOut
+ else do
+ eventsLen <- A.length (events q)
+ when (res == eventsLen) $ do
+ A.ensureCapacity (events q) (2 * eventsLen)
- eventsLen <- A.length (events q)
- when (res == eventsLen) $ do
- A.ensureCapacity (events q) (2 * eventsLen)
+ A.mapM_ (events q) $ \e -> do
+ let fd = fromIntegral (ident e)
+ -- TODO: Send the list of events to the callback
+ f fd []
- A.mapM_ (events q) $ \e -> do
- let fd = fromIntegral (ident e)
- f fd []
+ return E.Activity
fromEvent :: E.Event -> Filter
fromEvent E.Read = filterRead
Please sign in to comment.
Something went wrong with that request. Please try again.