Skip to content

Commit

Permalink
Add MPICH support
Browse files Browse the repository at this point in the history
  • Loading branch information
andreasnoack committed Jul 26, 2016
1 parent e29e72e commit 1c7e0c0
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 6 deletions.
14 changes: 10 additions & 4 deletions src/cman.jl
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,9 @@ function setup_worker(host, port, cookie)
Base.wait_connected(io)
redirect_stdout(io)
redirect_stderr(io)

# Send our MPI rank to the manager
rank = MPI.Comm_rank(MPI.COMM_WORLD)
Base.serialize(io, rank)

# Hand over control to Base
if cookie == nothing
Base.start_worker(io)
Expand Down Expand Up @@ -296,6 +294,9 @@ function start_send_event_loop(mgr::MPIManager, rank::Integer)
# When data are available, send them
while nb_available(w_s) > 0
data = takebuf_array(w_s.buffer)
if length(data) == 0
warn("sending zero sized message")
end
push!(reqs, MPI.Isend(data, rank, 0, mgr.comm))
end
if !isempty(reqs)
Expand Down Expand Up @@ -383,10 +384,11 @@ function start_main_loop(mode::TransportMode=TCP_TRANSPORT_ALL;
cookie = MPI.bcast(nothing, 0, comm)
Base.init_worker(cookie, mgr)
else
Base.init_worker(mgr)
Base.init_worker(mgr)#;println("D")
end
# Start a worker event loop
receive_event_loop(mgr)
#println("DD")
receive_event_loop(mgr)#;println("E")
MPI.Finalize()
exit()
end
Expand All @@ -402,6 +404,9 @@ function receive_event_loop(mgr::MPIManager)
(hasdata, stat) = MPI.Iprobe(MPI.ANY_SOURCE, 0, mgr.comm)
if hasdata
count = Get_count(stat, UInt8)
if count == 0
warn("receive empty msg: ", stat.source)
end
buf = Array(UInt8, count)
from_rank = stat.source
MPI.Recv!(buf, from_rank, 0, mgr.comm)
Expand All @@ -417,6 +422,7 @@ function receive_event_loop(mgr::MPIManager)
(r_s, w_s) = streams
end
write(r_s, buf)
yield()
else
# TODO: Need a better way to integrate with libuv's event loop
yield()
Expand Down
96 changes: 94 additions & 2 deletions src/mpi-base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@ if VERSION >= v"0.5.0-"
fieldoffsets{T}(::Type{T}) = Int[fieldoffset(T, i) for i in 1:nfields(T)]
end

MPIName = :OpenMPI

MPIString = readstring(`mpirun --version`)
if ismatch(r"Open MPI", MPIString)
MPIName = :OpenMPI
elseif ismatch(r"mpich", MPIString)
MPIName = :MPICH
elseif ismatch(r"Intel", MPIString)
MPIName = :Intel
else
error("Your MPI implementation is now known to MPI.jl. Please file a bug report.")
end

if MPIName == :OpenMPI

Expand Down Expand Up @@ -60,8 +68,71 @@ if MPIName == :OpenMPI

const ANY_SOURCE = Cint(-1)
const UNDEFINED = Cint(-32766)

elseif MPIName in (:MPICH, :Intel)

typealias MPI_Comm Cint
typealias MPI_Datatype Cint
typealias MPI_Errhandler Cint
typealias MPI_Group Cint
typealias MPI_Message Cint
typealias MPI_Op Cint
typealias MPI_Request Cint
typealias MPI_User_function Cint

const CHAR = MPI_Datatype(0x4c000101)
const SIGNED_CHAR = MPI_Datatype(0x4c000118)
const UNSIGNED_CHAR = MPI_Datatype(0x4c000102)
const BYTE = MPI_Datatype(0x4c00010d)
const WCHAR = MPI_Datatype(0x4c00040e)
const SHORT = MPI_Datatype(0x4c000203)
const UNSIGNED_SHORT = MPI_Datatype(0x4c000204)
const INT = MPI_Datatype(0x4c000405)
const UNSIGNED = MPI_Datatype(0x4c000406)
const LONG = MPI_Datatype(0x4c000807)
const UNSIGNED_LONG = MPI_Datatype(0x4c000808)
const FLOAT = MPI_Datatype(0x4c00040a)
const DOUBLE = MPI_Datatype(0x4c00080b)
const LONG_DOUBLE = MPI_Datatype(0x4c00100c)
const LONG_LONG_INT = MPI_Datatype(0x4c000809)
const UNSIGNED_LONG_LONG = MPI_Datatype(0x4c000819)
const LONG_LONG = LONG_LONG_INT

const INT8_T = MPI_Datatype(0x4c000137)
const INT16_T = MPI_Datatype(0x4c000238)
const INT32_T = MPI_Datatype(0x4c000439)
const INT64_T = MPI_Datatype(0x4c00083a)
const UINT8_T = MPI_Datatype(0x4c00013b)
const UINT16_T = MPI_Datatype(0x4c00023c)
const UINT32_T = MPI_Datatype(0x4c00043d)
const UINT64_T = MPI_Datatype(0x4c00083e)

const C_BOOL = MPI_Datatype(0x4c00013f)
const C_FLOAT_COMPLEX = MPI_Datatype(0x4c000840)
const C_COMPLEX = C_FLOAT_COMPLEX
const C_DOUBLE_COMPLEX = MPI_Datatype(0x4c001041)
const C_LONG_DOUBLE_COMPLEX = MPI_Datatype(0x4c002042)

const MAX = MPI_Op(0x58000001)
const MIN = MPI_Op(0x58000002)
const SUM = MPI_Op(0x58000003)
const PROD = MPI_Op(0x58000004)
const LAND = MPI_Op(0x58000005)
const BAND = MPI_Op(0x58000006)
const LOR = MPI_Op(0x58000007)
const BOR = MPI_Op(0x58000008)
const LXOR = MPI_Op(0x58000009)
const BXOR = MPI_Op(0x5800000a)
const MINLOC = MPI_Op(0x5800000b)
const MAXLOC = MPI_Op(0x5800000c)
const REPLACE = MPI_Op(0x5800000d)
const NO_OP = MPI_Op(0x5800000e)

const ANY_SOURCE = Cint(-2)
const UNDEFINED = Cint(-32766)
end


immutable Comm
val::MPI_Comm
end
Expand All @@ -73,7 +144,20 @@ end

if MPIName == :OpenMPI
const COMM_WORLD = Comm(cglobal((:ompi_mpi_comm_world, libmpi)))

const REQUEST_NULL = Request(cglobal((:ompi_request_null, libmpi)), nothing)
elseif MPIName in (:MPICH, :Intel)
const COMM_WORLD = Comm(MPI_Comm(0x44000000))
const COMM_SELF = MPI_Comm(0x44000001)

const COMM_NULL = Comm(MPI_Comm(0x04000000))
const OP_NULL = MPI_Op(0x18000000)
const GROUP_NULL = MPI_Group(0x08000000)
const DATATYPE_NULL = MPI_Datatype(0x0c000000)
const REQUEST_NULL = Request(MPI_Request(0x2c000000), nothing)
const ERRHANDLER_NULL = MPI_Errhandler(0x14000000)
const MESSAGE_NULL = MPI_Message(0x2c000000)
const MESSAGE_NO_PROC = MPI_Message(0x6c000000)
end

if MPIName == :OpenMPI
Expand All @@ -89,6 +173,14 @@ if MPIName == :OpenMPI
# The following fields is buffer space
# buf::NTuple{6,Cint}
end
elseif MPIName in (:MPICH, :Intel)
immutable Status
count_lo::Cint
count_hi_and_cancelled::Cint
source::Cint
tag::Cint
error::Cint
end
end

const mpitype_dict = Dict{DataType, MPI_Datatype}()
Expand Down

0 comments on commit 1c7e0c0

Please sign in to comment.