forked from tibbe/event
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Simple.hs
116 lines (101 loc) · 3.39 KB
/
Simple.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
-- Flow:
--
-- 1. Create N pipes.
--
-- Modelled after:
-- http://levent.svn.sourceforge.net/viewvc/levent/trunk/libevent/test/bench.c
module Main where
import Args (ljust, parseArgs, positive, theLast)
import Control.Concurrent (MVar, forkIO, takeMVar, newEmptyMVar, putMVar)
import Control.Monad (forM_, replicateM, when)
import Data.Array.Unboxed (UArray, listArray)
import Data.Function (on)
import Data.IORef (IORef, atomicModifyIORef, newIORef)
import Data.Int (Int32)
import Data.Monoid (Monoid(..), Last(..), getLast)
import Foreign.C.Error (throwErrnoIfMinus1Retry)
import Foreign.Marshal.Alloc (alloca)
import Foreign.Ptr (Ptr)
import Foreign.C.Types (CChar)
import System.Console.GetOpt (ArgDescr(ReqArg), OptDescr(..))
import System.Environment (getArgs)
import System.Event (Event(..), evtRead, evtWrite, loop, new, registerFd)
import System.Posix.IO (createPipe)
import System.Posix.Resource (ResourceLimit(..), ResourceLimits(..),
Resource(..), setResourceLimit)
import System.Posix.Internals (c_close, c_read, c_write)
import System.Posix.Types (Fd(..))
data Config = Config {
cfgNumPipes :: Last Int
}
defaultConfig :: Config
defaultConfig = Config {
cfgNumPipes = ljust 1024
}
instance Monoid Config where
mempty = Config {
cfgNumPipes = mempty
}
mappend a b = Config {
cfgNumPipes = app cfgNumPipes a b
}
where app = on mappend
defaultOptions :: [OptDescr (IO Config)]
defaultOptions = [
Option ['n'] ["num-pipes"]
(ReqArg (positive "number of pipes" $ \n -> mempty { cfgNumPipes = n }) "N")
"number of pipes to use"
]
readCallback :: MVar () -> IORef Int -> Fd -> Event -> IO ()
readCallback done ref fd _ = do
a <- atomicModifyIORef ref (\a -> let !b = a+1 in (b,b))
print ("read",fd,a)
if a > 10
then do
close fd
putMVar done ()
else do
readByte fd
writeCallback :: IORef Int -> Fd -> Event -> IO ()
writeCallback ref fd _ = do
a <- atomicModifyIORef ref (\a -> let !b = a+1 in (b,b))
print ("write",fd,a)
if a > 10
then close fd
else do
writeByte fd
main :: IO ()
main = do
(cfg, args) <- parseArgs defaultConfig defaultOptions =<< getArgs
let numPipes = theLast cfgNumPipes cfg
lim = ResourceLimit $ fromIntegral numPipes * 2 + 50
setResourceLimit ResourceOpenFiles
ResourceLimits { softLimit = lim, hardLimit = lim }
pipePairs <- replicateM numPipes createPipe
print pipePairs
let pipes = concatMap (\(r,w) -> [r,w]) pipePairs
mgr <- new
forkIO $ loop mgr
rref <- newIORef 0
wref <- newIORef 0
done <- newEmptyMVar
forM_ pipePairs $ \(r,w) -> do
registerFd mgr (readCallback done rref) r evtRead
registerFd mgr (writeCallback wref) w evtWrite
let pipeArray :: UArray Int Int32
pipeArray = listArray (0, numPipes) . map fromIntegral $ pipes
takeMVar done
readByte :: Fd -> IO ()
readByte (Fd fd) =
alloca $ \p -> do
n <- throwErrnoIfMinus1Retry "readByte" $ c_read fd p 1
when (n /= 1) . error $ "readByte returned " ++ show n
writeByte :: Fd -> IO ()
writeByte (Fd fd) =
alloca $ \p -> do
n <- throwErrnoIfMinus1Retry "writeByte" $ c_write fd p 1
when (n /= 1) . error $ "writeByte returned " ++ show n
close :: Fd -> IO ()
close (Fd fd) = do
c_close fd
return ()