Skip to content

Commit

Permalink
Add CUDA mapreduce.
Browse files Browse the repository at this point in the history
  • Loading branch information
sriharshakandala committed Feb 28, 2023
1 parent dcdef25 commit 44b1aeb
Show file tree
Hide file tree
Showing 3 changed files with 335 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/Fields/Fields.jl
Expand Up @@ -11,6 +11,7 @@ import ..Geometry: Geometry, Cartesian12Vector
import ..Utilities: PlusHalf, half

using ..RecursiveApply
using CUDA
using ClimaComms
import Adapt

Expand Down Expand Up @@ -290,6 +291,7 @@ A `Field` containing the `Δz` values on the same space as the given field.
Δz_field(space::AbstractSpace) = Field(Spaces.Δz_data(space), space)

include("broadcast.jl")
include("mapreduce_cuda.jl")
include("mapreduce.jl")
include("compat_diffeq.jl")
include("fieldvector.jl")
Expand Down
236 changes: 236 additions & 0 deletions src/Fields/mapreduce_cuda.jl
@@ -0,0 +1,236 @@
local_sum_cuda(
field::Field{V},
) where {
Nij,
A <: AbstractArray,
V <:
Union{DataLayouts.IJFH{<:Any, Nij, A}, DataLayouts.VIJFH{<:Any, Nij, A}},
} = mapreduce_cuda(identity, +, field, weighting = true)

local_maximum_cuda(
fn,
field::Field{V},
) where {
Nij,
A <: AbstractArray,
V <:
Union{DataLayouts.IJFH{<:Any, Nij, A}, DataLayouts.VIJFH{<:Any, Nij, A}},
} = mapreduce_cuda(fn, max, field)

local_minimum_cuda(
fn,
field::Field{V},
) where {
Nij,
A <: AbstractArray,
V <:
Union{DataLayouts.IJFH{<:Any, Nij, A}, DataLayouts.VIJFH{<:Any, Nij, A}},
} = mapreduce_cuda(fn, min, field)

function mapreduce_cuda(
f,
op,
field::Field{V};
weighting = false,
opargs...,
) where {
Nij,
A <: AbstractArray,
V <:
Union{DataLayouts.IJFH{<:Any, Nij, A}, DataLayouts.VIJFH{<:Any, Nij, A}},
}
data = Fields.field_values(field)
pdata = parent(data)
T = eltype(pdata)
(_, _, _, Nv, Nh) = size(data)
Nf = div(length(pdata), prod(size(data))) # length of field dimension
wt = Spaces.weighted_jacobian(axes(field)) # wt always IJFH layout
pwt = parent(wt)

nitems = Nv * Nij * Nij * Nh
max_threads = 256# 512 1024
nthreads = min(max_threads, nitems)
# perform n ops during loading to shmem
n_ops_on_load = cld(nitems, nthreads) == 1 ? 0 : 7
effective_blksize = nthreads * (n_ops_on_load + 1)
nblocks = cld(nitems, effective_blksize)

reduce_cuda = CuArray{T}(undef, nblocks, Nf)
reduce_cuda_fld = CuArray{T}(undef, Nf)
shmemsize = nthreads
# place each field on a different block
@cuda threads = (nthreads) blocks = (nblocks, Nf) shmem =
shmemsize * sizeof(T) mapreduce_cuda_kernel!(
reduce_cuda,
f,
op,
pdata,
pwt,
weighting,
Val(shmemsize),
Val(n_ops_on_load),
)
# reduce block data
if nblocks > 1
nthreads = min(32, nblocks)
shmemsize = nthreads
@cuda threads = (nthreads) blocks = (Nf) shmem = shmemsize * sizeof(T) reduce_cuda_blocks_kernel!(
reduce_cuda_fld,
reduce_cuda,
op,
Val(shmemsize),
)
end

return tuple(Array(reduce_cuda_fld)...)
end

function mapreduce_cuda_kernel!(
reduce_cuda::AbstractArray{T, 2},
f,
op,
pdata::AbstractArray{T, N},
pwt::AbstractArray{T, 4},
weighting::Bool,
::Val{shmemsize},
::Val{n_ops_on_load},
) where {T, N, shmemsize, n_ops_on_load}
blksize = blockDim().x
nblk = gridDim().x
tidx = threadIdx().x
bidx = blockIdx().x
fidx = blockIdx().y
effective_blksize = blksize * (n_ops_on_load + 1)
#gidx = tidx + (bidx - 1) * effective_blksize + (fidx - 1) * effective_blksize * nblk
gidx = _get_gidx(tidx, bidx, fidx, effective_blksize, nblk)
reduction = CUDA.CuStaticSharedArray(T, shmemsize)
reduction[tidx] = 0
(Nv, Nij, Nf, Nh) = _get_dims(pdata)
nitems = Nv * Nij * Nij * Nf * Nh
iidx, jidx, hidx = _get_idxs(Nv, Nij, Nf, Nh, fidx, gidx)

# load shmem
if gidx nitems
if weighting
reduction[tidx] = f(pdata[gidx]) * pwt[iidx, jidx, 1, hidx]
for n_ops in 1:n_ops_on_load
gidx2 = _get_gidx(
tidx + blksize * n_ops,
bidx,
fidx,
effective_blksize,
nblk,
)
if gidx2 nitems
iidx2, jidx2, hidx2 =
_get_idxs(Nv, Nij, Nf, Nh, fidx, gidx2)
reduction[tidx] = op(
reduction[tidx],
f(pdata[gidx2]) * pwt[iidx2, jidx2, 1, hidx2],
)
end
end
else
reduction[tidx] = f(pdata[gidx])
for n_ops in 1:n_ops_on_load
gidx2 = _get_gidx(
tidx + blksize * n_ops,
bidx,
fidx,
effective_blksize,
nblk,
)
if gidx2 nitems
iidx2, jidx2, hidx2 =
_get_idxs(Nv, Nij, Nf, Nh, fidx, gidx2)
reduction[tidx] = op(reduction[tidx], f(pdata[gidx2]))
end
end
end
end
sync_threads()
_cuda_intrablock_reduce!(op, reduction, tidx, blksize)

tidx == 1 && (reduce_cuda[bidx, fidx] = reduction[1])
return nothing
end

@inline function _get_gidx(tidx, bidx, fidx, effective_blksize, nblk)
return tidx +
(bidx - 1) * effective_blksize +
(fidx - 1) * effective_blksize * nblk
end

@inline function _get_dims(pdata::AbstractArray{FT, 4}) where {FT}
(Nij, _, Nf, Nh) = size(pdata)
return (1, Nij, Nf, Nh)
end

@inline function _get_dims(pdata::AbstractArray{FT, 5}) where {FT}
(Nv, Nij, _, Nf, Nh) = size(pdata)
return (Nv, Nij, Nf, Nh)
end

@inline function _get_idxs(Nv, Nij, Nf, Nh, fidx, gidx)
hidx = cld(gidx, Nv * Nij * Nij * Nf)
offset = ((hidx - 1) * Nf + (fidx - 1)) * Nv * Nij * Nij
jidx = cld(gidx - offset, Nv * Nij)
offset += (jidx - 1) * Nv * Nij
iidx = cld(gidx - offset, Nv)
return (iidx, jidx, hidx)
end

@inline function _cuda_reduce!(op, reduction, tidx, reduction_size, N)
if reduction_size > N
if tidx reduction_size - N
@inbounds reduction[tidx] = op(reduction[tidx], reduction[tidx + N])
end
N > 32 && sync_threads()
return N
end
return reduction_size
end

function reduce_cuda_blocks_kernel!(
reduce_cuda_fld::AbstractArray{T, 1},
reduce_cuda::AbstractArray{T, 2},
op,
::Val{shmemsize},
) where {T, shmemsize}
blksize = blockDim().x
fidx = blockIdx().x
tidx = threadIdx().x
nitems = size(reduce_cuda, 1)
nloads = cld(nitems, blksize) - 1
reduction = CUDA.CuStaticSharedArray(T, shmemsize)

reduction[tidx] = reduce_cuda[tidx, fidx]

for i in 1:nloads
idx = tidx + blksize * i
if idx nitems
reduction[tidx] = op(reduction[tidx], reduce_cuda[idx, fidx])
end
end

blksize > 32 && sync_threads()
_cuda_intrablock_reduce!(op, reduction, tidx, blksize)

tidx == 1 && (reduce_cuda_fld[fidx] = reduction[1])
return nothing
end

@inline function _cuda_intrablock_reduce!(op, reduction, tidx, blksize)
# assumes max_threads ≤ 1024 which is the current max on any CUDA device
newsize = _cuda_reduce!(op, reduction, tidx, blksize, 512)
newsize = _cuda_reduce!(op, reduction, tidx, newsize, 256)
newsize = _cuda_reduce!(op, reduction, tidx, newsize, 128)
newsize = _cuda_reduce!(op, reduction, tidx, newsize, 64)
newsize = _cuda_reduce!(op, reduction, tidx, newsize, 32)
newsize = _cuda_reduce!(op, reduction, tidx, newsize, 16)
newsize = _cuda_reduce!(op, reduction, tidx, newsize, 8)
newsize = _cuda_reduce!(op, reduction, tidx, newsize, 4)
newsize = _cuda_reduce!(op, reduction, tidx, newsize, 2)
newsize = _cuda_reduce!(op, reduction, tidx, newsize, 1)
return nothing
end
97 changes: 97 additions & 0 deletions test/Fields/reduction_cuda.jl
@@ -0,0 +1,97 @@
using Test
using CUDA
using ClimaComms

import ClimaCore:
Device,
Domains,
Fields,
Geometry,
Meshes,
Operators,
Spaces,
Topologies,
DataLayouts

# set initial condition for steady-state test
function set_initial_condition(space)
Y = map(Fields.local_geometry_field(space)) do local_geometry
h = 1.0
return h
end
return Y
end

function set_elevation(space, h₀)
Y = map(Fields.local_geometry_field(space)) do local_geometry
coord = local_geometry.coordinates

ϕ = coord.lat
λ = coord.long
FT = eltype(λ)
ϕₘ = FT(0) # degrees (equator)
λₘ = FT(3 / 2 * 180) # degrees
rₘ =
FT(acos(sind(ϕₘ) * sind(ϕ) + cosd(ϕₘ) * cosd(ϕ) * cosd- λₘ))) # Great circle distance (rads)
Rₘ = FT(3π / 4) # Moutain radius
ζₘ = FT/ 16) # Mountain oscillation half-width
zₛ = ifelse(
rₘ < Rₘ,
FT(h₀ / 2) * (1 + cospi(rₘ / Rₘ)) * (cospi(rₘ / ζₘ))^2,
FT(0),
)
return zₛ
end
return Y
end

@testset "test cuda reduction op on surface of sphere" begin
FT = Float64

device = Device.device()
context = ClimaComms.SingletonCommsContext(device)
context_cpu = ClimaComms.SingletonCommsContext() # CPU context for comparison

# Set up discretization
ne = 72
Nq = 4
ndof = ne * ne * 6 * Nq * Nq
println(
"running reduction test on $device device; problem size Ne = $ne; Nq = $Nq; ndof = $ndof; FT = $FT",
)
R = FT(6.37122e6) # radius of earth
domain = Domains.SphereDomain(R)
mesh = Meshes.EquiangularCubedSphere(domain, ne)
quad = Spaces.Quadratures.GLL{Nq}()
grid_topology = Topologies.Topology2D(context, mesh)
grid_topology_cpu = Topologies.Topology2D(context_cpu, mesh)
space = Spaces.SpectralElementSpace2D(grid_topology, quad)
space_cpu = Spaces.SpectralElementSpace2D(grid_topology_cpu, quad)

coords = Fields.coordinate_field(space)
Y = set_initial_condition(space)
Y_cpu = set_initial_condition(space_cpu)

h₀ = FT(200)
Z = set_elevation(space, h₀)
Z_cpu = set_elevation(space_cpu, h₀)

result = Fields.local_sum_cuda(Y)
result_cpu = Fields.local_sum(Y_cpu)

local_max = Fields.local_maximum_cuda(identity, Z)
local_max_cpu = Base.maximum(identity, Z_cpu)

local_min = Fields.local_minimum_cuda(identity, Z)
local_min_cpu = Base.minimum(identity, Z_cpu)
# test weighted sum
@test result[1] 4 * pi * R^2 rtol = 1e-5
@test result[1] result_cpu[1]
# test maximum
@test local_max[1] h₀
@test local_max[1] local_max_cpu
# test minimum
@test local_min[1] FT(0)
@test local_min[1] local_min_cpu

end

0 comments on commit 44b1aeb

Please sign in to comment.