Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
removed clock dependency and fixed watch
  • Loading branch information
CetinSert committed Aug 18, 2012
1 parent 8615526 commit 729a332
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 44 deletions.
5 changes: 2 additions & 3 deletions PortFusion.cabal
Expand Up @@ -85,8 +85,7 @@ executable PortFusion
buildable: True
build-depends: base >= 4 && <= 5,
bytestring -any,
splice -any,
clock -any
splice -any
if os(windows)
build-depends: network >= 2.3.0.13
if arch(i386)
Expand Down Expand Up @@ -163,4 +162,4 @@ executable PortFusion
if arch(mips)
cpp-options: -D__ARCH__="MIPS"
else
cpp-options: -D__ARCH__="Unknown"
cpp-options: -D__ARCH__="Unknown"
73 changes: 32 additions & 41 deletions src/Main.hs
Expand Up @@ -41,15 +41,14 @@ import System.IO.Unsafe

import GHC.Conc (numCapabilities)
import Network.Socket.Splice -- corsis library: SPLICE
import System.Clock -- corsis library: CLOCK

---------------------------------------------------------------------------------------------UTILITY

type Seconds = Int
secs :: Int -> Seconds; secs = (* 1000000)
wait :: Seconds -> IO (); wait = threadDelay . secs
schedule :: Seconds -> IO () -> IO ThreadId; schedule s a = forkIO $! wait s >> a
now :: IO Seconds; now = sec <$> getTime Monotonic
--now :: IO Seconds; now = sec <$> getTime Monotonic

{-# INLINE (<>) #-}; (<>) :: ByteString -> ByteString -> ByteString; (<>) = B.append
{-# INLINE (//) #-}; (//) :: a -> (a -> b) -> b; x // f = f x
Expand Down Expand Up @@ -133,7 +132,7 @@ instance Disposable Handle where (✖) = try_ . hClose
instance Disposable (Ptr a) where (✖) = free
instance Disposable (StablePtr a) where (✖) = freeStablePtr

----------------------------------------------------------------------------------------------------
--------------------------------------------------------------------------------------------ADDRPORT

type Host = ByteString
type Port = PortNumber
Expand Down Expand Up @@ -167,14 +166,13 @@ faf x = LS $! case x of { AF_INET6 -> sf; AF_UNSPEC -> sf; AF_INET -> "IPv4"; _-

----------------------------------------------------------------------------------------------EVENTS

type Message = Request
data ServiceAction = Listen | Watch | Drop deriving Show
data PeerAction = Accept | Open | Close | Receive Message | Send Message deriving Show
data PeerAction = Accept | Open | Close | Receive Request | Send Request deriving Show
data FusionAction = Establish | Terminate deriving Show

data Event = ServiceAction :^: (LiteralString, AddrPort)
| PeerAction :.: PeerLink
| FusionAction ::: FusionLink deriving Show
| PeerAction :.: PeerLink
| FusionAction ::: FusionLink deriving Show

------------------------------------------------------------------------------------------------WIRE

Expand Down Expand Up @@ -205,7 +203,6 @@ main = withSocketsDo $! tryWith (const . print $! LS "INVALID SYNTAX") $! do
mapM_ (forkIO . run) tasks
void Prelude.getChar


parse :: [String] -> [Task]
parse [ "]", ap, "[" ] = [(:><:) $! read ap ]
parse [ lp, lh, "-", fp, fh, "[", ap ] = [(read lp, B.pack lh) :-<: ((read fp, B.pack fh),read ap) ]
Expand All @@ -217,57 +214,54 @@ parse m = concatMap parse $! map (map B.unpack . filter (not . B.null) . B.split

data Vectors = V {-# UNPACK #-} !(Ptr (Word16) ) -- number of clients
{-# UNPACK #-} !(Ptr (StablePtr Socket)) -- server socket
{-# UNPACK #-} !(Ptr (Seconds) ) -- watch thread

portVectors :: MVar Vectors
initialized :: MVar Bool
initialize :: IO ()

portVectors = unsafePerformIO $! newEmptyMVar
initialized = unsafePerformIO $! newMVar False
initialize = initialized `modifyMVar_` \initialized ->
{-# UNPACK #-} !(Ptr (Word16) ) -- watch thread number
portVectors :: MVar Vectors; portVectors = unsafePerformIO $! newEmptyMVar
initialized :: MVar Bool; initialized = unsafePerformIO $! newMVar False
initialize :: IO (); initialize = initialized `modifyMVar_` \initialized ->
when (not initialized) init >> return True
where init = putMVar portVectors =<< V <$> mallocArray pc <*> mallocArray pc <*> mallocArray pc
pc = 65536

{-# INLINE (|.) #-}; (|.)::Storable a=>Ptr a -> Int -> IO a ; (|.) a i = peekElemOff a i
{-# INLINE (|^) #-}; (|^)::Storable a=>Ptr a -> Int -> a -> IO (); (|^) a i v = pokeElemOff a i v

(-@<) :: AddrPort -> IO Socket
(-@<) ap@(_ :@: p') = do
let p = fromIntegral p'
withMVar portVectors $! \ !(V !c !s !t) -> do
t |^ p =<< now
t |^ p =<< (1 +) <$> t |. p
n <- c |. p
case compare n 0 of
GT -> do c |^ p $! n+1; s |. p >>= deRefStablePtr
EQ -> do l <- (ap @<) ; s |^ p =<< newStablePtr l; c |^ p $! n+1; return l
LT -> error "-@< ERRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR"
LT -> error "-@< ERRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR"

(-✖) :: AddrPort -> IO ()
(-✖) ap@(_ :@: p') = do
let p = fromIntegral p'
withMVar portVectors $! \(V !c _ !t) -> do
withMVar portVectors $! \(V !c _ _) -> do
n <- c |. p
case compare n 1 of
GT -> c |^ p $! n-1
EQ -> do last <- t |. p
span <- abs . (last -) <$> now
when (span >= 10) $! do
print $! Watch :^: (faf AF_UNSPEC, ap)
void . schedule 10 $! do
withMVar portVectors $! \ !(V !c !s !t) -> do
last <- t |. p
span <- abs . (last -) <$> now
when (span >= 10) $! do
n <- c |. p
c |^ p $! n-1
print $! Drop :^: (faf AF_UNSPEC, ap)
sv <- s |. p; deRefStablePtr sv >>= (✖); (sv )
LT -> error "-x ERRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR"
GT -> do c |^ p $! n-1
EQ -> watch ap p
LT -> error "-x ERRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRRR"
where
watch ap p = void . forkIO $! withMVar portVectors $! \(V _ _ !t) -> do
tp <- t |. p
print $! Watch :^: (LS . B.pack $! show tp, ap)
void . schedule 10 $! do
withMVar portVectors $! \ !(V !c !s !t) -> do
n <- c |. p
tp' <- t |. p
if n == 1 && tp == tp'
then do print $! Drop :^: (faf AF_UNSPEC, ap)
c |^ p $! n-1
sv <- s |. p; deRefStablePtr sv >>= (✖); (sv )
else when (n == 1) $! watch ap p

-----------------------------------------------------------------------------------------------CHECK

(|<>|) :: (MVar ThreadId -> IO ()) -> (MVar ThreadId -> IO ()) -> IO ()
(|<>|) :: (MVar ThreadId -> IO ()) -> (MVar ThreadId -> IO ()) -> IO () -- marry
a |<>| b = do
ma <- newEmptyMVar ; mb <- newEmptyMVar
ta <- forkIO $! a mb; tb <- forkIO $! b ma
Expand Down Expand Up @@ -318,7 +312,6 @@ run ((:><:) fp) = do
e <- rh ! rp
o >-< e $! return () -- any exception disposes o ^


--- :: Task -> IO () - distributed reverse
run ((lp,lh) :-<: ((fp,fh),rp)) = do

Expand All @@ -334,7 +327,6 @@ run ((lp,lh) :-<: ((fp,fh),rp)) = do
e <- lh ! lp `X.onException` (f )
f >-< e $! return ()


--- :: Task -> IO () - distributed forward
run (lp :>-: ((fh,fp),(rh,rp))) = do

Expand All @@ -352,7 +344,6 @@ run (lp :>-: ((fh,fp),(rh,rp))) = do
s <: m
f >-< c $! return ()


--- :: Task -> IO () - direct forward
run (lp :>=: (rh, rp)) = do

Expand All @@ -375,5 +366,5 @@ run (lp :>=: (rh, rp)) = do
a >- b $! j; b >- a $! j

(>-) :: Peer -> Peer -> ErrorIO () -> IO ()
(Peer as ah >- Peer bs bh) j =
(Peer as ah >- Peer bs bh) j =
void . forkIO . tryWith (const j) $! splice chunk (as, Just ah) (bs, Just bh)

0 comments on commit 729a332

Please sign in to comment.