Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ jobs:
- ubuntu-latest
- windows-latest
- macOS-latest
exclude:
# For Julia 1.6 no aarch64 binary exists
- version: 'min'
os: macOS-latest
include:
- version: 'min'
os: macOS-13 # uses x64
steps:
- uses: actions/checkout@v5
- uses: julia-actions/setup-julia@v2
Expand Down
6 changes: 4 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ SparseArrays = "2f01184e-e22b-5df5-ae63-d93ebab69eaf"
Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2"

[compat]
ExplicitImports = "1.13.2"
Primes = "0.4, 0.5"
Statistics = "<0.0.1, 1"
julia = "1"
julia = "1.10"

[extras]
ExplicitImports = "7d51a73a-1435-4ff3-83d9-f097790105c7"
SpecialFunctions = "276daf66-3868-5448-9aa4-cd146d93841b"
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"

[targets]
test = ["Test", "SpecialFunctions"]
test = ["ExplicitImports", "Test", "SpecialFunctions"]
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ julia> import Pkg; Pkg.add("DistributedArrays")

## Project Status

The package is tested against Julia `0.7`, `1.0` and the nightly builds of the Julia `master` branch on Linux, and macOS.
The package is tested against
Julia 1.10.0 (oldest supported Julia version),
the Julia LTS version,
the latest stable release of Julia,
and the pre-release version of Julia.

## Questions and Contributions

Expand Down
22 changes: 10 additions & 12 deletions src/DistributedArrays.jl
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
__precompile__()

module DistributedArrays

using Distributed
using Serialization
using LinearAlgebra
using Statistics
using Base: Callable
using Base.Broadcast: BroadcastStyle, Broadcasted

import Base: +, -, *, div, mod, rem, &, |, xor
import Base.Callable
import LinearAlgebra: axpy!, dot, norm, mul!
using Distributed: Distributed, RemoteChannel, Future, myid, nworkers, procs, remotecall, remotecall_fetch, remotecall_wait, worker_id_from_socket, workers
using LinearAlgebra: LinearAlgebra, Adjoint, Diagonal, I, Transpose, adjoint, adjoint!, axpy!, dot, lmul!, mul!, norm, rmul!, transpose, transpose!
using Random: Random, rand!
using Serialization: Serialization, AbstractSerializer, deserialize, serialize
using SparseArrays: SparseArrays, nnz
using Statistics: Statistics

import Primes
import Primes: factor
using Primes: factor

# DArray exports
export DArray, SubDArray, SubOrDArray, @DArray
Expand All @@ -22,7 +20,7 @@ export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindices,
export ddata, gather

# immediate release of localparts
export close, d_closeall
export d_closeall

include("darray.jl")
include("core.jl")
Expand Down
9 changes: 3 additions & 6 deletions src/broadcast.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
# Distributed broadcast implementation
##

using Base.Broadcast
import Base.Broadcast: BroadcastStyle, Broadcasted

# We define a custom ArrayStyle here since we need to keep track of
# the fact that it is Distributed and what kind of underlying broadcast behaviour
# we will encounter.
Expand All @@ -13,11 +10,11 @@ DArrayStyle(::S) where {S} = DArrayStyle{S}()
DArrayStyle(::S, ::Val{N}) where {S,N} = DArrayStyle(S(Val(N)))
DArrayStyle(::Val{N}) where N = DArrayStyle{Broadcast.DefaultArrayStyle{N}}()

BroadcastStyle(::Type{<:DArray{<:Any, N, A}}) where {N, A} = DArrayStyle(BroadcastStyle(A), Val(N))
Broadcast.BroadcastStyle(::Type{<:DArray{<:Any, N, A}}) where {N, A} = DArrayStyle(BroadcastStyle(A), Val(N))

# promotion rules
# TODO: test this
function BroadcastStyle(::DArrayStyle{AStyle}, ::DArrayStyle{BStyle}) where {AStyle, BStyle}
function Broadcast.BroadcastStyle(::DArrayStyle{AStyle}, ::DArrayStyle{BStyle}) where {AStyle, BStyle}
DArrayStyle(BroadcastStyle(AStyle, BStyle))
end

Expand Down Expand Up @@ -78,7 +75,7 @@ end
# create a local version of the broadcast, by constructing views
# Note: creates copies of the argument
lbc = bclocal(dbc, dest.indices[lpidx])
Base.copyto!(localpart(dest), lbc)
copyto!(localpart(dest), lbc)
return nothing
end
end
Expand Down
29 changes: 8 additions & 21 deletions src/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,7 @@ _localindex(i::AbstractUnitRange, offset) = (first(i)-offset):(last(i)-offset)
Equivalent to `Array(view(A, I...))` but optimised for the case that the data is local.
Can return a view into `localpart(A)`
"""
function makelocal(A::DArray{<:Any, <:Any, AT}, I::Vararg{Any, N}) where {N, AT}
Base.@_inline_meta
@inline function makelocal(A::DArray{<:Any, <:Any, AT}, I::Vararg{Any, N}) where {N, AT}
J = map(i->Base.unalias(A, i), to_indices(A, I))
J = map(j-> isa(j, Base.Slice) ? j.indices : j, J)
@boundscheck checkbounds(A, J...)
Expand Down Expand Up @@ -597,17 +596,10 @@ function Base.copyto!(a::Array, s::SubDArray)
return a
end

if VERSION < v"1.2"
# This is an internal API that has changed
reindex(A, I, J) = Base.reindex(A, I, J)
else
reindex(A, I, J) = Base.reindex(I, J)
end

function DArray(SD::SubArray{T,N}) where {T,N}
D = SD.parent
DArray(size(SD), procs(D)) do I
lindices = reindex(SD, SD.indices, I)
lindices = Base.reindex(SD.indices, I)
convert(Array, D[lindices...])
end
end
Expand Down Expand Up @@ -661,9 +653,7 @@ function Base.getindex(d::DArray, i::Int...)
end

Base.getindex(d::DArray) = d[1]
if VERSION > v"1.1-"
Base.getindex(d::SubDArray, I::Int...) = invoke(getindex, Tuple{SubArray{<:Any,N},Vararg{Int,N}} where N, d, I...)
end
Base.getindex(d::SubOrDArray, I::Union{Int,UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) = view(d, I...)

function Base.isassigned(D::DArray, i::Integer...)
Expand Down Expand Up @@ -793,13 +783,12 @@ Base.@propagate_inbounds Base.getindex(M::MergedIndices{J,N}, I::Vararg{Int, N})
const ReshapedMergedIndices{T,N,M<:MergedIndices} = Base.ReshapedArray{T,N,M}
const SubMergedIndices{T,N,M<:Union{MergedIndices, ReshapedMergedIndices}} = SubArray{T,N,M}
const MergedIndicesOrSub = Union{MergedIndices, ReshapedMergedIndices, SubMergedIndices}
import Base: checkbounds_indices
@inline checkbounds_indices(::Type{Bool}, inds::Tuple{}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) =
checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...))
@inline checkbounds_indices(::Type{Bool}, inds::Tuple{Any}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) =
checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...))
@inline checkbounds_indices(::Type{Bool}, inds::Tuple, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) =
checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...))
@inline Base.checkbounds_indices(::Type{Bool}, inds::Tuple{}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) =
Base.checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...))
@inline Base.checkbounds_indices(::Type{Bool}, inds::Tuple{Any}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) =
Base.checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...))
@inline Base.checkbounds_indices(::Type{Bool}, inds::Tuple, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) =
Base.checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...))

# The tricky thing here is that we want to optimize the accesses into the
# distributed array, but in doing so, we lose track of which indices in I we
Expand Down Expand Up @@ -851,8 +840,6 @@ function Base.fill!(A::DArray, x)
return A
end

using Random

function Random.rand!(A::DArray, ::Type{T}) where T
asyncmap(procs(A)) do p
remotecall_wait((A, T)->rand!(localpart(A), T), p, A, T)
Expand Down
6 changes: 3 additions & 3 deletions src/linalg.jl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const DMatrix{T,A} = DArray{T,2,A}

# Level 1

function axpy!(α, x::DArray, y::DArray)
function LinearAlgebra.axpy!(α, x::DArray, y::DArray)
if length(x) != length(y)
throw(DimensionMismatch("vectors must have same length"))
end
Expand All @@ -34,7 +34,7 @@ function axpy!(α, x::DArray, y::DArray)
return y
end

function dot(x::DVector, y::DVector)
function LinearAlgebra.dot(x::DVector, y::DVector)
if length(x) != length(y)
throw(DimensionMismatch(""))
end
Expand All @@ -46,7 +46,7 @@ function dot(x::DVector, y::DVector)
return reduce(+, results)
end

function norm(x::DArray, p::Real = 2)
function LinearAlgebra.norm(x::DArray, p::Real = 2)
results = []
@sync begin
for pp in procs(x)
Expand Down
19 changes: 6 additions & 13 deletions src/mapreduce.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
## higher-order functions ##

import Base: +, -, div, mod, rem, &, |, xor
import SparseArrays: nnz

Base.map(f, d0::DArray, ds::AbstractArray...) = broadcast(f, d0, ds...)

function Base.map!(f::F, dest::DArray, src::DArray{<:Any,<:Any,A}) where {F,A}
Expand Down Expand Up @@ -120,7 +117,7 @@ function Base.count(f, A::DArray)
return sum(B)
end

function nnz(A::DArray)
function SparseArrays.nnz(A::DArray)
B = asyncmap(A.pids) do p
remotecall_fetch(nnz∘localpart, p, A)
end
Expand All @@ -136,14 +133,10 @@ function Base.extrema(d::DArray)
return reduce((t,s) -> (min(t[1], s[1]), max(t[2], s[2])), r)
end

if VERSION < v"1.3"
Statistics._mean(A::DArray, region) = sum(A, dims = region) ./ prod((size(A, i) for i in region))
else
Statistics._mean(f, A::DArray, region) = sum(f, A, dims = region) ./ prod((size(A, i) for i in region))
end
Statistics._mean(f, A::DArray, region) = sum(f, A, dims = region) ./ prod((size(A, i) for i in region))

# Unary vector functions
(-)(D::DArray) = map(-, D)
Base.:(-)(D::DArray) = map(-, D)


map_localparts(f::Callable, d::DArray) = DArray(i->f(localpart(d)), d)
Expand Down Expand Up @@ -191,12 +184,12 @@ end

for f in (:+, :-, :div, :mod, :rem, :&, :|, :xor)
@eval begin
function ($f)(A::DArray{T}, B::DArray{T}) where T
function Base.$f(A::DArray{T}, B::DArray{T}) where T
B = samedist(A, B)
map_localparts($f, A, B)
end
($f)(A::DArray{T}, B::Array{T}) where {T} = map_localparts($f, A, B)
($f)(A::Array{T}, B::DArray{T}) where {T} = map_localparts($f, A, B)
Base.$f(A::DArray{T}, B::Array{T}) where {T} = map_localparts($f, A, B)
Base.$f(A::Array{T}, B::DArray{T}) where {T} = map_localparts($f, A, B)
end
end

Expand Down
6 changes: 3 additions & 3 deletions src/spmd.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
module SPMD

using Distributed
using Distributed: RemoteChannel, myid, procs, remote_do, remotecall_fetch
using ..DistributedArrays: DistributedArrays, gather, next_did

import DistributedArrays: gather, next_did, close
export sendto, recvfrom, recvfrom_any, barrier, bcast, scatter, gather
export context_local_storage, context, spmd

Expand Down Expand Up @@ -206,7 +206,7 @@ function scatter(x, pid::Int; tag=nothing, pids=procs())
end
end

function gather(x, pid::Int; tag=nothing, pids=procs())
function DistributedArrays.gather(x, pid::Int; tag=nothing, pids=procs())
if myid() == pid
gathered_data = Array{Any}(undef, length(pids))
myidx = findfirst(isequal(pid), sort(pids))
Expand Down
2 changes: 0 additions & 2 deletions test/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -351,9 +351,7 @@ check_leaks()

unpack(ex::Base.CapturedException) = unpack(ex.ex)
unpack(ex::Distributed.RemoteException) = unpack(ex.captured)
if VERSION >= v"1.3.0-alpha.110"
unpack(ex::Base.TaskFailedException) = unpack(ex.task.exception)
end
unpack(ex) = ex

@testset "test sum on DArrays" begin
Expand Down
64 changes: 64 additions & 0 deletions test/explicit_imports.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using DistributedArrays, Test
import ExplicitImports

@testset "ExplicitImports" begin
# No implicit imports in DistributedArrays (ie. no `using MyPkg`)
@test ExplicitImports.check_no_implicit_imports(DistributedArrays) === nothing

# No non-owning imports in DistributedArrays (ie. no `using LinearAlgebra: map`)
@test ExplicitImports.check_all_explicit_imports_via_owners(DistributedArrays) === nothing

# Limit non-public imports in DistributedArrays (ie. `using MyPkg: _non_public_internal_func`)
# to a few selected types and functions
@test ExplicitImports.check_all_explicit_imports_are_public(
DistributedArrays;
ignore = (
# Base
:Broadcasted,
:Callable,
(VERSION < v"1.11" ? (:tail,) : ())...,
),
) === nothing

# No stale imports in DistributedArrays (ie. no `using MyPkg: func` where `func` is not used in DistributedArrays)
@test ExplicitImports.check_no_stale_explicit_imports(DistributedArrays) === nothing

# No non-owning accesses in DistributedArrays (ie. no `... LinearAlgebra.map(...)`)
@test ExplicitImports.check_all_qualified_accesses_via_owners(DistributedArrays) === nothing

# Limit non-public accesses in DistributedArrays (ie. no `... MyPkg._non_public_internal_func(...)`)
# to a few selected types and methods from Base
@test ExplicitImports.check_all_qualified_accesses_are_public(
DistributedArrays;
ignore = (
# Base.Broadcast
:AbstractArrayStyle,
:DefaultArrayStyle,
:broadcasted,
:throwdm,
# Base
(VERSION < v"1.11" ? (Symbol("@propagate_inbounds"),) : ())...,
:ReshapedArray,
:Slice,
:_all,
:_any,
:_mapreduce,
:check_reducedims,
:checkbounds_indices,
:index_lengths,
:mapreducedim!,
:promote_op,
:reducedim_initarray,
:reindex,
:setindex_shape_check,
:unalias,
# Serialization
:serialize_type,
# Statistics
:_mean,
),
) === nothing

# No self-qualified accesses in DistributedArrays (ie. no `... DistributedArrays.func(...)`)
@test ExplicitImports.check_no_self_qualified_accesses(DistributedArrays) === nothing
end
1 change: 1 addition & 0 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ function check_leaks()
end
end

include("explicit_imports.jl")
include("darray.jl")
include("spmd.jl")

Loading