Permalink
Browse files

connect timeouts are now app-controlled; connections are retried in a…

…ll modes

1) Connections would not be retried due to missing tryRun. 2) Testing
connection retries on all platforms led to another discovery:
system-controlled timeouts on socket connect were way too long on OS X
and FreeBSD unlike Windows and Linux - where they are unnoticeably
short. Timeouts on socket connect are now controlled by PortFusion. 3)
Error type is simplified.
  • Loading branch information...
Cetin Sert Cetin Sert
Cetin Sert authored and Cetin Sert committed May 11, 2012
1 parent 4714728 commit 5b51226424b4da4fe8abcd33f4178bee3f1b84e3
Showing with 29 additions and 28 deletions.
  1. +29 −28 src/Main.hs
View
@@ -14,6 +14,7 @@ import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B hiding (map,concatMap,filter,reverse)
import qualified Control.Exception as X
import System.Environment
import System.Timeout
import System.IO hiding (hGetLine,hPutStr,hGetContents)
import Data.String (IsString,fromString)
import GHC.Conc (threadDelay)
@@ -33,8 +34,11 @@ import Network.Socket.Splice -- corsis library: SPLICE
type Seconds = Int
wait :: Seconds -> IO ()
wait secs = GHC.Conc.threadDelay $ 1000000 * secs
secs :: Int -> Seconds
secs n = n * 1000000
wait :: Seconds -> IO ()
wait n = GHC.Conc.threadDelay $ secs n
schedule :: Seconds -> IO () -> IO ThreadId
schedule s a = forkIO $ do wait s; a
@@ -65,7 +69,7 @@ a =>> f = do r <- a; _ <- f r; return r
(//) :: a -> (a -> b) -> b
x // f = f x
(???) :: IO a -> [IO a] -> IO a
(???) :: ErrorIO a -> [IO a] -> IO a
e ??? as = foldr (?>) e as
where x ?> y = x `X.catch` (\(_ :: X.SomeException) -> y)
@@ -80,11 +84,11 @@ data PeerLink = PeerLink (Maybe SockAddr) (Maybe SockAddr) deriving Show
data FusionLink = FusionLink (Maybe SockAddr) (Maybe Port ) (Maybe SockAddr)
deriving Show
data PeerFault = Loss | Impatience deriving (Show,Typeable)
data ProtocolException = Error PeerFault PeerLink deriving (Show,Typeable)
data ConnectionException = Refused deriving (Show,Typeable)
instance X.Exception ProtocolException where
instance X.Exception ConnectionException where
data ProtocolException = Loss PeerLink
| Impatience PeerLink
| Silence [SockAddr] deriving (Show,Typeable)
instance X.Exception ProtocolException where
(<@>) :: Socket -> IO PeerLink
(<@>) s = PeerLink <$> (att $ getSocketName s) <*> (att $ getPeerName s)
@@ -99,26 +103,30 @@ a @>-<@ b =
s <- socket AF_INET6 Stream 0 =>> opt
bindSocket s $ SockAddrInet6 p 0 iN6ADDR_ANY 0
#else
s <- socket AF_INET Stream 0 =>> opt -- Windows XP does not have
bindSocket s $ SockAddrInet p iNADDR_ANY -- a dual-stack sockets API
s <- socket AF_INET Stream 0 =>> opt -- Windows XP does not have
bindSocket s $ SockAddrInet p iNADDR_ANY -- a dual-stack sockets API
#endif
listen s maxListenQueue
print $ Listen :^: p
print $! Listen :^: p
return s
where opt s = mapM_ (\o -> setSocketOption s o 1) [ ReuseAddr, KeepAlive ]
(<@) :: Socket -> IO Socket
(<@) s = do (c,_) <- accept s; configure c; print . (:.:) Accept =<< (c <@>); return c
(.@.) :: Host -> Port -> IO Socket
h .@. p = (X.throwIO Refused ???) . map c =<< getAddrInfo hint host port
h .@. p = do -- (X.throwIO (e Nothing) ???) . map c =<< getAddrInfo hint host port
as <- getAddrInfo hint host port
X.throwIO (Silence $ map addrAddress as) ??? map c as
where hint = Just $! defaultHints { addrSocketType = Stream }
host = Just $! B.unpack h
port = Just $! show p
c a = do s <- socket (addrFamily a) Stream 0x6 =>> configure
s `connect` addrAddress a
print . (:.:) Open =<< (s <@>)
return s
c a = do s <- socket (addrFamily a) Stream 0x6 =>> configure
r <- s `connect` addrAddress a // timeout (secs 3)
case r of
Nothing -> do (s ); X.throw $! Silence [addrAddress a]
Just _ -> do print . (:.:) Open =<< (s <@>)
return s
configure :: Socket -> IO ()
configure x = do
@@ -143,8 +151,8 @@ configure x = do
(!) h p = (!@) =<< h .@. p
class Disposable a where (✖) :: a -> IO () -- ✖ ✿ @
-- ✖ ✿ @
class Disposable a where (✖) :: a -> IO ()
instance Disposable Socket where
(✖) s = do
try_ $ do
@@ -222,7 +230,6 @@ main = withSocketsDo $ tryWith (const . print $ LS "INVALID SYNTAX") $ do
ss = map (map B.unpack . filter (not . B.null) . B.split ' ' . B.pack) m
type PortVector a = SVM.IOVector a
portVectors :: MVar (PortVector Word16, PortVector (StablePtr Socket))
@@ -275,8 +282,8 @@ initPortVectors = do
let f x = do free p; maybe (n x) y $ (X.fromException x :: Maybe X.AsyncException)
tryWith f $ hGetBufSome h p 1 >>= \b -> f . X.toException $
case b of
0 -> Error Loss l
_ -> Error Impatience l
0 -> Loss l
_ -> Impatience l
(|<>|) :: (MVar ThreadId -> IO ()) -> (MVar ThreadId -> IO ()) -> IO ()
a |<>| b = do
@@ -288,11 +295,7 @@ a |<>| b = do
putMVar mb tb
run :: Task -> IO ()
--- :: Task -> IO () -- serve
run :: Task -> IO () -- serve
run ((:><:) fp) = do
f <- (fp @<)
@@ -373,7 +376,6 @@ run (lp :>=: (rh, rp)) = do
r >-< c $ return ()
---- data stream IO
(>-<) :: Peer -> Peer -> ErrorIO () -> IO ()
(a@(Peer as _) >-< b@(Peer bs _)) h = do
!t <- as @>-<@ bs
@@ -384,7 +386,6 @@ run (lp :>=: (rh, rp)) = do
b >- a $ j
a >- b $ j
(>-) :: Peer -> Peer -> ErrorIO () -> IO ()
(Peer as ah >- Peer bs bh) j =
void . forkIO . tryWith (const j) $ splice chunk (as, Just ah) (bs, Just bh)

0 comments on commit 5b51226

Please sign in to comment.