Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Add support for MPI-2 intercommunicator client/server functions #11

Merged
merged 6 commits into from

2 participants

@adk9

The added functions are:

MPI_Open_port
MPI_Close_port
MPI_Comm_accept
MPI_Comm_connect
MPI_Comm_disconnect

These are detailed in section 10.4 of the MPI-2.2 standard, and are useful for writing client/server applications in MPI.

@bjpop bjpop merged commit d56e34e into bjpop:master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Feb 24, 2012
  1. @adk9

    Add support for MPI-2 intercommunicator communication

    adk9 authored
    functions like:
    MPI_Open_port
    MPI_Close_port
    MPI_Comm_accept
    MPI_Comm_connect
    MPI_Comm_disconnect
  2. @adk9
  3. @adk9

    fix indentation issues

    adk9 authored
Commits on Mar 26, 2012
  1. @adk9

    update gitignore

    adk9 authored
  2. @adk9
  3. @adk9
This page is out of date. Refresh to see the latest.
View
1  .gitignore
@@ -23,5 +23,6 @@ docs/HCAR/Issue19/template.pdf
/test/examples/programs/speed/BidirectionalBandwidth
/test/examples/programs/speed/AllToAll
/test/examples/programs/speed/serializable/Bandwidth
+/test/examples/clientserver/
/sender.log
/receivers.log
View
2  haskell-mpi.cabal
@@ -77,6 +77,8 @@ build-type: Simple
stability: experimental
tested-with: GHC==6.10.4, GHC==6.12.1
extra-source-files: src/cbits/*.c src/include/*.h README.txt
+ test/examples/clientserver/*.c
+ test/examples/clientserver/*.hs
test/examples/HaskellAndC/Makefile
test/examples/HaskellAndC/*.c
test/examples/HaskellAndC/*.hs
View
6 src/Control/Parallel/MPI/Base.hs
@@ -46,6 +46,7 @@ module Control.Parallel.MPI.Base
, commTestInter
, commRemoteSize
, commCompare
+ , commFree
, commSetErrhandler
, commGetErrhandler
, commGroup
@@ -149,6 +150,11 @@ module Control.Parallel.MPI.Base
, commSpawnSimple
, argvNull
, errcodesIgnore
+ , openPort
+ , closePort
+ , commAccept
+ , commConnect
+ , commDisconnect
-- * Error handling.
, MPIError(..)
View
54 src/Control/Parallel/MPI/Internal.chs
@@ -48,7 +48,7 @@ module Control.Parallel.MPI.Internal
Comm, commWorld, commSelf, commNull, commTestInter,
commSize, commRemoteSize,
commRank,
- commCompare, commGroup, commGetAttr,
+ commCompare, commFree, commGroup, commGetAttr,
-- ** Process groups.
Group, groupEmpty, groupRank, groupSize, groupUnion,
@@ -59,6 +59,7 @@ module Control.Parallel.MPI.Internal
-- ** Dynamic process management
commGetParent, commSpawn, commSpawnSimple, argvNull, errcodesIgnore,
+ openPort, closePort, commAccept, commConnect, commDisconnect,
-- * Error handling.
Errhandler, errorsAreFatal, errorsReturn, errorsThrowExceptions, commSetErrhandler, commGetErrhandler,
@@ -148,6 +149,8 @@ type MPIComm = {# type MPI_Comm #}
-}
newtype Comm = MkComm { fromComm :: MPIComm } deriving Eq
peekComm ptr = MkComm <$> peek ptr
+withComm comm f = alloca $ \ptr -> do poke ptr (fromComm comm)
+ f (castPtr ptr)
foreign import ccall "&mpi_comm_world" commWorld_ :: Ptr MPIComm
foreign import ccall "&mpi_comm_self" commSelf_ :: Ptr MPIComm
@@ -163,12 +166,17 @@ commSelf :: Comm
commSelf = MkComm <$> unsafePerformIO $ peek commSelf_
foreign import ccall "&mpi_max_processor_name" max_processor_name_ :: Ptr CInt
+foreign import ccall "&mpi_max_port_name" max_port_name_ :: Ptr CInt
foreign import ccall "&mpi_max_error_string" max_error_string_ :: Ptr CInt
-- | Max length of "processor name" as returned by 'getProcessorName'
maxProcessorName :: CInt
maxProcessorName = unsafePerformIO $ peek max_processor_name_
+-- | Max length of "port name" as returned by 'openPort'
+maxPortName :: CInt
+maxPortName = unsafePerformIO $ peek max_port_name_
+
-- | Max length of error description as returned by 'errorString'
maxErrorString :: CInt
maxErrorString = unsafePerformIO $ peek max_error_string_
@@ -838,10 +846,52 @@ foreign import ccall unsafe "&mpi_errcodes_ignore" mpiErrcodesIgnore_ :: Ptr (Pt
argvNull = unsafePerformIO $ peek mpiArgvNull_
errcodesIgnore = unsafePerformIO $ peek mpiErrcodesIgnore_
-{-| Simplified version of `commSpawn' that does not support argument passing and spawn error code checking -}
+{-| Simplified version of `commSpawn' that does not support argument passing and spawn error code checking. -}
commSpawnSimple rank program maxprocs =
commSpawn program argvNull maxprocs infoNull rank commSelf errcodesIgnore
+{-| Opens up a port (network address) on the server where clients
+ can establish connections using @commConnect@.
+
+Refer to MPI Report v2.2, Section 10.4 "Establishing communication"
+for more details on client/server programming with MPI. -}
+openPort :: Info -> IO String
+openPort info = do
+ allocaBytes (fromIntegral maxPortName) $ \ptr -> do
+ openPort' info ptr
+ peekCStringLen (ptr, fromIntegral maxPortName)
+ where
+ openPort' = {# fun unsafe Open_port as openPort_
+ {fromInfo `Info', id `Ptr CChar'} -> `()' checkError*- #}
+
+-- | Closes the specified port on the server.
+{# fun unsafe Close_port as ^
+ {`String'} -> `()' checkError*- #}
+
+{-| @commAccept@ allows a connection from a client. The intercommunicator
+ object returned can be used to communicate with the client. -}
+{# fun unsafe Comm_accept as ^
+ { `String'
+ , fromInfo `Info'
+ , fromRank `Rank'
+ , fromComm `Comm'
+ , alloca- `Comm' peekComm*} -> `()' checkError*- #}
+
+{-| @commConnect@ creates a connection to the server. The intercommunicator
+ object returned can be used to communicate with the server. -}
+{# fun unsafe Comm_connect as ^
+ { `String'
+ , fromInfo `Info'
+ , fromRank `Rank'
+ , fromComm `Comm'
+ , alloca- `Comm' peekComm*} -> `()' checkError*- #}
+
+-- | Free a communicator object.
+{# fun Comm_free as ^ {withComm* `Comm'} -> `()' checkError*- #}
+
+-- | Stop pending communication and deallocate a communicator object.
+{# fun Comm_disconnect as ^ {withComm* `Comm'} -> `()' checkError*- #}
+
foreign import ccall "&mpi_undefined" mpiUndefined_ :: Ptr Int
-- | Predefined constant that might be returned as @Rank@ by calls
View
1  src/cbits/constants.c
@@ -40,6 +40,7 @@ MPI_CONST (int, mpi_graph, MPI_GRAPH)
MPI_CONST (int, mpi_universe_size, MPI_UNIVERSE_SIZE)
MPI_CONST (char **, mpi_argv_null, MPI_ARGV_NULL)
MPI_CONST (int *, mpi_errcodes_ignore, MPI_ERRCODES_IGNORE)
+MPI_CONST (int, mpi_max_port_name, MPI_MAX_PORT_NAME)
/* MPI predefined handles */
MPI_CONST (MPI_Comm, mpi_comm_world, MPI_COMM_WORLD)
View
6 test/OtherTests.hs
@@ -23,6 +23,7 @@ otherTests threadSupport _ =
, testCase "test requestNull" $ testRequestNull
, testCase "Info objects" $ testInfoObjects
, testCase "anySource/anySize values" anySourceTagTest
+ , testCase "openClosePort" openClosePortTest
]
queryThreadTest :: ThreadSupport -> IO ()
@@ -112,3 +113,8 @@ anySourceTagTest = do
else putStrLn ("anySource is not -1, but rather " ++ show anySource)
if (anyTag) == (toEnum (-1)) then return ()
else putStrLn ("anyTag is not -1, but rather " ++ show anyTag)
+
+openClosePortTest :: IO ()
+openClosePortTest = do
+ port <- openPort infoNull
+ closePort port
View
28 test/examples/clientserver/Client.c
@@ -0,0 +1,28 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include "mpi.h"
+
+int main( int argc, char **argv )
+{
+ MPI_Comm server;
+ char port_name[MPI_MAX_PORT_NAME];
+ int i, tag;
+
+ if (argc < 2) {
+ fprintf(stderr, "server port name required.\n");
+ exit(EXIT_FAILURE);
+ }
+
+ MPI_Init(&argc, &argv);
+ strcpy(port_name, argv[1]); /* assume server's name is cmd-line arg */
+ MPI_Comm_connect(port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &server);
+ for (i = 0; i < 5; i++) {
+ tag = 2; /* Action to perform */
+ MPI_Send(&i, 1, MPI_INT, 0, tag, server);
+ }
+ MPI_Send(&i, 0, MPI_INT, 0, 1, server);
+ MPI_Comm_disconnect(&server);
+ MPI_Finalize();
+ return 0;
+}
View
27 test/examples/clientserver/Client.hs
@@ -0,0 +1,27 @@
+{-# LANGUAGE ScopedTypeVariables #-}
+
+-- Based on the simple client-server example from page 326/327 of
+-- "MPI Standard version 2.2"
+
+module Main where
+
+import System.Environment (getArgs)
+import System.Exit
+import Foreign.C.Types
+import Control.Monad (forM_, when)
+import Control.Parallel.MPI.Fast
+
+main :: IO ()
+main = do
+ args <- getArgs
+ when (length args /= 1) $ do
+ putStr "server port name required.\n"
+ exitWith (ExitFailure 1)
+ sendRequest $ head args
+
+sendRequest :: String -> IO ()
+sendRequest port = mpi $ do
+ server <- commConnect port infoNull 0 commWorld
+ forM_ [0..4] $ \(i::CInt) -> send server 0 2 i
+ send server 0 1 (0xdeadbeef::CInt)
+ commDisconnect server
View
25 test/examples/clientserver/Makefile
@@ -0,0 +1,25 @@
+EXE = ServerC ClientC ServerH ClientH
+all: $(EXE)
+
+# Set C compiler to use -m32 if ghc is set to produce 32 bit executables
+# as is usually (always?) the case on OS X and ghc 6.12
+
+ServerC: Server.c
+ mpicc -O2 -Wall Server.c -o ServerC
+# mpicc -m32 -O2 -Wall Server.c -o ServerC
+
+ClientC: Client.c
+ mpicc -O2 -Wall Client.c -o ClientC
+# mpicc -m32 -O2 -Wall Server.c -o ServerC
+
+ServerH: Server.hs
+ ghc --make -O2 Server.hs -o ServerH
+
+ClientH: Client.hs
+ ghc --make -O2 Client.hs -o ClientH
+
+clean:
+ /bin/rm -f *.o *.hi
+
+clobber: clean
+ /bin/rm -f $(EXE)
View
45 test/examples/clientserver/Server.c
@@ -0,0 +1,45 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include "mpi.h"
+
+int main(int argc, char **argv)
+{
+ MPI_Comm client;
+ MPI_Status status;
+ char port_name[MPI_MAX_PORT_NAME];
+ int size, again, i;
+
+ MPI_Init(&argc, &argv);
+ MPI_Comm_size(MPI_COMM_WORLD, &size);
+ if (size != 1) {
+ fprintf(stderr, "Server too big");
+ exit(EXIT_FAILURE);
+ }
+
+ MPI_Open_port(MPI_INFO_NULL, port_name);
+ printf("Server available at port: %s\n", port_name);
+ while (1) {
+ MPI_Comm_accept(port_name, MPI_INFO_NULL, 0, MPI_COMM_WORLD, &client);
+ again = 1;
+ while (again) {
+ MPI_Recv(&i, 1, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, client, &status);
+ switch (status.MPI_TAG) {
+ case 0:
+ MPI_Comm_free(&client);
+ MPI_Close_port(port_name);
+ MPI_Finalize();
+ return 0;
+ case 1:
+ MPI_Comm_disconnect(&client);
+ again = 0;
+ break;
+ case 2: /* do something */
+ printf("Received: %d\n", i);
+ break;
+ default:
+ /* Unexpected message type */
+ MPI_Abort(MPI_COMM_WORLD, 1);
+ }
+ }
+ }
+}
View
37 test/examples/clientserver/Server.hs
@@ -0,0 +1,37 @@
+{-# LANGUAGE ScopedTypeVariables #-}
+
+-- Based on the simple client-server example from page 326/327 of
+-- "MPI Standard version 2.2"
+
+module Main where
+
+import System.Exit
+import Foreign.C.Types
+import Control.Monad (forever)
+import Control.Parallel.MPI.Fast
+
+main :: IO ()
+main = mpi $ do
+ size <- commSize commWorld
+ if size == 1
+ then do
+ port <- openPort infoNull
+ putStrLn $ "Server available at port: " ++ show port ++ "."
+ forever $ do
+ clientComm <- commAccept port infoNull 0 commWorld
+ handleRequest port clientComm
+ else putStrLn $ "Server too big."
+
+handleRequest :: String -> Comm -> IO ()
+handleRequest port client = do
+ (msg::CInt, status) <- intoNewVal $ recv client anySource anyTag
+ case (status_tag status) of
+ 0 -> do
+ commFree client
+ closePort port
+ exitWith (ExitFailure 1)
+ 1 -> commDisconnect client
+ 2 -> do
+ putStrLn $ "Received: " ++ (show msg)
+ handleRequest port client
+ _ -> abort commWorld 1
Something went wrong with that request. Please try again.