Skip to content

Commit

Permalink
DistributedComputations part #2
Browse files Browse the repository at this point in the history
  • Loading branch information
simone-silvestri committed Sep 19, 2023
1 parent 58d92ec commit cab51e5
Show file tree
Hide file tree
Showing 23 changed files with 77 additions and 77 deletions.
2 changes: 1 addition & 1 deletion benchmark/distributed_nonhydrostatic_model_mpi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ local_rank = MPI.Comm_rank(comm)
@info "Setting up distributed nonhydrostatic model with N=($Nx, $Ny, $Nz) grid points and ranks=($Rx, $Ry, $Rz) on rank $local_rank..."

topo = (Periodic, Periodic, Periodic)
arch = MultiProcess(CPU(), topology=topo, ranks=(Rx, Ry, Rz), communicator=MPI.COMM_WORLD)
arch = Distributed(CPU(), topology=topo, ranks=(Rx, Ry, Rz), communicator=MPI.COMM_WORLD)
distributed_grid = RectilinearGrid(arch, topology=topo, size=(Nx, Ny, Nz), extent=(1, 1, 1))
model = NonhydrostaticModel(grid=distributed_grid)

Expand Down
2 changes: 1 addition & 1 deletion benchmark/distributed_shallow_water_model_mpi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Ry = parse(Int, ARGS[4])
@info "Setting up distributed shallow water model with N=($Nx, $Ny) grid points and ranks=($Rx, $Ry) on rank $local_rank..."

topo = (Periodic, Periodic, Flat)
arch = MultiProcess(CPU(), topology=topo, ranks=(Rx, Ry, 1), communicator=MPI.COMM_WORLD)
arch = Distributed(CPU(), topology=topo, ranks=(Rx, Ry, 1), communicator=MPI.COMM_WORLD)
distributed_grid = RectilinearGrid(arch, topology=topo, size=(Nx, Ny), extent=(1, 1))
model = ShallowWaterModel(grid=distributed_grid, gravitational_acceleration=1.0)
set!(model, h=1)
Expand Down
2 changes: 1 addition & 1 deletion src/DistributedComputations/DistributedComputations.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module DistributedComputations

export
MultiProcess, child_architecture, reconstruct_global_grid,
Distributed, child_architecture, reconstruct_global_grid,
inject_halo_communication_boundary_conditions,
DistributedFFTBasedPoissonSolver

Expand Down
34 changes: 17 additions & 17 deletions src/DistributedComputations/distributed_architectures.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import Oceananigans.Architectures: device, arch_array, array_type, child_archite
import Oceananigans.Grids: zeros
import Oceananigans.Utils: sync_device!

struct MultiProcess{A, M, R, I, ρ, C, γ, T} <: AbstractArchitecture
struct Distributed{A, M, R, I, ρ, C, γ, T} <: AbstractArchitecture
child_architecture :: A
local_rank :: R
local_index :: I
Expand All @@ -22,7 +22,7 @@ end
#####

"""
MultiProcess(child_architecture = CPU();
Distributed(child_architecture = CPU();
topology,
ranks,
devices = nothing,
Expand Down Expand Up @@ -57,7 +57,7 @@ Keyword arguments
- `communicator`: the MPI communicator, `MPI.COMM_WORLD`. This keyword argument should not be tampered with
if not for testing or developing. Change at your own risk!
"""
function MultiProcess(child_architecture = CPU();
function Distributed(child_architecture = CPU();
topology,
ranks,
devices = nothing,
Expand Down Expand Up @@ -101,28 +101,28 @@ function MultiProcess(child_architecture = CPU();
M = typeof(mpi_requests)
T = typeof(Ref(0))

return MultiProcess{A, M, R, I, ρ, C, γ, T}(child_architecture, local_rank, local_index, ranks, local_connectivity, communicator, mpi_requests, Ref(0))
return Distributed{A, M, R, I, ρ, C, γ, T}(child_architecture, local_rank, local_index, ranks, local_connectivity, communicator, mpi_requests, Ref(0))
end

const MultiCPUProcess = MultiProcess{CPU}
const MultiGPUProcess = MultiProcess{GPU}
const DistributedCPU = Distributed{CPU}
const DistributedGPU = Distributed{GPU}

const BlockingMultiProcess = MultiProcess{<:Any, <:Nothing}
const BlockingDistributed = Distributed{<:Any, <:Nothing}

#####
##### All the architectures
#####

child_architecture(arch::MultiProcess) = arch.child_architecture
device(arch::MultiProcess) = device(child_architecture(arch))
arch_array(arch::MultiProcess, A) = arch_array(child_architecture(arch), A)
zeros(FT, arch::MultiProcess, N...) = zeros(FT, child_architecture(arch), N...)
array_type(arch::MultiProcess) = array_type(child_architecture(arch))
sync_device!(arch::MultiProcess) = sync_device!(arch.child_architecture)
child_architecture(arch::Distributed) = arch.child_architecture
device(arch::Distributed) = device(child_architecture(arch))
arch_array(arch::Distributed, A) = arch_array(child_architecture(arch), A)
zeros(FT, arch::Distributed, N...) = zeros(FT, child_architecture(arch), N...)
array_type(arch::Distributed) = array_type(child_architecture(arch))
sync_device!(arch::Distributed) = sync_device!(arch.child_architecture)

cpu_architecture(arch::MultiCPUProcess) = arch
cpu_architecture(arch::MultiGPUProcess) =
MultiProcess(CPU(), arch.local_rank, arch.local_index, arch.ranks,
cpu_architecture(arch::DistributedCPU) = arch
cpu_architecture(arch::DistributedGPU) =
Distributed(CPU(), arch.local_rank, arch.local_index, arch.ranks,
arch.connectivity, arch.communicator, arch.mpi_requests, arch.mpi_tag)

#####
Expand Down Expand Up @@ -223,7 +223,7 @@ end
##### Pretty printing
#####

function Base.show(io::IO, arch::MultiProcess)
function Base.show(io::IO, arch::Distributed)
c = arch.connectivity
print(io, "Distributed architecture (rank $(arch.local_rank)/$(prod(arch.ranks)-1)) [index $(arch.local_index) / $(arch.ranks)]\n",
"└── child architecture: $(typeof(child_architecture(arch))) \n",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Return a FFT-based solver for the Poisson equation,
∇²φ = b
```
for `MultiProcess`itectures.
for `Distributed`itectures.
Supported configurations
========================
Expand Down Expand Up @@ -80,7 +80,7 @@ Restrictions
============
The algorithm for two-dimensional decompositions requires that `Nz = size(global_grid, 3)` is larger
than either `Rx = ranks[1]` or `Ry = ranks[2]`, where `ranks` are configured when building `MultiProcess`.
than either `Rx = ranks[1]` or `Ry = ranks[2]`, where `ranks` are configured when building `Distributed`.
If `Nz` does not satisfy this condition, we can only support a one-dimensional decomposition.
Algorithm for one-dimensional decompositions
Expand Down
22 changes: 11 additions & 11 deletions src/DistributedComputations/distributed_grids.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ using Oceananigans.ImmersedBoundaries

import Oceananigans.Grids: RectilinearGrid, LatitudeLongitudeGrid, with_halo

const DistributedGrid{FT, TX, TY, TZ} = AbstractGrid{FT, TX, TY, TZ, <:MultiProcess}
const DistributedGrid{FT, TX, TY, TZ} = AbstractGrid{FT, TX, TY, TZ, <:Distributed}
const DistributedRectilinearGrid{FT, TX, TY, TZ, FX, FY, FZ, VX, VY, VZ} =
RectilinearGrid{FT, TX, TY, TZ, FX, FY, FZ, VX, VY, VZ, <:MultiProcess} where {FT, TX, TY, TZ, FX, FY, FZ, VX, VY, VZ}
RectilinearGrid{FT, TX, TY, TZ, FX, FY, FZ, VX, VY, VZ, <:Distributed} where {FT, TX, TY, TZ, FX, FY, FZ, VX, VY, VZ}
const DistributedLatitudeLongitudeGrid{FT, TX, TY, TZ, M, MY, FX, FY, FZ, VX, VY, VZ} =
LatitudeLongitudeGrid{FT, TX, TY, TZ, M, MY, FX, FY, FZ, VX, VY, VZ, <:MultiProcess} where {FT, TX, TY, TZ, M, MY, FX, FY, FZ, VX, VY, VZ}
LatitudeLongitudeGrid{FT, TX, TY, TZ, M, MY, FX, FY, FZ, VX, VY, VZ, <:Distributed} where {FT, TX, TY, TZ, M, MY, FX, FY, FZ, VX, VY, VZ}

const DistributedImmersedBoundaryGrid = ImmersedBoundaryGrid{FT, TX, TY, TZ, <:DistributedGrid, I, M, <:MultiProcess} where {FT, TX, TY, TZ, I, M}
const DistributedImmersedBoundaryGrid = ImmersedBoundaryGrid{FT, TX, TY, TZ, <:DistributedGrid, I, M, <:Distributed} where {FT, TX, TY, TZ, I, M}

"""
RectilinearGrid(arch::MultiProcess, FT=Float64; kw...)
RectilinearGrid(arch::Distributed, FT=Float64; kw...)
Return the rank-local portion of `RectilinearGrid` on `arch`itecture.
"""
function RectilinearGrid(arch::MultiProcess,
function RectilinearGrid(arch::Distributed,
FT::DataType = Float64;
size,
x = nothing,
Expand Down Expand Up @@ -69,11 +69,11 @@ function RectilinearGrid(arch::MultiProcess,
end

"""
LatitudeLongitudeGrid(arch::MultiProcess, FT=Float64; kw...)
LatitudeLongitudeGrid(arch::Distributed, FT=Float64; kw...)
Return the rank-local portion of `LatitudeLongitudeGrid` on `arch`itecture.
"""
function LatitudeLongitudeGrid(arch::MultiProcess,
function LatitudeLongitudeGrid(arch::Distributed,
FT::DataType = Float64;
precompute_metrics = true,
size,
Expand Down Expand Up @@ -321,17 +321,17 @@ function scatter_grid_properties(global_grid)
return x, y, z, topo, halo
end

function scatter_local_grids(arch::MultiProcess, global_grid::RectilinearGrid, local_size)
function scatter_local_grids(arch::Distributed, global_grid::RectilinearGrid, local_size)
x, y, z, topo, halo = scatter_grid_properties(global_grid)
return RectilinearGrid(arch, eltype(global_grid); size=local_size, x=x, y=y, z=z, halo=halo, topology=topo)
end

function scatter_local_grids(arch::MultiProcess, global_grid::LatitudeLongitudeGrid, local_size)
function scatter_local_grids(arch::Distributed, global_grid::LatitudeLongitudeGrid, local_size)
x, y, z, topo, halo = scatter_grid_properties(global_grid)
return LatitudeLongitudeGrid(arch, eltype(global_grid); size=local_size, longitude=x, latitude=y, z=z, halo=halo, topology=topo)
end

function scatter_local_grids(arch::MultiProcess, global_grid::ImmersedBoundaryGrid, local_size)
function scatter_local_grids(arch::Distributed, global_grid::ImmersedBoundaryGrid, local_size)
ib = global_grid.immersed_boundary
ug = global_grid.underlying_grid

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Oceananigans.Utils: launch!

function launch!(arch::MultiProcess, args...; kwargs...)
function launch!(arch::Distributed, args...; kwargs...)
child_arch = child_architecture(arch)
return launch!(child_arch, args...; kwargs...)
end
Expand Down
8 changes: 4 additions & 4 deletions src/DistributedComputations/halo_communication.jl
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ end

# Overlapping communication and computation, store requests in a `MPI.Request`
# pool to be waited upon after tendency calculation
if async && !(arch isa BlockingMultiProcess)
if async && !(arch isa BlockingDistributed)
push!(arch.mpi_requests, requests...)
return nothing
end
Expand Down Expand Up @@ -238,7 +238,7 @@ for (side, opposite_side) in zip([:west, :south], [:east, :north])
fill_opposite_side_send_buffers! = Symbol("fill_$(opposite_side)_send_buffers!")

@eval begin
function $fill_both_halo!(c, bc_side::DCBCT, bc_opposite_side::DCBCT, size, offset, loc, arch::MultiProcess,
function $fill_both_halo!(c, bc_side::DCBCT, bc_opposite_side::DCBCT, size, offset, loc, arch::Distributed,
grid::DistributedGrid, buffers, args...; only_local_halos = false, kwargs...)

only_local_halos && return nothing
Expand All @@ -255,7 +255,7 @@ for (side, opposite_side) in zip([:west, :south], [:east, :north])
return [send_req1, send_req2, recv_req1, recv_req2]
end

function $fill_both_halo!(c, bc_side::DCBCT, bc_opposite_side, size, offset, loc, arch::MultiProcess,
function $fill_both_halo!(c, bc_side::DCBCT, bc_opposite_side, size, offset, loc, arch::Distributed,
grid::DistributedGrid, buffers, args...; only_local_halos = false, kwargs...)

$fill_opposite_side_halo!(c, bc_opposite_side, size, offset, loc, arch, grid, buffers, args...; kwargs...)
Expand All @@ -271,7 +271,7 @@ for (side, opposite_side) in zip([:west, :south], [:east, :north])
return [send_req, recv_req]
end

function $fill_both_halo!(c, bc_side, bc_opposite_side::DCBCT, size, offset, loc, arch::MultiProcess,
function $fill_both_halo!(c, bc_side, bc_opposite_side::DCBCT, size, offset, loc, arch::Distributed,
grid::DistributedGrid, buffers, args...; only_local_halos = false, kwargs...)

$fill_side_halo!(c, bc_side, size, offset, loc, arch, grid, buffers, args...; kwargs...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ function complete_communication_and_compute_boundary!(model, ::DistributedGrid,
end

# Fallback
complete_communication_and_compute_boundary!(model, ::DistributedGrid, ::BlockingMultiProcess) = nothing
complete_communication_and_compute_boundary!(model, ::DistributedGrid, ::BlockingDistributed) = nothing
complete_communication_and_compute_boundary!(model, grid, arch) = nothing

compute_boundary_tendencies!(model) = nothing
Expand All @@ -26,7 +26,7 @@ interior_tendency_kernel_parameters(grid) = :xyz
interior_tendency_kernel_parameters(grid::DistributedGrid) =
interior_tendency_kernel_parameters(grid, architecture(grid))

interior_tendency_kernel_parameters(grid, ::BlockingMultiProcess) = :xyz
interior_tendency_kernel_parameters(grid, ::BlockingDistributed) = :xyz

function interior_tendency_kernel_parameters(grid, arch)
Rx, Ry, _ = arch.ranks
Expand Down
12 changes: 6 additions & 6 deletions src/DistributedComputations/partition_assemble.jl
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
using Oceananigans.Architectures: arch_array

all_reduce(val, arch::MultiProcess; op = +) =
all_reduce(val, arch::Distributed; op = +) =
MPI.Allreduce(val, op, arch.communicator)

all_reduce(val, arch; kwargs...) = val

"""
concatenate_local_sizes(n, arch::MultiProcess)
concatenate_local_sizes(n, arch::Distributed)
Return a 3-Tuple containing a vector of `size(grid, idx)` for each rank in
all 3 directions.
"""
concatenate_local_sizes(n, arch::MultiProcess) =
concatenate_local_sizes(n, arch::Distributed) =
Tuple(concatenate_local_sizes(n, arch, i) for i in 1:length(n))

function concatenate_local_sizes(n, arch::MultiProcess, idx)
function concatenate_local_sizes(n, arch::Distributed, idx)
R = arch.ranks[idx]
r = arch.local_index[idx]
n = n isa Number ? n : n[idx]
Expand Down Expand Up @@ -106,7 +106,7 @@ partition_global_array(arch, c_global::AbstractArray, n) = c_global
partition_global_array(arch, c_global::Function, n) = c_global

# Here we assume that we cannot partition in z (we should remove support for that)
function partition_global_array(arch::MultiProcess, c_global::AbstractArray, n)
function partition_global_array(arch::Distributed, c_global::AbstractArray, n)
c_global = arch_array(CPU(), c_global)

ri, rj, rk = arch.local_index
Expand Down Expand Up @@ -141,7 +141,7 @@ construct_global_array(arch, c_local::AbstractArray, n) = c_local
construct_global_array(arch, c_local::Function, N) = c_local

# TODO: This does not work for 3D parallelizations!!!
function construct_global_array(arch::MultiProcess, c_local::AbstractArray, n)
function construct_global_array(arch::Distributed, c_local::AbstractArray, n)
c_local = arch_array(CPU(), c_local)

ri, rj, rk = arch.local_index
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using Oceananigans.AbstractOperations: GridMetricOperation, Δz
using Oceananigans.DistributedComputations: DistributedGrid, DistributedField
using Oceananigans.DistributedComputations: BlockingMultiProcess, complete_halo_communication!
using Oceananigans.DistributedComputations: BlockingDistributed, complete_halo_communication!
using Oceananigans.Models.HydrostaticFreeSurfaceModels: SplitExplicitState, SplitExplicitFreeSurface

import Oceananigans.Models.HydrostaticFreeSurfaceModels: FreeSurface, SplitExplicitAuxiliaryFields
Expand Down Expand Up @@ -93,7 +93,7 @@ end

const DistributedSplitExplicit = SplitExplicitFreeSurface{<:DistributedField}

wait_free_surface_communication!(::DistributedSplitExplicit, ::BlockingMultiProcess) = nothing
wait_free_surface_communication!(::DistributedSplitExplicit, ::BlockingDistributed) = nothing

function wait_free_surface_communication!(free_surface::DistributedSplitExplicit, arch)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ function validate_vertical_velocity_boundary_conditions(w)
return nothing
end

validate_free_surface(::MultiProcess, free_surface::SplitExplicitFreeSurface) = free_surface
validate_free_surface(arch::MultiProcess, free_surface) = error("$(typeof(free_surface)) is not supported with $(typeof(arch))")
validate_free_surface(::Distributed, free_surface::SplitExplicitFreeSurface) = free_surface
validate_free_surface(arch::Distributed, free_surface) = error("$(typeof(free_surface)) is not supported with $(typeof(arch))")
validate_free_surface(arch, free_surface) = free_surface

validate_momentum_advection(momentum_advection, ibg::ImmersedBoundaryGrid) = validate_momentum_advection(momentum_advection, ibg.underlying_grid)
Expand Down
4 changes: 2 additions & 2 deletions src/Models/NonhydrostaticModels/NonhydrostaticModels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ using Oceananigans.Utils
using Oceananigans.Grids
using Oceananigans.Grids: XYRegRectilinearGrid, XZRegRectilinearGrid, YZRegRectilinearGrid
using Oceananigans.Solvers
using Oceananigans.DistributedComputations: MultiProcess, DistributedFFTBasedPoissonSolver, reconstruct_global_grid
using Oceananigans.DistributedComputations: Distributed, DistributedFFTBasedPoissonSolver, reconstruct_global_grid
using Oceananigans.ImmersedBoundaries: ImmersedBoundaryGrid
using Oceananigans.Utils: SumOfArrays

import Oceananigans: fields, prognostic_fields
import Oceananigans.Advection: cell_advection_timescale
import Oceananigans.TimeSteppers: step_lagrangian_particles!

function PressureSolver(arch::MultiProcess, local_grid::RegRectilinearGrid)
function PressureSolver(arch::Distributed, local_grid::RegRectilinearGrid)
global_grid = reconstruct_global_grid(local_grid)
return DistributedFFTBasedPoissonSolver(global_grid, local_grid)
end
Expand Down
2 changes: 1 addition & 1 deletion src/Models/NonhydrostaticModels/nonhydrostatic_model.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ using CUDA: has_cuda
using OrderedCollections: OrderedDict

using Oceananigans.Architectures: AbstractArchitecture
using Oceananigans.DistributedComputations: MultiProcess
using Oceananigans.DistributedComputations: Distributed
using Oceananigans.Advection: CenteredSecondOrder
using Oceananigans.BuoyancyModels: validate_buoyancy, regularize_buoyancy, SeawaterBuoyancy
using Oceananigans.Biogeochemistry: validate_biogeochemistry, AbstractBiogeochemistry, biogeochemical_auxiliary_fields
Expand Down
4 changes: 2 additions & 2 deletions src/OutputWriters/output_writer_utils.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ saveproperty!(file, address, grid::AbstractGrid) = _saveproperty!(file, add

function saveproperty!(file, address, grid::DistributedGrid)
arch = architecture(grid)
cpu_arch = MultiProcess(CPU(); topology = topology(grid),
cpu_arch = Distributed(CPU(); topology = topology(grid),
ranks = arch.ranks)
_saveproperty!(file, address, on_architecture(cpu_arch, grid))
end
Expand Down Expand Up @@ -86,7 +86,7 @@ serializeproperty!(file, address, grid::AbstractGrid) = file[address] = on_archi

function serializeproperty!(file, address, grid::DistributedGrid)
arch = architecture(grid)
cpu_arch = MultiProcess(CPU(); topology = topology(grid),
cpu_arch = Distributed(CPU(); topology = topology(grid),
ranks = arch.ranks)
file[address] = on_architecture(cpu_arch, grid)
end
Expand Down
Loading

0 comments on commit cab51e5

Please sign in to comment.