diff --git a/network-transport-inmemory/LICENSE b/network-transport-inmemory/LICENSE new file mode 100644 index 00000000..f3459e44 --- /dev/null +++ b/network-transport-inmemory/LICENSE @@ -0,0 +1,31 @@ +Copyright Well-Typed LLP, 2011-2012 + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of the owner nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + diff --git a/network-transport-inmemory/Setup.hs b/network-transport-inmemory/Setup.hs new file mode 100644 index 00000000..9a994af6 --- /dev/null +++ b/network-transport-inmemory/Setup.hs @@ -0,0 +1,2 @@ +import Distribution.Simple +main = defaultMain diff --git a/network-transport-inmemory/network-transport-inmemory.cabal b/network-transport-inmemory/network-transport-inmemory.cabal new file mode 100644 index 00000000..a1b6e282 --- /dev/null +++ b/network-transport-inmemory/network-transport-inmemory.cabal @@ -0,0 +1,64 @@ +Name: network-transport-inmemory +Version: 0.2.0 +Cabal-Version: >=1.8 +Build-Type: Simple +License: BSD3 +License-file: LICENSE +Copyright: Well-Typed LLP +Author: Duncan Coutts, Nicolas Wu, Edsko de Vries +Maintainer: edsko@well-typed.com, dcoutts@well-typed.com +Stability: experimental +Homepage: http://github.com/haskell-distributed/distributed-process +Bug-Reports: mailto:edsko@well-typed.com +Synopsis: In-memory instantation of Network.Transport +Description: In-memory instantation of Network.Transport +Tested-With: GHC==7.0.4 GHC==7.2.2 GHC==7.4.1 GHC==7.4.2 +Category: Network + +Library + Build-Depends: base >= 4.3 && < 5, + network-transport >= 0.2 && < 0.3, + data-accessor >= 0.2 && < 0.3, + bytestring >= 0.9 && < 0.10, + containers >= 0.4 && < 0.5 + Exposed-modules: Network.Transport.Chan + ghc-options: -Wall -fno-warn-unused-do-bind + HS-Source-Dirs: src + +Test-Suite TestMulticastInMemory + Type: exitcode-stdio-1.0 + Build-Depends: base >= 4.3 && < 5, + network-transport >= 0.2 && < 0.3, + data-accessor >= 0.2 && < 0.3, + bytestring >= 0.9 && < 0.10, + containers >= 0.4 && < 0.5, + random >= 1.0 && < 1.1, + ansi-terminal >= 0.5 && < 0.6 + Main-Is: TestMulticastInMemory.hs + ghc-options: -Wall -fno-warn-unused-do-bind + Extensions: ExistentialQuantification, + FlexibleInstances, + DeriveDataTypeable, + RankNTypes, + OverloadedStrings + HS-Source-Dirs: tests src + +Test-Suite TestInMemory + Type: exitcode-stdio-1.0 + Build-Depends: base >= 4.3 && < 5, + network-transport >= 0.2 && < 0.3, + data-accessor >= 0.2 && < 0.3, + bytestring >= 0.9 && < 0.10, + containers >= 0.4 && < 0.5, + random >= 1.0 && < 1.1, + ansi-terminal >= 0.5 && < 0.6, + mtl >= 2.0 && < 2.2 + Main-Is: TestInMemory.hs + ghc-options: -Wall -fno-warn-unused-do-bind + Extensions: ExistentialQuantification, + FlexibleInstances, + DeriveDataTypeable, + RankNTypes, + OverloadedStrings, + OverlappingInstances + HS-Source-Dirs: tests src diff --git a/network-transport/src/Network/Transport/Chan.hs b/network-transport-inmemory/src/Network/Transport/Chan.hs similarity index 100% rename from network-transport/src/Network/Transport/Chan.hs rename to network-transport-inmemory/src/Network/Transport/Chan.hs diff --git a/network-transport/tests/TestAuxiliary.hs b/network-transport-inmemory/tests/TestAuxiliary.hs similarity index 100% rename from network-transport/tests/TestAuxiliary.hs rename to network-transport-inmemory/tests/TestAuxiliary.hs diff --git a/network-transport/tests/TestInMemory.hs b/network-transport-inmemory/tests/TestInMemory.hs similarity index 100% rename from network-transport/tests/TestInMemory.hs rename to network-transport-inmemory/tests/TestInMemory.hs diff --git a/network-transport/tests/TestMulticast.hs b/network-transport-inmemory/tests/TestMulticast.hs similarity index 100% rename from network-transport/tests/TestMulticast.hs rename to network-transport-inmemory/tests/TestMulticast.hs diff --git a/network-transport/tests/TestMulticastInMemory.hs b/network-transport-inmemory/tests/TestMulticastInMemory.hs similarity index 100% rename from network-transport/tests/TestMulticastInMemory.hs rename to network-transport-inmemory/tests/TestMulticastInMemory.hs diff --git a/network-transport/tests/TestTransport.hs b/network-transport-inmemory/tests/TestTransport.hs similarity index 100% rename from network-transport/tests/TestTransport.hs rename to network-transport-inmemory/tests/TestTransport.hs diff --git a/network-transport/tests/Traced.hs b/network-transport-inmemory/tests/Traced.hs similarity index 100% rename from network-transport/tests/Traced.hs rename to network-transport-inmemory/tests/Traced.hs diff --git a/network-transport-tcp/LICENSE b/network-transport-tcp/LICENSE new file mode 100644 index 00000000..f3459e44 --- /dev/null +++ b/network-transport-tcp/LICENSE @@ -0,0 +1,31 @@ +Copyright Well-Typed LLP, 2011-2012 + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + + * Redistributions in binary form must reproduce the above + copyright notice, this list of conditions and the following + disclaimer in the documentation and/or other materials provided + with the distribution. + + * Neither the name of the owner nor the names of other + contributors may be used to endorse or promote products derived + from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + diff --git a/network-transport-tcp/Setup.hs b/network-transport-tcp/Setup.hs new file mode 100644 index 00000000..9a994af6 --- /dev/null +++ b/network-transport-tcp/Setup.hs @@ -0,0 +1,2 @@ +import Distribution.Simple +main = defaultMain diff --git a/network-transport-tcp/network-transport-tcp.cabal b/network-transport-tcp/network-transport-tcp.cabal new file mode 100644 index 00000000..a739aeed --- /dev/null +++ b/network-transport-tcp/network-transport-tcp.cabal @@ -0,0 +1,49 @@ +Name: network-transport-tcp +Version: 0.2.0 +Cabal-Version: >=1.8 +Build-Type: Simple +License: BSD3 +License-file: LICENSE +Copyright: Well-Typed LLP +Author: Duncan Coutts, Nicolas Wu, Edsko de Vries +Maintainer: edsko@well-typed.com, dcoutts@well-typed.com +Stability: experimental +Homepage: http://github.com/haskell-distributed/distributed-process +Bug-Reports: mailto:edsko@well-typed.com +Synopsis: TCP instantation of Network.Transport +Description: TCP instantation of Network.Transport +Tested-With: GHC==7.0.4 GHC==7.2.2 GHC==7.4.1 GHC==7.4.2 +Category: Network + +Library + Build-Depends: base >= 4.3 && < 5, + network-transport >= 0.2 && < 0.3, + data-accessor >= 0.2 && < 0.3, + containers >= 0.4 && < 0.5, + bytestring >= 0.9 && < 0.10, + network >= 2.3 && < 2.4 + Exposed-modules: Network.Transport.TCP, + Network.Transport.TCP.Internal + ghc-options: -Wall -fno-warn-unused-do-bind + HS-Source-Dirs: src + +Test-Suite TestTCP + Type: exitcode-stdio-1.0 + Main-Is: TestTCP.hs + Build-Depends: base >= 4.3 && < 5, + network-transport >= 0.2 && < 0.3, + data-accessor >= 0.2 && < 0.3, + containers >= 0.4 && < 0.5, + bytestring >= 0.9 && < 0.10, + network >= 2.3 && < 2.4, + random >= 1.0 && < 1.1, + ansi-terminal >= 0.5 && < 0.6, + mtl >= 2.0 && < 2.2 + ghc-options: -Wall -fno-warn-unused-do-bind -threaded -rtsopts -with-rtsopts=-N + Extensions: ExistentialQuantification, + FlexibleInstances, + DeriveDataTypeable, + RankNTypes, + OverlappingInstances, + OverloadedStrings + HS-Source-Dirs: tests src diff --git a/network-transport/src/Network/Transport/TCP.hs b/network-transport-tcp/src/Network/Transport/TCP.hs similarity index 99% rename from network-transport/src/Network/Transport/TCP.hs rename to network-transport-tcp/src/Network/Transport/TCP.hs index 5bf8a9e4..dd203313 100644 --- a/network-transport/src/Network/Transport/TCP.hs +++ b/network-transport-tcp/src/Network/Transport/TCP.hs @@ -30,7 +30,7 @@ module Network.Transport.TCP ( -- * Main API import Prelude hiding (catch, mapM_) import Network.Transport -import Network.Transport.Internal.TCP ( forkServer +import Network.Transport.TCP.Internal ( forkServer , recvWithLength , recvInt32 , tryCloseSocket @@ -364,8 +364,16 @@ data ValidRemoteEndPointState = ValidRemoteEndPointState , _nextCtrlRequestId :: !ControlRequestId } +-- | Local identifier for an endpoint within this transport type EndPointId = Int32 + +-- | Control request ID +-- +-- Control requests are asynchronous; the request ID makes it possible to match +-- requests and replies type ControlRequestId = Int32 + +-- | Pair of local and a remote endpoint (for conciseness in signatures) type EndPointPair = (LocalEndPoint, RemoteEndPoint) -- | Control headers @@ -380,7 +388,7 @@ data ControlHeader = | CloseSocket deriving (Enum, Bounded, Show) --- Response sent by /B/ to /A/ when /A/ tries to connect +-- | Response sent by /B/ to /A/ when /A/ tries to connect data ConnectionRequestResponse = -- | /B/ accepts the connection ConnectionRequestAccepted @@ -390,7 +398,7 @@ data ConnectionRequestResponse = | ConnectionRequestCrossed deriving (Enum, Bounded, Show) --- Parameters for setting up the TCP transport +-- | Parameters for setting up the TCP transport data TCPParameters = TCPParameters { -- | Backlog for 'listen'. -- Defaults to SOMAXCONN. @@ -403,7 +411,7 @@ data TCPParameters = TCPParameters { , tcpReuseClientAddr :: Bool } --- Internal functionality we expose for unit testing +-- | Internal functionality we expose for unit testing data TransportInternals = TransportInternals { -- | The ID of the thread that listens for new incoming connections transportThread :: ThreadId diff --git a/network-transport/src/Network/Transport/Internal/TCP.hs b/network-transport-tcp/src/Network/Transport/TCP/Internal.hs similarity index 99% rename from network-transport/src/Network/Transport/Internal/TCP.hs rename to network-transport-tcp/src/Network/Transport/TCP/Internal.hs index 851564e7..eabcf0a6 100644 --- a/network-transport/src/Network/Transport/Internal/TCP.hs +++ b/network-transport-tcp/src/Network/Transport/TCP/Internal.hs @@ -1,5 +1,5 @@ -- | Utility functions for TCP sockets -module Network.Transport.Internal.TCP ( forkServer +module Network.Transport.TCP.Internal ( forkServer , recvWithLength , recvExact , recvInt32 diff --git a/network-transport-tcp/tests/TestAuxiliary.hs b/network-transport-tcp/tests/TestAuxiliary.hs new file mode 100644 index 00000000..d912ee6e --- /dev/null +++ b/network-transport-tcp/tests/TestAuxiliary.hs @@ -0,0 +1,108 @@ +{-# OPTIONS_GHC -fno-warn-orphans #-} +module TestAuxiliary ( -- Running tests + runTest + , runTests + -- Writing tests + , forkTry + , trySome + , randomThreadDelay + ) where + +import Prelude hiding (catch) +import Control.Concurrent (myThreadId, forkIO, ThreadId, throwTo, threadDelay) +import Control.Concurrent.Chan (Chan) +import Control.Monad (liftM2, unless) +import Control.Exception (SomeException, try, catch) +import System.Timeout (timeout) +import System.IO (stdout, hFlush) +import System.Console.ANSI ( SGR(SetColor, Reset) + , Color(Red, Green) + , ConsoleLayer(Foreground) + , ColorIntensity(Vivid) + , setSGR + ) +import System.Random (randomIO) +import Network.Transport +import Traced (Traceable(..), traceShow) + +-- | Like fork, but throw exceptions in the child thread to the parent +forkTry :: IO () -> IO ThreadId +forkTry p = do + tid <- myThreadId + forkIO $ catch p (\e -> throwTo tid (e :: SomeException)) + +-- | Like try, but specialized to SomeException +trySome :: IO a -> IO (Either SomeException a) +trySome = try + +-- | Run the given test, catching timeouts and exceptions +runTest :: String -> IO () -> IO Bool +runTest description test = do + putStr $ "Running " ++ show description ++ ": " + hFlush stdout + done <- try . timeout 60000000 $ test -- 60 seconds + case done of + Left err -> failed $ "(exception: " ++ show (err :: SomeException) ++ ")" + Right Nothing -> failed $ "(timeout)" + Right (Just ()) -> ok + where + failed :: String -> IO Bool + failed err = do + setSGR [SetColor Foreground Vivid Red] + putStr "failed " + setSGR [Reset] + putStrLn err + return False + + ok :: IO Bool + ok = do + setSGR [SetColor Foreground Vivid Green] + putStrLn "ok" + setSGR [Reset] + return True + +-- | Run a bunch of tests and throw an exception if any fails +runTests :: [(String, IO ())] -> IO () +runTests tests = do + success <- foldr (liftM2 (&&) . uncurry runTest) (return True) $ tests + unless success $ fail "Some tests failed" + +-- | Random thread delay between 0 and the specified max +randomThreadDelay :: Int -> IO () +randomThreadDelay maxDelay = do + delay <- randomIO :: IO Int + threadDelay (delay `mod` maxDelay) + +-------------------------------------------------------------------------------- +-- traceShow instances -- +-------------------------------------------------------------------------------- + +instance Traceable EndPoint where + trace = const Nothing + +instance Traceable Transport where + trace = const Nothing + +instance Traceable Connection where + trace = const Nothing + +instance Traceable Event where + trace = traceShow + +instance Show err => Traceable (TransportError err) where + trace = traceShow + +instance Traceable EndPointAddress where + trace = traceShow . endPointAddressToByteString + +instance Traceable SomeException where + trace = traceShow + +instance Traceable ThreadId where + trace = const Nothing + +instance Traceable (Chan a) where + trace = const Nothing + +instance Traceable Float where + trace = traceShow diff --git a/network-transport/tests/TestTCP.hs b/network-transport-tcp/tests/TestTCP.hs similarity index 99% rename from network-transport/tests/TestTCP.hs rename to network-transport-tcp/tests/TestTCP.hs index f087145d..fcc387e5 100644 --- a/network-transport/tests/TestTCP.hs +++ b/network-transport-tcp/tests/TestTCP.hs @@ -36,7 +36,7 @@ import Network.Transport.Internal ( encodeInt32 , tryIO , void ) -import Network.Transport.Internal.TCP (recvInt32, forkServer, recvWithLength) +import Network.Transport.TCP.Internal (recvInt32, forkServer, recvWithLength) import qualified Network.Socket as N ( sClose , ServiceName , Socket diff --git a/network-transport-tcp/tests/TestTransport.hs b/network-transport-tcp/tests/TestTransport.hs new file mode 100644 index 00000000..e528e327 --- /dev/null +++ b/network-transport-tcp/tests/TestTransport.hs @@ -0,0 +1,956 @@ +{-# LANGUAGE RebindableSyntax #-} +module TestTransport where + +import Prelude hiding (catch, (>>=), (>>), return, fail) +import TestAuxiliary (forkTry, runTests, trySome, randomThreadDelay) +import Control.Concurrent (forkIO, killThread, yield) +import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar, readMVar, tryTakeMVar, modifyMVar_, newMVar) +import Control.Exception (evaluate, throw, throwIO, bracket) +import Control.Monad (replicateM, replicateM_, when, guard, forM_, unless) +import Control.Monad.Error () +import Control.Applicative ((<$>)) +import Network.Transport +import Network.Transport.Internal (tlog, tryIO, timeoutMaybe) +import Network.Transport.Util (spawn) +import System.Random (randomIO) +import Data.ByteString (ByteString) +import Data.ByteString.Char8 (pack) +import Data.Map (Map) +import qualified Data.Map as Map (empty, insert, delete, findWithDefault, adjust, null, toList, map) +import Data.String (fromString) +import Data.List (permutations) +import Traced + +-- | Server that echoes messages straight back to the origin endpoint. +echoServer :: EndPoint -> IO () +echoServer endpoint = do + go Map.empty + where + go :: Map ConnectionId Connection -> IO () + go cs = do + event <- receive endpoint + case event of + ConnectionOpened cid rel addr -> do + tlog $ "Opened new connection " ++ show cid + Right conn <- connect endpoint addr rel defaultConnectHints + go (Map.insert cid conn cs) + Received cid payload -> do + send (Map.findWithDefault (error $ "Received: Invalid cid " ++ show cid) cid cs) payload + go cs + ConnectionClosed cid -> do + tlog $ "Close connection " ++ show cid + close (Map.findWithDefault (error $ "ConnectionClosed: Invalid cid " ++ show cid) cid cs) + go (Map.delete cid cs) + ReceivedMulticast _ _ -> + -- Ignore + go cs + ErrorEvent _ -> + putStrLn $ "Echo server received error event: " ++ show event + EndPointClosed -> + return () + +-- | Ping client used in a few tests +ping :: EndPoint -> EndPointAddress -> Int -> ByteString -> IO () +ping endpoint server numPings msg = do + -- Open connection to the server + tlog "Connect to echo server" + Right conn <- connect endpoint server ReliableOrdered defaultConnectHints + + -- Wait for the server to open reply connection + tlog "Wait for ConnectionOpened message" + ConnectionOpened cid _ _ <- receive endpoint + + -- Send pings and wait for reply + tlog "Send ping and wait for reply" + replicateM_ numPings $ do + send conn [msg] + Received cid' [reply] <- receive endpoint ; True <- return $ cid == cid' && reply == msg + return () + + -- Close the connection + tlog "Close the connection" + close conn + + -- Wait for the server to close its connection to us + tlog "Wait for ConnectionClosed message" + ConnectionClosed cid' <- receive endpoint ; True <- return $ cid == cid' + + -- Done + tlog "Ping client done" + +-- | Basic ping test +testPingPong :: Transport -> Int -> IO () +testPingPong transport numPings = do + tlog "Starting ping pong test" + server <- spawn transport echoServer + result <- newEmptyMVar + + -- Client + forkTry $ do + tlog "Ping client" + Right endpoint <- newEndPoint transport + ping endpoint server numPings "ping" + putMVar result () + + takeMVar result + +-- | Test that endpoints don't get confused +testEndPoints :: Transport -> Int -> IO () +testEndPoints transport numPings = do + server <- spawn transport echoServer + dones <- replicateM 2 newEmptyMVar + + forM_ (zip dones ['A'..]) $ \(done, name) -> forkTry $ do + let name' :: ByteString + name' = pack [name] + Right endpoint <- newEndPoint transport + tlog $ "Ping client " ++ show name' ++ ": " ++ show (address endpoint) + ping endpoint server numPings name' + putMVar done () + + forM_ dones takeMVar + +-- Test that connections don't get confused +testConnections :: Transport -> Int -> IO () +testConnections transport numPings = do + server <- spawn transport echoServer + result <- newEmptyMVar + + -- Client + forkTry $ do + Right endpoint <- newEndPoint transport + + -- Open two connections to the server + Right conn1 <- connect endpoint server ReliableOrdered defaultConnectHints + ConnectionOpened serv1 _ _ <- receive endpoint + + Right conn2 <- connect endpoint server ReliableOrdered defaultConnectHints + ConnectionOpened serv2 _ _ <- receive endpoint + + -- One thread to send "pingA" on the first connection + forkTry $ replicateM_ numPings $ send conn1 ["pingA"] + + -- One thread to send "pingB" on the second connection + forkTry $ replicateM_ numPings $ send conn2 ["pingB"] + + -- Verify server responses + let verifyResponse 0 = putMVar result () + verifyResponse n = do + event <- receive endpoint + case event of + Received cid [payload] -> do + when (cid == serv1 && payload /= "pingA") $ error "Wrong message" + when (cid == serv2 && payload /= "pingB") $ error "Wrong message" + verifyResponse (n - 1) + _ -> + verifyResponse n + verifyResponse (2 * numPings) + + takeMVar result + +-- | Test that closing one connection does not close the other +testCloseOneConnection :: Transport -> Int -> IO () +testCloseOneConnection transport numPings = do + server <- spawn transport echoServer + result <- newEmptyMVar + + -- Client + forkTry $ do + Right endpoint <- newEndPoint transport + + -- Open two connections to the server + Right conn1 <- connect endpoint server ReliableOrdered defaultConnectHints + ConnectionOpened serv1 _ _ <- receive endpoint + + Right conn2 <- connect endpoint server ReliableOrdered defaultConnectHints + ConnectionOpened serv2 _ _ <- receive endpoint + + -- One thread to send "pingA" on the first connection + forkTry $ do + replicateM_ numPings $ send conn1 ["pingA"] + close conn1 + + -- One thread to send "pingB" on the second connection + forkTry $ replicateM_ (numPings * 2) $ send conn2 ["pingB"] + + -- Verify server responses + let verifyResponse 0 = putMVar result () + verifyResponse n = do + event <- receive endpoint + case event of + Received cid [payload] -> do + when (cid == serv1 && payload /= "pingA") $ error "Wrong message" + when (cid == serv2 && payload /= "pingB") $ error "Wrong message" + verifyResponse (n - 1) + _ -> + verifyResponse n + verifyResponse (3 * numPings) + + takeMVar result + +-- | Test that if A connects to B and B connects to A, B can still send to A after +-- A closes its connection to B (for instance, in the TCP transport, the socket pair +-- connecting A and B should not yet be closed). +testCloseOneDirection :: Transport -> Int -> IO () +testCloseOneDirection transport numPings = do + addrA <- newEmptyMVar + addrB <- newEmptyMVar + doneA <- newEmptyMVar + doneB <- newEmptyMVar + + -- A + forkTry $ do + tlog "A" + Right endpoint <- newEndPoint transport + tlog (show (address endpoint)) + putMVar addrA (address endpoint) + + -- Connect to B + tlog "Connect to B" + Right conn <- readMVar addrB >>= \addr -> connect endpoint addr ReliableOrdered defaultConnectHints + + -- Wait for B to connect to us + tlog "Wait for B" + ConnectionOpened cid _ _ <- receive endpoint + + -- Send pings to B + tlog "Send pings to B" + replicateM_ numPings $ send conn ["ping"] + + -- Close our connection to B + tlog "Close connection" + close conn + + -- Wait for B's pongs + tlog "Wait for pongs from B" + replicateM_ numPings $ do Received _ _ <- receive endpoint ; return () + + -- Wait for B to close it's connection to us + tlog "Wait for B to close connection" + ConnectionClosed cid' <- receive endpoint + guard (cid == cid') + + -- Done + tlog "Done" + putMVar doneA () + + -- B + forkTry $ do + tlog "B" + Right endpoint <- newEndPoint transport + tlog (show (address endpoint)) + putMVar addrB (address endpoint) + + -- Wait for A to connect + tlog "Wait for A to connect" + ConnectionOpened cid _ _ <- receive endpoint + + -- Connect to A + tlog "Connect to A" + Right conn <- readMVar addrA >>= \addr -> connect endpoint addr ReliableOrdered defaultConnectHints + + -- Wait for A's pings + tlog "Wait for pings from A" + replicateM_ numPings $ do Received _ _ <- receive endpoint ; return () + + -- Wait for A to close it's connection to us + tlog "Wait for A to close connection" + ConnectionClosed cid' <- receive endpoint + guard (cid == cid') + + -- Send pongs to A + tlog "Send pongs to A" + replicateM_ numPings $ send conn ["pong"] + + -- Close our connection to A + tlog "Close connection to A" + close conn + + -- Done + tlog "Done" + putMVar doneB () + + mapM_ takeMVar [doneA, doneB] + +-- | Collect events and order them by connection ID +collect :: EndPoint -> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])] +collect endPoint maxEvents timeout = go maxEvents Map.empty Map.empty + where + -- TODO: for more serious use of this function we'd need to make these arguments strict + go (Just 0) open closed = finish open closed + go n open closed = do + mEvent <- tryIO . timeoutMaybe timeout (userError "timeout") $ receive endPoint + case mEvent of + Left _ -> finish open closed + Right event -> do + let n' = (\x -> x - 1) <$> n + case event of + ConnectionOpened cid _ _ -> + go n' (Map.insert cid [] open) closed + ConnectionClosed cid -> + let list = Map.findWithDefault (error "Invalid ConnectionClosed") cid open in + go n' (Map.delete cid open) (Map.insert cid list closed) + Received cid msg -> + go n' (Map.adjust (msg :) cid open) closed + ReceivedMulticast _ _ -> + fail "Unexpected multicast" + ErrorEvent _ -> + fail "Unexpected error" + EndPointClosed -> + fail "Unexpected endpoint closure" + + finish open closed = + if Map.null open + then return . Map.toList . Map.map reverse $ closed + else fail $ "Open connections: " ++ show (map fst . Map.toList $ open) + +-- | Open connection, close it, then reopen it +-- (In the TCP transport this means the socket will be closed, then reopened) +-- +-- Note that B cannot expect to receive all of A's messages on the first connection +-- before receiving the messages on the second connection. What might (and sometimes +-- does) happen is that finishes sending all of its messages on the first connection +-- (in the TCP transport, the first socket pair) while B is behind on reading _from_ +-- this connection (socket pair) -- the messages are "in transit" on the network +-- (these tests are done on localhost, so there are in some OS buffer). Then when +-- A opens the second connection (socket pair) B will spawn a new thread for this +-- connection, and hence might start interleaving messages from the first and second +-- connection. +-- +-- This is correct behaviour, however: the transport API guarantees reliability and +-- ordering _per connection_, but not _across_ connections. +testCloseReopen :: Transport -> Int -> IO () +testCloseReopen transport numPings = do + addrB <- newEmptyMVar + doneB <- newEmptyMVar + + let numRepeats = 2 :: Int + + -- A + forkTry $ do + Right endpoint <- newEndPoint transport + + forM_ [1 .. numRepeats] $ \i -> do + tlog "A connecting" + -- Connect to B + Right conn <- readMVar addrB >>= \addr -> connect endpoint addr ReliableOrdered defaultConnectHints + + tlog "A pinging" + -- Say hi + forM_ [1 .. numPings] $ \j -> send conn [pack $ "ping" ++ show i ++ "/" ++ show j] + + tlog "A closing" + -- Disconnect again + close conn + + tlog "A finishing" + + -- B + forkTry $ do + Right endpoint <- newEndPoint transport + putMVar addrB (address endpoint) + + eventss <- collect endpoint (Just (numRepeats * (numPings + 2))) Nothing + + forM_ (zip [1 .. numRepeats] eventss) $ \(i, (_, events)) -> do + forM_ (zip [1 .. numPings] events) $ \(j, event) -> do + guard (event == [pack $ "ping" ++ show i ++ "/" ++ show j]) + + putMVar doneB () + + takeMVar doneB + +-- | Test lots of parallel connection attempts +testParallelConnects :: Transport -> Int -> IO () +testParallelConnects transport numPings = do + server <- spawn transport echoServer + done <- newEmptyMVar + + Right endpoint <- newEndPoint transport + + -- Spawn lots of clients + forM_ [1 .. numPings] $ \i -> forkTry $ do + Right conn <- connect endpoint server ReliableOrdered defaultConnectHints + send conn [pack $ "ping" ++ show i] + send conn [pack $ "ping" ++ show i] + close conn + + forkTry $ do + eventss <- collect endpoint (Just (numPings * 4)) Nothing + -- Check that no pings got sent to the wrong connection + forM_ eventss $ \(_, [[ping1], [ping2]]) -> + guard (ping1 == ping2) + putMVar done () + + takeMVar done + +-- | Test that sending on a closed connection gives an error +testSendAfterClose :: Transport -> Int -> IO () +testSendAfterClose transport numRepeats = do + server <- spawn transport echoServer + clientDone <- newEmptyMVar + + forkTry $ do + Right endpoint <- newEndPoint transport + + -- We request two lightweight connections + replicateM numRepeats $ do + Right conn1 <- connect endpoint server ReliableOrdered defaultConnectHints + Right conn2 <- connect endpoint server ReliableOrdered defaultConnectHints + + -- Close the second, but leave the first open; then output on the second + -- connection (i.e., on a closed connection while there is still another + -- connection open) + close conn2 + Left (TransportError SendClosed _) <- send conn2 ["ping2"] + + -- Now close the first connection, and output on it (i.e., output while + -- there are no lightweight connection at all anymore) + close conn1 + Left (TransportError SendClosed _) <- send conn2 ["ping2"] + + return () + + putMVar clientDone () + + takeMVar clientDone + +-- | Test that closing the same connection twice has no effect +testCloseTwice :: Transport -> Int -> IO () +testCloseTwice transport numRepeats = do + server <- spawn transport echoServer + clientDone <- newEmptyMVar + + forkTry $ do + Right endpoint <- newEndPoint transport + + replicateM numRepeats $ do + -- We request two lightweight connections + Right conn1 <- connect endpoint server ReliableOrdered defaultConnectHints + Right conn2 <- connect endpoint server ReliableOrdered defaultConnectHints + + -- Close the second one twice + close conn2 + close conn2 + + -- Then send a message on the first and close that twice too + send conn1 ["ping"] + close conn1 + + -- Verify expected response from the echo server + ConnectionOpened cid1 _ _ <- receive endpoint + ConnectionOpened cid2 _ _ <- receive endpoint + ConnectionClosed cid2' <- receive endpoint ; True <- return $ cid2' == cid2 + Received cid1' ["ping"] <- receive endpoint ; True <- return $ cid1' == cid1 + ConnectionClosed cid1'' <- receive endpoint ; True <- return $ cid1'' == cid1 + + return () + + putMVar clientDone () + + takeMVar clientDone + +-- | Test that we can connect an endpoint to itself +testConnectToSelf :: Transport -> Int -> IO () +testConnectToSelf transport numPings = do + done <- newEmptyMVar + Right endpoint <- newEndPoint transport + + tlog "Creating self-connection" + Right conn <- connect endpoint (address endpoint) ReliableOrdered defaultConnectHints + + tlog "Talk to myself" + + -- One thread to write to the endpoint + forkTry $ do + tlog $ "writing" + + tlog $ "Sending ping" + replicateM_ numPings $ send conn ["ping"] + + tlog $ "Closing connection" + close conn + + -- And one thread to read + forkTry $ do + tlog $ "reading" + + tlog "Waiting for ConnectionOpened" + ConnectionOpened cid _ addr <- receive endpoint ; True <- return $ addr == address endpoint + + tlog "Waiting for Received" + replicateM_ numPings $ do + Received cid' ["ping"] <- receive endpoint ; True <- return $ cid == cid' + return () + + tlog "Waiting for ConnectionClosed" + ConnectionClosed cid' <- receive endpoint ; True <- return $ cid == cid' + + tlog "Done" + putMVar done () + + takeMVar done + +-- | Test that we can connect an endpoint to itself multiple times +testConnectToSelfTwice :: Transport -> Int -> IO () +testConnectToSelfTwice transport numPings = do + done <- newEmptyMVar + Right endpoint <- newEndPoint transport + + tlog "Creating self-connection" + Right conn1 <- connect endpoint (address endpoint) ReliableOrdered defaultConnectHints + Right conn2 <- connect endpoint (address endpoint) ReliableOrdered defaultConnectHints + + tlog "Talk to myself" + + -- One thread to write to the endpoint using the first connection + forkTry $ do + tlog $ "writing" + + tlog $ "Sending ping" + replicateM_ numPings $ send conn1 ["pingA"] + + tlog $ "Closing connection" + close conn1 + + -- One thread to write to the endpoint using the second connection + forkTry $ do + tlog $ "writing" + + tlog $ "Sending ping" + replicateM_ numPings $ send conn2 ["pingB"] + + tlog $ "Closing connection" + close conn2 + + -- And one thread to read + forkTry $ do + tlog $ "reading" + + [(_, events1), (_, events2)] <- collect endpoint (Just (2 * (numPings + 2))) Nothing + True <- return $ events1 == replicate numPings ["pingA"] + True <- return $ events2 == replicate numPings ["pingB"] + + tlog "Done" + putMVar done () + + takeMVar done + +-- | Test that we self-connections no longer work once we close our endpoint +-- or our transport +testCloseSelf :: IO (Either String Transport) -> IO () +testCloseSelf newTransport = do + Right transport <- newTransport + Right endpoint1 <- newEndPoint transport + Right endpoint2 <- newEndPoint transport + Right conn1 <- connect endpoint1 (address endpoint1) ReliableOrdered defaultConnectHints + Right conn2 <- connect endpoint1 (address endpoint1) ReliableOrdered defaultConnectHints + Right conn3 <- connect endpoint2 (address endpoint2) ReliableOrdered defaultConnectHints + + -- Close the conneciton and try to send + close conn1 + Left (TransportError SendClosed _) <- send conn1 ["ping"] + + -- Close the first endpoint. We should not be able to use the first + -- connection anymore, or open more self connections, but the self connection + -- to the second endpoint should still be fine + closeEndPoint endpoint1 + Left (TransportError SendFailed _) <- send conn2 ["ping"] + Left (TransportError ConnectFailed _) <- connect endpoint1 (address endpoint1) ReliableOrdered defaultConnectHints + Right () <- send conn3 ["ping"] + + -- Close the transport; now the second should no longer work + closeTransport transport + Left (TransportError SendFailed _) <- send conn3 ["ping"] + Left (TransportError ConnectFailed _) <- connect endpoint2 (address endpoint2) ReliableOrdered defaultConnectHints + + return () + +-- | Test various aspects of 'closeEndPoint' +testCloseEndPoint :: Transport -> Int -> IO () +testCloseEndPoint transport _ = do + serverDone <- newEmptyMVar + clientDone <- newEmptyMVar + clientAddr1 <- newEmptyMVar + clientAddr2 <- newEmptyMVar + serverAddr <- newEmptyMVar + + -- Server + forkTry $ do + Right endpoint <- newEndPoint transport + putMVar serverAddr (address endpoint) + + -- First test (see client) + do + theirAddr <- readMVar clientAddr1 + ConnectionOpened cid ReliableOrdered addr <- receive endpoint ; True <- return $ addr == theirAddr + ConnectionClosed cid' <- receive endpoint ; True <- return $ cid == cid' + return () + + -- Second test + do + theirAddr <- readMVar clientAddr2 + + ConnectionOpened cid ReliableOrdered addr <- receive endpoint ; True <- return $ addr == theirAddr + Received cid' ["ping"] <- receive endpoint ; True <- return $ cid == cid' + + Right conn <- connect endpoint theirAddr ReliableOrdered defaultConnectHints + send conn ["pong"] + + ConnectionClosed cid'' <- receive endpoint ; True <- return $ cid == cid'' + ErrorEvent (TransportError (EventConnectionLost (Just addr') []) _) <- receive endpoint ; True <- return $ addr' == theirAddr + + Left (TransportError SendFailed _) <- send conn ["pong2"] + + return () + + putMVar serverDone () + + -- Client + forkTry $ do + theirAddr <- readMVar serverAddr + + -- First test: close endpoint with one outgoing but no incoming connections + do + Right endpoint <- newEndPoint transport + putMVar clientAddr1 (address endpoint) + + -- Connect to the server, then close the endpoint without disconnecting explicitly + Right _ <- connect endpoint theirAddr ReliableOrdered defaultConnectHints + closeEndPoint endpoint + EndPointClosed <- receive endpoint + return () + + -- Second test: close endpoint with one outgoing and one incoming connection + do + Right endpoint <- newEndPoint transport + putMVar clientAddr2 (address endpoint) + + Right conn <- connect endpoint theirAddr ReliableOrdered defaultConnectHints + send conn ["ping"] + + -- Reply from the server + ConnectionOpened cid ReliableOrdered addr <- receive endpoint ; True <- return $ addr == theirAddr + Received cid' ["pong"] <- receive endpoint ; True <- return $ cid == cid' + + -- Close the endpoint + closeEndPoint endpoint + EndPointClosed <- receive endpoint + + -- Attempt to send should fail with connection closed + Left (TransportError SendFailed _) <- send conn ["ping2"] + + -- An attempt to close the already closed connection should just return + () <- close conn + + -- And so should an attempt to connect + Left (TransportError ConnectFailed _) <- connect endpoint theirAddr ReliableOrdered defaultConnectHints + + return () + + putMVar clientDone () + + mapM_ takeMVar [serverDone, clientDone] + +-- Test closeTransport +-- +-- This tests many of the same things that testEndPoint does, and some more +testCloseTransport :: IO (Either String Transport) -> IO () +testCloseTransport newTransport = do + serverDone <- newEmptyMVar + clientDone <- newEmptyMVar + clientAddr1 <- newEmptyMVar + clientAddr2 <- newEmptyMVar + serverAddr <- newEmptyMVar + + -- Server + forkTry $ do + Right transport <- newTransport + Right endpoint <- newEndPoint transport + putMVar serverAddr (address endpoint) + + -- Client sets up first endpoint + theirAddr1 <- readMVar clientAddr1 + ConnectionOpened cid1 ReliableOrdered addr <- receive endpoint ; True <- return $ addr == theirAddr1 + + -- Client sets up second endpoint + theirAddr2 <- readMVar clientAddr2 + + ConnectionOpened cid2 ReliableOrdered addr' <- receive endpoint ; True <- return $ addr' == theirAddr2 + Received cid2' ["ping"] <- receive endpoint ; True <- return $ cid2' == cid2 + + Right conn <- connect endpoint theirAddr2 ReliableOrdered defaultConnectHints + send conn ["pong"] + + -- Client now closes down its transport. We should receive connection closed messages (we don't know the precise order, however) + evs <- replicateM 3 $ receive endpoint + let expected = [ ConnectionClosed cid1 + , ConnectionClosed cid2 + , ErrorEvent (TransportError (EventConnectionLost (Just theirAddr2) []) "") + ] + True <- return $ any (== expected) (permutations evs) + + -- An attempt to send to the endpoint should now fail + Left (TransportError SendFailed _) <- send conn ["pong2"] + + putMVar serverDone () + + -- Client + forkTry $ do + Right transport <- newTransport + theirAddr <- readMVar serverAddr + + -- Set up endpoint with one outgoing but no incoming connections + Right endpoint1 <- newEndPoint transport + putMVar clientAddr1 (address endpoint1) + + -- Connect to the server, then close the endpoint without disconnecting explicitly + Right _ <- connect endpoint1 theirAddr ReliableOrdered defaultConnectHints + + -- Set up an endpoint with one outgoing and out incoming connection + Right endpoint2 <- newEndPoint transport + putMVar clientAddr2 (address endpoint2) + + Right conn <- connect endpoint2 theirAddr ReliableOrdered defaultConnectHints + send conn ["ping"] + + -- Reply from the server + ConnectionOpened cid ReliableOrdered addr <- receive endpoint2 ; True <- return $ addr == theirAddr + Received cid' ["pong"] <- receive endpoint2 ; True <- return $ cid == cid' + + -- Now shut down the entire transport + closeTransport transport + + -- Both endpoints should report that they have been closed + EndPointClosed <- receive endpoint1 + EndPointClosed <- receive endpoint2 + + -- Attempt to send should fail with connection closed + Left (TransportError SendFailed _) <- send conn ["ping2"] + + -- An attempt to close the already closed connection should just return + () <- close conn + + -- And so should an attempt to connect on either endpoint + Left (TransportError ConnectFailed _) <- connect endpoint1 theirAddr ReliableOrdered defaultConnectHints + Left (TransportError ConnectFailed _) <- connect endpoint2 theirAddr ReliableOrdered defaultConnectHints + + -- And finally, so should an attempt to create a new endpoint + Left (TransportError NewEndPointFailed _) <- newEndPoint transport + + putMVar clientDone () + + mapM_ takeMVar [serverDone, clientDone] + +-- | Remote node attempts to connect to a closed local endpoint +testConnectClosedEndPoint :: Transport -> IO () +testConnectClosedEndPoint transport = do + serverAddr <- newEmptyMVar + serverClosed <- newEmptyMVar + clientDone <- newEmptyMVar + + -- Server + forkTry $ do + Right endpoint <- newEndPoint transport + putMVar serverAddr (address endpoint) + + closeEndPoint endpoint + putMVar serverClosed () + + -- Client + forkTry $ do + Right endpoint <- newEndPoint transport + readMVar serverClosed + + Left (TransportError ConnectNotFound _) <- readMVar serverAddr >>= \addr -> connect endpoint addr ReliableOrdered defaultConnectHints + + putMVar clientDone () + + takeMVar clientDone + +-- | We should receive an exception when doing a 'receive' after we have been +-- notified that an endpoint has been closed +testExceptionOnReceive :: IO (Either String Transport) -> IO () +testExceptionOnReceive newTransport = do + Right transport <- newTransport + + -- Test one: when we close an endpoint specifically + Right endpoint1 <- newEndPoint transport + closeEndPoint endpoint1 + EndPointClosed <- receive endpoint1 + Left _ <- trySome (receive endpoint1 >>= evaluate) + + -- Test two: when we close the entire transport + Right endpoint2 <- newEndPoint transport + closeTransport transport + EndPointClosed <- receive endpoint2 + Left _ <- trySome (receive endpoint2 >>= evaluate) + + return () + +-- | Test what happens when the argument to 'send' is an exceptional value +testSendException :: IO (Either String Transport) -> IO () +testSendException newTransport = do + Right transport <- newTransport + Right endpoint1 <- newEndPoint transport + Right endpoint2 <- newEndPoint transport + + -- Connect endpoint1 to endpoint2 + Right conn <- connect endpoint1 (address endpoint2) ReliableOrdered defaultConnectHints + ConnectionOpened _ _ _ <- receive endpoint2 + + -- Send an exceptional value + Left (TransportError SendFailed _) <- send conn (throw $ userError "uhoh") + + -- This will have been as a failure to send by endpoint1, which will + -- therefore have closed the socket. In turn this will have caused endpoint2 + -- to report that the connection was lost + ErrorEvent (TransportError (EventConnectionLost _ []) _) <- receive endpoint1 + ErrorEvent (TransportError (EventConnectionLost _ [_]) _) <- receive endpoint2 + + -- A new connection will re-establish the connection + Right conn2 <- connect endpoint1 (address endpoint2) ReliableOrdered defaultConnectHints + send conn2 ["ping"] + close conn2 + + ConnectionOpened _ _ _ <- receive endpoint2 + Received _ ["ping"] <- receive endpoint2 + ConnectionClosed _ <- receive endpoint2 + + return () + +-- | If threads get killed while executing a 'connect', 'send', or 'close', this +-- should not affect other threads. +-- +-- The intention of this test is to see what happens when a asynchronous +-- exception happes _while executing a send_. This is exceedingly difficult to +-- guarantee, however. Hence we run a large number of tests and insert random +-- thread delays -- and even then it might not happen. Moreover, it will only +-- happen when we run on multiple cores. +testKill :: IO (Either String Transport) -> Int -> IO () +testKill newTransport numThreads = do + Right transport1 <- newTransport + Right transport2 <- newTransport + Right endpoint1 <- newEndPoint transport1 + Right endpoint2 <- newEndPoint transport2 + + threads <- replicateM numThreads . forkIO $ do + randomThreadDelay 100 + bracket (connect endpoint1 (address endpoint2) ReliableOrdered defaultConnectHints) + -- Note that we should not insert a randomThreadDelay into the + -- exception handler itself as this means that the exception handler + -- could be interrupted and we might not close + (\(Right conn) -> close conn) + (\(Right conn) -> do randomThreadDelay 100 + Right () <- send conn ["ping"] + randomThreadDelay 100) + + numAlive <- newMVar (0 :: Int) + + -- Kill half of those threads + forkIO . forM_ threads $ \tid -> do + shouldKill <- randomIO + if shouldKill + then randomThreadDelay 600 >> killThread tid + else modifyMVar_ numAlive (return . (+ 1)) + + -- Since it is impossible to predict when the kill exactly happens, we don't + -- know how many connects were opened and how many pings were sent. But we + -- should not have any open connections (if we do, collect will throw an + -- error) and we should have at least the number of pings equal to the number + -- of threads we did *not* kill + eventss <- collect endpoint2 Nothing (Just 1000000) + let actualPings = sum . map (length . snd) $ eventss + expectedPings <- takeMVar numAlive + unless (actualPings >= expectedPings) $ + throwIO (userError "Missing pings") + +-- print (actualPings, expectedPings) + + +-- | Set up conditions with a high likelyhood of "crossing" (for transports +-- that multiplex lightweight connections across heavyweight connections) +testCrossing :: Transport -> Int -> IO () +testCrossing transport numRepeats = do + [aAddr, bAddr] <- replicateM 2 newEmptyMVar + [aDone, bDone] <- replicateM 2 newEmptyMVar + [aTimeout, bTimeout] <- replicateM 2 newEmptyMVar + go <- newEmptyMVar + + let hints = defaultConnectHints { + connectTimeout = Just 5000000 + } + + -- A + forkTry $ do + Right endpoint <- newEndPoint transport + putMVar aAddr (address endpoint) + theirAddress <- readMVar bAddr + + replicateM_ numRepeats $ do + takeMVar go >> yield + -- Because we are creating lots of connections, it's possible that + -- connect times out (for instance, in the TCP transport, + -- Network.Socket.connect may time out). We shouldn't regard this as an + -- error in the Transport, though. + connectResult <- connect endpoint theirAddress ReliableOrdered hints + case connectResult of + Right conn -> close conn + Left (TransportError ConnectTimeout _) -> putMVar aTimeout () + Left (TransportError ConnectFailed _) -> readMVar bTimeout + Left err -> throwIO . userError $ "testCrossed: " ++ show err + putMVar aDone () + + -- B + forkTry $ do + Right endpoint <- newEndPoint transport + putMVar bAddr (address endpoint) + theirAddress <- readMVar aAddr + + replicateM_ numRepeats $ do + takeMVar go >> yield + connectResult <- connect endpoint theirAddress ReliableOrdered hints + case connectResult of + Right conn -> close conn + Left (TransportError ConnectTimeout _) -> putMVar bTimeout () + Left (TransportError ConnectFailed _) -> readMVar aTimeout + Left err -> throwIO . userError $ "testCrossed: " ++ show err + putMVar bDone () + + -- Driver + forM_ [1 .. numRepeats] $ \_i -> do + -- putStrLn $ "Round " ++ show _i + tryTakeMVar aTimeout + tryTakeMVar bTimeout + putMVar go () + putMVar go () + takeMVar aDone + takeMVar bDone + +-- Transport tests +testTransport :: IO (Either String Transport) -> IO () +testTransport newTransport = do + Right transport <- newTransport + runTests + [ ("PingPong", testPingPong transport numPings) + , ("EndPoints", testEndPoints transport numPings) + , ("Connections", testConnections transport numPings) + , ("CloseOneConnection", testCloseOneConnection transport numPings) + , ("CloseOneDirection", testCloseOneDirection transport numPings) + , ("CloseReopen", testCloseReopen transport numPings) + , ("ParallelConnects", testParallelConnects transport numPings) + , ("SendAfterClose", testSendAfterClose transport 1000) + , ("Crossing", testCrossing transport 100) + , ("CloseTwice", testCloseTwice transport 100) + , ("ConnectToSelf", testConnectToSelf transport numPings) + , ("ConnectToSelfTwice", testConnectToSelfTwice transport numPings) + , ("CloseSelf", testCloseSelf newTransport) + , ("CloseEndPoint", testCloseEndPoint transport numPings) + , ("CloseTransport", testCloseTransport newTransport) + , ("ConnectClosedEndPoint", testConnectClosedEndPoint transport) + , ("ExceptionOnReceive", testExceptionOnReceive newTransport) + , ("SendException", testSendException newTransport) + , ("Kill", testKill newTransport 10000) + ] + where + numPings = 10000 :: Int diff --git a/network-transport-tcp/tests/Traced.hs b/network-transport-tcp/tests/Traced.hs new file mode 100644 index 00000000..a7735efa --- /dev/null +++ b/network-transport-tcp/tests/Traced.hs @@ -0,0 +1,191 @@ +-- | Add tracing to the IO monad (see examples). +-- +-- [Usage] +-- +-- > {-# LANGUAGE RebindableSyntax #-} +-- > import Prelude hiding (catch, (>>=), (>>), return, fail) +-- > import Traced +-- +-- [Example] +-- +-- > test1 :: IO Int +-- > test1 = do +-- > Left x <- return (Left 1 :: Either Int Int) +-- > putStrLn "Hello world" +-- > Right y <- return (Left 2 :: Either Int Int) +-- > return (x + y) +-- +-- outputs +-- +-- > Hello world +-- > *** Exception: user error (Pattern match failure in do expression at Traced.hs:187:3-9) +-- > Trace: +-- > 0 Left 2 +-- > 1 Left 1 +-- +-- [Guards] +-- +-- Use the following idiom instead of using 'Control.Monad.guard': +-- +-- > test2 :: IO Int +-- > test2 = do +-- > Left x <- return (Left 1 :: Either Int Int) +-- > True <- return (x == 3) +-- > return x +-- +-- The advantage of this idiom is that it gives you line number information when the guard fails: +-- +-- > *Traced> test2 +-- > *** Exception: user error (Pattern match failure in do expression at Traced.hs:193:3-6) +-- > Trace: +-- > 0 Left 1 +module Traced ( MonadS(..) + , return + , (>>=) + , (>>) + , fail + , ifThenElse + , Showable(..) + , Traceable(..) + , traceShow + ) where + +import Prelude hiding ((>>=), return, fail, catch, (>>)) +import qualified Prelude +import Control.Exception (catches, Handler(..), SomeException, throwIO, Exception(..), IOException) +import Control.Applicative ((<$>)) +import Data.Typeable (Typeable) +import Data.Maybe (catMaybes) +import Data.ByteString (ByteString) +import Data.Int (Int32) +import Control.Concurrent.MVar (MVar) + +-------------------------------------------------------------------------------- +-- MonadS class -- +-------------------------------------------------------------------------------- + +-- | Like 'Monad' but bind is only defined for 'Trace'able instances +class MonadS m where + returnS :: a -> m a + bindS :: Traceable a => m a -> (a -> m b) -> m b + failS :: String -> m a + seqS :: m a -> m b -> m b + +-- | Redefinition of 'Prelude.>>=' +(>>=) :: (MonadS m, Traceable a) => m a -> (a -> m b) -> m b +(>>=) = bindS + +-- | Redefinition of 'Prelude.>>' +(>>) :: MonadS m => m a -> m b -> m b +(>>) = seqS + +-- | Redefinition of 'Prelude.return' +return :: MonadS m => a -> m a +return = returnS + +-- | Redefinition of 'Prelude.fail' +fail :: MonadS m => String -> m a +fail = failS + +-------------------------------------------------------------------------------- +-- Trace typeclass (for adding elements to a trace -- +-------------------------------------------------------------------------------- + +data Showable = forall a. Show a => Showable a + +instance Show Showable where + show (Showable x) = show x + +mapShowable :: (forall a. Show a => a -> Showable) -> Showable -> Showable +mapShowable f (Showable x) = f x + +traceShow :: Show a => a -> Maybe Showable +traceShow = Just . Showable + +class Traceable a where + trace :: a -> Maybe Showable + +instance (Traceable a, Traceable b) => Traceable (Either a b) where + trace (Left x) = (mapShowable $ Showable . (Left :: forall c. c -> Either c ())) <$> trace x + trace (Right y) = (mapShowable $ Showable . (Right :: forall c. c -> Either () c)) <$> trace y + +instance (Traceable a, Traceable b) => Traceable (a, b) where + trace (x, y) = case (trace x, trace y) of + (Nothing, Nothing) -> Nothing + (Just t1, Nothing) -> traceShow t1 + (Nothing, Just t2) -> traceShow t2 + (Just t1, Just t2) -> traceShow (t1, t2) + +instance (Traceable a, Traceable b, Traceable c) => Traceable (a, b, c) where + trace (x, y, z) = case (trace x, trace y, trace z) of + (Nothing, Nothing, Nothing) -> Nothing + (Just t1, Nothing, Nothing) -> traceShow t1 + (Nothing, Just t2, Nothing) -> traceShow t2 + (Just t1, Just t2, Nothing) -> traceShow (t1, t2) + (Nothing, Nothing, Just t3) -> traceShow t3 + (Just t1, Nothing, Just t3) -> traceShow (t1, t3) + (Nothing, Just t2, Just t3) -> traceShow (t2, t3) + (Just t1, Just t2, Just t3) -> traceShow (t1, t2, t3) + +instance Traceable a => Traceable (Maybe a) where + trace Nothing = traceShow (Nothing :: Maybe ()) + trace (Just x) = mapShowable (Showable . Just) <$> trace x + +instance Traceable a => Traceable [a] where + trace = traceShow . catMaybes . map trace + +instance Traceable () where + trace = const Nothing + +instance Traceable Int where + trace = traceShow + +instance Traceable Int32 where + trace = traceShow + +instance Traceable Bool where + trace = const Nothing + +instance Traceable ByteString where + trace = traceShow + +instance Traceable (MVar a) where + trace = const Nothing + +instance Traceable [Char] where + trace = traceShow + +instance Traceable IOException where + trace = traceShow + +-------------------------------------------------------------------------------- +-- IO instance for MonadS -- +-------------------------------------------------------------------------------- + +data TracedException = TracedException [String] SomeException + deriving Typeable + +instance Exception TracedException + +-- | Add tracing to 'IO' (see examples) +instance MonadS IO where + returnS = Prelude.return + bindS = \x f -> x Prelude.>>= \a -> catches (f a) (traceHandlers a) + failS = Prelude.fail + seqS = (Prelude.>>) + +instance Show TracedException where + show (TracedException ts ex) = + show ex ++ "\nTrace:\n" ++ unlines (map (\(i, t) -> show i ++ "\t" ++ t) (zip ([0..] :: [Int]) (take 10 . reverse $ ts))) + +traceHandlers :: Traceable a => a -> [Handler b] +traceHandlers a = case trace a of + Nothing -> [ Handler $ \ex -> throwIO (ex :: SomeException) ] + Just t -> [ Handler $ \(TracedException ts ex) -> throwIO $ TracedException (show t : ts) ex + , Handler $ \ex -> throwIO $ TracedException [show t] (ex :: SomeException) + ] + +-- | Definition of 'ifThenElse' for use with RebindableSyntax +ifThenElse :: Bool -> a -> a -> a +ifThenElse True x _ = x +ifThenElse False _ y = y diff --git a/network-transport/LICENSE b/network-transport/LICENSE index bbc98067..f3459e44 100644 --- a/network-transport/LICENSE +++ b/network-transport/LICENSE @@ -1,4 +1,4 @@ -Copyright Well-Typed LLP, 2011 +Copyright Well-Typed LLP, 2011-2012 All rights reserved. diff --git a/network-transport/network-transport.cabal b/network-transport/network-transport.cabal index db933017..0979c44c 100644 --- a/network-transport/network-transport.cabal +++ b/network-transport/network-transport.cabal @@ -1,80 +1,75 @@ Name: network-transport -Version: 0.1.0 -Description: Network Transport -Author: Duncan Coutts, Nicolas Wu, Edsko de Vries -Maintainer: dcoutts@well-typed.com +Version: 0.2.0 +Cabal-Version: >=1.2.3 +Build-Type: Simple License: BSD3 -License-file: LICENSE +License-File: LICENSE +Copyright: Well-Typed LLP +Author: Duncan Coutts, Nicolas Wu, Edsko de Vries +Maintainer: edsko@well-typed.com, dcoutts@well-typed.com +Stability: experimental +Homepage: http://github.com/haskell-distributed/distributed-process +Bug-Reports: mailto:edsko@well-typed.com Synopsis: Network abstraction layer +Description: "Network.Transport" is a Network Abstraction Layer which provides + the following high-level concepts: + . + * Nodes in the network are represented by 'EndPoint's. These are + heavyweight stateful objects. + . + * Each 'EndPoint' has an 'EndPointAddress'. + . + * Connections can be established from one 'EndPoint' to another + using the 'EndPointAddress' of the remote end. + . + * The 'EndPointAddress' can be serialised and sent over the + network, where as 'EndPoint's and connections cannot. + . + * Connections between 'EndPoint's are unidirectional and lightweight. + . + * Outgoing messages are sent via a 'Connection' object that + represents the sending end of the connection. + . + * Incoming messages for /all/ of the incoming connections on + an 'EndPoint' are collected via a shared receive queue. + . + * In addition to incoming messages, 'EndPoint's are notified of + other 'Event's such as new connections or broken connections. + . + This design was heavily influenced by the design of the Common + Communication Interface + (). + Important design goals are: + . + * Connections should be lightweight: it should be no problem to + create thousands of connections between endpoints. + . + * Error handling is explicit: every function declares as part of + its type which errors it can return (no exceptions are thrown) + . + * Error handling is "abstract": errors that originate from + implementation specific problems (such as "no more sockets" in + the TCP implementation) get mapped to generic errors + ("insufficient resources") at the Transport level. + . + This package provides the generic interface only; you will + probably also want to install at least one transport + implementation (network-transport-*). +Tested-With: GHC==7.0.4 GHC==7.2.2 GHC==7.4.1 GHC==7.4.2 Category: Network -Homepage: http://github.com/haskell-distributed -Build-Type: Simple -Cabal-Version: >=1.9.2 Library - Build-Depends: base >= 4 && < 5, - bytestring, - containers, - data-accessor, - network, - mtl, - binary, - transformers - Exposed-modules: Network.Transport, - Network.Transport.Chan, - Network.Transport.TCP, + Build-Depends: base >= 4.3 && < 5, + binary >= 0.5 && < 0.6, + bytestring >= 0.9 && < 0.10, + transformers >= 0.2 && < 0.4 + Exposed-Modules: Network.Transport, Network.Transport.Util Network.Transport.Internal - Network.Transport.Internal.TCP - extensions: OverloadedStrings, ForeignFunctionInterface, DeriveDataTypeable, RankNTypes, ScopedTypeVariables, GeneralizedNewtypeDeriving - ghc-options: -Wall -fno-warn-unused-do-bind + Extensions: ForeignFunctionInterface, + RankNTypes, + ScopedTypeVariables, + DeriveDataTypeable, + GeneralizedNewtypeDeriving + GHC-Options: -Wall -fno-warn-unused-do-bind HS-Source-Dirs: src - -Test-Suite TestTCP - Type: exitcode-stdio-1.0 - Main-Is: TestTCP.hs - Build-Depends: base >= 4, - bytestring, - containers, - data-accessor, - network, - mtl, - transformers, - ansi-terminal, - binary, - random - extensions: OverloadedStrings, ForeignFunctionInterface, DeriveDataTypeable, RankNTypes, ScopedTypeVariables, GeneralizedNewtypeDeriving, ExistentialQuantification, FlexibleInstances, OverlappingInstances - ghc-options: -Wall -fno-warn-unused-do-bind -threaded -rtsopts -with-rtsopts=-N - HS-Source-Dirs: tests src - -Test-Suite TestMulticastInMemory - Type: exitcode-stdio-1.0 - Main-Is: TestMulticastInMemory.hs - Build-Depends: base >= 4, - bytestring, - containers, - data-accessor, - mtl, - transformers, - ansi-terminal, - binary, - random - extensions: OverloadedStrings, ForeignFunctionInterface, DeriveDataTypeable, RankNTypes, ScopedTypeVariables, GeneralizedNewtypeDeriving, ExistentialQuantification, FlexibleInstances, OverlappingInstances - ghc-options: -Wall -fno-warn-unused-do-bind - HS-Source-Dirs: tests src - -Test-Suite TestInMemory - Type: exitcode-stdio-1.0 - Main-Is: TestInMemory.hs - Build-Depends: base >= 4, - bytestring, - containers, - data-accessor, - mtl, - transformers, - ansi-terminal, - binary, - random - extensions: OverloadedStrings, ForeignFunctionInterface, DeriveDataTypeable, RankNTypes, ScopedTypeVariables, GeneralizedNewtypeDeriving, ExistentialQuantification, FlexibleInstances, OverlappingInstances - ghc-options: -Wall -fno-warn-unused-do-bind - HS-Source-Dirs: tests src diff --git a/network-transport/src/Network/Transport.hs b/network-transport/src/Network/Transport.hs index 684e45ab..6c327706 100644 --- a/network-transport/src/Network/Transport.hs +++ b/network-transport/src/Network/Transport.hs @@ -130,13 +130,13 @@ instance Show MulticastAddress where -- placeholders only. -- -------------------------------------------------------------------------------- --- Hints used by 'connect' +-- | Hints used by 'connect' data ConnectHints = ConnectHints { -- Timeout connectTimeout :: Maybe Int } --- Default hints for connecting +-- | Default hints for connecting defaultConnectHints :: ConnectHints defaultConnectHints = ConnectHints { connectTimeout = Nothing