Skip to content

Commit

Permalink
Cleaned up examples and benchmarks. Found strange bug. Tweaked Proces…
Browse files Browse the repository at this point in the history
…s.hs for strict bytestrings.

The bug is that DemoTransport.hs will hang when trying to "killThread"
(Pipes backend, demo3 and demo4), but ONLY under GHC 7.0.4 and 7.2.1.
Under GHC 6.12.3 and GHC 7.4.1 it works fine!

At first I thought this may be an issue with non-allocating threads not
being preempted by the runtime system (and therefore not servicing the
ThreadKilled asynchronous exception).  But it's hard to explain that
pattern of outcomes on different GHC versions.
  • Loading branch information
rrnewton committed Feb 25, 2012
1 parent ff8d9c6 commit 48e257a
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 24 deletions.
6 changes: 4 additions & 2 deletions distributed-process/src/Control/Distributed/Process.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ module Control.Distributed.Process (
import qualified Network.Transport as Trans
import Network.Transport (Transport)

import qualified Data.ByteString.Lazy.Char8 as BS
import Data.ByteString.Lazy.Char8 (ByteString)
-- import qualified Data.ByteString.Lazy.Char8 as BS
-- import Data.ByteString.Lazy.Char8 (ByteString)
import qualified Data.ByteString.Char8 as BS
import Data.ByteString.Char8 (ByteString)
import qualified Data.IntMap as IntMap
import Data.IntMap (IntMap)
import Control.Applicative
Expand Down
4 changes: 3 additions & 1 deletion examples/DemoProcess.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module DemoProcess where
module Main where

import Control.Distributed.Process
import Control.Concurrent (threadDelay)
Expand Down Expand Up @@ -32,3 +32,5 @@ logger n = do
str <- expect
liftIO . putStrLn $ show n ++ ": " ++ str
logger (n+1)

main = demo1
17 changes: 15 additions & 2 deletions examples/DemoTransport.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import Network.Transport.TCP (mkTransport, TCPConfig (..))
import Control.Concurrent
import Control.Monad

import qualified Data.ByteString.Lazy.Char8 as BS
-- import qualified Data.ByteString.Lazy.Char8 as BS
import qualified Data.ByteString.Char8 as BS

import Data.IORef
import Debug.Trace
Expand All @@ -23,6 +24,9 @@ tcp = mkTCPOff 8080
mvar = return Network.Transport.MVar.mkTransport
pipes = return Network.Transport.Pipes.mkTransport

tcp :: IO (IO Transport)
mvar :: IO (IO Transport)
pipes :: IO (IO Transport)

-------------------------------------------
-- Example programs using backend directly
Expand Down Expand Up @@ -103,11 +107,13 @@ demo3 mktrans = do
send sourceEnd2 [BS.pack "hello2"]

threadDelay 100000

putStrLn "demo3: Time, up, killing threads & closing transports.."
killThread threadId1
killThread threadId2
putStrLn "demo3: Threads killed."
closeTransport trans1
closeTransport trans2
putStrLn "demo3: Done."


-- | Check that two different connections on the same transport can be created.
Expand All @@ -129,9 +135,14 @@ demo4 mktrans = do

threadDelay 100000

putStrLn "demo4: Time, up, killing threads & closing transports.."
killThread threadId1
killThread threadId2
putStrLn "demo4: Threads killed."
closeTransport trans
putStrLn "demo4: Done."



--------------------------------------------------------------------------------

Expand All @@ -151,10 +162,12 @@ mkTCPOff off = do
runWAllTranports :: (IO Transport -> IO ()) -> Int -> IO ()
runWAllTranports demo offset = do
putStrLn "------------------------------------------------------------"
{-
putStrLn " MVAR transport:"
mvar >>= demo
putStrLn "\n TCP transport:"
tcp >>= demo
-}
putStrLn "\n PIPES transport:"
pipes >>= demo
putStrLn "\n"
Expand Down
15 changes: 11 additions & 4 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ ifeq ($(GHC),)
GHC=ghc
endif

all:
$(GHC) -threaded --make -i../network-transport/src DemoTransport.hs -o DemoTransport.exe -fforce-recomp
INCLUDES= -i../network-transport/src -i../network-transport-pipes/src -i../distributed-process/src

EXMPLS= DemoTransport.exe DemoProcess.exe

clean:
rm -f *.hi *.o DemoTransport.exe
all: $(EXMPLS)

$(EXMPLS): %.exe: %.hs
$(GHC) -O2 -rtsopts -threaded --make $(INCLUDES) $< -o $@

# transport:
# $(GHC) -threaded --make -i../network-transport/src DemoTransport.hs -o DemoTransport.exe -fforce-recomp

clean:
rm -f *.hi *.o $(EXMPLS)
7 changes: 4 additions & 3 deletions network-transport-pipes/network-transport-pipes.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ Cabal-Version: >=1.2

Library
Build-Depends: base >= 3 && < 5,
binary >= 0.5,
-- binary >= 0.5,
cereal,
bytestring >= 0.9,
network-transport >= 0.0.1.1,
containers >= 0.4,
-- containers >= 0.4,
containers >= 0.3,
-- network >= 2.3,
-- safe >= 0.3,
unix >= 2.5.0.0,
unix >= 2.4,
random,
directory,
unix-bytestring >= 0.3.5.3
Expand Down
28 changes: 18 additions & 10 deletions network-transport-pipes/src/Network/Transport/Pipes.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

-- * Add support for messages greater than 4096 bytes.
-- * debug ODD problem with CEREAL below
-- * switch to unix-bytestring after that package is updated for 7.4.1

----------------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -88,9 +87,12 @@ fileFlags =

mkTransport :: IO Transport
mkTransport = do
uid <- randomIO :: IO Word64
-- uid <- randomIO :: IO Word64
-- For backwards compatibility we shouldn't assume a Random instance for Word64:
uid <- randomIO :: IO Int

lock <- newMVar ()
let filename = "/tmp/pipe_"++show uid
let filename = "/tmp/pipe_"++show (abs uid)
createNamedPipe filename $ unionFileModes ownerReadMode ownerWriteMode

return Transport
Expand Down Expand Up @@ -128,6 +130,7 @@ mkTransport = do
SourceEnd
{ send = \bsls -> do
-- ThreadSafe: This may happen on multiple processes/threads:
-- We don't need to lock because the write is atomic (for <4096):
let msgsize :: Word32 = fromIntegral$ foldl' (\n s -> n + BS.length s) 0 bsls
when (msgsize > 4096)$ -- TODO, look up PIPE_BUF in foreign code
error "Message larger than blocksize written atomically to a named pipe. Unimplemented."
Expand Down Expand Up @@ -162,10 +165,14 @@ mkTransport = do
{ receive = do
fd <- readMVar mv -- Make sure Fd is there before

-- This should only happen on a single process. But it may
-- This should only happen on a *single* process. But it may
-- happen on multiple threads so we grab a lock.

-- If we don't lock, we cannot atomically read the length
-- and THEN read the payload. (A single read could be
-- atomic, but not two reads.)
takeMVar lock

let spinread :: Int -> IO BS.ByteString
spinread desired = do

Expand All @@ -174,23 +181,24 @@ mkTransport = do

case BS.length bs of
n | n == desired -> return bs
0 -> do threadDelay (10*1000)
spinread desired
-- Because we're in non-blocking mode we deal with failed reads thusly:
-- 0 -> do threadDelay (10*1000)
-- spinread desired
0 -> error$ "receive: read zero bytes from pipe. Even in non-blocking mode should have had an EAGAIN error instead."
l -> error$ "Inclomplete read expected either 0 bytes or complete msg ("++
show desired ++" bytes) got "++ show l ++ " bytes"

hdr <- spinread sizeof_header
let -- decoded :: BSS.ByteString
let
#ifdef CEREAL
-- decoded = decode (BSS.concat$ BS.toChunks hdr)
decoded = decode hdr
#else
-- decoded = decode hdr
decoded = decode (BS.fromChunks [hdr])
#endif
payload <- case decoded of
Left err -> error$ "ERROR: "++ err
Right size -> spinread (fromIntegral (size::Word32))

putMVar lock ()
return [payload] -- How terribly listy.
, closeTargetEnd = do
Expand Down
4 changes: 2 additions & 2 deletions network-transport/network-transport.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ Cabal-Version: >=1.2

Library
Build-Depends: base >= 3 && < 5,
binary >= 0.5,
-- binary >= 0.5,
bytestring >= 0.9,
cereal >= 0.3,
containers >= 0.4,
network >= 2.3,
safe >= 0.3,
unix >= 2.5.0.0
unix >= 2.4
Exposed-modules: Network.Transport,
Network.Transport.MVar,
Network.Transport.TCP
Expand Down

1 comment on commit 48e257a

@rrnewton
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note -- I relaxed the dependencies here to work on older GHC's. At least as far back as 6.12.3.

Please sign in to comment.