Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions src/CartesianDiscreteModels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,22 @@ function Gridap.CartesianDiscreteModel(comm::Communicator,subdomains::Tuple,args
end

function Gridap.CartesianDiscreteModel(
comm::Communicator,subdomains::Tuple,gdesc::CartesianDescriptor{D,T,F}) where {D,T,F}
comm::Communicator,subdomains::Tuple,gdesc::CartesianDescriptor)

nsubdoms = prod(subdomains)
ngcells = prod(Tuple(gdesc.partition))

S = CartesianDiscreteModel{D,T,F}

models = ScatteredVector{S}(comm) do (isubdom)
models = DistributedData(comm) do isubdom

ldesc = local_cartesian_descriptor(gdesc,subdomains,isubdom)
#TODO face labeling has wrong ids
CartesianDiscreteModel(ldesc)
end

gids = GhostedVector{Int}(comm) do (isubdom)
gids = DistributedIndexSet(comm) do isubdom

lid_to_gid, lid_to_owner = local_cartesian_gids(gdesc,subdomains,isubdom)
GhostedVectorPart(ngcells,lid_to_gid,lid_to_gid,lid_to_owner)
IndexSet(lid_to_gid,lid_to_owner)
end

DistributedDiscreteModel(models,gids)
Expand Down
99 changes: 61 additions & 38 deletions src/Communicators.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ function i_am_master(::Communicator)
@abstractmethod
end

# We need to compare communicators to perform some checks
function Base.:(==)(a::Communicator,b::Communicator)
@abstractmethod
end

# All communicators that are to be executed in the master to workers
# model inherit from these one
abstract type OrchestratedCommunicator <: Communicator end
Expand All @@ -42,6 +47,12 @@ function SequentialCommunicator(nparts::Tuple)
SequentialCommunicator(prod(nparts))
end

# All objects to be used with this communicator need to implement this
# function
function get_part(comm::SequentialCommunicator,object,part::Integer)
@abstractmethod
end

function num_parts(a::SequentialCommunicator)
a.nparts
end
Expand All @@ -50,47 +61,59 @@ function num_workers(a::SequentialCommunicator)
1
end

function do_on_parts(task::Function,a::SequentialCommunicator,args...)
for part in 1:num_parts(a)
largs = map(a->get_distributed_data(a).parts[part],args)
task(part,largs...)
end
function Base.:(==)(a::SequentialCommunicator,b::SequentialCommunicator)
a.nparts == b.nparts
end

struct MPICommunicator <: CollaborativeCommunicator
comm::MPI.Comm
master_rank::Int
function MPICommunicator(comm::MPI.Comm,master_rank::Int=0)
new(comm,master_rank)
function do_on_parts(task::Function,comm::SequentialCommunicator,args...)
for part in 1:num_parts(comm)
largs = map(a->get_part(comm,get_distributed_data(a),part),args)
task(part,largs...)
end
end

function MPICommunicator()
# TODO copy the communicator
MPICommunicator(MPI.COMM_WORLD)
end

function i_am_master(comm::MPICommunicator)
MPI.Comm_rank(comm.comm) == comm.master_rank
end

function do_on_parts(task::Function,comm::MPICommunicator,args...)
part = get_part(comm)
largs = map(a->get_distributed_data(a).part,args)
task(part,largs...)
end

function num_parts(comm::MPICommunicator)
@notimplementedif comm.comm !== MPI.COMM_WORLD
MPI.Comm_size(comm.comm)
end

function num_workers(comm::MPICommunicator)
MPI.Comm_size(comm.comm)
end

function get_part(comm::MPICommunicator)
@notimplementedif comm.comm !== MPI.COMM_WORLD
MPI.Comm_rank(comm.comm) + 1
end
# Refactoring not yet for MPI
#
#struct MPICommunicator <: CollaborativeCommunicator
# comm::MPI.Comm
# master_rank::Int
# function MPICommunicator(comm::MPI.Comm,master_rank::Int=0)
# new(comm,master_rank)
# end
#end
#
#function MPICommunicator()
# # TODO copy the communicator
# MPICommunicator(MPI.COMM_WORLD)
#end
#
## All objects associated with this communicator need to implement this
## function
#function get_part(comm::MPICommunicator,object)
# @abstractmethod
#end
#
#function i_am_master(comm::MPICommunicator)
# MPI.Comm_rank(comm.comm) == comm.master_rank
#end
#
#function do_on_parts(task::Function,comm::MPICommunicator,args...)
# part = get_part(comm)
# largs = map(a->get_part(get_distributed_data(a)),args)
# task(part,largs...)
#end
#
#function num_parts(comm::MPICommunicator)
# @notimplementedif comm.comm !== MPI.COMM_WORLD
# MPI.Comm_size(comm.comm)
#end
#
#function num_workers(comm::MPICommunicator)
# MPI.Comm_size(comm.comm)
#end
#
#function get_part(comm::MPICommunicator)
# @notimplementedif comm.comm !== MPI.COMM_WORLD
# MPI.Comm_rank(comm.comm) + 1
#end

132 changes: 105 additions & 27 deletions src/DistributedData.jl
Original file line number Diff line number Diff line change
@@ -1,51 +1,129 @@
abstract type DistributedData end
# Data distributed in parts of type T in a communicator
# Formerly, ScatteredVector
abstract type DistributedData{T} end

function get_comm(a::DistributedData)
@abstractmethod
end

function num_parts(a::DistributedData)
function num_parts(a)
num_parts(get_comm(a))
end

# Construct a DistributedData object in a communicator
function DistributedData{T}(initializer::Function,::Communicator,args...) where T
@abstractmethod
end

function DistributedData(initializer::Function,::Communicator,args...)
@abstractmethod
end

function get_distributed_data(a::DistributedData)
# The comm argument can be omitted if it can be determined from the first
# data argument.
function DistributedData{T}(initializer::Function,args...) where T
comm = get_comm(get_distributed_data(first(args)))
DistributedData{T}(initializer,comm,args...)
end

function DistributedData(initializer::Function,args...)
comm = get_comm(get_distributed_data(first(args)))
DistributedData(initializer,comm,args...)
end

get_part_type(::Type{<:DistributedData{T}}) where T = T

get_part_type(::DistributedData{T}) where T = T

function gather!(a::AbstractVector,b::DistributedData)
@abstractmethod
end

function gather(b::DistributedData{T}) where T
if i_am_master(get_comm(b))
a = zeros(T,num_parts(b))
else
a = zeros(T,0)
end
gather!(a,b)
a
end

# Providing of some "distributed behavior" to objects that are not instances
# of `DistributedData`
function scatter(comm::Communicator,b::AbstractVector)
@abstractmethod
end

function scatter(comm::Communicator,v)
if i_am_master(comm)
part_to_v = fill(v,num_parts(comm))
else
T = eltype(v)
part_to_v = T[]
end
scatter(comm,part_to_v)
end

# This can be defined for types that are not DistributedData
# to interpret them as DistributedData. See e.g., the DistributedDiscreteModel
# return an object for which
# its restriction to the parts of a communicator is defined.
# The returned object is not necessarily an instance
# of DistributedData
# Do nothing by default.
function get_distributed_data(object)
@abstractmethod
object
end

function get_comm(object)
get_comm(get_distributed_data(object))
# Specializations

struct SequentialDistributedData{T} <: DistributedData{T}
comm::SequentialCommunicator
parts::Vector{T}
end

function num_parts(object)
num_parts(get_distributed_data(object))
get_part(comm::SequentialCommunicator,a::SequentialDistributedData,part::Integer) = a.parts[part]

get_comm(a::SequentialDistributedData) = a.comm

function DistributedData{T}(initializer::Function,comm::SequentialCommunicator,args...) where T
nparts = num_parts(comm)
parts = [initializer(i,map(a->get_part(comm,get_distributed_data(a),i),args)...) for i in 1:nparts]
SequentialDistributedData{T}(comm,parts)
end

#function get_distributed_data(d::Dict)
function DistributedData(initializer::Function,comm::SequentialCommunicator,args...) where T
nparts = num_parts(comm)
parts = [initializer(i,map(a->get_part(comm,get_distributed_data(a),i),args)...) for i in 1:nparts]
SequentialDistributedData(comm,parts)
end

function gather!(a::AbstractVector,b::SequentialDistributedData)
@assert length(a) == num_parts(b)
copyto!(a,b.parts)
end

function scatter(comm::SequentialCommunicator,b::AbstractVector)
@assert length(b) == num_parts(comm)
SequentialDistributedData(comm,b)
end

#struct MPIDistributedData{T} <: DistributedData{T}
# part::T
# comm::MPICommunicator
#end
#
# T = typeof(d)
# thekeys = keys(d)
# thevals = values(d)
#get_comm(a::MPIDistributedData) = a.comm
#
# v = first(thevals)
# comm = get_comm(get_distributed_data(v))
# nparts = num_parts(get_distributed_data(v))
#num_parts(a::MPIDistributedData) = num_parts(a.comm)
#
# function initializer(part, vals...)
# ld = T()
# for (i,k) in enumerate(thekeys)
# ld[k] = vals[i]
# end
# ld
# end
#function DistributedData{T}(initializer::Function,comm::MPICommunicator,args...) where T
# largs = map(a->get_distributed_data(a).part,args)
# i = get_part(comm)
# part = initializer(i,largs...)
# MPIDistributedData{T}(part,comm)
#end
#
# ScatteredVector{T}(initializer,comm,nparts,values(d)...)
#function DistributedData(initializer::Function,comm::MPICommunicator,args...) where T
# largs = map(a->get_distributed_data(a).part,args)
# i = get_part(comm)
# part = initializer(i,largs...)
# MPIDistributedData(part,comm)
#end
61 changes: 3 additions & 58 deletions src/DistributedDiscreteModels.jl
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
struct DistributedDiscreteModel
models::ScatteredVector{<:DiscreteModel}
gids::GhostedVector{Int}
models::DistributedData{<:DiscreteModel}
gids::DistributedIndexSet
end

function get_distributed_data(dmodel::DistributedDiscreteModel)
models = dmodel.models
gids = dmodel.gids
comm = get_comm(models)

ScatteredVector(comm,models,gids) do part, model, lgids
DistributedData(models,gids) do part, model, lgids
model, lgids
end
end
Expand All @@ -25,58 +24,4 @@ function Gridap.writevtk(model::DistributedDiscreteModel,filebase::String)

end

function Gridap.Triangulation(dmodel::DistributedDiscreteModel,args...)
comm = get_comm(dmodel)
trians = ScatteredVector(comm,dmodel.models) do part, model
Triangulation(model,args...)
end
DistributedTriangulation(trians)
end

function Gridap.BoundaryTriangulation(dmodel::DistributedDiscreteModel,args...)
comm = get_comm(dmodel)
trians = ScatteredVector(comm,dmodel.models) do part, model
BoundaryTriangulation(model,args...)
end
DistributedTriangulation(trians)
end

function Gridap.SkeletonTriangulation(dmodel::DistributedDiscreteModel,args...)
comm = get_comm(dmodel)
trians = ScatteredVector(comm,dmodel.models) do part, model
SkeletonTriangulation(model,args...)
end
DistributedTriangulation(trians)
end


function remove_ghost_cells(dtrian::DistributedTriangulation,dmodel)

trians = ScatteredVector(dtrian,dmodel.gids) do part, trian, gids

tcell_to_mcell = get_cell_id(trian)
mcell_to_isowned = gids.lid_to_owner .== part
tcell_to_isowned = reindex(mcell_to_isowned,tcell_to_mcell)
ocell_to_tcell = findall(tcell_to_isowned)
TriangulationPortion(trian,ocell_to_tcell)
end

DistributedTriangulation(trians)

end

function include_ghost_cells(dtrian::DistributedTriangulation)

trians = ScatteredVector(dtrian) do part, trian
trian.oldtrian
end

DistributedTriangulation(trians)
end

#TODO move to Gridap

function Gridap.Geometry.get_cell_id(trian::TriangulationPortion)
reindex(get_cell_id(trian.oldtrian),trian.cell_to_oldcell)
end

Loading