Skip to content
Browse files

Implement the wakeup machinery.

This is based on a pipe mechanism.

The IO manager does not yet do anything with a wakeup message. The setup of the wakeup
fd is probably happening in the wrong place to make that work well.
  • Loading branch information...
1 parent e3d3daf commit bd5d9ea338ea0f62f1b7def764b685c2a941ed64 @bos bos committed
Showing with 108 additions and 30 deletions.
  1. +8 −12 src/System/Event.hs
  2. +10 −3 src/System/Event/EPoll.hsc
  3. +79 −7 src/System/Event/Internal.hsc
  4. +11 −8 src/System/Event/KQueue.hsc
View
20 src/System/Event.hs
@@ -36,7 +36,7 @@ import Data.Time.Clock (NominalDiffTime, UTCTime, addUTCTime, diffUTCTime,
import Data.Unique
import System.Posix.Types (Fd)
-import System.Event.Internal (Backend, Event(..), Timeout(..))
+import System.Event.Internal (Backend, Event(..), Timeout(..), wmWakeup)
import qualified System.Event.Internal as I
import qualified System.Event.TimeoutTable as TT
@@ -138,37 +138,33 @@ registerFd :: EventManager -> IOCallback -> Fd -> [Event] -> IO ()
registerFd (EventManager be cbs _) cb fd evs = do
atomicModifyIORef cbs $ \c -> (IM.insert (fromIntegral fd) cb c, ())
I.set be (fromIntegral fd) evs
- -- TODO: uncomment once wakeup is implemented in the backends
-
- -- I.wakeup be
+ I.wakeup be wmWakeup
------------------------------------------------------------------------
-- Registering interest in timeout events
registerTimeout :: EventManager -> Int -> TimeoutCallback -> IO TimeoutKey
-registerTimeout (EventManager _ _ tt) ms cb = do
+registerTimeout (EventManager be _ tt) ms cb = do
now <- getCurrentTime
let expTime = addUTCTime (1000 * fromIntegral ms) now
key <- newUnique
atomicModifyIORef tt $ \tab -> (TT.insert expTime key cb tab, ())
- -- I.wakeup be
+ I.wakeup be wmWakeup
return key
clearTimeout :: EventManager -> TimeoutKey -> IO ()
-clearTimeout (EventManager _ _ tt) key = do
+clearTimeout (EventManager be _ tt) key = do
atomicModifyIORef tt $ \tab -> (TT.delete key tab, ())
- -- I.wakeup be
- return ()
+ I.wakeup be wmWakeup
updateTimeout :: EventManager -> TimeoutKey -> Int -> IO ()
-updateTimeout (EventManager _ _ tt) key ms = do
+updateTimeout (EventManager be _ tt) key ms = do
now <- getCurrentTime
let expTime = addUTCTime (1000 * fromIntegral ms) now
atomicModifyIORef tt $ \tab -> (TT.update key expTime tab, ())
- -- I.wakeup be
- return ()
+ I.wakeup be wmWakeup
------------------------------------------------------------------------
-- Utilities
View
13 src/System/Event/EPoll.hsc
@@ -4,7 +4,7 @@ module System.Event.EPoll where
#include <sys/epoll.h>
-import Control.Monad (liftM, liftM2, when)
+import Control.Monad (liftM, liftM3, when)
import Data.Bits ((.|.))
import Foreign.C.Error (throwErrnoIfMinus1)
import Foreign.C.Types (CInt, CUInt)
@@ -126,16 +126,20 @@ epollWait epfd events maxNumEvents maxNumMilliseconds =
data EPoll = EPoll
{ epollEpfd :: !EPollFd
, epollEvents :: !(A.Array Event)
+ , epollWakeup :: !E.Wakeup
}
instance E.Backend EPoll where
new = new
set = set
poll = poll
- wakeup = undefined
+ wakeup = wakeup
new :: IO EPoll
-new = liftM2 EPoll epollCreate (A.new 64)
+new = do
+ ep <- liftM3 EPoll epollCreate (A.new 64) E.createWakeup
+ set ep (E.wakeupReadFd . epollWakeup $ ep) [E.Read]
+ return ep
set :: EPoll -> Fd -> [E.Event] -> IO ()
set ep fd events =
@@ -165,6 +169,9 @@ poll ep timeout f = do
return E.Activity
+wakeup :: EPoll -> E.WakeupMessage -> IO ()
+wakeup ep = E.writeWakeupMessage (epollWakeup ep)
+
-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --
fromEvent :: E.Event -> EventType
View
86 src/System/Event/Internal.hsc
@@ -1,6 +1,32 @@
-module System.Event.Internal where
+module System.Event.Internal
+ (
+ -- * Core types
+ Backend(..)
+ , Event(..)
+ , Result(..)
+ , Timeout(..)
+ -- * Managing the IO manager
+ , WakeupMessage
+ , Wakeup
+ , createWakeup
+ , closeWakeup
+ , readWakeupMessage
+ , writeWakeupMessage
+ , wakeupReadFd
+ , wakeupWriteFd
+ -- ** Wakeup message types
+ , wmWakeup
+ , wmDie
+ ) where
-import Foreign.C.Types (CInt)
+import Control.Monad (liftM)
+import Foreign.C.Error (throwErrnoIfMinus1_)
+import Foreign.C.Types (CChar, CInt)
+import Foreign.Marshal (alloca)
+import Foreign.Marshal.Array (allocaArray)
+import Foreign.Storable (peek, peekElemOff, poke)
+import System.Posix.Internals (c_close, c_pipe, c_read, c_write,
+ setCloseOnExec, setNonBlockingFD)
import System.Posix.Types (Fd)
-- | An I/O event.
@@ -14,6 +40,20 @@ data Timeout = Timeout CInt
-- | Indicates whether poll returned because of activity or timeout
data Result = Activity | TimedOut
+newtype WakeupMessage = WM CChar
+
+wmWakeup :: WakeupMessage
+wmWakeup = WM 1
+
+wmDie :: WakeupMessage
+wmDie = WM 2
+
+-- | The structure used to tell the IO manager thread what to do.
+data Wakeup = W {
+ wakeupReadFd :: {-# UNPACK #-} !Fd
+ , wakeupWriteFd :: {-# UNPACK #-} !Fd
+ }
+
-- | Event notification backend.
class Backend a where
-- | Create a new backend.
@@ -33,8 +73,40 @@ class Backend a where
-> [Event] -- ^ events to watch for
-> IO ()
- -- | This should cause the underlying polling mechanism to unblock. An
- -- | example of how to do this is provided by the GHC runtime system: when
- -- | you create the Backend, create a pipe and register interest in the
- -- | read end.
- wakeup :: a -> IO ()
+ -- | This should cause the underlying polling mechanism to
+ -- unblock. See 'createWakeup' and friends for a possible
+ -- implementation.
+ wakeup :: a -> WakeupMessage -> IO ()
+
+-- | Create the structure (usually a pipe) used for waking up the IO
+-- manager thread from another thread.
+createWakeup :: IO Wakeup
+createWakeup = allocaArray 2 $ \fds -> do
+ throwErrnoIfMinus1_ "createWakeupFds" $ c_pipe fds
+ rd_end <- peekElemOff fds 0
+ wr_end <- peekElemOff fds 1
+#if __GLASGOW_HASKELL__ >= 611
+ setNonBlockingFD wr_end True
+#else
+ setNonBlockingFD wr_end
+#endif
+ setCloseOnExec rd_end
+ setCloseOnExec wr_end
+ return W { wakeupReadFd = fromIntegral rd_end
+ , wakeupWriteFd = fromIntegral wr_end }
+
+-- | Close the wakeup structure used by the IO manager thread.
+closeWakeup :: Wakeup -> IO ()
+closeWakeup (W rd_end wr_end) = do
+ throwErrnoIfMinus1_ "closeWakeupFds" $ c_close (fromIntegral rd_end)
+ throwErrnoIfMinus1_ "closeWakeupFds" $ c_close (fromIntegral wr_end)
+
+readWakeupMessage :: Wakeup -> IO WakeupMessage
+readWakeupMessage (W fd _) = alloca $ \p -> do
+ throwErrnoIfMinus1_ "readWakeupMessage" $ c_read (fromIntegral fd) p 1
+ WM `liftM` peek p
+
+writeWakeupMessage :: Wakeup -> WakeupMessage -> IO ()
+writeWakeupMessage (W _ fd) (WM m) = alloca $ \p -> do
+ poke p m
+ throwErrnoIfMinus1_ "writeWakeupMessage" $ c_write (fromIntegral fd) p 1
View
19 src/System/Event/KQueue.hsc
@@ -149,23 +149,23 @@ msToTimeSpec (Timeout ms) = TimeSpec (toEnum sec) (toEnum nanosec)
-- Exported interface
data EventQueue = EventQueue {
- kq :: !EventQ
- , changes :: !(A.Array Event)
- , events :: !(A.Array Event)
+ kq :: !EventQ
+ , changes :: !(A.Array Event)
+ , events :: !(A.Array Event)
+ , eqWakeup :: !E.Wakeup
}
instance E.Backend EventQueue where
new = new
poll = poll
set q fd evs = set q fd (combineFilters $ map fromEvent evs) flagAdd
- wakeup = undefined
+ wakeup = wakeup
new :: IO EventQueue
new = do
- kq' <- kqueue
- changes' <- A.empty
- events' <- A.new 64
- return $ EventQueue kq' changes' events'
+ eq <- liftM4 EventQueue kqueue A.empty (A.new 64) E.createWakeup
+ set eq (E.wakeupReadFd . eqWakeup $ eq) [E.Read]
+ return eq
set :: EventQueue -> Fd -> Filter -> Flag -> IO ()
set q fd fltr flg =
@@ -201,6 +201,9 @@ poll q tout f = do
return E.Activity
+wakeup :: EventQueue -> E.WakeupMessage -> IO ()
+wakeup ep = E.writeWakeupMessage (eqWakeup ep)
+
fromEvent :: E.Event -> Filter
fromEvent E.Read = filterRead
fromEvent E.Write = filterWrite

0 comments on commit bd5d9ea

Please sign in to comment.
Something went wrong with that request. Please try again.