From bc312637949430e03869e61a1414f4a21c17e6ed Mon Sep 17 00:00:00 2001 From: Andreas Noack Date: Fri, 20 Jul 2018 16:00:00 +0200 Subject: [PATCH 01/18] Add allowscalar functionality to disable scalar indexing. Various other fixes --- README.md | 16 ++-- REQUIRE | 2 +- src/DistributedArrays.jl | 14 ++-- src/core.jl | 10 +-- src/darray.jl | 143 ++++++++++++++++++--------------- src/linalg.jl | 34 ++++---- src/mapreduce.jl | 168 +++++++++++++++++++-------------------- src/serialize.jl | 42 +++++----- src/spmd.jl | 16 ++-- test/darray.jl | 23 +++--- test/runtests.jl | 11 ++- 11 files changed, 248 insertions(+), 231 deletions(-) diff --git a/README.md b/README.md index fe12b4c..961e6c3 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ Distributed Arrays will only work on Julia v0.4.0 or later. `DArray`s have been removed from Julia Base library in v0.4 so it is now necessary to import the `DistributedArrays` package on all spawned processes. ```julia -@everywhere using DistributedArrays +using DistributedArrays ``` Distributed Arrays @@ -76,12 +76,12 @@ Indexing via symbols is used for this, specifically symbols `:L`,`:LP`,`:l`,`:lp are all equivalent. For example, `d[:L]` returns the localpart of `d` while `d[:L]=v` sets `v` as the localpart of `d`. -* `localindexes(a::DArray)` gives a tuple of the index ranges owned by the +* `localindices(a::DArray)` gives a tuple of the index ranges owned by the local process. * `convert(Array, a::DArray)` brings all the data to the local process. -Indexing a `DArray` (square brackets) with ranges of indexes always +Indexing a `DArray` (square brackets) with ranges of indices always creates a `SubArray`, not copying any data. @@ -205,7 +205,7 @@ seen in this simple example: ```julia julia> addprocs(8); -julia> @everywhere using DistributedArrays +julia> using DistributedArrays julia> A = fill(1.1, (100,100)); @@ -227,7 +227,7 @@ Garbage Collection and DArrays ------------------------------ When a DArray is constructed (typically on the master process), the returned DArray objects stores information on how the -array is distributed, which procesor holds which indexes and so on. When the DArray object +array is distributed, which procesor holds which indices and so on. When the DArray object on the master process is garbage collected, all particpating workers are notified and localparts of the DArray freed on each worker. @@ -317,10 +317,10 @@ Example This toy example exchanges data with each of its neighbors `n` times. ``` -using DistributedArrays +using Distributed addprocs(8) -@everywhere importall DistributedArrays -@everywhere importall DistributedArrays.SPMD +using DistributedArrays +using DistributedArrays.SPMD d_in=d=DArray(I->fill(myid(), (map(length,I)...)), (nworkers(), 2), workers(), [nworkers(),1]) d_out=ddata() diff --git a/REQUIRE b/REQUIRE index 48df863..d5a55b7 100644 --- a/REQUIRE +++ b/REQUIRE @@ -1,2 +1,2 @@ -julia 0.6 +julia 0.7-beta Primes diff --git a/src/DistributedArrays.jl b/src/DistributedArrays.jl index a441516..31d9aee 100644 --- a/src/DistributedArrays.jl +++ b/src/DistributedArrays.jl @@ -2,16 +2,20 @@ __precompile__() module DistributedArrays -importall Base +using Distributed +using Serialization +using LinearAlgebra + +import Base: +, -, *, div, mod, rem, &, |, xor import Base.Callable -import Base.BLAS: axpy! +import LinearAlgebra.BLAS: axpy! -using Primes -using Primes: factor +import Primes +import Primes: factor # DArray exports export DArray, SubDArray, SubOrDArray, @DArray -export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindexes, ppeval +export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindices, ppeval # non-array distributed data export ddata, gather diff --git a/src/core.jl b/src/core.jl index 4f1f5cc..034e901 100644 --- a/src/core.jl +++ b/src/core.jl @@ -17,7 +17,7 @@ release_localpart(id::Tuple) = (delete!(registry, id); nothing) release_localpart(d) = release_localpart(d.id) function close_by_id(id, pids) -# @schedule println("Finalizer for : ", id) +# @async println("Finalizer for : ", id) global refs @sync begin for p in pids @@ -31,10 +31,10 @@ function close_by_id(id, pids) nothing end -function close(d::DArray) -# @schedule println("close : ", d.id, ", object_id : ", object_id(d), ", myid : ", myid() ) +function Base.close(d::DArray) +# @async println("close : ", d.id, ", object_id : ", object_id(d), ", myid : ", myid() ) if (myid() == d.id[1]) && d.release - @schedule close_by_id(d.id, d.pids) + @async close_by_id(d.id, d.pids) d.release = false end nothing @@ -55,7 +55,7 @@ end Get the vector of processes storing pieces of DArray `d`. """ -Base.procs(d::DArray) = d.pids +Distributed.procs(d::DArray) = d.pids """ localpart(A) diff --git a/src/darray.jl b/src/darray.jl index 0084466..4c2453d 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -26,22 +26,22 @@ mutable struct DArray{T,N,A} <: AbstractArray{T,N} id::Tuple dims::NTuple{N,Int} pids::Array{Int,N} # pids[i]==p ⇒ processor p has piece i - indexes::Array{NTuple{N,UnitRange{Int}},N} # indexes held by piece i + indices::Array{NTuple{N,UnitRange{Int}},N} # indices held by piece i cuts::Vector{Vector{Int}} # cuts[d][i] = first index of chunk i in dimension d - localpart::Nullable{A} + localpart::Union{A,Missing} release::Bool - function DArray{T,N,A}(id, dims, pids, indexes, cuts, lp) where {T,N,A} + function DArray{T,N,A}(id, dims, pids, indices, cuts, lp) where {T,N,A} # check invariants - if dims != map(last, last(indexes)) - throw(ArgumentError("dimension of DArray (dim) and indexes do not match")) + if dims != map(last, last(indices)) + throw(ArgumentError("dimension of DArray (dim) and indices do not match")) end release = (myid() == id[1]) d = d_from_weakref_or_d(id) if d === nothing - d = new(id, dims, pids, indexes, cuts, lp, release) + d = new(id, dims, pids, indices, cuts, lp, release) end if release @@ -49,7 +49,7 @@ mutable struct DArray{T,N,A} <: AbstractArray{T,N} registry[id] = WeakRef(d) # println("Installing finalizer for : ", d.id, ", : ", object_id(d), ", isbits: ", isbits(d)) - finalizer(d, close) + finalizer(close, d) end d end @@ -63,7 +63,7 @@ function d_from_weakref_or_d(id) return d end -eltype(::Type{DArray{T}}) where {T} = T +Base.eltype(::Type{DArray{T}}) where {T} = T empty_localpart(T,N,A) = A(Array{T}(ntuple(zero, N))) const SubDArray{T,N,D<:DArray} = SubArray{T,N,D} @@ -148,7 +148,7 @@ function ddata(;T::Type=Any, init::Function=I->nothing, pids=workers(), data::Ve d = registry[id] d = isa(d, WeakRef) ? d.value : d else - d = DArray{T,1,T}(id, (npids,), pids, idxs, cuts, Nullable{T}()) + d = DArray{T,1,T}(id, (npids,), pids, idxs, cuts, missing) end d end @@ -189,18 +189,18 @@ function DArray(refs) id = next_did() npids = [r.where for r in refs] - nsizes = Array{Tuple}(dimdist) + nsizes = Array{Tuple}(undef, dimdist) @sync for i in 1:length(refs) let i=i @async nsizes[i] = remotecall_fetch(sz_localpart_ref, npids[i], refs[i], id) end end - nindexes = Array{NTuple{length(dimdist),UnitRange{Int}}}(dimdist...) + nindices = Array{NTuple{length(dimdist),UnitRange{Int}}}(dimdist...) - for i in 1:length(nindexes) - subidx = ind2sub(dimdist, i) - nindexes[i] = ntuple(length(subidx)) do x + for i in 1:length(nindices) + subidx = CartesianIndices(dimdist)[i] + nindices[i] = ntuple(length(subidx)) do x idx_in_dim = subidx[x] startidx = 1 for j in 1:(idx_in_dim-1) @@ -212,11 +212,11 @@ function DArray(refs) end end - lastidxs = hcat([Int[last(idx_in_d)+1 for idx_in_d in idx] for idx in nindexes]...) - ncuts = Array{Int,1}[unshift!(sort(unique(lastidxs[x,:])), 1) for x in 1:length(dimdist)] + lastidxs = hcat([Int[last(idx_in_d)+1 for idx_in_d in idx] for idx in nindices]...) + ncuts = Array{Int,1}[pushfirst!(sort(unique(lastidxs[x,:])), 1) for x in 1:length(dimdist)] ndims = tuple([sort(unique(lastidxs[x,:]))[end]-1 for x in 1:length(dimdist)]...) - DArray(id, refs, ndims, reshape(npids, dimdist), nindexes, ncuts) + DArray(id, refs, ndims, reshape(npids, dimdist), nindices, ncuts) end macro DArray(ex0::Expr) @@ -239,7 +239,7 @@ macro DArray(ex0::Expr) end # new DArray similar to an existing one -DArray(init, d::DArray) = DArray(next_did(), init, size(d), procs(d), d.indexes, d.cuts) +DArray(init, d::DArray) = DArray(next_did(), init, size(d), procs(d), d.indices, d.cuts) sz_localpart_ref(ref, id) = size(fetch(ref)) @@ -273,7 +273,7 @@ function defaultdist(dims, pids) fac = f[k] (d, dno) = findmax(dims) # resolve ties to highest dim - dno = last(find(dims .== d)) + dno = findlast(isequal(d), dims) if dims[dno] >= fac dims[dno] = div(dims[dno], fac) chunks[dno] *= fac @@ -283,21 +283,21 @@ function defaultdist(dims, pids) return chunks end -# get array of start indexes for dividing sz into nc chunks +# get array of start indices for dividing sz into nc chunks function defaultdist(sz::Int, nc::Int) if sz >= nc - return round.(Int, linspace(1, sz+1, nc+1)) + return round.(Int, range(1, stop=sz+1, length=nc+1)) else - return [[1:(sz+1);], zeros(Int, nc-sz);] + return [[1:(sz+1);]; zeros(Int, nc-sz);] end end -# compute indexes array for dividing dims into chunks +# compute indices array for dividing dims into chunks function chunk_idxs(dims, chunks) cuts = map(defaultdist, dims, chunks) n = length(dims) - idxs = Array{NTuple{n,UnitRange{Int}}}(chunks...) - for cidx in CartesianRange(tuple(chunks...)) + idxs = Array{NTuple{n,UnitRange{Int}}}(undef, chunks...) + for cidx in CartesianIndices(tuple(chunks...)) idxs[cidx.I...] = ntuple(i -> (cuts[i][cidx[i]]:cuts[i][cidx[i] + 1] - 1), n) end return (idxs, cuts) @@ -330,7 +330,7 @@ function localpart(d::DArray{T,N,A}) where {T,N,A} return empty_localpart(T,N,A)::A end - return get(d.localpart)::A + return d.localpart::A end localpart(d::DArray, localidx...) = localpart(d)[localidx...] @@ -349,20 +349,20 @@ end # fetch localpart of d at pids[i] -fetch(d::DArray{T,N,A}, i) where {T,N,A} = remotecall_fetch(localpart, d.pids[i], d) +Base.fetch(d::DArray{T,N,A}, i) where {T,N,A} = remotecall_fetch(localpart, d.pids[i], d) """ - localindexes(d) + localindices(d) -A tuple describing the indexes owned by the local process. +A tuple describing the indices owned by the local process. Returns a tuple with empty ranges if no local part exists on the calling process. """ -function localindexes(d::DArray) +function localindices(d::DArray) lpidx = localpartindex(d) if lpidx == 0 return ntuple(i -> 1:0, ndims(d)) end - return d.indexes[lpidx] + return d.indices[lpidx] end # find which piece holds index (I...) @@ -459,59 +459,48 @@ Distribute a local array `A` like the distributed array `DA`. function distribute(A::AbstractArray, DA::DArray) size(DA) == size(A) || throw(DimensionMismatch("Distributed array has size $(size(DA)) but array has $(size(A))")) - s = verified_destination_serializer(procs(DA), size(DA.indexes)) do pididx - A[DA.indexes[pididx]...] + s = verified_destination_serializer(procs(DA), size(DA.indices)) do pididx + A[DA.indices[pididx]...] end return DArray(I->localpart(s), DA) end -Base.convert(::Type{DArray{T,N,S}}, A::S) where {T,N,S<:AbstractArray} = distribute(convert(AbstractArray{T,N}, A)) +(::Type{DArray{T,N,S}})(A::S) where {T,N,S<:AbstractArray} = distribute(convert(AbstractArray{T,N}, A)) -Base.convert(::Type{Array{S,N}}, d::DArray{T,N}) where {S,T,N} = begin - a = Array{S}(size(d)) +function (::Type{Array{S,N}})(d::DArray{T,N}) where {S,T,N} + a = Array{S}(undef, size(d)) @sync begin for i = 1:length(d.pids) - @async a[d.indexes[i]...] = chunk(d, i) + @async a[d.indices[i]...] = chunk(d, i) end end return a end -Base.convert(::Type{Array{S,N}}, s::SubDArray{T,N}) where {S,T,N} = begin - I = s.indexes +function (::Type{Array{S,N}})(s::SubDArray{T,N}) where {S,T,N} + I = s.indices d = s.parent if isa(I,Tuple{Vararg{UnitRange{Int}}}) && S<:T && T<:S l = locate(d, map(first, I)...) - if isequal(d.indexes[l...], I) + if isequal(d.indices[l...], I) # SubDArray corresponds to a chunk return chunk(d, l...) end end - a = Array{S}(size(s)) + a = Array{S}(undef, size(s)) a[[1:size(a,i) for i=1:N]...] = s return a end -function Base.convert(::Type{DArray}, SD::SubArray{T,N}) where {T,N} +function (::Type{DArray})(SD::SubArray{T,N}) where {T,N} D = SD.parent DArray(size(SD), procs(D)) do I - TR = typeof(SD.indexes[1]) - lindices = Array{TR}(0) - for (i,r) in zip(I, SD.indexes) - st = step(r) - lrstart = first(r) + st*(first(i)-1) - lrend = first(r) + st*(last(i)-1) - if TR <: UnitRange - push!(lindices, lrstart:lrend) - else - push!(lindices, lrstart:st:lrend) - end - end + lindices = Base.reindex(SD, SD.indices, I) convert(Array, D[lindices...]) end end -Base.reshape(A::DArray{T,1,S}, d::Dims) where {T,S<:Array} = begin +function Base.reshape(A::DArray{T,1,S}, d::Dims) where {T,S<:Array} if prod(d) != length(A) throw(DimensionMismatch("dimensions must be consistent with array size")) end @@ -525,7 +514,7 @@ Base.reshape(A::DArray{T,1,S}, d::Dims) where {T,S<:Array} = begin sztail = size(B)[2:end] for i=1:div(length(B),nr) - i2 = ind2sub(sztail, i) + i2 = CartesianIndices(sztail)[i] globalidx = [ I[j][i2[j-1]] for j=2:nd ] a = sub2ind(d, d1offs, globalidx...) @@ -537,27 +526,49 @@ Base.reshape(A::DArray{T,1,S}, d::Dims) where {T,S<:Array} = begin end ## indexing ## +const _allowscalar = Ref(true) +allowscalar(flag = true) = (_allowscalar[] = flag) +_scalarindexingallowed() = _allowscalar[] || throw(ErrorException("scalar indexing disabled")) getlocalindex(d::DArray, idx...) = localpart(d)[idx...] function getindex_tuple(d::DArray{T}, I::Tuple{Vararg{Int}}) where T chidx = locate(d, I...) - idxs = d.indexes[chidx...] + idxs = d.indices[chidx...] localidx = ntuple(i -> (I[i] - first(idxs[i]) + 1), ndims(d)) pid = d.pids[chidx...] return remotecall_fetch(getlocalindex, pid, d, localidx...)::T end -Base.getindex(d::DArray, i::Int) = getindex_tuple(d, ind2sub(size(d), i)) -Base.getindex(d::DArray, i::Int...) = getindex_tuple(d, i) +function Base.getindex(d::DArray, i::Int) + _scalarindexingallowed() + return getindex_tuple(d, CartesianIndices(d)[i]) +end +function Base.getindex(d::DArray, i::Int...) + _scalarindexingallowed() + return getindex_tuple(d, i) +end Base.getindex(d::DArray) = d[1] Base.getindex(d::DArray, I::Union{Int,UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) = view(d, I...) +function Base.isassigned(D::DArray, i::Integer...) + try + getindex_tuple(D, i) + true + catch e + if isa(e, BoundsError) || isa(e, UndefRefError) + return false + else + rethrow(e) + end + end +end + -Base.copy!(dest::SubOrDArray, src::SubOrDArray) = begin +function Base.copy!(dest::SubOrDArray, src::SubOrDArray) asyncmap(procs(dest)) do p remotecall_fetch(p) do - localpart(dest)[:] = src[localindexes(dest)...] + localpart(dest)[:] = src[localindices(dest)...] end end return dest @@ -570,7 +581,7 @@ function Base.setindex!(a::Array, d::DArray, I::Union{UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) n = length(I) @sync for i = 1:length(d.pids) - K = d.indexes[i] + K = d.indices[i] @async a[[I[j][K[j]] for j=1:n]...] = chunk(d, i) end return a @@ -585,7 +596,7 @@ end # skip at the end. In many cases range intersection would be much faster # than generating a logical mask, but that loses the endpoint information. indexin_mask(a, b::Number) = a .== b -indexin_mask(a, r::Range{Int}) = [i in r for i in a] +indexin_mask(a, r::AbstractRange{Int}) = [i in r for i in a] indexin_mask(a, b::AbstractArray{Int}) = indexin_mask(a, IntSet(b)) indexin_mask(a, b::AbstractArray) = indexin_mask(a, Set(b)) indexin_mask(a, b) = [i in b for i in a] @@ -683,9 +694,9 @@ function Base.setindex!(a::Array, s::SubDArray, Base.setindex_shape_check(s, Base.index_lengths(Inew...)...) n = length(Inew) d = s.parent - J = Base.to_indices(d, s.indexes) + J = Base.to_indices(d, s.indices) @sync for i = 1:length(d.pids) - K_c = d.indexes[i] + K_c = d.indices[i] K = map(intersect, J, K_c) if !any(isempty, K) K_mask = map(indexin_mask, J, K_c) @@ -705,7 +716,7 @@ function Base.setindex!(a::Array, s::SubDArray, return a end -Base.fill!(A::DArray, x) = begin +function Base.fill!(A::DArray, x) @sync for p in procs(A) @async remotecall_fetch((A,x)->(fill!(localpart(A), x); nothing), p, A, x) end diff --git a/src/linalg.jl b/src/linalg.jl index c39837f..3218483 100644 --- a/src/linalg.jl +++ b/src/linalg.jl @@ -1,4 +1,4 @@ -function Base.ctranspose(D::DArray{T,2}) where T +function Base.copy(D::Adjoint{T,<:DArray{T,2}}) where T DArray(reverse(size(D)), procs(D)) do I lp = Array{T}(map(length, I)) rp = convert(Array, D[reverse(I)...]) @@ -6,7 +6,7 @@ function Base.ctranspose(D::DArray{T,2}) where T end end -function Base.transpose(D::DArray{T,2}) where T +function Base.copy(D::Transpose{T,<:DArray{T,2}}) where T DArray(reverse(size(D)), procs(D)) do I lp = Array{T}(map(length, I)) rp = convert(Array, D[reverse(I)...]) @@ -59,9 +59,9 @@ function norm(x::DVector, p::Real = 2) return norm(results, p) end -Base.scale!(A::DArray, x::Number) = begin +function LinearAlgebra.rmul!(A::DArray, x::Number) @sync for p in procs(A) - @async remotecall_fetch((A,x)->(scale!(localpart(A), x); nothing), p, A, x) + @async remotecall_fetch((A,x)->(rmul!(localpart(A), x); nothing), p, A, x) end return A end @@ -108,7 +108,7 @@ function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVec if β != one(β) @sync for p in y.pids if β != zero(β) - @async remotecall_fetch(y -> (scale!(localpart(y), β); nothing), p, y) + @async remotecall_fetch(y -> (rmul!(localpart(y), β); nothing), p, y) else @async remotecall_fetch(y -> (fill!(localpart(y), 0); nothing), p, y) end @@ -150,7 +150,7 @@ function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVe if β != one(β) @sync for p in y.pids if β != zero(β) - @async remotecall_fetch(() -> (scale!(localpart(y), β); nothing), p) + @async remotecall_fetch(() -> (rmul!(localpart(y), β); nothing), p) else @async remotecall_fetch(() -> (fill!(localpart(y), 0); nothing), p) end @@ -168,21 +168,23 @@ function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVe return y end -function Base.LinAlg.scale!(b::AbstractVector, DA::DMatrix) - s = verified_destination_serializer(procs(DA), size(DA.indexes)) do pididx - b[DA.indexes[pididx][1]] +function LinearAlgebra.lmul!(D::Diagonal, DA::DMatrix) + d = D.diag + s = verified_destination_serializer(procs(DA), size(DA.indices)) do pididx + d[DA.indices[pididx][1]] end map_localparts!(DA) do lDA - scale!(localpart(s), lDA) + lmul!(Diagonal(localpart(s)), lDA) end end -function Base.LinAlg.scale!(DA::DMatrix, b::AbstractVector) - s = verified_destination_serializer(procs(DA), size(DA.indexes)) do pididx - b[DA.indexes[pididx][2]] +function LinearAlgebra.rmul!(DA::DMatrix, D::Diagonal) + d = D.diag + s = verified_destination_serializer(procs(DA), size(DA.indices)) do pididx + d[DA.indices[pididx][2]] end map_localparts!(DA) do lDA - scale!(lDA, localpart(s)) + rmul!(lDA, Diagonal(localpart(s))) end end @@ -217,7 +219,7 @@ function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::D p = (tA == 'N') ? procs(A)[i,j] : procs(A)[j,i] R[i,j,k] = remotecall(p) do if tA == 'T' - return localpart(A).'*convert(localtype(B), Bjk) + return transpose(localpart(A))*convert(localtype(B), Bjk) elseif tA == 'C' return localpart(A)'*convert(localtype(B), Bjk) else @@ -232,7 +234,7 @@ function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::D if β != one(β) @sync for p in C.pids if β != zero(β) - @async remotecall_fetch(() -> (scale!(localpart(C), β); nothing), p) + @async remotecall_fetch(() -> (rmul!(localpart(C), β); nothing), p) else @async remotecall_fetch(() -> (fill!(localpart(C), 0); nothing), p) end diff --git a/src/mapreduce.jl b/src/mapreduce.jl index 44c32dc..025ad4c 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -2,14 +2,18 @@ Base.map(f, d0::DArray, ds::AbstractArray...) = broadcast(f, d0, ds...) -Base.map!(f::F, dest::DArray, src::DArray) where {F} = begin - @sync for p in procs(dest) - @async remotecall_fetch(() -> (map!(f, localpart(dest), src[localindexes(dest)...]); nothing), p) +function Base.map!(f::F, dest::DArray, src::DArray) where {F} + asyncmap(procs(dest)) do p + remotecall_fetch(p) do + map!(f, localpart(dest), src[localindices(dest)...]) + return nothing + end end return dest end -Base.Broadcast._containertype(::Type{D}) where {D<:DArray} = DArray +# Base.Broadcast._containertype(::Type{D}) where {D<:DArray} = DArray +Base.BroadcastStyle(::Type{<:DArray}) = Broadcast.ArrayStyle{DArray}() Base.Broadcast.promote_containertype(::Type{DArray}, ::Type{DArray}) = DArray Base.Broadcast.promote_containertype(::Type{DArray}, ::Type{Array}) = DArray @@ -26,7 +30,7 @@ Base.Broadcast.broadcast_indices(::Type{DArray}, A::Ref) = () function Base.Broadcast.broadcast_c(f, ::Type{DArray}, As...) T = Base.Broadcast._broadcast_eltype(f, As...) shape = Base.Broadcast.broadcast_indices(As...) - iter = Base.CartesianRange(shape) + iter = Base.CartesianIndices(shape) D = DArray(map(length, shape)) do I Base.Broadcast.broadcast_c(f, Array, map(a -> isa(a, Union{Number,Ref}) ? a : @@ -36,32 +40,28 @@ function Base.Broadcast.broadcast_c(f, ::Type{DArray}, As...) end function Base.reduce(f, d::DArray) - results=[] - @sync begin - for p in procs(d) - @async push!(results, remotecall_fetch((f,d)->reduce(f, localpart(d)), p, f, d)) + results = asyncmap(procs(d)) do p + remotecall_fetch(p, f, d) do (f, d) + return reduce(f, localpart(d)) end end reduce(f, results) end -function _mapreduce(f, opt, d::DArray) -# TODO Change to an @async remotecall_fetch - will reduce one extra network hop - -# once bug in master is fixed. - results=[] - @sync begin - for p in procs(d) - @async push!(results, remotecall_fetch((f,opt,d)->mapreduce(f, opt, localpart(d)), p, f, opt, d)) - end +function Base._mapreduce(f, op, ::IndexCartesian, d::DArray) + results = asyncmap(procs(d)) do p + remotecall_fetch((_f,_op,_d)->mapreduce(_f, _op, localpart(_d)), p, f, op, d) end - reduce(opt, results) + + reduce(op, results) end -Base.mapreduce(f, opt::Union{typeof(|), typeof(&)}, d::DArray) = _mapreduce(f, opt, d) -Base.mapreduce(f, opt::Function, d::DArray) = _mapreduce(f, opt, d) -Base.mapreduce(f, opt, d::DArray) = _mapreduce(f, opt, d) +Base._mapreduce(f, op, ::IndexCartesian, d::SubDArray) = Base._mapreduce(f, op, IndexCartesian(), DArray(d)) +# Base.mapreduce(f, opt::Union{typeof(|), typeof(&)}, d::DArray) = _mapreduce(f, opt, d) +# Base.mapreduce(f, opt::Function, d::DArray) = _mapreduce(f, opt, d) +# Base.mapreduce(f, opt, d::DArray) = _mapreduce(f, opt, d) # mapreducedim -Base.reducedim_initarray(A::DArray, region, v0, ::Type{R}) where {R} = begin +function Base.reducedim_initarray(A::DArray, region, v0, ::Type{R}) where {R} # Store reduction on lowest pids pids = A.pids[ntuple(i -> i in region ? (1:1) : (:), ndims(A))...] chunks = similar(pids, Future) @@ -72,28 +72,28 @@ Base.reducedim_initarray(A::DArray, region, v0, ::Type{R}) where {R} = begin end Base.reducedim_initarray(A::DArray, region, v0::T) where {T} = Base.reducedim_initarray(A, region, v0, T) -Base.reducedim_initarray0(A::DArray, region, v0, ::Type{R}) where {R} = begin - # Store reduction on lowest pids - pids = A.pids[ntuple(i -> i in region ? (1:1) : (:), ndims(A))...] - chunks = similar(pids, Future) - @sync for i in eachindex(pids) - @async chunks[i...] = remotecall_wait(() -> Base.reducedim_initarray0(localpart(A), region, v0, R), pids[i...]) - end - return DArray(chunks) -end -Base.reducedim_initarray0(A::DArray, region, v0::T) where {T} = Base.reducedim_initarray0(A, region, v0, T) +# function Base.reducedim_initarray0(A::DArray, region, v0, ::Type{R}) where {R} +# # Store reduction on lowest pids +# pids = A.pids[ntuple(i -> i in region ? (1:1) : (:), ndims(A))...] +# chunks = similar(pids, Future) +# @sync for i in eachindex(pids) +# @async chunks[i...] = remotecall_wait(() -> Base.reducedim_initarray0(localpart(A), region, v0, R), pids[i...]) +# end +# return DArray(chunks) +# end +# Base.reducedim_initarray0(A::DArray, region, v0::T) where {T} = Base.reducedim_initarray0(A, region, v0, T) # Compute mapreducedim of each localpart and store the result in a new DArray -mapreducedim_within(f, op, A::DArray, region) = begin +function mapreducedim_within(f, op, A::DArray, region) arraysize = [size(A)...] - gridsize = [size(A.indexes)...] + gridsize = [size(A.indices)...] arraysize[[region...]] = gridsize[[region...]] - indx = similar(A.indexes) - for i in CartesianRange(size(indx)) - indx[i] = ntuple(j -> j in region ? (i.I[j]:i.I[j]) : A.indexes[i][j], ndims(A)) + indx = similar(A.indices) + for i in CartesianIndices(size(indx)) + indx[i] = ntuple(j -> j in region ? (i.I[j]:i.I[j]) : A.indices[i][j], ndims(A)) end cuts = [i in region ? collect(1:arraysize[i] + 1) : A.cuts[i] for i in 1:ndims(A)] - return DArray(next_did(), I -> mapreducedim(f, op, localpart(A), region), + return DArray(next_did(), I -> mapreduce(f, op, localpart(A), dims=region), tuple(arraysize...), procs(A), indx, cuts) end @@ -101,9 +101,9 @@ end # has been run on each localpart with mapreducedim_within. Eventually, we might # want to write mapreducedim_between! as a binary reduction. function mapreducedim_between!(f, op, R::DArray, A::DArray, region) - @sync for p in procs(R) - @async remotecall_fetch(p, f, op, R, A, region) do f, op, R, A, region - localind = [r for r = localindexes(A)] + asyncmap(procs(R)) do p + remotecall_fetch(p, f, op, R, A, region) do f, op, R, A, region + localind = [r for r = localindices(A)] localind[[region...]] = [1:n for n = size(A)[[region...]]] B = convert(Array, A[localind...]) Base.mapreducedim!(f, op, localpart(R), B) @@ -113,7 +113,7 @@ function mapreducedim_between!(f, op, R::DArray, A::DArray, region) return R end -Base.mapreducedim!(f, op, R::DArray, A::DArray) = begin +function Base.mapreducedim!(f, op, R::DArray, A::DArray) lsize = Base.check_reducedims(R,A) if isempty(A) return copy(R) @@ -126,29 +126,27 @@ Base.mapreducedim!(f, op, R::DArray, A::DArray) = begin return mapreducedim_between!(identity, op, R, B, region) end -Base.mapreducedim(f, op, R::DArray, A::DArray) = begin - Base.mapreducedim!(f, op, Base.reducedim_initarray(A, region, v0), A) -end +# function Base.mapreducedim(f, op, R::DArray, A::DArray) +# Base.mapreducedim!(f, op, Base.reducedim_initarray(A, region, v0), A) +# end function nnz(A::DArray) - B = Array{Any}(size(A.pids)) - @sync begin - for i in eachindex(A.pids) - @async B[i...] = remotecall_fetch(x -> nnz(localpart(x)), A.pids[i...], A) - end + B = asyncmap(A.pids) do p + remotecall_fetch(nnz∘localpart, p, A) end return reduce(+, B) end # reduce like -for (fn, fr) in ((:sum, :+), - (:prod, :*), - (:maximum, :max), - (:minimum, :min), - (:any, :|), - (:all, :&)) - @eval (Base.$fn)(d::DArray) = reduce($fr, d) -end +# for (fn, fr) in ((:sum, :+), +# (:prod, :*), +# (:maximum, :max), +# (:minimum, :min), +# (:any, :|), +# (:all, :&)) +# @eval (Base.$fn)(d::DArray) = reduce($fr, d) +# @eval (Base.$fn)(f, d::DArray) = mapreduce(f, $fr, d) +# end function Base.extrema(d::DArray) r = asyncmap(procs(d)) do p @@ -160,22 +158,22 @@ function Base.extrema(d::DArray) end # mapreduce like -for (fn, fr1, fr2) in ((:maxabs, :abs, :max), - (:minabs, :abs, :min), - (:sumabs, :abs, :+), - (:sumabs2, :abs2, :+)) - @eval (Base.$fn)(d::DArray) = mapreduce($fr1, $fr2, d) -end +# for (fn, fr1, fr2) in ((:abs, :max), +# (:minabs, :abs, :min), +# (:sumabs, :abs, :+), +# (:sumabs2, :abs2, :+)) +# @eval (Base.$fn)(d::DArray) = mapreduce($fr1, $fr2, d) +# end # semi mapreduce -for (fn, fr) in ((:any, :|), - (:all, :&), - (:count, :+)) - @eval begin - (Base.$fn)(f::typeof(identity), d::DArray) = mapreduce(f, $fr, d) - (Base.$fn)(f::Callable, d::DArray) = mapreduce(f, $fr, d) - end -end +# for (fn, fr) in ((:any, :|), +# (:all, :&), +# (:count, :+)) +# @eval begin +# (Base.$fn)(f::typeof(identity), d::DArray) = mapreduce(f, $fr, d) +# (Base.$fn)(f::Callable, d::DArray) = mapreduce(f, $fr, d) +# end +# end # Unary vector functions (-)(D::DArray) = map(-, D) @@ -187,8 +185,8 @@ map_localparts(f::Callable, d1::DArray, d2::DArray) = DArray(d1) do I end function map_localparts(f::Callable, DA::DArray, A::Array) - s = verified_destination_serializer(procs(DA), size(DA.indexes)) do pididx - A[DA.indexes[pididx]...] + s = verified_destination_serializer(procs(DA), size(DA.indices)) do pididx + A[DA.indices[pididx]...] end DArray(DA) do I f(localpart(DA), localpart(s)) @@ -196,8 +194,8 @@ function map_localparts(f::Callable, DA::DArray, A::Array) end function map_localparts(f::Callable, A::Array, DA::DArray) - s = verified_destination_serializer(procs(DA), size(DA.indexes)) do pididx - A[DA.indexes[pididx]...] + s = verified_destination_serializer(procs(DA), size(DA.indices)) do pididx + A[DA.indices[pididx]...] end DArray(DA) do I f(localpart(s), localpart(DA)) @@ -205,8 +203,8 @@ function map_localparts(f::Callable, A::Array, DA::DArray) end function map_localparts!(f::Callable, d::DArray) - @sync for p in procs(d) - @async remotecall_fetch((f,d)->(f(localpart(d)); nothing), p, f, d) + asyncmap(procs(d)) do p + remotecall_fetch((f,d)->(f(localpart(d)); nothing), p, f, d) end return d end @@ -224,7 +222,7 @@ function samedist(A::DArray, B::DArray) B end -for f in (:+, :-, :div, :mod, :rem, :&, :|, :$) +for f in (:+, :-, :div, :mod, :rem, :&, :|, :xor) @eval begin function ($f)(A::DArray{T}, B::DArray{T}) where T B = samedist(A, B) @@ -236,7 +234,7 @@ for f in (:+, :-, :div, :mod, :rem, :&, :|, :$) end function mapslices(f::Function, D::DArray{T,N,A}, dims::AbstractVector) where {T,N,A} - if !all(t -> t == 1, size(D.indexes)[dims]) + if !all(t -> t == 1, size(D.indices)[dims]) p = ones(Int, ndims(D)) nondims = filter(t -> !(t in dims), 1:ndims(D)) p[nondims] = defaultdist([size(D)...][[nondims...]], procs(D)) @@ -326,17 +324,17 @@ the input arrays are sliced. #### Examples ```jl -addprocs(JULIA_CPU_CORES) +addprocs(Sys.CPU_THREADS) using DistributedArrays -A = drandn((10, 10, JULIA_CPU_CORES), workers(), [1, 1, JULIA_CPU_CORES]) +A = drandn((10, 10, Sys.CPU_THREADS), workers(), [1, 1, Sys.CPU_THREADS]) ppeval(eigvals, A) ppeval(eigvals, A, randn(10,10)) # broadcasting second argument -B = drandn((10, JULIA_CPU_CORES), workers(), [1, JULIA_CPU_CORES]) +B = drandn((10, Sys.CPU_THREADS), workers(), [1, Sys.CPU_THREADS]) ppeval(*, A, B) ``` @@ -345,7 +343,7 @@ function ppeval(f, D...; dim::NTuple = map(t -> isa(t, DArray) ? ndims(t) : 0, D #Ensure that the complete DArray is available on the specified dims on all processors for i = 1:length(D) if isa(D[i], DArray) - for idxs in D[i].indexes + for idxs in D[i].indices for d in setdiff(1:ndims(D[i]), dim[i]) if length(idxs[d]) != size(D[i], d) throw(DimensionMismatch(string("dimension $d is distributed. ", @@ -363,5 +361,5 @@ function ppeval(f, D...; dim::NTuple = map(t -> isa(t, DArray) ? ndims(t) : 0, D # the DArray constructor. sd = [size(D[1].pids)...] nd = remotecall_fetch((r)->ndims(fetch(r)), refs[1].where, refs[1]) - DArray(reshape(refs, tuple([sd[1:nd - 1], sd[end];]...))) + DArray(reshape(refs, tuple([sd[1:nd - 1]; sd[end];]...))) end diff --git a/src/serialize.jl b/src/serialize.jl index 4e0ec96..28b5a9a 100644 --- a/src/serialize.jl +++ b/src/serialize.jl @@ -1,20 +1,20 @@ -function Base.serialize(S::AbstractSerializer, d::DArray{T,N,A}) where {T,N,A} +function Serialization.serialize(S::AbstractSerializer, d::DArray{T,N,A}) where {T,N,A} # Only send the ident for participating workers - we expect the DArray to exist in the # remote registry. DO NOT send the localpart. - destpid = Base.worker_id_from_socket(S.io) + destpid = Distributed.worker_id_from_socket(S.io) Serializer.serialize_type(S, typeof(d)) if (destpid in d.pids) || (destpid == d.id[1]) serialize(S, (true, d.id)) # (id_only, id) else serialize(S, (false, d.id)) - for n in [:dims, :pids, :indexes, :cuts] + for n in [:dims, :pids, :indices, :cuts] serialize(S, getfield(d, n)) end serialize(S, A) end end -function Base.deserialize(S::AbstractSerializer, t::Type{DT}) where DT<:DArray +function Serialization.deserialize(S::AbstractSerializer, t::Type{DT}) where DT<:DArray what = deserialize(S) id_only = what[1] id = what[2] @@ -32,26 +32,26 @@ function Base.deserialize(S::AbstractSerializer, t::Type{DT}) where DT<:DArray # We are not a participating worker, deser fields and instantiate locally. dims = deserialize(S) pids = deserialize(S) - indexes = deserialize(S) + indices = deserialize(S) cuts = deserialize(S) A = deserialize(S) T=eltype(DT) N=length(dims) - return DT(id, dims, pids, indexes, cuts, empty_localpart(T,N,A)) + return DT(id, dims, pids, indices, cuts, empty_localpart(T,N,A)) end end # Serialize only those parts of the object as required by the destination worker. mutable struct DestinationSerializer - generate::Nullable{Function} # Function to generate the part to be serialized - pids::Nullable{Array} # MUST have the same shape as the distribution + generate::Union{Function,Missing} # Function to generate the part to be serialized + pids::Union{Array,Missing} # MUST have the same shape as the distribution - deser_obj::Nullable{Any} # Deserialized part + deser_obj::Union{Any,Missing} # Deserialized part DestinationSerializer(f,p,d) = new(f,p,d) end -DestinationSerializer(f::Function, pids::Array) = DestinationSerializer(f, pids, Nullable{Any}()) +DestinationSerializer(f::Function, pids::Array) = DestinationSerializer(f, pids, missing) # contructs a DestinationSerializer after verifying that the shape of pids. function verified_destination_serializer(f::Function, pids::Array, verify_size) @@ -59,28 +59,28 @@ function verified_destination_serializer(f::Function, pids::Array, verify_size) return DestinationSerializer(f, pids) end -DestinationSerializer(deser_obj::Any) = DestinationSerializer(Nullable{Function}(), Nullable{Array}(), deser_obj) +DestinationSerializer(deser_obj::Any) = DestinationSerializer(missing, missing, deser_obj) -function Base.serialize(S::AbstractSerializer, s::DestinationSerializer) - pid = Base.worker_id_from_socket(S.io) - pididx = findfirst(get(s.pids), pid) - Serializer.serialize_type(S, typeof(s)) - serialize(S, get(s.generate)(pididx)) +function Serialization.serialize(S::AbstractSerializer, s::DestinationSerializer) + pid = Distributed.worker_id_from_socket(S.io) + pididx = findfirst(isequal(pid), s.pids) + Serialization.serialize_type(S, typeof(s)) + serialize(S, s.generate(pididx)) end -function Base.deserialize(S::AbstractSerializer, t::Type{T}) where T<:DestinationSerializer +function Serialization.deserialize(S::AbstractSerializer, t::Type{T}) where T<:DestinationSerializer lpart = deserialize(S) return DestinationSerializer(lpart) end function localpart(s::DestinationSerializer) - if !isnull(s.deser_obj) - return get(s.deser_obj) - elseif !isnull(s.generate) && (myid() in get(s.pids)) + if !ismissing(s.deser_obj) + return s.deser_obj + elseif !ismissing(s.generate) && (myid() in s.pids) # Handle the special case where myid() is part of s.pids. # In this case serialize/deserialize is not called as the remotecall is executed locally - return get(s.generate)(findfirst(get(s.pids), myid())) + return s.generate(findfirst(isequal(myid()), s.pids)) else throw(ErrorException(string("Invalid state in DestinationSerializer."))) end diff --git a/src/spmd.jl b/src/spmd.jl index 6e9513b..2cbe3ff 100644 --- a/src/spmd.jl +++ b/src/spmd.jl @@ -1,5 +1,7 @@ module SPMD +using Distributed + import DistributedArrays: gather, next_did, close export sendto, recvfrom, recvfrom_any, barrier, bcast, scatter, gather export context_local_storage, context, spmd, close @@ -7,10 +9,10 @@ export context_local_storage, context, spmd, close mutable struct WorkerDataChannel pid::Int - rc::Nullable{RemoteChannel} + rc::Union{RemoteChannel,Missing} lock::ReentrantLock - WorkerDataChannel(pid) = new(pid, Nullable{RemoteChannel}(), ReentrantLock()) + WorkerDataChannel(pid) = new(pid, missing, ReentrantLock()) end mutable struct SPMDContext @@ -59,14 +61,14 @@ const map_ctxts = Dict{Tuple, SPMDContext}() function get_dc(wc::WorkerDataChannel) lock(wc.lock) try - if isnull(wc.rc) + if ismissing(wc.rc) if wc.pid == myid() myrc = RemoteChannel(()->Channel(typemax(Int))) - wc.rc = Nullable{RemoteChannel}(myrc) + wc.rc = myrc # start a task to transfer incoming messages into local # channels based on the execution context - @schedule begin + @async begin while true msg = take!(myrc) ctxt_id = msg[1] # First element of the message tuple is the context id. @@ -75,13 +77,13 @@ function get_dc(wc::WorkerDataChannel) end end else - wc.rc = Nullable{RemoteChannel}(remotecall_fetch(()->get_remote_dc(myid()), wc.pid)) + wc.rc = remotecall_fetch(()->get_remote_dc(myid()), wc.pid) end end finally unlock(wc.lock) end - return get(wc.rc) + return wc.rc end function get_ctxt_from_id(ctxt_id) diff --git a/test/darray.jl b/test/darray.jl index 7238ca0..f90a115 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -1,4 +1,4 @@ -using SpecialFunctions +using Test, LinearAlgebra, SpecialFunctions @testset "test distribute and other constructors" begin A = rand(1:100, (100,100)) @@ -197,22 +197,23 @@ check_leaks() @testset "test scale" begin A = randn(100,100) DA = distribute(A) - @test scale!(DA, 2) == scale!(A, 2) + @test rmul!(DA, 2) == rmul!(A, 2) close(DA) end check_leaks() -@testset "test scale!(b, A)" begin +@testset "test rmul!(Diagonal, A)" begin A = randn(100, 100) b = randn(100) + D = Diagonal(b) DA = distribute(A) - @test scale!(b, A) == scale!(b, DA) + @test lmul!(D, A) == lmul!(D, DA) close(DA) A = randn(100, 100) b = randn(100) DA = distribute(A) - @test scale!(A, b) == scale!(DA, b) + @test rmul!(A, D) == rmul!(DA, D) close(DA) end @@ -644,12 +645,12 @@ check_leaks() end @testset "test transpose real" begin A = drand(Float64, 200, 100) - @test A.' == Array(A).' + @test transpose(A) == transpose(Array(A)) close(A) end @testset "test ctranspose complex" begin A = drand(Complex128, 100, 200) - @test A.' == Array(A).' + @test transpose(A) == transpose(Array(A)) close(A) end @@ -827,8 +828,8 @@ check_leaks() for (x, y) in ((drandn(20), drandn(20)), (drandn(20, 2), drandn(20, 2))) - @test Array(LinAlg.axpy!(2.0, x, copy(y))) ≈ LinAlg.axpy!(2.0, Array(x), Array(y)) - @test_throws DimensionMismatch LinAlg.axpy!(2.0, x, zeros(length(x) + 1)) + @test Array(axpy!(2.0, x, copy(y))) ≈ axpy!(2.0, Array(x), Array(y)) + @test_throws DimensionMismatch axpy!(2.0, x, zeros(length(x) + 1)) close(x) close(y) end @@ -867,11 +868,11 @@ end b = convert(Array, B) AB = A * B - AtB = A.' * B + AtB = transpose(A) * B AcB = A' * B ab = a * b - atb = a.' * b + atb = transpose(a) * b acb = a' * b @test AB ≈ ab diff --git a/test/runtests.jl b/test/runtests.jl index ada8d35..85967ca 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,17 +1,16 @@ -using Base.Test - -using DistributedArrays +using Test, Distributed # add at least 3 worker processes if nworkers() < 3 - n = max(3, min(8, Sys.CPU_CORES)) + n = max(3, min(8, Sys.CPU_THREADS)) addprocs(n; exeflags=`--check-bounds=yes`) end @assert nprocs() > 3 @assert nworkers() >= 3 -@everywhere importall DistributedArrays -@everywhere importall DistributedArrays.SPMD +@everywhere using DistributedArrays +@everywhere using DistributedArrays.SPMD +@everywhere using Random @everywhere srand(1234 + myid()) From cbc570bf5994914b3c5da9c4226979af05e7ace6 Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Thu, 12 Jul 2018 16:24:47 +0100 Subject: [PATCH 02/18] WIP: make it work on julia-0.7 --- .travis.yml | 2 +- src/DistributedArrays.jl | 2 +- src/darray.jl | 22 +++++++++++----------- src/linalg.jl | 2 +- src/mapreduce.jl | 40 ++++++++++++---------------------------- src/serialize.jl | 24 ++++++++++++------------ src/spmd.jl | 10 +++++----- test/darray.jl | 3 ++- test/runtests.jl | 1 + 9 files changed, 46 insertions(+), 60 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6eab514..41106db 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ os: - linux - osx julia: - - 0.6 + - 0.7 - nightly matrix: # allow_failures: diff --git a/src/DistributedArrays.jl b/src/DistributedArrays.jl index 31d9aee..941e4f6 100644 --- a/src/DistributedArrays.jl +++ b/src/DistributedArrays.jl @@ -8,7 +8,7 @@ using LinearAlgebra import Base: +, -, *, div, mod, rem, &, |, xor import Base.Callable -import LinearAlgebra.BLAS: axpy! +import LinearAlgebra: axpy!, dot, norm, import Primes import Primes: factor diff --git a/src/darray.jl b/src/darray.jl index 4c2453d..f448f20 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -28,8 +28,7 @@ mutable struct DArray{T,N,A} <: AbstractArray{T,N} pids::Array{Int,N} # pids[i]==p ⇒ processor p has piece i indices::Array{NTuple{N,UnitRange{Int}},N} # indices held by piece i cuts::Vector{Vector{Int}} # cuts[d][i] = first index of chunk i in dimension d - localpart::Union{A,Missing} - + localpart::Union{A,Nothing} release::Bool function DArray{T,N,A}(id, dims, pids, indices, cuts, lp) where {T,N,A} @@ -64,7 +63,7 @@ function d_from_weakref_or_d(id) end Base.eltype(::Type{DArray{T}}) where {T} = T -empty_localpart(T,N,A) = A(Array{T}(ntuple(zero, N))) +empty_localpart(T,N,A) = A(Array{T}(undef, ntuple(zero, N))) const SubDArray{T,N,D<:DArray} = SubArray{T,N,D} const SubOrDArray{T,N} = Union{DArray{T,N}, SubDArray{T,N}} @@ -148,7 +147,7 @@ function ddata(;T::Type=Any, init::Function=I->nothing, pids=workers(), data::Ve d = registry[id] d = isa(d, WeakRef) ? d.value : d else - d = DArray{T,1,T}(id, (npids,), pids, idxs, cuts, missing) + d = DArray{T,1,T}(id, (npids,), pids, idxs, cuts, nothing) end d end @@ -196,10 +195,10 @@ function DArray(refs) end end - nindices = Array{NTuple{length(dimdist),UnitRange{Int}}}(dimdist...) + nindices = Array{NTuple{length(dimdist),UnitRange{Int}}}(undef, dimdist...) for i in 1:length(nindices) - subidx = CartesianIndices(dimdist)[i] + subidx = CartesianIndices(dimdist)[i] #ind2sub(dimdist, i) nindices[i] = ntuple(length(subidx)) do x idx_in_dim = subidx[x] startidx = 1 @@ -243,7 +242,7 @@ DArray(init, d::DArray) = DArray(next_did(), init, size(d), procs(d), d.indices, sz_localpart_ref(ref, id) = size(fetch(ref)) -Base.similar(d::DArray, T::Type, dims::Dims) = DArray(I->Array{T}(map(length,I)), dims, procs(d)) +Base.similar(d::DArray, T::Type, dims::Dims) = DArray(I->Array{T}(undef, map(length,I)), dims, procs(d)) Base.similar(d::DArray, T::Type) = similar(d, T, size(d)) Base.similar(d::DArray{T}, dims::Dims) where {T} = similar(d, T, dims) Base.similar(d::DArray{T}) where {T} = similar(d, T, size(d)) @@ -288,7 +287,7 @@ function defaultdist(sz::Int, nc::Int) if sz >= nc return round.(Int, range(1, stop=sz+1, length=nc+1)) else - return [[1:(sz+1);]; zeros(Int, nc-sz);] + return [[1:(sz+1);]; zeros(Int, nc-sz)] end end @@ -488,7 +487,7 @@ function (::Type{Array{S,N}})(s::SubDArray{T,N}) where {S,T,N} end end a = Array{S}(undef, size(s)) - a[[1:size(a,i) for i=1:N]...] = s + a[[1:size(a,i) for i=1:N]...] .= s return a end @@ -509,7 +508,7 @@ function Base.reshape(A::DArray{T,1,S}, d::Dims) where {T,S<:Array} d1offs = first(I[1]) nd = length(I) - B = Array{T}(sz) + B = Array{T}(undef, sz) nr = size(B,1) sztail = size(B)[2:end] @@ -565,7 +564,7 @@ function Base.isassigned(D::DArray, i::Integer...) end -function Base.copy!(dest::SubOrDArray, src::SubOrDArray) +Base.copyto!(dest::SubOrDArray, src::SubOrDArray) = begin asyncmap(procs(dest)) do p remotecall_fetch(p) do localpart(dest)[:] = src[localindices(dest)...] @@ -573,6 +572,7 @@ function Base.copy!(dest::SubOrDArray, src::SubOrDArray) end return dest end +Base.copy!(dest::SubOrDArray, src::SubOrDArray) = copyto!(dest, src) # local copies are obtained by convert(Array, ) or assigning from # a SubDArray to a local Array. diff --git a/src/linalg.jl b/src/linalg.jl index 3218483..c5bcc51 100644 --- a/src/linalg.jl +++ b/src/linalg.jl @@ -221,7 +221,7 @@ function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::D if tA == 'T' return transpose(localpart(A))*convert(localtype(B), Bjk) elseif tA == 'C' - return localpart(A)'*convert(localtype(B), Bjk) + return ctranspose(localpart(A))*convert(localtype(B), Bjk) else return localpart(A)*convert(localtype(B), Bjk) end diff --git a/src/mapreduce.jl b/src/mapreduce.jl index 025ad4c..e5ebf11 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -1,5 +1,7 @@ ## higher-order functions ## +import Base: +, -, div, mod, rem, &, |, xor + Base.map(f, d0::DArray, ds::AbstractArray...) = broadcast(f, d0, ds...) function Base.map!(f::F, dest::DArray, src::DArray) where {F} @@ -12,14 +14,13 @@ function Base.map!(f::F, dest::DArray, src::DArray) where {F} return dest end -# Base.Broadcast._containertype(::Type{D}) where {D<:DArray} = DArray -Base.BroadcastStyle(::Type{<:DArray}) = Broadcast.ArrayStyle{DArray}() +#Base.Broadcast._containertype(::Type{D}) where {D<:DArray} = DArray -Base.Broadcast.promote_containertype(::Type{DArray}, ::Type{DArray}) = DArray -Base.Broadcast.promote_containertype(::Type{DArray}, ::Type{Array}) = DArray -Base.Broadcast.promote_containertype(::Type{DArray}, ct) = DArray -Base.Broadcast.promote_containertype(::Type{Array}, ::Type{DArray}) = DArray -Base.Broadcast.promote_containertype(ct, ::Type{DArray}) = DArray +Base.BroadcastStyle(::Type{DArray}, ::Type{DArray}) = DArray +Base.BroadcastStyle(::Type{DArray}, ::Type{Array}) = DArray +Base.BroadcastStyle(::Type{DArray}, ct) = DArray +#Base.Broadcast.promote_containertype(::Type{Array}, ::Type{DArray}) = DArray +#Base.Broadcast.promote_containertype(ct, ::Type{DArray}) = DArray Base.Broadcast.broadcast_indices(::Type{DArray}, A) = indices(A) Base.Broadcast.broadcast_indices(::Type{DArray}, A::Ref) = () @@ -27,7 +28,7 @@ Base.Broadcast.broadcast_indices(::Type{DArray}, A::Ref) = () # FixMe! ## 1. Support for arbitrary indices including OneTo ## 2. This is as type unstable as it can be. Overhead might not matter too much for DArrays though. -function Base.Broadcast.broadcast_c(f, ::Type{DArray}, As...) +function Base.broadcast(f, ::Type{DArray}, ::Nothing, ::Nothing, As...) T = Base.Broadcast._broadcast_eltype(f, As...) shape = Base.Broadcast.broadcast_indices(As...) iter = Base.CartesianIndices(shape) @@ -89,7 +90,8 @@ function mapreducedim_within(f, op, A::DArray, region) gridsize = [size(A.indices)...] arraysize[[region...]] = gridsize[[region...]] indx = similar(A.indices) - for i in CartesianIndices(size(indx)) + + for i in CartesianIndices(indx) indx[i] = ntuple(j -> j in region ? (i.I[j]:i.I[j]) : A.indices[i][j], ndims(A)) end cuts = [i in region ? collect(1:arraysize[i] + 1) : A.cuts[i] for i in 1:ndims(A)] @@ -157,24 +159,6 @@ function Base.extrema(d::DArray) return reduce((t,s) -> (min(t[1], s[1]), max(t[2], s[2])), r) end -# mapreduce like -# for (fn, fr1, fr2) in ((:abs, :max), -# (:minabs, :abs, :min), -# (:sumabs, :abs, :+), -# (:sumabs2, :abs2, :+)) -# @eval (Base.$fn)(d::DArray) = mapreduce($fr1, $fr2, d) -# end - -# semi mapreduce -# for (fn, fr) in ((:any, :|), -# (:all, :&), -# (:count, :+)) -# @eval begin -# (Base.$fn)(f::typeof(identity), d::DArray) = mapreduce(f, $fr, d) -# (Base.$fn)(f::Callable, d::DArray) = mapreduce(f, $fr, d) -# end -# end - # Unary vector functions (-)(D::DArray) = map(-, D) @@ -361,5 +345,5 @@ function ppeval(f, D...; dim::NTuple = map(t -> isa(t, DArray) ? ndims(t) : 0, D # the DArray constructor. sd = [size(D[1].pids)...] nd = remotecall_fetch((r)->ndims(fetch(r)), refs[1].where, refs[1]) - DArray(reshape(refs, tuple([sd[1:nd - 1]; sd[end];]...))) + DArray(reshape(refs, tuple([sd[1:nd - 1]; sd[end]]...))) end diff --git a/src/serialize.jl b/src/serialize.jl index 28b5a9a..a36d6b8 100644 --- a/src/serialize.jl +++ b/src/serialize.jl @@ -1,8 +1,8 @@ function Serialization.serialize(S::AbstractSerializer, d::DArray{T,N,A}) where {T,N,A} # Only send the ident for participating workers - we expect the DArray to exist in the # remote registry. DO NOT send the localpart. - destpid = Distributed.worker_id_from_socket(S.io) - Serializer.serialize_type(S, typeof(d)) + destpid = worker_id_from_socket(S.io) + serialize_type(S, typeof(d)) if (destpid in d.pids) || (destpid == d.id[1]) serialize(S, (true, d.id)) # (id_only, id) else @@ -43,15 +43,14 @@ end # Serialize only those parts of the object as required by the destination worker. mutable struct DestinationSerializer - generate::Union{Function,Missing} # Function to generate the part to be serialized - pids::Union{Array,Missing} # MUST have the same shape as the distribution - - deser_obj::Union{Any,Missing} # Deserialized part + generate::Union{Function,Nothing} # Function to generate the part to be serialized + pids::Union{Array,Nothing} # MUST have the same shape as the distribution + deser_obj::Any # Deserialized part DestinationSerializer(f,p,d) = new(f,p,d) end -DestinationSerializer(f::Function, pids::Array) = DestinationSerializer(f, pids, missing) +DestinationSerializer(f::Function, pids::Array) = DestinationSerializer(f, pids, nothing) # contructs a DestinationSerializer after verifying that the shape of pids. function verified_destination_serializer(f::Function, pids::Array, verify_size) @@ -59,12 +58,13 @@ function verified_destination_serializer(f::Function, pids::Array, verify_size) return DestinationSerializer(f, pids) end -DestinationSerializer(deser_obj::Any) = DestinationSerializer(missing, missing, deser_obj) +DestinationSerializer(deser_obj::Any) = DestinationSerializer(nothing, nothing, deser_obj) function Serialization.serialize(S::AbstractSerializer, s::DestinationSerializer) - pid = Distributed.worker_id_from_socket(S.io) + pid = worker_id_from_socket(S.io) pididx = findfirst(isequal(pid), s.pids) - Serialization.serialize_type(S, typeof(s)) + @assert pididx !== nothing + serialize_type(S, typeof(s)) serialize(S, s.generate(pididx)) end @@ -75,9 +75,9 @@ end function localpart(s::DestinationSerializer) - if !ismissing(s.deser_obj) + if s.deser_obj !== nothing return s.deser_obj - elseif !ismissing(s.generate) && (myid() in s.pids) + elseif s.generate !== nothing && (myid() in s.pids) # Handle the special case where myid() is part of s.pids. # In this case serialize/deserialize is not called as the remotecall is executed locally return s.generate(findfirst(isequal(myid()), s.pids)) diff --git a/src/spmd.jl b/src/spmd.jl index 2cbe3ff..b1f9f88 100644 --- a/src/spmd.jl +++ b/src/spmd.jl @@ -4,15 +4,15 @@ using Distributed import DistributedArrays: gather, next_did, close export sendto, recvfrom, recvfrom_any, barrier, bcast, scatter, gather -export context_local_storage, context, spmd, close +export context_local_storage, context, spmd mutable struct WorkerDataChannel pid::Int - rc::Union{RemoteChannel,Missing} + rc::Union{RemoteChannel,Nothing} lock::ReentrantLock - WorkerDataChannel(pid) = new(pid, missing, ReentrantLock()) + WorkerDataChannel(pid) = new(pid, nothing, ReentrantLock()) end mutable struct SPMDContext @@ -61,7 +61,7 @@ const map_ctxts = Dict{Tuple, SPMDContext}() function get_dc(wc::WorkerDataChannel) lock(wc.lock) try - if ismissing(wc.rc) + if wc.rc === nothing if wc.pid == myid() myrc = RemoteChannel(()->Channel(typemax(Int))) wc.rc = myrc @@ -254,7 +254,7 @@ function delete_ctxt_id(ctxt_id) nothing end -function close(ctxt::SPMDContext) +function Base.close(ctxt::SPMDContext) for p in ctxt.pids Base.remote_do(delete_ctxt_id, p, ctxt.id) end diff --git a/test/darray.jl b/test/darray.jl index f90a115..dc57149 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -118,7 +118,8 @@ check_leaks() end @testset "test invalid use of @DArray" begin - @test_throws ArgumentError eval(:((@DArray [1,2,3,4]))) + #@test_throws ArgumentError eval(:((@DArray [1,2,3,4]))) + @test_throws LoadError eval(:((@DArray [1,2,3,4]))) end end diff --git a/test/runtests.jl b/test/runtests.jl index 85967ca..a94a277 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -11,6 +11,7 @@ end @everywhere using DistributedArrays @everywhere using DistributedArrays.SPMD @everywhere using Random +@everywhere using LinearAlgebra @everywhere srand(1234 + myid()) From 2f2ef7bed4707fd5041fa1d5a8e1dffabdfaba48 Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Fri, 13 Jul 2018 10:53:49 +0100 Subject: [PATCH 03/18] femtocleaning This caught some additional deprecations. --- src/linalg.jl | 2 +- src/sort.jl | 2 +- test/spmd.jl | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/linalg.jl b/src/linalg.jl index c5bcc51..7be1325 100644 --- a/src/linalg.jl +++ b/src/linalg.jl @@ -2,7 +2,7 @@ function Base.copy(D::Adjoint{T,<:DArray{T,2}}) where T DArray(reverse(size(D)), procs(D)) do I lp = Array{T}(map(length, I)) rp = convert(Array, D[reverse(I)...]) - ctranspose!(lp, rp) + adjoint!(lp, rp) end end diff --git a/src/sort.jl b/src/sort.jl index 5e9456d..df5cad0 100644 --- a/src/sort.jl +++ b/src/sort.jl @@ -66,7 +66,7 @@ function compute_boundaries(d::DVector{T}; kwargs...) where T results = asyncmap(p -> remotecall_fetch(sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs...), pids) - samples = Array{T}(0) + samples = Array{T}(undef, 0) for x in results append!(samples, x[1]) end diff --git a/test/spmd.jl b/test/spmd.jl index 6e7adf3..73412c9 100644 --- a/test/spmd.jl +++ b/test/spmd.jl @@ -105,7 +105,7 @@ spmd(spmd_test1) end # run foo_spmd on all workers, many of them, all concurrently using implictly different contexts. -in_arrays = map(x->DArray(I->fill(myid(), (map(length,I)...)), (nworkers(), 2), workers(), [nworkers(),1]), 1:8) +in_arrays = map(x->DArray(I->fill(myid(), (map(length,I)...,)), (nworkers(), 2), workers(), [nworkers(),1]), 1:8) out_arrays = map(x->ddata(), 1:8) @sync for i in 1:8 @@ -151,7 +151,7 @@ println("SPMD: Passed testing of spmd function run concurrently") end -in_arrays = map(x->DArray(I->fill(myid(), (map(length,I)...)), (nworkers(), 2), workers(), [nworkers(),1]), 1:8) +in_arrays = map(x->DArray(I->fill(myid(), (map(length,I)...,)), (nworkers(), 2), workers(), [nworkers(),1]), 1:8) out_arrays = map(x->ddata(), 1:8) contexts = map(x->context(workers()), 1:8) From b32e9301994d23337adede701d94500a7f1d338a Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Fri, 13 Jul 2018 11:34:13 +0100 Subject: [PATCH 04/18] In tests, use the package before adding procs. This fixes (works around?) `localpart` getting dispatched wrongly on the workers. No idea why. --- test/runtests.jl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/runtests.jl b/test/runtests.jl index a94a277..b98613b 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,4 +1,6 @@ -using Test, Distributed +using Test +using Distributed +using DistributedArrays # add at least 3 worker processes if nworkers() < 3 From 83af5b66679a9c8abce0774f865234be7290fe24 Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Fri, 13 Jul 2018 13:01:07 +0100 Subject: [PATCH 05/18] Remove reducedim_initarray0 extension. It has been dropped from Base. --- src/mapreduce.jl | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/mapreduce.jl b/src/mapreduce.jl index e5ebf11..3912f16 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -73,17 +73,6 @@ function Base.reducedim_initarray(A::DArray, region, v0, ::Type{R}) where {R} end Base.reducedim_initarray(A::DArray, region, v0::T) where {T} = Base.reducedim_initarray(A, region, v0, T) -# function Base.reducedim_initarray0(A::DArray, region, v0, ::Type{R}) where {R} -# # Store reduction on lowest pids -# pids = A.pids[ntuple(i -> i in region ? (1:1) : (:), ndims(A))...] -# chunks = similar(pids, Future) -# @sync for i in eachindex(pids) -# @async chunks[i...] = remotecall_wait(() -> Base.reducedim_initarray0(localpart(A), region, v0, R), pids[i...]) -# end -# return DArray(chunks) -# end -# Base.reducedim_initarray0(A::DArray, region, v0::T) where {T} = Base.reducedim_initarray0(A, region, v0, T) - # Compute mapreducedim of each localpart and store the result in a new DArray function mapreducedim_within(f, op, A::DArray, region) arraysize = [size(A)...] From c7a2c5a6500581e5ebabb58c396016a1831046fc Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Fri, 13 Jul 2018 17:27:11 +0100 Subject: [PATCH 06/18] New implementation of broadcast for DArray. All mapreduce tests passing. --- src/darray.jl | 2 +- src/mapreduce.jl | 67 ++++++++++++++++++++++++------------------------ test/darray.jl | 1 + 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/src/darray.jl b/src/darray.jl index f448f20..9d7a37b 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -708,7 +708,7 @@ function Base.setindex!(a::Array, s::SubDArray, # partial chunk @async a[idxs...] = remotecall_fetch(d.pids[i]) do - view(localpart(d), [K[j]-first(K_c[j])+1 for j=1:length(J)]...) + view(localpart(d), [K[j].-first(K_c[j]).+1 for j=1:length(J)]...) end end end diff --git a/src/mapreduce.jl b/src/mapreduce.jl index 3912f16..d0439c2 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -14,32 +14,42 @@ function Base.map!(f::F, dest::DArray, src::DArray) where {F} return dest end -#Base.Broadcast._containertype(::Type{D}) where {D<:DArray} = DArray - -Base.BroadcastStyle(::Type{DArray}, ::Type{DArray}) = DArray -Base.BroadcastStyle(::Type{DArray}, ::Type{Array}) = DArray -Base.BroadcastStyle(::Type{DArray}, ct) = DArray -#Base.Broadcast.promote_containertype(::Type{Array}, ::Type{DArray}) = DArray -#Base.Broadcast.promote_containertype(ct, ::Type{DArray}) = DArray - -Base.Broadcast.broadcast_indices(::Type{DArray}, A) = indices(A) -Base.Broadcast.broadcast_indices(::Type{DArray}, A::Ref) = () - -# FixMe! -## 1. Support for arbitrary indices including OneTo -## 2. This is as type unstable as it can be. Overhead might not matter too much for DArrays though. -function Base.broadcast(f, ::Type{DArray}, ::Nothing, ::Nothing, As...) - T = Base.Broadcast._broadcast_eltype(f, As...) - shape = Base.Broadcast.broadcast_indices(As...) - iter = Base.CartesianIndices(shape) - D = DArray(map(length, shape)) do I - Base.Broadcast.broadcast_c(f, Array, - map(a -> isa(a, Union{Number,Ref}) ? a : - localtype(a)(a[ntuple(i -> i > ndims(a) ? 1 : (size(a, i) == 1 ? (1:1) : I[i]), length(shape))...]), As)...) +# new broadcasting implementation for julia-0.7 + +Base.BroadcastStyle(::Type{<:DArray}) = Broadcast.ArrayStyle{DArray}() +Base.BroadcastStyle(::Type{<:DArray}, ::Any) = Broadcast.ArrayStyle{DArray}() + +function Base.similar(bc::Broadcast.Broadcasted{Broadcast.ArrayStyle{DArray}}, ::Type{ElType}) where {ElType} + DA = find_darray(bc) + similar(DA, ElType) +end + +"`DA = find_darray(As)` returns the first DArray among the arguments." +find_darray(bc::Base.Broadcast.Broadcasted) = find_darray(bc.args) +find_darray(args::Tuple) = find_darray(find_darray(args[1]), Base.tail(args)) +find_darray(x) = x +find_darray(a::DArray, rest) = a +find_darray(::Any, rest) = find_darray(rest) + +function Base.copyto!(dest::DArray, bc::Broadcast.Broadcasted{Broadcast.ArrayStyle{DArray}}) + @sync for p in procs(dest) + @async remotecall_fetch(p) do + copyto!(localpart(dest), rewrite_local(bc)) + end end - return D + dest end +""" +Transform a Broadcasted{Broadcast.ArrayStyle{DArray}} object into an equivalent +Broadcasted{Broadcast.DefaultArrayStyle} object for the localparts. +""" +rewrite_local(bc::Broadcast.Broadcasted{Broadcast.ArrayStyle{DArray}}) = Broadcast.broadcasted(bc.f, rewrite_local(bc.args)...) +rewrite_local(args::Tuple) = map(rewrite_local, args) +rewrite_local(a::DArray) = localpart(a) +rewrite_local(x) = x + + function Base.reduce(f, d::DArray) results = asyncmap(procs(d)) do p remotecall_fetch(p, f, d) do (f, d) @@ -128,17 +138,6 @@ function nnz(A::DArray) return reduce(+, B) end -# reduce like -# for (fn, fr) in ((:sum, :+), -# (:prod, :*), -# (:maximum, :max), -# (:minimum, :min), -# (:any, :|), -# (:all, :&)) -# @eval (Base.$fn)(d::DArray) = reduce($fr, d) -# @eval (Base.$fn)(f, d::DArray) = mapreduce(f, $fr, d) -# end - function Base.extrema(d::DArray) r = asyncmap(procs(d)) do p remotecall_fetch(p) do diff --git a/test/darray.jl b/test/darray.jl index dc57149..06cefe7 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -182,6 +182,7 @@ check_leaks() @testset "test map / reduce" begin D2 = map(x->1, D) + @test D2 isa DArray @test reduce(+, D2) == 100 close(D2) end From 0b7af3c304347f84e476c4be9c41a18f423895f1 Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Fri, 13 Jul 2018 18:00:58 +0100 Subject: [PATCH 07/18] scale! -> [lr]mul! --- test/darray.jl | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/test/darray.jl b/test/darray.jl index 06cefe7..1e64d59 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -1,4 +1,9 @@ +<<<<<<< HEAD using Test, LinearAlgebra, SpecialFunctions +======= +using SpecialFunctions +using LinearAlgebra +>>>>>>> scale! -> [lr]mul! @testset "test distribute and other constructors" begin A = rand(1:100, (100,100)) @@ -196,7 +201,7 @@ end check_leaks() -@testset "test scale" begin +@testset "test rmul" begin A = randn(100,100) DA = distribute(A) @test rmul!(DA, 2) == rmul!(A, 2) @@ -205,17 +210,29 @@ end check_leaks() +<<<<<<< HEAD @testset "test rmul!(Diagonal, A)" begin +======= +@testset "test [lr]mul!(b, A)" begin +>>>>>>> scale! -> [lr]mul! A = randn(100, 100) b = randn(100) D = Diagonal(b) DA = distribute(A) +<<<<<<< HEAD @test lmul!(D, A) == lmul!(D, DA) +======= + @test lmul!(Diagonal(b), A) == lmul!(Diagonal(b), DA) +>>>>>>> scale! -> [lr]mul! close(DA) A = randn(100, 100) b = randn(100) DA = distribute(A) +<<<<<<< HEAD @test rmul!(A, D) == rmul!(DA, D) +======= + @test rmul!(A, Diagonal(b)) == rmul!(A, Diagonal(b)) +>>>>>>> scale! -> [lr]mul! close(DA) end From ce8bb40a3b383548dfbfa595761ac69c47b00562 Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Sun, 15 Jul 2018 11:09:51 +0100 Subject: [PATCH 08/18] Fix more deprecations. Note: (map)reducedim probably currently falls back to the method for AbstractArray. --- src/linalg.jl | 18 +++++----- src/mapreduce.jl | 4 --- test/darray.jl | 85 ++++++++++++++++++++---------------------------- 3 files changed, 45 insertions(+), 62 deletions(-) diff --git a/src/linalg.jl b/src/linalg.jl index 7be1325..93a2eab 100644 --- a/src/linalg.jl +++ b/src/linalg.jl @@ -1,6 +1,6 @@ function Base.copy(D::Adjoint{T,<:DArray{T,2}}) where T DArray(reverse(size(D)), procs(D)) do I - lp = Array{T}(map(length, I)) + lp = Array{T}(undef, map(length, I)) rp = convert(Array, D[reverse(I)...]) adjoint!(lp, rp) end @@ -8,7 +8,7 @@ end function Base.copy(D::Transpose{T,<:DArray{T,2}}) where T DArray(reverse(size(D)), procs(D)) do I - lp = Array{T}(map(length, I)) + lp = Array{T}(undef, map(length, I)) rp = convert(Array, D[reverse(I)...]) transpose!(lp, rp) end @@ -94,7 +94,7 @@ function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVec end # Multiply on each tile of A - R = Array{Future}(size(A.pids)...) + R = Array{Future}(undef, size(A.pids)) for j = 1:size(A.pids, 2) xj = x[A.cuts[2][j]:A.cuts[2][j + 1] - 1] for i = 1:size(A.pids, 1) @@ -138,7 +138,7 @@ function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVe end # Multiply on each tile of A - R = Array{Future}(reverse(size(A.pids))...) + R = Array{Future}(undef, reverse(size(A.pids))) for j = 1:size(A.pids, 1) xj = x[A.cuts[1][j]:A.cuts[1][j + 1] - 1] for i = 1:size(A.pids, 2) @@ -206,9 +206,9 @@ function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::D # Multiply on each tile of A if tA == 'N' - R = Array{Future}(size(procs(A))..., size(procs(C), 2)) + R = Array{Future}(undef, size(procs(A))..., size(procs(C), 2)) else - R = Array{Future}(reverse(size(procs(A)))..., size(procs(C), 2)) + R = Array{Future}(undef, reverse(size(procs(A)))..., size(procs(C), 2)) end for j = 1:size(A.pids, Ad2) for k = 1:size(C.pids, 2) @@ -221,7 +221,7 @@ function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::D if tA == 'T' return transpose(localpart(A))*convert(localtype(B), Bjk) elseif tA == 'C' - return ctranspose(localpart(A))*convert(localtype(B), Bjk) + return adjoint(localpart(A))*convert(localtype(B), Bjk) else return localpart(A)*convert(localtype(B), Bjk) end @@ -261,12 +261,12 @@ At_mul_B!(C::DMatrix, A::DMatrix, B::AbstractMatrix) = At_mul_B!(one(eltype(C)), _matmul_op = (t,s) -> t*s + t*s -function (*)(A::DMatrix, x::AbstractVector) +function Base.:*(A::DMatrix, x::AbstractVector) T = Base.promote_op(_matmul_op, eltype(A), eltype(x)) y = DArray(I -> Array{T}(map(length, I)), (size(A, 1),), procs(A)[:,1], (size(procs(A), 1),)) return A_mul_B!(one(T), A, x, zero(T), y) end -function (*)(A::DMatrix, B::AbstractMatrix) +function Base.:*(A::DMatrix, B::AbstractMatrix) T = Base.promote_op(_matmul_op, eltype(A), eltype(B)) C = DArray(I -> Array{T}(map(length, I)), (size(A, 1), size(B, 2)), diff --git a/src/mapreduce.jl b/src/mapreduce.jl index d0439c2..468812d 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -127,10 +127,6 @@ function Base.mapreducedim!(f, op, R::DArray, A::DArray) return mapreducedim_between!(identity, op, R, B, region) end -# function Base.mapreducedim(f, op, R::DArray, A::DArray) -# Base.mapreducedim!(f, op, Base.reducedim_initarray(A, region, v0), A) -# end - function nnz(A::DArray) B = asyncmap(A.pids) do p remotecall_fetch(nnz∘localpart, p, A) diff --git a/test/darray.jl b/test/darray.jl index 1e64d59..b108e5b 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -1,9 +1,6 @@ -<<<<<<< HEAD using Test, LinearAlgebra, SpecialFunctions -======= -using SpecialFunctions -using LinearAlgebra ->>>>>>> scale! -> [lr]mul! +using Statistics: mean +@everywhere using SparseArrays: sprandn @testset "test distribute and other constructors" begin A = rand(1:100, (100,100)) @@ -210,29 +207,17 @@ end check_leaks() -<<<<<<< HEAD @testset "test rmul!(Diagonal, A)" begin -======= -@testset "test [lr]mul!(b, A)" begin ->>>>>>> scale! -> [lr]mul! A = randn(100, 100) b = randn(100) D = Diagonal(b) DA = distribute(A) -<<<<<<< HEAD @test lmul!(D, A) == lmul!(D, DA) -======= - @test lmul!(Diagonal(b), A) == lmul!(Diagonal(b), DA) ->>>>>>> scale! -> [lr]mul! close(DA) A = randn(100, 100) b = randn(100) DA = distribute(A) -<<<<<<< HEAD @test rmul!(A, D) == rmul!(DA, D) -======= - @test rmul!(A, Diagonal(b)) == rmul!(A, Diagonal(b)) ->>>>>>> scale! -> [lr]mul! close(DA) end @@ -242,6 +227,7 @@ check_leaks() for _ = 1:25, f = [x -> Int128(2x), x -> Int128(x^2), x -> Int128(x^2 + 2x - 1)], opt = [+, *] A = rand(1:5, rand(2:30)) DA = distribute(A) + @test DA isa DArray @test mapreduce(f, opt, DA) - mapreduce(f, opt, A) == 0 close(DA) end @@ -252,15 +238,16 @@ check_leaks() @testset "test mapreducedim on DArrays" begin D = DArray(I->fill(myid(), map(length,I)), (73,73), [MYID, OTHERIDS]) D2 = map(x->1, D) - @test mapreducedim(t -> t*t, +, D2, 1) == mapreducedim(t -> t*t, +, convert(Array, D2), 1) - @test mapreducedim(t -> t*t, +, D2, 2) == mapreducedim(t -> t*t, +, convert(Array, D2), 2) - @test mapreducedim(t -> t*t, +, D2, (1,2)) == mapreducedim(t -> t*t, +, convert(Array, D2), (1,2)) + @test D2 isa DArray + @test mapreduce(t -> t*t, +, D2, dims=1) == mapreduce(t -> t*t, +, convert(Array, D2), dims=1) + @test mapreduce(t -> t*t, +, D2, dims=2) == mapreduce(t -> t*t, +, convert(Array, D2), dims=2) + @test mapreduce(t -> t*t, +, D2, dims=(1,2)) == mapreduce(t -> t*t, +, convert(Array, D2), dims=(1,2)) # Test non-regularly chunked DArrays r1 = DistributedArrays.remotecall(() -> sprandn(3, 10, 0.1), workers()[1]) r2 = DistributedArrays.remotecall(() -> sprandn(7, 10, 0.1), workers()[2]) D = DArray(reshape([r1; r2], (2,1))) - @test Array(sum(D, 2)) == sum(Array(D), 2) + @test Array(sum(D, dims=2)) == sum(Array(D), dims=2) # close(D) # close(D2) @@ -275,10 +262,10 @@ check_leaks() A = convert(Array, DA) @testset "dimension $dms" for dms in (1, 2, 3, (1,2), (1,3), (2,3), (1,2,3)) - @test mapreducedim(t -> t*t, +, A, dms) ≈ mapreducedim(t -> t*t, +, DA, dms) - @test mapreducedim(t -> t*t, +, A, dms, 1.0) ≈ mapreducedim(t -> t*t, +, DA, dms, 1.0) - @test reducedim(*, A, dms) ≈ reducedim(*, DA, dms) - @test reducedim(*, A, dms, 2.0) ≈ reducedim(*, DA, dms, 2.0) + @test mapreduce(t -> t*t, +, A, dims=dms) ≈ mapreduce(t -> t*t, +, DA, dims=dms) + @test mapreduce(t -> t*t, +, A, dims=dms, init=1.0) ≈ mapreduce(t -> t*t, +, DA, dims=dms, init=1.0) + @test reduce(*, A, dims=dms) ≈ reduce(*, DA, dims=dms) + @test reduce(*, A, dims=dms, init=2.0) ≈ reduce(*, DA, dims=dms, init=2.0) end close(DA) d_closeall() # temp created by the mapreduce above @@ -293,7 +280,7 @@ check_leaks() @testset "test $f for dimension $dms" for f in (mean, ), dms in (1, 2, 3, (1,2), (1,3), (2,3), (1,2,3)) # std is pending implementation - @test f(DA,dms) ≈ f(A,dms) + @test f(DA, dims=dms) ≈ f(A, dims=dms) end close(DA) @@ -308,7 +295,7 @@ check_leaks() # sum either throws an ArgumentError or a CompositeException of ArgumentErrors try - sum(DA, -1) + sum(DA, dims=-1) catch err if isa(err, CompositeException) @test !isempty(err.exceptions) @@ -322,7 +309,7 @@ check_leaks() end end try - sum(DA, 0) + sum(DA, dims=0) catch err if isa(err, CompositeException) @test !isempty(err.exceptions) @@ -337,9 +324,9 @@ check_leaks() end @test sum(DA) ≈ sum(A) - @test sum(DA,1) ≈ sum(A,1) - @test sum(DA,2) ≈ sum(A,2) - @test sum(DA,3) ≈ sum(A,3) + @test sum(DA, dims=1) ≈ sum(A, dims=1) + @test sum(DA, dims=2) ≈ sum(A, dims=2) + @test sum(DA, dims=3) ≈ sum(A, dims=3) close(DA) d_closeall() # temporaries created above end @@ -360,7 +347,7 @@ end check_leaks() -# test length / endof +# test length / lastindex @testset "test collections API" begin A = randn(23,23) DA = distribute(A) @@ -369,8 +356,8 @@ check_leaks() @test length(DA) == length(A) end - @testset "test endof" begin - @test endof(DA) == endof(A) + @testset "test lastindex" begin + @test lastindex(DA) == lastindex(A) end close(DA) end @@ -378,7 +365,7 @@ end check_leaks() @testset "test max / min / sum" begin - a = map(x -> Int(round(rand() * 100)) - 50, Array{Int}(100,1000)) + a = map(x -> Int(round(rand() * 100)) - 50, Array{Int}(undef,100,1000)) d = distribute(a) @test sum(d) == sum(a) @@ -395,7 +382,7 @@ end check_leaks() @testset "test all / any" begin - a = map(x->Int(round(rand() * 100)) - 50, Array{Int}(100,1000)) + a = map(x->Int(round(rand() * 100)) - 50, Array{Int}(undef,100,1000)) a = [true for i in 1:100] d = distribute(a) @@ -651,25 +638,25 @@ end check_leaks() -@testset "test c/transpose" begin - @testset "test ctranspose real" begin +@testset "test transpose/adjoint" begin + @testset "test transpose real" begin A = drand(Float64, 100, 200) - @test A' == Array(A)' + @test transpose(A) == transpose(Array(A)) close(A) end - @testset "test ctranspose complex" begin - A = drand(Complex128, 200, 100) - @test A' == Array(A)' + @testset "test transpose complex" begin + A = drand(ComplexF64, 200, 100) + @test transpose(A) == transpose(Array(A)) close(A) end - @testset "test transpose real" begin + @testset "test adjoint real" begin A = drand(Float64, 200, 100) - @test transpose(A) == transpose(Array(A)) + @test adjoint(A) == adjoint(Array(A)) close(A) end - @testset "test ctranspose complex" begin - A = drand(Complex128, 100, 200) - @test transpose(A) == transpose(Array(A)) + @testset "test adjoint complex" begin + A = drand(ComplexF64, 100, 200) + @test adjoint(A) == adjoint(Array(A)) close(A) end @@ -712,8 +699,8 @@ check_leaks() sinh, sinpi, sqrt, tan, tand, tanh, trigamma) @test f.(a) == f.(b) end - a = a + 1 - b = b + 1 + a = a .+ 1 + b = b .+ 1 @testset "$f" for f in (asec, asecd, acosh, acsc, acscd, acoth) @test f.(a) == f.(b) end From a77e35b72a0e7bd499b5b8d8f60081ac35c8c8c4 Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Mon, 16 Jul 2018 19:38:49 +0100 Subject: [PATCH 09/18] Fix more deprecations. --- src/linalg.jl | 12 +++++----- src/mapreduce.jl | 18 +++++++++------ src/sort.jl | 8 +++---- test/darray.jl | 57 ++++++++++++++++++++++++------------------------ 4 files changed, 50 insertions(+), 45 deletions(-) diff --git a/src/linalg.jl b/src/linalg.jl index 93a2eab..ca05c19 100644 --- a/src/linalg.jl +++ b/src/linalg.jl @@ -25,7 +25,7 @@ function axpy!(α, x::DArray, y::DArray) end asyncmap(procs(y)) do p @async remotecall_fetch(p) do - Base.axpy!(α, localpart(x), localpart(y)) + axpy!(α, localpart(x), localpart(y)) return nothing end end @@ -192,7 +192,7 @@ end function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix, tA) # error checks Ad1, Ad2 = (tA == 'N') ? (1,2) : (2,1) - mA, nA = size(A, Ad1, Ad2) + mA, nA = (size(A, Ad1), size(A, Ad2)) mB, nB = size(B) if mB != nA throw(DimensionMismatch("matrix A has dimensions ($mA, $nA), matrix B has dimensions ($mB, $nB)")) @@ -263,12 +263,12 @@ _matmul_op = (t,s) -> t*s + t*s function Base.:*(A::DMatrix, x::AbstractVector) T = Base.promote_op(_matmul_op, eltype(A), eltype(x)) - y = DArray(I -> Array{T}(map(length, I)), (size(A, 1),), procs(A)[:,1], (size(procs(A), 1),)) + y = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 1),), procs(A)[:,1], (size(procs(A), 1),)) return A_mul_B!(one(T), A, x, zero(T), y) end function Base.:*(A::DMatrix, B::AbstractMatrix) T = Base.promote_op(_matmul_op, eltype(A), eltype(B)) - C = DArray(I -> Array{T}(map(length, I)), + C = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 1), size(B, 2)), procs(A)[:,1:min(size(procs(A), 2), size(procs(B), 2))], (size(procs(A), 1), min(size(procs(A), 2), size(procs(B), 2)))) @@ -277,7 +277,7 @@ end function Ac_mul_B(A::DMatrix, x::AbstractVector) T = Base.promote_op(_matmul_op, eltype(A), eltype(x)) - y = DArray(I -> Array{T}(map(length, I)), + y = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 2),), procs(A)[1,:], (size(procs(A), 2),)) @@ -285,7 +285,7 @@ function Ac_mul_B(A::DMatrix, x::AbstractVector) end function Ac_mul_B(A::DMatrix, B::AbstractMatrix) T = Base.promote_op(_matmul_op, eltype(A), eltype(B)) - C = DArray(I -> Array{T}(map(length, I)), (size(A, 2), + C = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 2), size(B, 2)), procs(A)[1:min(size(procs(A), 1), size(procs(B), 2)),:], (size(procs(A), 2), min(size(procs(A), 1), size(procs(B), 2)))) diff --git a/src/mapreduce.jl b/src/mapreduce.jl index 468812d..3b28c77 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -1,6 +1,7 @@ ## higher-order functions ## import Base: +, -, div, mod, rem, &, |, xor +import SparseArrays: nnz Base.map(f, d0::DArray, ds::AbstractArray...) = broadcast(f, d0, ds...) @@ -21,7 +22,7 @@ Base.BroadcastStyle(::Type{<:DArray}, ::Any) = Broadcast.ArrayStyle{DArray}() function Base.similar(bc::Broadcast.Broadcasted{Broadcast.ArrayStyle{DArray}}, ::Type{ElType}) where {ElType} DA = find_darray(bc) - similar(DA, ElType) + DArray(I -> Array{ElType}(undef, map(length,I)), DA) end "`DA = find_darray(As)` returns the first DArray among the arguments." @@ -201,7 +202,10 @@ for f in (:+, :-, :div, :mod, :rem, :&, :|, :xor) end end -function mapslices(f::Function, D::DArray{T,N,A}, dims::AbstractVector) where {T,N,A} +function Base.mapslices(f, D::DArray{T,N,A}; dims) where {T,N,A} + if !(dims isa AbstractVector) + dims = [dims...] + end if !all(t -> t == 1, size(D.indices)[dims]) p = ones(Int, ndims(D)) nondims = filter(t -> !(t in dims), 1:ndims(D)) @@ -209,10 +213,10 @@ function mapslices(f::Function, D::DArray{T,N,A}, dims::AbstractVector) where {T DD = DArray(size(D), procs(D), p) do I return convert(A, D[I...]) end - return mapslices(f, DD, dims) + return mapslices(f, DD, dims=dims) end - refs = Future[remotecall((x,y,z)->mapslices(x,localpart(y),z), p, f, D, dims) for p in procs(D)] + refs = Future[remotecall((x,y,z)->mapslices(x,localpart(y),dims=z), p, f, D, dims) for p in procs(D)] DArray(reshape(refs, size(procs(D)))) end @@ -246,7 +250,7 @@ function _ppeval(f, A...; dim = map(ndims, A)) push!(ridx, 1) Rsize = map(last, ridx) Rsize[end] = dimlength - R = Array{eltype(R1)}(Rsize...) + R = Array{eltype(R1)}(undef, Rsize...) for i = 1:dimlength for j = 1:narg @@ -273,7 +277,7 @@ Evaluates the callable argument `f` on slices of the elements of the `D` tuple. `f` can be any callable object that accepts sliced or broadcasted elements of `D`. The result returned from `f` must be either an array or a scalar. -`D` has any number of elements and the alements can have any type. If an element +`D` has any number of elements and the elements can have any type. If an element of `D` is a distributed array along the dimension specified by `dim`. If an element of `D` is not distributed, the element is by default broadcasted and applied on all evaluations of `f`. @@ -286,7 +290,7 @@ broadcasted to all evaluations of `f`. #### Result `ppeval` returns a distributed array of dimension `p+1` where the first `p` -sizes correspond to the sizes of return values of `f`. The last dimention of +sizes correspond to the sizes of return values of `f`. The last dimension of the return array from `ppeval` has the same length as the dimension over which the input arrays are sliced. diff --git a/src/sort.jl b/src/sort.jl index df5cad0..a7d34f0 100644 --- a/src/sort.jl +++ b/src/sort.jl @@ -38,7 +38,7 @@ function scatter_n_sort_localparts(d, myidx, refs, boundaries::Array{T}; by = id end if p_till == p_sorted - @async put!(r, Array{T}(0)) + @async put!(r, Array{T}(undef,0)) else v = sorted[p_sorted:p_till-1] @async put!(r, v) @@ -66,7 +66,7 @@ function compute_boundaries(d::DVector{T}; kwargs...) where T results = asyncmap(p -> remotecall_fetch(sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs...), pids) - samples = Array{T}(undef, 0) + samples = Array{T}(undef,0) for x in results append!(samples, x[1]) end @@ -128,7 +128,7 @@ function Base.sort(d::DVector{T}; sample=true, kwargs...) where T @assert lb<=ub - s = Array{T}(np) + s = Array{T}(undef,np) part = abs(ub - lb)/np (isnan(part) || isinf(part)) && throw(ArgumentError("lower and upper bounds must not be infinities")) @@ -155,7 +155,7 @@ function Base.sort(d::DVector{T}; sample=true, kwargs...) where T throw(ArgumentError("keyword arg `sample` must be Boolean, Tuple(Min,Max) or an actual sample of data : " * string(sample))) end - local_sort_results = Array{Tuple}(np) + local_sort_results = Array{Tuple}(undef,np) Base.asyncmap!((i,p) -> remotecall_fetch( scatter_n_sort_localparts, p, presorted ? nothing : d, i, refs, boundaries; kwargs...), diff --git a/test/darray.jl b/test/darray.jl index b108e5b..28c7c36 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -1,5 +1,6 @@ using Test, LinearAlgebra, SpecialFunctions using Statistics: mean +using SparseArrays: nnz @everywhere using SparseArrays: sprandn @testset "test distribute and other constructors" begin @@ -713,35 +714,35 @@ check_leaks() @testset "test mapslices" begin A = randn(5,5,5) D = distribute(A, procs = workers(), dist = [1, 1, min(nworkers(), 5)]) - @test mapslices(svdvals, D, (1,2)) ≈ mapslices(svdvals, A, (1,2)) - @test mapslices(svdvals, D, (1,3)) ≈ mapslices(svdvals, A, (1,3)) - @test mapslices(svdvals, D, (2,3)) ≈ mapslices(svdvals, A, (2,3)) - @test mapslices(sort, D, (1,)) ≈ mapslices(sort, A, (1,)) - @test mapslices(sort, D, (2,)) ≈ mapslices(sort, A, (2,)) - @test mapslices(sort, D, (3,)) ≈ mapslices(sort, A, (3,)) + @test mapslices(svdvals, D, dims=(1,2)) ≈ mapslices(svdvals, A, dims=(1,2)) + @test mapslices(svdvals, D, dims=(1,3)) ≈ mapslices(svdvals, A, dims=(1,3)) + @test mapslices(svdvals, D, dims=(2,3)) ≈ mapslices(svdvals, A, dims=(2,3)) + @test mapslices(sort, D, dims=(1,)) ≈ mapslices(sort, A, dims=(1,)) + @test mapslices(sort, D, dims=(2,)) ≈ mapslices(sort, A, dims=(2,)) + @test mapslices(sort, D, dims=(3,)) ≈ mapslices(sort, A, dims=(3,)) # issue #3613 - B = mapslices(sum, dones(Float64, (2,3,4), workers(), [1,1,min(nworkers(),4)]), [1,2]) + B = mapslices(sum, dones(Float64, (2,3,4), workers(), [1,1,min(nworkers(),4)]), dims=[1,2]) @test size(B) == (1,1,4) @test all(B.==6) # issue #5141 - C1 = mapslices(x-> maximum(-x), D, []) + C1 = mapslices(x-> maximum(-x), D, dims=[]) @test C1 == -D # issue #5177 c = dones(Float64, (2,3,4,5), workers(), [1,1,1,min(nworkers(),5)]) - m1 = mapslices(x-> ones(2,3), c, [1,2]) - m2 = mapslices(x-> ones(2,4), c, [1,3]) - m3 = mapslices(x-> ones(3,4), c, [2,3]) + m1 = mapslices(x-> ones(2,3), c, dims=[1,2]) + m2 = mapslices(x-> ones(2,4), c, dims=[1,3]) + m3 = mapslices(x-> ones(3,4), c, dims=[2,3]) @test size(m1) == size(m2) == size(m3) == size(c) - n1 = mapslices(x-> ones(6), c, [1,2]) - n2 = mapslices(x-> ones(6), c, [1,3]) - n3 = mapslices(x-> ones(6), c, [2,3]) - n1a = mapslices(x-> ones(1,6), c, [1,2]) - n2a = mapslices(x-> ones(1,6), c, [1,3]) - n3a = mapslices(x-> ones(1,6), c, [2,3]) + n1 = mapslices(x-> ones(6), c, dims=[1,2]) + n2 = mapslices(x-> ones(6), c, dims=[1,3]) + n3 = mapslices(x-> ones(6), c, dims=[2,3]) + n1a = mapslices(x-> ones(1,6), c, dims=[1,2]) + n2a = mapslices(x-> ones(1,6), c, dims=[1,3]) + n3a = mapslices(x-> ones(1,6), c, dims=[2,3]) @test (size(n1a) == (1,6,4,5) && size(n2a) == (1,3,6,5) && size(n3a) == (2,1,6,5)) @test (size(n1) == (6,1,4,5) && size(n2) == (6,3,1,5) && size(n3) == (2,6,1,5)) close(D) @@ -757,11 +758,11 @@ check_leaks() c = drand(20,20) d = convert(Array, c) - @testset "$f" for f in (:+, :-, :.+, :.-, :.*, :./, :.%) + @testset "$f" for f in (:+, :-, :*, :/, :%) x = rand() - @test @eval ($f)($a, $x) == ($f)($b, $x) - @test @eval ($f)($x, $a) == ($f)($x, $b) - @test @eval ($f)($a, $c) == ($f)($b, $d) + @test @eval ($f).($a, $x) == ($f).($b, $x) + @test @eval ($f).($x, $a) == ($f).($x, $b) + @test @eval ($f).($a, $c) == ($f).($b, $d) end close(a) @@ -769,10 +770,10 @@ check_leaks() a = dones(Int, 20, 20) b = convert(Array, a) - @testset "$f" for f in (:.<<, :.>>) - @test @eval ($f)($a, 2) == ($f)($b, 2) - @test @eval ($f)(2, $a) == ($f)(2, $b) - @test @eval ($f)($a, $a) == ($f)($b, $b) + @testset "$f" for f in (:<<, :>>) + @test @eval ($f).($a, 2) == ($f).($b, 2) + @test @eval ($f).(2, $a) == ($f).(2, $b) + @test @eval ($f).($a, $a) == ($f).($b, $b) end @testset "$f" for f in (:rem,) @@ -792,7 +793,7 @@ check_leaks() nrows = 20 * nwrkrs ncols = 10 * nwrkrs a = drand((nrows,ncols), wrkrs, (1, nwrkrs)) - m = mean(a, 1) + m = mean(a, dims=1) c = a .- m d = convert(Array, a) .- convert(Array, m) @test c == d @@ -854,7 +855,7 @@ check_leaks() R[:, i] = convert(Array, A)[:, :, i]*convert(Array, B)[:, i] end @test convert(Array, ppeval(*, A, B)) ≈ R - @test sum(ppeval(eigvals, A)) ≈ sum(ppeval(eigvals, A, eye(10, 10))) + @test sum(ppeval(eigvals, A)) ≈ sum(ppeval(eigvals, A, Matrix{Float64}(I,10,10))) close(A) close(B) d_closeall() # close the temporaries created above @@ -887,7 +888,7 @@ end d_closeall() # close the temporaries created above end -@testset "sort, T = $T" for i in 0:6, T in [Int, Float64] +@testset "sort, T = $T, 10^$i elements" for i in 0:6, T in [Int, Float64] d = DistributedArrays.drand(T, 10^i) @testset "sample = $sample" for sample in Any[true, false, (minimum(d),maximum(d)), rand(T, 10^i>512 ? 512 : 10^i)] d2 = DistributedArrays.sort(d; sample=sample) From 86d883c6e883a139098c8ed4c407d903ad8f6bd2 Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Mon, 16 Jul 2018 20:27:22 +0100 Subject: [PATCH 10/18] Add constructor for Array from DArray. This fixes extreme slowness of convert(Array, ::DArray) which was falling back to the generic constructor from AbstractArray. --- src/darray.jl | 2 +- test/darray.jl | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/darray.jl b/src/darray.jl index 9d7a37b..40a4dc3 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -153,7 +153,7 @@ function ddata(;T::Type=Any, init::Function=I->nothing, pids=workers(), data::Ve end function gather(d::DArray{T,1,T}) where T - a=Array{T}(length(procs(d))) + a=Array{T}(undef, length(procs(d))) @sync for (i,p) in enumerate(procs(d)) @async a[i] = remotecall_fetch(localpart, p, d) end diff --git a/test/darray.jl b/test/darray.jl index 28c7c36..42cac37 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -892,9 +892,10 @@ end d = DistributedArrays.drand(T, 10^i) @testset "sample = $sample" for sample in Any[true, false, (minimum(d),maximum(d)), rand(T, 10^i>512 ? 512 : 10^i)] d2 = DistributedArrays.sort(d; sample=sample) - + a = convert(Array, d) + a2 = convert(Array, d2) @test length(d) == length(d2) - @test sort(convert(Array, d)) == convert(Array, d2) + @test sort(a) == a2 end d_closeall() # close the temporaries created above end From cd94d5c4f431e95549ec8d4f6a3f2a8345e823ef Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Tue, 17 Jul 2018 13:28:43 +0100 Subject: [PATCH 11/18] Fix SPMD deadlock. There was a leftover get from the Nullable times. This should cause an exception, but resulted in a deadlock. Julia bug? --- src/spmd.jl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/spmd.jl b/src/spmd.jl index b1f9f88..b68d8cb 100644 --- a/src/spmd.jl +++ b/src/spmd.jl @@ -24,7 +24,7 @@ mutable struct SPMDContext function SPMDContext(id) ctxt = new(id, Channel(typemax(Int)), Dict{Any,Any}(), [], false) - finalizer(ctxt, finalize_ctxt) + finalizer(finalize_ctxt, ctxt) ctxt end end @@ -256,7 +256,7 @@ end function Base.close(ctxt::SPMDContext) for p in ctxt.pids - Base.remote_do(delete_ctxt_id, p, ctxt.id) + remote_do(delete_ctxt_id, p, ctxt.id) end ctxt.release = false end From 30621dd1fb5889b719463152cbaa5ca7835a4cdc Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Tue, 17 Jul 2018 13:45:18 +0100 Subject: [PATCH 12/18] Fixed deprecations in SPMD. All tests pass without deprecation warnings now. --- src/spmd.jl | 8 ++++---- test/spmd.jl | 6 +++--- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/spmd.jl b/src/spmd.jl index b68d8cb..91ad898 100644 --- a/src/spmd.jl +++ b/src/spmd.jl @@ -198,7 +198,7 @@ function scatter(x, pid::Int; tag=nothing, pids=procs()) p == pid && continue send_msg(p, :scatter, x[cnt*(i-1)+1:cnt*i], tag) end - myidx = findfirst(sort(pids), pid) + myidx = findfirst(isequal(pid), sort(pids)) return x[cnt*(myidx-1)+1:cnt*myidx] else _, data = get_msg(:scatter, pid, tag) @@ -208,13 +208,13 @@ end function gather(x, pid::Int; tag=nothing, pids=procs()) if myid() == pid - gathered_data = Array{Any}(length(pids)) - myidx = findfirst(sort(pids), pid) + gathered_data = Array{Any}(undef, length(pids)) + myidx = findfirst(isequal(pid), sort(pids)) gathered_data[myidx] = x n = length(pids) - 1 while n > 0 from, data_x = get_msg(:gather, false, tag) - fromidx = findfirst(sort(pids), from) + fromidx = findfirst(isequal(from), sort(pids)) gathered_data[fromidx] = data_x n=n-1 end diff --git a/test/spmd.jl b/test/spmd.jl index 73412c9..0bd289e 100644 --- a/test/spmd.jl +++ b/test/spmd.jl @@ -76,7 +76,7 @@ spmd(spmd_test1) # define the function everywhere @everywhere function foo_spmd(d_in, d_out, n) pids=sort(vec(procs(d_in))) - pididx = findfirst(pids, myid()) + pididx = findfirst(isequal(myid()), pids) mylp = localpart(d_in) localsum = 0 @@ -122,7 +122,7 @@ println("SPMD: Passed testing of spmd function run concurrently") # define the function everywhere @everywhere function foo_spmd2(d_in, d_out, n) pids=sort(vec(procs(d_in))) - pididx = findfirst(pids, myid()) + pididx = findfirst(isequal(myid()), pids) mylp = localpart(d_in) # see if we have a value in the local store. @@ -170,7 +170,7 @@ end # verify localstores with appropriate context store values exist. @everywhere begin if myid() != 1 - n = 0 + local n = 0 for (k,v) in DistributedArrays.SPMD.map_ctxts store = v.store localsum = store[:LOCALSUM] From d6277003b448dfc0be7c8171edbab7411cbb30c7 Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Thu, 19 Jul 2018 11:20:20 +0100 Subject: [PATCH 13/18] Fixup README. - update compatibility status - update example code Also, remove superfluous method for mapreduce. --- README.md | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 961e6c3..0a8f032 100644 --- a/README.md +++ b/README.md @@ -7,9 +7,7 @@ Distributed Arrays for Julia ***NOTE*** -Distributed Arrays will only work on Julia v0.4.0 or later. - -`DArray`s have been removed from Julia Base library in v0.4 so it is now necessary to import the `DistributedArrays` package on all spawned processes. +This package will only work on Julia v0.7 or later. ```julia using DistributedArrays @@ -154,7 +152,7 @@ following code accomplishes this:: left = mod(first(I[2])-2,size(d,2))+1 right = mod( last(I[2]) ,size(d,2))+1 - old = Array(Bool, length(I[1])+2, length(I[2])+2) + old = Array{Bool}(undef, length(I[1])+2, length(I[2])+2) old[1 , 1 ] = d[top , left] # left side old[2:end-1, 1 ] = d[I[1], left] old[end , 1 ] = d[bot , left] @@ -318,17 +316,18 @@ This toy example exchanges data with each of its neighbors `n` times. ``` using Distributed -addprocs(8) using DistributedArrays -using DistributedArrays.SPMD +addprocs(8) +@everywhere using DistributedArrays +@everywhere using DistributedArrays.SPMD -d_in=d=DArray(I->fill(myid(), (map(length,I)...)), (nworkers(), 2), workers(), [nworkers(),1]) -d_out=ddata() +d_in=d=DArray(I->fill(myid(), (map(length,I)...,)), (nworkers(), 2), workers(), [nworkers(),1]) +d_out=ddata(); # TODO cannot show # define the function everywhere @everywhere function foo_spmd(d_in, d_out, n) pids = sort(vec(procs(d_in))) - pididx = findfirst(pids, myid()) + pididx = findfirst(isequal(myid()), pids) mylp = d_in[:L] localsum = 0 @@ -352,7 +351,7 @@ d_out=ddata() end # run foo_spmd on all workers -spmd(foo_spmd, d_in, d_out, 10) +spmd(foo_spmd, d_in, d_out, 10, pids=workers()) # print values of d_in and d_out after the run println(d_in) From 83961fd8044becf8dc9f13a15676fbab50afce16 Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Fri, 20 Jul 2018 12:51:54 +0100 Subject: [PATCH 14/18] Add deepcopy and tests. `deepcopy(::DArray)` calls `similar` and deepcopies the localparts. Tests to ensure that `copy` makes a shadllow copy and `deepcopy` makes a deep copy. --- src/darray.jl | 10 ++++++++++ test/darray.jl | 38 ++++++++++++++++++++++++++++++++++---- 2 files changed, 44 insertions(+), 4 deletions(-) diff --git a/src/darray.jl b/src/darray.jl index 40a4dc3..dd92858 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -574,6 +574,16 @@ Base.copyto!(dest::SubOrDArray, src::SubOrDArray) = begin end Base.copy!(dest::SubOrDArray, src::SubOrDArray) = copyto!(dest, src) +function Base.deepcopy(src::DArray) + dest = similar(src) + asyncmap(procs(src)) do p + remotecall_fetch(p) do + dest[:L] = deepcopy(src[:L]) + end + end + return dest +end + # local copies are obtained by convert(Array, ) or assigning from # a SubDArray to a local Array. diff --git a/test/darray.jl b/test/darray.jl index 42cac37..6ee06ea 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -60,21 +60,51 @@ end check_leaks() -@testset "test DArray equality" begin +@testset "test DArray equality/copy/deepcopy" begin D = drand((200,200), [MYID, OTHERIDS]) - DC = copy(D) @testset "test isequal(::DArray, ::DArray)" begin + DC = copy(D) @test D == DC + close(DC) end - @testset "test copy(::DArray) does a copy of each localpart" begin + @testset "test [deep]copy(::DArray) does a copy of each localpart" begin + DC = copy(D) @spawnat OTHERIDS localpart(DC)[1] = 0 @test fetch(@spawnat OTHERIDS localpart(D)[1] != 0) + DD = deepcopy(D) + @spawnat OTHERIDS localpart(DD)[1] = 0 + @test fetch(@spawnat OTHERIDS localpart(D)[1] != 0) + close(DC) + close(DD) + end + + @testset "test copy(::DArray) is shallow" begin + DA = @DArray [rand(100) for i=1:10] + DC = copy(DA) + id = procs(DC)[1] + @test DA == DC + fetch(@spawnat id localpart(DC)[1] .= -1.0) + @test DA == DC + @test fetch(@spawnat id all(localpart(DA)[1] .== -1.0)) + close(DA) + close(DC) + end + + @testset "test deepcopy(::DArray) is not shallow" begin + DA = @DArray [rand(100) for i=1:10] + DC = deepcopy(DA) + id = procs(DC)[1] + @test DA == DC + fetch(@spawnat id localpart(DC)[1] .= -1.0) + @test DA != DC + @test fetch(@spawnat id all(localpart(DA)[1] .>= 0.0)) + close(DA) + close(DC) end close(D) - close(DC) end check_leaks() From 81b20b75bce654f89d1a69d1318805dc1a9cd387 Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Fri, 20 Jul 2018 13:01:30 +0100 Subject: [PATCH 15/18] Update README. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0a8f032..7a271bd 100644 --- a/README.md +++ b/README.md @@ -322,7 +322,7 @@ addprocs(8) @everywhere using DistributedArrays.SPMD d_in=d=DArray(I->fill(myid(), (map(length,I)...,)), (nworkers(), 2), workers(), [nworkers(),1]) -d_out=ddata(); # TODO cannot show +d_out=ddata(); # define the function everywhere @everywhere function foo_spmd(d_in, d_out, n) From f509f592b5e7836cf602ac432c088f0d7f271065 Mon Sep 17 00:00:00 2001 From: Frank Otto Date: Fri, 20 Jul 2018 14:26:58 +0100 Subject: [PATCH 16/18] Cleanup. Document change linearindexes -> linearindices. Clean up comments and unused code. --- src/darray.jl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/darray.jl b/src/darray.jl index dd92858..ab28026 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -198,7 +198,7 @@ function DArray(refs) nindices = Array{NTuple{length(dimdist),UnitRange{Int}}}(undef, dimdist...) for i in 1:length(nindices) - subidx = CartesianIndices(dimdist)[i] #ind2sub(dimdist, i) + subidx = CartesianIndices(dimdist)[i] nindices[i] = ntuple(length(subidx)) do x idx_in_dim = subidx[x] startidx = 1 From 24a150ba8673c4ca8448c4ab2ae0363a2f0deb0b Mon Sep 17 00:00:00 2001 From: Andreas Noack Date: Sat, 28 Jul 2018 09:19:04 +0200 Subject: [PATCH 17/18] Define necessary methods to make tests pass without hitting scalar indexing and some other cleanup. --- src/DistributedArrays.jl | 2 +- src/core.jl | 3 +- src/darray.jl | 52 ++++++++++++++++++++----- src/linalg.jl | 82 +++++++++++++++++++++++++++------------- src/mapreduce.jl | 35 +++++++++++++++-- src/serialize.jl | 4 +- test/darray.jl | 34 ++++++++++------- test/runtests.jl | 7 +++- 8 files changed, 161 insertions(+), 58 deletions(-) diff --git a/src/DistributedArrays.jl b/src/DistributedArrays.jl index 941e4f6..755855c 100644 --- a/src/DistributedArrays.jl +++ b/src/DistributedArrays.jl @@ -8,7 +8,7 @@ using LinearAlgebra import Base: +, -, *, div, mod, rem, &, |, xor import Base.Callable -import LinearAlgebra: axpy!, dot, norm, +import LinearAlgebra: axpy!, dot, norm import Primes import Primes: factor diff --git a/src/core.jl b/src/core.jl index 034e901..abb8415 100644 --- a/src/core.jl +++ b/src/core.jl @@ -55,7 +55,8 @@ end Get the vector of processes storing pieces of DArray `d`. """ -Distributed.procs(d::DArray) = d.pids +Distributed.procs(d::DArray) = d.pids +Distributed.procs(d::SubDArray) = procs(parent(d)) """ localpart(A) diff --git a/src/darray.jl b/src/darray.jl index ab28026..e2108b8 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -364,9 +364,41 @@ function localindices(d::DArray) return d.indices[lpidx] end -# find which piece holds index (I...) -locate(d::DArray, I::Int...) = - ntuple(i -> searchsortedlast(d.cuts[i], I[i]), ndims(d)) +# Equality +function Base.:(==)(d::DArray{<:Any,<:Any,A}, a::AbstractArray) where A + if size(d) != size(a) + return false + else + b = asyncmap(procs(d)) do p + remotecall_fetch(p) do + localpart(d) == A(a[localindices(d)...]) + end + end + return all(b) + end +end +Base.:(==)(d::SubDArray, a::AbstractArray) = copy(d) == a +Base.:(==)(a::AbstractArray, d::DArray) = d == a +Base.:(==)(a::AbstractArray, d::SubDArray) = d == a +Base.:(==)(d1::DArray, d2::DArray) = invoke(==, Tuple{DArray, AbstractArray}, d1, d2) +Base.:(==)(d1::SubDArray, d2::DArray) = copy(d1) == d2 +Base.:(==)(d1::DArray, d2::SubDArray) = d1 == copy(d2) +Base.:(==)(d1::SubDArray, d2::SubDArray) = copy(d1) == copy(d2) + +""" + locate(d::DArray, I::Int...) + +Determine the index of `procs(d)` that hold element `I`. +""" +function locate(d::DArray, I::Int...) + ntuple(ndims(d)) do i + fi = searchsortedlast(d.cuts[i], I[i]) + if fi >= length(d.cuts[i]) + throw(ArgumentError("element not contained in array")) + end + return fi + end +end chunk(d::DArray{T,N,A}, i...) where {T,N,A} = remotecall_fetch(localpart, d.pids[i...], d)::A @@ -479,7 +511,7 @@ end function (::Type{Array{S,N}})(s::SubDArray{T,N}) where {S,T,N} I = s.indices d = s.parent - if isa(I,Tuple{Vararg{UnitRange{Int}}}) && S<:T && T<:S + if isa(I,Tuple{Vararg{UnitRange{Int}}}) && S<:T && T<:S && !isempty(s) l = locate(d, map(first, I)...) if isequal(d.indices[l...], I) # SubDArray corresponds to a chunk @@ -487,7 +519,7 @@ function (::Type{Array{S,N}})(s::SubDArray{T,N}) where {S,T,N} end end a = Array{S}(undef, size(s)) - a[[1:size(a,i) for i=1:N]...] .= s + a[[1:size(a,i) for i=1:N]...] = s return a end @@ -540,7 +572,7 @@ end function Base.getindex(d::DArray, i::Int) _scalarindexingallowed() - return getindex_tuple(d, CartesianIndices(d)[i]) + return getindex_tuple(d, Tuple(CartesianIndices(d)[i])) end function Base.getindex(d::DArray, i::Int...) _scalarindexingallowed() @@ -548,7 +580,7 @@ function Base.getindex(d::DArray, i::Int...) end Base.getindex(d::DArray) = d[1] -Base.getindex(d::DArray, I::Union{Int,UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) = view(d, I...) +Base.getindex(d::SubOrDArray, I::Union{Int,UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) = view(d, I...) function Base.isassigned(D::DArray, i::Integer...) try @@ -564,15 +596,15 @@ function Base.isassigned(D::DArray, i::Integer...) end -Base.copyto!(dest::SubOrDArray, src::SubOrDArray) = begin +function Base.copyto!(dest::SubOrDArray, src::AbstractArray) asyncmap(procs(dest)) do p remotecall_fetch(p) do - localpart(dest)[:] = src[localindices(dest)...] + ldest = localpart(dest) + ldest[:] = Array(view(src, localindices(dest)...)) end end return dest end -Base.copy!(dest::SubOrDArray, src::SubOrDArray) = copyto!(dest, src) function Base.deepcopy(src::DArray) dest = similar(src) diff --git a/src/linalg.jl b/src/linalg.jl index ca05c19..caf95cc 100644 --- a/src/linalg.jl +++ b/src/linalg.jl @@ -1,4 +1,5 @@ -function Base.copy(D::Adjoint{T,<:DArray{T,2}}) where T +function Base.copy(Dadj::Adjoint{T,<:DArray{T,2}}) where T + D = parent(Dadj) DArray(reverse(size(D)), procs(D)) do I lp = Array{T}(undef, map(length, I)) rp = convert(Array, D[reverse(I)...]) @@ -6,7 +7,8 @@ function Base.copy(D::Adjoint{T,<:DArray{T,2}}) where T end end -function Base.copy(D::Transpose{T,<:DArray{T,2}}) where T +function Base.copy(Dtr::Transpose{T,<:DArray{T,2}}) where T + D = parent(Dtr) DArray(reverse(size(D)), procs(D)) do I lp = Array{T}(undef, map(length, I)) rp = convert(Array, D[reverse(I)...]) @@ -49,7 +51,7 @@ function dot(x::DVector, y::DVector) return reduce(+, results) end -function norm(x::DVector, p::Real = 2) +function norm(x::DArray, p::Real = 2) results = [] @sync begin for pp in procs(x) @@ -83,7 +85,7 @@ function add!(dest, src, scale = one(dest[1])) return dest end -function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVector) +function mul!(y::DVector, A::DMatrix, x::AbstractVector, α::Number = 1, β::Number = 0) # error checks if size(A, 2) != length(x) @@ -106,11 +108,14 @@ function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVec # Scale y if necessary if β != one(β) - @sync for p in y.pids - if β != zero(β) - @async remotecall_fetch(y -> (rmul!(localpart(y), β); nothing), p, y) - else - @async remotecall_fetch(y -> (fill!(localpart(y), 0); nothing), p, y) + asyncmap(procs(y)) do p + remotecall_fetch(p) do + if !iszero(β) + rmul!(localpart(y), β) + else + fill!(localpart(y), 0) + end + return nothing end end end @@ -127,7 +132,9 @@ function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVec return y end -function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVector) +function mul!(y::DVector, adjA::Adjoint{<:Number,<:DMatrix}, x::AbstractVector, α::Number = 1, β::Number = 0) + + A = parent(adjA) # error checks if size(A, 1) != length(x) @@ -148,11 +155,14 @@ function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVe # Scale y if necessary if β != one(β) - @sync for p in y.pids - if β != zero(β) - @async remotecall_fetch(() -> (rmul!(localpart(y), β); nothing), p) - else - @async remotecall_fetch(() -> (fill!(localpart(y), 0); nothing), p) + asyncmap(procs(y)) do p + remotecall_fetch(p) do + if !iszero(β) + rmul!(localpart(y), β) + else + fill!(localpart(y), 0) + end + return nothing end end end @@ -189,7 +199,7 @@ function LinearAlgebra.rmul!(DA::DMatrix, D::Diagonal) end # Level 3 -function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix, tA) +function _matmatmul!(C::DMatrix, A::DMatrix, B::AbstractMatrix, α::Number, β::Number, tA) # error checks Ad1, Ad2 = (tA == 'N') ? (1,2) : (2,1) mA, nA = (size(A, Ad1), size(A, Ad2)) @@ -254,17 +264,16 @@ function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::D return C end -A_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'N') -Ac_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'C') -At_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'T') -At_mul_B!(C::DMatrix, A::DMatrix, B::AbstractMatrix) = At_mul_B!(one(eltype(C)), A, B, zero(eltype(C)), C) +mul!(C::DMatrix, A::DMatrix, B::AbstractMatrix, α::Number = 1, β::Number = 0) = _matmatmul!(C, A, B, α, β, 'N') +mul!(C::DMatrix, A::Adjoint{<:Number,<:DMatrix}, B::AbstractMatrix, α::Number = 1, β::Number = 0) = _matmatmul!(C, parent(A), B, α, β, 'C') +mul!(C::DMatrix, A::Transpose{<:Number,<:DMatrix}, B::AbstractMatrix, α::Number = 1, β::Number = 0) = _matmatmul!(C, parent(A), B, α, β, 'T') _matmul_op = (t,s) -> t*s + t*s function Base.:*(A::DMatrix, x::AbstractVector) T = Base.promote_op(_matmul_op, eltype(A), eltype(x)) y = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 1),), procs(A)[:,1], (size(procs(A), 1),)) - return A_mul_B!(one(T), A, x, zero(T), y) + return mul!(y, A, x) end function Base.:*(A::DMatrix, B::AbstractMatrix) T = Base.promote_op(_matmul_op, eltype(A), eltype(B)) @@ -272,22 +281,43 @@ function Base.:*(A::DMatrix, B::AbstractMatrix) (size(A, 1), size(B, 2)), procs(A)[:,1:min(size(procs(A), 2), size(procs(B), 2))], (size(procs(A), 1), min(size(procs(A), 2), size(procs(B), 2)))) - return A_mul_B!(one(T), A, B, zero(T), C) + return mul!(C, A, B) +end + +function Base.:*(adjA::Adjoint{<:Any,<:DMatrix}, x::AbstractVector) + A = parent(adjA) + T = Base.promote_op(_matmul_op, eltype(A), eltype(x)) + y = DArray(I -> Array{T}(undef, map(length, I)), + (size(A, 2),), + procs(A)[1,:], + (size(procs(A), 2),)) + return mul!(y, adjA, x) +end +function Base.:*(adjA::Adjoint{<:Any,<:DMatrix}, B::AbstractMatrix) + A = parent(adjA) + T = Base.promote_op(_matmul_op, eltype(A), eltype(B)) + C = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 2), + size(B, 2)), + procs(A)[1:min(size(procs(A), 1), size(procs(B), 2)),:], + (size(procs(A), 2), min(size(procs(A), 1), size(procs(B), 2)))) + return mul!(C, adjA, B) end -function Ac_mul_B(A::DMatrix, x::AbstractVector) +function Base.:*(trA::Transpose{<:Any,<:DMatrix}, x::AbstractVector) + A = parent(trA) T = Base.promote_op(_matmul_op, eltype(A), eltype(x)) y = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 2),), procs(A)[1,:], (size(procs(A), 2),)) - return Ac_mul_B!(one(T), A, x, zero(T), y) + return mul!(y, trA, x) end -function Ac_mul_B(A::DMatrix, B::AbstractMatrix) +function Base.:*(trA::Transpose{<:Any,<:DMatrix}, B::AbstractMatrix) + A = parent(trA) T = Base.promote_op(_matmul_op, eltype(A), eltype(B)) C = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 2), size(B, 2)), procs(A)[1:min(size(procs(A), 1), size(procs(B), 2)),:], (size(procs(A), 2), min(size(procs(A), 1), size(procs(B), 2)))) - return Ac_mul_B!(one(T), A, B, zero(T), C) + return mul!(C, trA, B) end diff --git a/src/mapreduce.jl b/src/mapreduce.jl index 3b28c77..d8da385 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -5,10 +5,10 @@ import SparseArrays: nnz Base.map(f, d0::DArray, ds::AbstractArray...) = broadcast(f, d0, ds...) -function Base.map!(f::F, dest::DArray, src::DArray) where {F} +function Base.map!(f::F, dest::DArray, src::DArray{<:Any,<:Any,A}) where {F,A} asyncmap(procs(dest)) do p remotecall_fetch(p) do - map!(f, localpart(dest), src[localindices(dest)...]) + map!(f, localpart(dest), A(view(src, localindices(dest)...))) return nothing end end @@ -53,7 +53,7 @@ rewrite_local(x) = x function Base.reduce(f, d::DArray) results = asyncmap(procs(d)) do p - remotecall_fetch(p, f, d) do (f, d) + remotecall_fetch(p) do return reduce(f, localpart(d)) end end @@ -122,12 +122,39 @@ function Base.mapreducedim!(f, op, R::DArray, A::DArray) end region = tuple(collect(1:ndims(A))[[size(R)...] .!= [size(A)...]]...) if isempty(region) - return copy!(R, A) + return copyto!(R, A) end B = mapreducedim_within(f, op, A, region) return mapreducedim_between!(identity, op, R, B, region) end +function Base._all(f, A::DArray, ::Colon) + B = asyncmap(procs(A)) do p + remotecall_fetch(p) do + all(f, localpart(A)) + end + end + return all(B) +end + +function Base._any(f, A::DArray, ::Colon) + B = asyncmap(procs(A)) do p + remotecall_fetch(p) do + any(f, localpart(A)) + end + end + return any(B) +end + +function Base.count(f, A::DArray) + B = asyncmap(procs(A)) do p + remotecall_fetch(p) do + count(f, localpart(A)) + end + end + return sum(B) +end + function nnz(A::DArray) B = asyncmap(A.pids) do p remotecall_fetch(nnz∘localpart, p, A) diff --git a/src/serialize.jl b/src/serialize.jl index a36d6b8..385a89f 100644 --- a/src/serialize.jl +++ b/src/serialize.jl @@ -2,7 +2,7 @@ function Serialization.serialize(S::AbstractSerializer, d::DArray{T,N,A}) where # Only send the ident for participating workers - we expect the DArray to exist in the # remote registry. DO NOT send the localpart. destpid = worker_id_from_socket(S.io) - serialize_type(S, typeof(d)) + Serialization.serialize_type(S, typeof(d)) if (destpid in d.pids) || (destpid == d.id[1]) serialize(S, (true, d.id)) # (id_only, id) else @@ -64,7 +64,7 @@ function Serialization.serialize(S::AbstractSerializer, s::DestinationSerializer pid = worker_id_from_socket(S.io) pididx = findfirst(isequal(pid), s.pids) @assert pididx !== nothing - serialize_type(S, typeof(s)) + Serialization.serialize_type(S, typeof(s)) serialize(S, s.generate(pididx)) end diff --git a/test/darray.jl b/test/darray.jl index 6ee06ea..4875f72 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -161,32 +161,40 @@ check_leaks() @testset "test DArray / Array conversion" begin D = drand((200,200), [MYID, OTHERIDS]) - @testset "test convert(::Array, ::(Sub)DArray)" begin - S = convert(Matrix{Float64}, D[1:150, 1:150]) - A = convert(Matrix{Float64}, D) + @testset "test construct Array from (Sub)DArray" begin + S = Matrix{Float64}(D[1:150, 1:150]) + A = Matrix{Float64}(D) @test A[1:150,1:150] == S - D2 = convert(DArray{Float64,2,Matrix{Float64}}, A) + D2 = DArray{Float64,2,Matrix{Float64}}(A) @test D2 == D + DistributedArrays.allowscalar(true) @test fetch(@spawnat MYID localpart(D)[1,1]) == D[1,1] @test fetch(@spawnat OTHERIDS localpart(D)[1,1]) == D[1,101] + DistributedArrays.allowscalar(false) close(D2) - S2 = convert(Vector{Float64}, D[4, 23:176]) + S2 = Vector{Float64}(D[4, 23:176]) @test A[4, 23:176] == S2 - S3 = convert(Vector{Float64}, D[23:176, 197]) + S3 = Vector{Float64}(D[23:176, 197]) @test A[23:176, 197] == S3 S4 = zeros(4) setindex!(S4, D[3:4, 99:100], :) + # FixMe! Hitting the AbstractArray fallback here is extremely unfortunate but vec() becomes a ReshapedArray which makes it diffuclt to hit DArray methods. Unless this can be fixed in Base, we might have to add special methods for ReshapedArray{DArray} + DistributedArrays.allowscalar(true) @test S4 == vec(D[3:4, 99:100]) @test S4 == vec(A[3:4, 99:100]) + DistributedArrays.allowscalar(false) S5 = zeros(2,2) setindex!(S5, D[1,1:4], :, 1:2) + # FixMe! Hitting the AbstractArray fallback here is extremely unfortunate but vec() becomes a ReshapedArray which makes it diffuclt to hit DArray methods. Unless this can be fixed in Base, we might have to add special methods for ReshapedArray{DArray} + DistributedArrays.allowscalar(true) @test vec(S5) == D[1, 1:4] @test vec(S5) == A[1, 1:4] + DistributedArrays.allowscalar(false) end close(D) end @@ -198,7 +206,7 @@ check_leaks() r1 = remotecall_wait(() -> randn(3,10), workers()[1]) r2 = remotecall_wait(() -> randn(7,10), workers()[2]) D2 = DArray(reshape([r1; r2], 2, 1)) - copy!(D2, D1) + copyto!(D2, D1) @test D1 == D2 close(D1) close(D2) @@ -672,22 +680,22 @@ check_leaks() @testset "test transpose/adjoint" begin @testset "test transpose real" begin A = drand(Float64, 100, 200) - @test transpose(A) == transpose(Array(A)) + @test copy(transpose(A)) == transpose(Array(A)) close(A) end @testset "test transpose complex" begin A = drand(ComplexF64, 200, 100) - @test transpose(A) == transpose(Array(A)) + @test copy(transpose(A)) == transpose(Array(A)) close(A) end @testset "test adjoint real" begin A = drand(Float64, 200, 100) - @test adjoint(A) == adjoint(Array(A)) + @test copy(adjoint(A)) == adjoint(Array(A)) close(A) end @testset "test adjoint complex" begin A = drand(ComplexF64, 100, 200) - @test adjoint(A) == adjoint(Array(A)) + @test copy(adjoint(A)) == adjoint(Array(A)) close(A) end @@ -701,11 +709,11 @@ check_leaks() s = view(a, 1:5, 5:8) @test isa(s, SubDArray) - @test s == convert(DArray, s) + @test s == DArray(s) s = view(a, 6:5, 5:8) @test isa(s, SubDArray) - @test s == convert(DArray, s) + @test s == DArray(s) close(a) d_closeall() # close the temporaries created above end diff --git a/test/runtests.jl b/test/runtests.jl index b98613b..9b66444 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -2,6 +2,10 @@ using Test using Distributed using DistributedArrays +# Disable scalar indexing to avoid falling back on generic methods +# for AbstractArray +DistributedArrays.allowscalar(false) + # add at least 3 worker processes if nworkers() < 3 n = max(3, min(8, Sys.CPU_THREADS)) @@ -10,6 +14,7 @@ end @assert nprocs() > 3 @assert nworkers() >= 3 +@everywhere using Distributed @everywhere using DistributedArrays @everywhere using DistributedArrays.SPMD @everywhere using Random @@ -23,7 +28,7 @@ const OTHERIDS = filter(id-> id != MYID, procs())[rand(1:(nprocs()-1))] function check_leaks() if length(DistributedArrays.refs) > 0 sleep(0.1) # allow time for any cleanup to complete and test again - length(DistributedArrays.refs) > 0 && warn("Probable leak of ", length(DistributedArrays.refs), " darrays") + length(DistributedArrays.refs) > 0 && @warn("Probable leak of ", length(DistributedArrays.refs), " darrays") end end From 810e14228f30828a0f2ad363553a54aa81ff69af Mon Sep 17 00:00:00 2001 From: Andreas Noack Date: Sat, 28 Jul 2018 14:28:58 +0200 Subject: [PATCH 18/18] Fix leaks in comparisons --- src/darray.jl | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/darray.jl b/src/darray.jl index e2108b8..ac6657e 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -377,13 +377,33 @@ function Base.:(==)(d::DArray{<:Any,<:Any,A}, a::AbstractArray) where A return all(b) end end -Base.:(==)(d::SubDArray, a::AbstractArray) = copy(d) == a +function Base.:(==)(d::SubDArray, a::AbstractArray) + cd = copy(d) + t = cd == a + close(cd) + return t +end Base.:(==)(a::AbstractArray, d::DArray) = d == a Base.:(==)(a::AbstractArray, d::SubDArray) = d == a Base.:(==)(d1::DArray, d2::DArray) = invoke(==, Tuple{DArray, AbstractArray}, d1, d2) -Base.:(==)(d1::SubDArray, d2::DArray) = copy(d1) == d2 -Base.:(==)(d1::DArray, d2::SubDArray) = d1 == copy(d2) -Base.:(==)(d1::SubDArray, d2::SubDArray) = copy(d1) == copy(d2) +function Base.:(==)(d1::SubDArray, d2::DArray) + cd1 = copy(d1) + t = cd1 == d2 + close(cd1) + return t +end +function Base.:(==)(d1::DArray, d2::SubDArray) + cd2 = copy(d2) + t = d1 == cd2 + close(cd2) + return t +end +function Base.:(==)(d1::SubDArray, d2::SubDArray) + cd1 = copy(d1) + t = cd1 == d2 + close(cd1) + return t +end """ locate(d::DArray, I::Int...)