Permalink
Browse files

Merged

Abhishek Kulkarni's intercommunicator client/server function support
  • Loading branch information...
2 parents 94b3931 + d322638 commit 818bbceb9cd70188676689c48a25967042f9d5c6 @bjpop committed Aug 25, 2012
View
@@ -23,5 +23,6 @@ docs/HCAR/Issue19/template.pdf
/test/examples/programs/speed/BidirectionalBandwidth
/test/examples/programs/speed/AllToAll
/test/examples/programs/speed/serializable/Bandwidth
+/test/examples/clientserver/
/sender.log
/receivers.log
View
@@ -106,3 +106,9 @@ AusHac Australian Haskell Hackathon, hosted at UNSW from the 16th to the
The next major injection of effort happened when Dmitry Astapov started
contributing to the project in August 2010.
+
+Contributions have also been made by:
+
+ - Abhishek Kulkarni: support for MPI-2 intercommunicator client/server
+ functions
+ - Andres Löh: bug fixes
View
@@ -69,14 +69,16 @@ description:
category: FFI, Distributed Computing
license: BSD3
license-file: LICENSE
-copyright: (c) 2010 Bernard James Pope, Dmitry Astapov
+copyright: (c) 2010, 2011, 2012 Bernard James Pope, Dmitry Astapov, Abhishek Kulkarni, Andres Löh
author: Bernard James Pope (Bernie Pope)
maintainer: florbitous@gmail.com
homepage: http://github.com/bjpop/haskell-mpi
build-type: Simple
stability: experimental
tested-with: GHC==6.10.4, GHC==6.12.1
extra-source-files: src/cbits/*.c src/include/*.h README.txt
+ test/examples/clientserver/*.c
+ test/examples/clientserver/*.hs
test/examples/HaskellAndC/Makefile
test/examples/HaskellAndC/*.c
test/examples/HaskellAndC/*.hs
@@ -100,11 +102,12 @@ flag mpich14
default: False
Library
- extra-libraries: mpi
if flag(mpich14)
- extra-libraries: mpl
+ extra-libraries: mpich, opa, mpl
+ else
+ extra-libraries: mpi, open-rte, open-pal
build-tools: c2hs
- ghc-options: -Wall -fno-warn-name-shadowing -fno-warn-orphans
+ ghc-options: -O2 -Wall -fno-warn-name-shadowing -fno-warn-orphans
c-sources:
src/cbits/init_wrapper.c,
src/cbits/constants.c
@@ -132,9 +135,10 @@ executable haskell-mpi-testsuite
./test
./src
build-tools: c2hs
- extra-libraries: mpi
if flag(mpich14)
- extra-libraries: mpl
+ extra-libraries: mpich, opa, mpl
+ else
+ extra-libraries: mpi, open-rte, open-pal
ghc-options: -Wall -fno-warn-name-shadowing -fno-warn-orphans
c-sources:
src/cbits/init_wrapper.c,
@@ -46,6 +46,7 @@ module Control.Parallel.MPI.Base
, commTestInter
, commRemoteSize
, commCompare
+ , commFree
, commSetErrhandler
, commGetErrhandler
, commGroup
@@ -149,6 +150,11 @@ module Control.Parallel.MPI.Base
, commSpawnSimple
, argvNull
, errcodesIgnore
+ , openPort
+ , closePort
+ , commAccept
+ , commConnect
+ , commDisconnect
-- * Error handling.
, MPIError(..)
@@ -48,7 +48,7 @@ module Control.Parallel.MPI.Internal
Comm, commWorld, commSelf, commNull, commTestInter,
commSize, commRemoteSize,
commRank,
- commCompare, commGroup, commGetAttr,
+ commCompare, commFree, commGroup, commGetAttr,
-- ** Process groups.
Group, groupEmpty, groupRank, groupSize, groupUnion,
@@ -59,6 +59,7 @@ module Control.Parallel.MPI.Internal
-- ** Dynamic process management
commGetParent, commSpawn, commSpawnSimple, argvNull, errcodesIgnore,
+ openPort, closePort, commAccept, commConnect, commDisconnect,
-- * Error handling.
Errhandler, errorsAreFatal, errorsReturn, errorsThrowExceptions, commSetErrhandler, commGetErrhandler,
@@ -148,6 +149,8 @@ type MPIComm = {# type MPI_Comm #}
-}
newtype Comm = MkComm { fromComm :: MPIComm } deriving Eq
peekComm ptr = MkComm <$> peek ptr
+withComm comm f = alloca $ \ptr -> do poke ptr (fromComm comm)
+ f (castPtr ptr)
foreign import ccall "&mpi_comm_world" commWorld_ :: Ptr MPIComm
foreign import ccall "&mpi_comm_self" commSelf_ :: Ptr MPIComm
@@ -163,12 +166,17 @@ commSelf :: Comm
commSelf = MkComm <$> unsafePerformIO $ peek commSelf_
foreign import ccall "&mpi_max_processor_name" max_processor_name_ :: Ptr CInt
+foreign import ccall "&mpi_max_port_name" max_port_name_ :: Ptr CInt
foreign import ccall "&mpi_max_error_string" max_error_string_ :: Ptr CInt
-- | Max length of "processor name" as returned by 'getProcessorName'
maxProcessorName :: CInt
maxProcessorName = unsafePerformIO $ peek max_processor_name_
+-- | Max length of "port name" as returned by 'openPort'
+maxPortName :: CInt
+maxPortName = unsafePerformIO $ peek max_port_name_
+
-- | Max length of error description as returned by 'errorString'
maxErrorString :: CInt
maxErrorString = unsafePerformIO $ peek max_error_string_
@@ -187,14 +195,14 @@ maxErrorString = unsafePerformIO $ peek max_error_string_
-- may be called before the MPI environment has been initialized and after it
-- has been finalized.
-- This function corresponds to @MPI_Initialized@.
-{# fun unsafe Initialized as ^ {alloca- `Bool' peekBool*} -> `()' checkError*- #}
+{# fun unsafe Initialized as ^ {alloca- `Bool' peekBool*} -> `()' discard*- #}
-- | Determine if the MPI environment has been finalized. Returns @True@ if the
-- environment has been finalized and @False@ otherwise. This function
-- may be called before the MPI environment has been initialized and after it
-- has been finalized.
-- This function corresponds to @MPI_Finalized@.
-{# fun unsafe Finalized as ^ {alloca- `Bool' peekBool*} -> `()' checkError*- #}
+{# fun unsafe Finalized as ^ {alloca- `Bool' peekBool*} -> `()' discard*- #}
-- | Initialize the MPI environment with a /required/ level of thread support.
-- See the documentation for 'init' for more information about MPI initialization.
@@ -838,12 +846,54 @@ foreign import ccall unsafe "&mpi_errcodes_ignore" mpiErrcodesIgnore_ :: Ptr (Pt
argvNull = unsafePerformIO $ peek mpiArgvNull_
errcodesIgnore = unsafePerformIO $ peek mpiErrcodesIgnore_
-{-| Simplified version of `commSpawn' that does not support argument passing and spawn error code checking -}
+{-| Simplified version of `commSpawn' that does not support argument passing and spawn error code checking. -}
commSpawnSimple rank program maxprocs =
commSpawn program argvNull maxprocs infoNull rank commSelf errcodesIgnore
foreign import ccall "&mpi_undefined" mpiUndefined_ :: Ptr CInt
+{-| Opens up a port (network address) on the server where clients
+ can establish connections using @commConnect@.
+
+Refer to MPI Report v2.2, Section 10.4 "Establishing communication"
+for more details on client/server programming with MPI. -}
+openPort :: Info -> IO String
+openPort info = do
+ allocaBytes (fromIntegral maxPortName) $ \ptr -> do
+ openPort' info ptr
+ peekCStringLen (ptr, fromIntegral maxPortName)
+ where
+ openPort' = {# fun unsafe Open_port as openPort_
+ {fromInfo `Info', id `Ptr CChar'} -> `()' checkError*- #}
+
+-- | Closes the specified port on the server.
+{# fun unsafe Close_port as ^
+ {`String'} -> `()' checkError*- #}
+
+{-| @commAccept@ allows a connection from a client. The intercommunicator
+ object returned can be used to communicate with the client. -}
+{# fun unsafe Comm_accept as ^
+ { `String'
+ , fromInfo `Info'
+ , fromRank `Rank'
+ , fromComm `Comm'
+ , alloca- `Comm' peekComm*} -> `()' checkError*- #}
+
+{-| @commConnect@ creates a connection to the server. The intercommunicator
+ object returned can be used to communicate with the server. -}
+{# fun unsafe Comm_connect as ^
+ { `String'
+ , fromInfo `Info'
+ , fromRank `Rank'
+ , fromComm `Comm'
+ , alloca- `Comm' peekComm*} -> `()' checkError*- #}
+
+-- | Free a communicator object.
+{# fun Comm_free as ^ {withComm* `Comm'} -> `()' checkError*- #}
+
+-- | Stop pending communication and deallocate a communicator object.
+{# fun Comm_disconnect as ^ {withComm* `Comm'} -> `()' checkError*- #}
+
-- | Predefined constant that might be returned as @Rank@ by calls
-- like 'groupTranslateRanks'. Corresponds to @MPI_UNDEFINED@. Please
-- refer to \"MPI Report Constant And Predefined Handle Index\" for a
View
@@ -40,6 +40,7 @@ MPI_CONST (int, mpi_graph, MPI_GRAPH)
MPI_CONST (int, mpi_universe_size, MPI_UNIVERSE_SIZE)
MPI_CONST (char **, mpi_argv_null, MPI_ARGV_NULL)
MPI_CONST (int *, mpi_errcodes_ignore, MPI_ERRCODES_IGNORE)
+MPI_CONST (int, mpi_max_port_name, MPI_MAX_PORT_NAME)
/* MPI predefined handles */
MPI_CONST (MPI_Comm, mpi_comm_world, MPI_COMM_WORLD)
View
@@ -23,6 +23,7 @@ otherTests threadSupport _ =
, testCase "test requestNull" $ testRequestNull
, testCase "Info objects" $ testInfoObjects
, testCase "anySource/anySize values" anySourceTagTest
+ , testCase "openClosePort" openClosePortTest
]
queryThreadTest :: ThreadSupport -> IO ()
@@ -112,3 +113,8 @@ anySourceTagTest = do
else putStrLn ("anySource is not -1, but rather " ++ show anySource)
if (anyTag) == (toEnum (-1)) then return ()
else putStrLn ("anyTag is not -1, but rather " ++ show anyTag)
+
+openClosePortTest :: IO ()
+openClosePortTest = do
+ port <- openPort infoNull
+ closePort port
@@ -0,0 +1,28 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include "mpi.h"
+
+int main( int argc, char **argv )
+{
+ MPI_Comm server;
+ char port_name[MPI_MAX_PORT_NAME];
+ int i, tag;
+
+ if (argc < 2) {
+ fprintf(stderr, "server port name required.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ MPI_Init(&argc, &argv);
+ strcpy(port_name, argv[1]); /* assume server's name is cmd-line arg */
+ MPI_Comm_connect(port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &server);
+ for (i = 0; i < 5; i++) {
+ tag = 2; /* Action to perform */
+ MPI_Send(&i, 1, MPI_INT, 0, tag, server);
+ }
+ MPI_Send(&i, 0, MPI_INT, 0, 1, server);
+ MPI_Comm_disconnect(&server);
+ MPI_Finalize();
+ return 0;
+}
@@ -0,0 +1,27 @@
+{-# LANGUAGE ScopedTypeVariables #-}
+
+-- Based on the simple client-server example from page 326/327 of
+-- "MPI Standard version 2.2"
+
+module Main where
+
+import System.Environment (getArgs)
+import System.Exit
+import Foreign.C.Types
+import Control.Monad (forM_, when)
+import Control.Parallel.MPI.Fast
+
+main :: IO ()
+main = do
+ args <- getArgs
+ when (length args /= 1) $ do
+ putStr "server port name required.\n"
+ exitWith (ExitFailure 1)
+ sendRequest $ head args
+
+sendRequest :: String -> IO ()
+sendRequest port = mpi $ do
+ server <- commConnect port infoNull 0 commWorld
+ forM_ [0..4] $ \(i::CInt) -> send server 0 2 i
+ send server 0 1 (0xdeadbeef::CInt)
+ commDisconnect server
@@ -0,0 +1,25 @@
+EXE = ServerC ClientC ServerH ClientH
+all: $(EXE)
+
+# Set C compiler to use -m32 if ghc is set to produce 32 bit executables
+# as is usually (always?) the case on OS X and ghc 6.12
+
+ServerC: Server.c
+ mpicc -O2 -Wall Server.c -o ServerC
+# mpicc -m32 -O2 -Wall Server.c -o ServerC
+
+ClientC: Client.c
+ mpicc -O2 -Wall Client.c -o ClientC
+# mpicc -m32 -O2 -Wall Server.c -o ServerC
+
+ServerH: Server.hs
+ ghc --make -O2 Server.hs -o ServerH
+
+ClientH: Client.hs
+ ghc --make -O2 Client.hs -o ClientH
+
+clean:
+ /bin/rm -f *.o *.hi
+
+clobber: clean
+ /bin/rm -f $(EXE)
@@ -0,0 +1,45 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include "mpi.h"
+
+int main(int argc, char **argv)
+{
+ MPI_Comm client;
+ MPI_Status status;
+ char port_name[MPI_MAX_PORT_NAME];
+ int size, again, i;
+
+ MPI_Init(&argc, &argv);
+ MPI_Comm_size(MPI_COMM_WORLD, &size);
+ if (size != 1) {
+ fprintf(stderr, "Server too big");
+ exit(EXIT_FAILURE);
+ }
+
+ MPI_Open_port(MPI_INFO_NULL, port_name);
+ printf("Server available at port: %s\n", port_name);
+ while (1) {
+ MPI_Comm_accept(port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &client);
+ again = 1;
+ while (again) {
+ MPI_Recv(&i, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, client, &status);
+ switch (status.MPI_TAG) {
+ case 0:
+ MPI_Comm_free(&client);
+ MPI_Close_port(port_name);
+ MPI_Finalize();
+ return 0;
+ case 1:
+ MPI_Comm_disconnect(&client);
+ again = 0;
+ break;
+ case 2: /* do something */
+ printf("Received: %d\n", i);
+ break;
+ default:
+ /* Unexpected message type */
+ MPI_Abort(MPI_COMM_WORLD, 1);
+ }
+ }
+ }
+}
@@ -0,0 +1,37 @@
+{-# LANGUAGE ScopedTypeVariables #-}
+
+-- Based on the simple client-server example from page 326/327 of
+-- "MPI Standard version 2.2"
+
+module Main where
+
+import System.Exit
+import Foreign.C.Types
+import Control.Monad (forever)
+import Control.Parallel.MPI.Fast
+
+main :: IO ()
+main = mpi $ do
+ size <- commSize commWorld
+ if size == 1
+ then do
+ port <- openPort infoNull
+ putStrLn $ "Server available at port: " ++ show port ++ "."
+ forever $ do
+ clientComm <- commAccept port infoNull 0 commWorld
+ handleRequest port clientComm
+ else putStrLn $ "Server too big."
+
+handleRequest :: String -> Comm -> IO ()
+handleRequest port client = do
+ (msg::CInt, status) <- intoNewVal $ recv client anySource anyTag
+ case (status_tag status) of
+ 0 -> do
+ commFree client
+ closePort port
+ exitWith (ExitFailure 1)
+ 1 -> commDisconnect client
+ 2 -> do
+ putStrLn $ "Received: " ++ (show msg)
+ handleRequest port client
+ _ -> abort commWorld 1

0 comments on commit 818bbce

Please sign in to comment.