Skip to content

Commit

Permalink
Merge pull request #76 from JuliaParallel/mb
Browse files Browse the repository at this point in the history
Fix setindex! with SubDArray source (+ remove Julia 0.4 support)
  • Loading branch information
andreasnoack committed Aug 4, 2016
2 parents 166642e + feff2c6 commit 4e1a280
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 94 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ os:
- linux
- osx
julia:
- 0.4
- 0.5
- nightly
notifications:
email: false
Expand Down
3 changes: 1 addition & 2 deletions REQUIRE
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
julia 0.4
Compat 0.7.14
julia 0.5-
Primes
211 changes: 130 additions & 81 deletions src/DistributedArrays.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,8 @@ module DistributedArrays
using Compat
import Compat.view

if VERSION >= v"0.5.0-dev+4340"
using Primes
using Primes: factor
end

if VERSION < v"0.5.0-"
typealias Future RemoteRef
typealias RemoteChannel RemoteRef
typealias AbstractSerializer SerializationState # On 0.4 fallback to the only concrete implementation
end
using Primes
using Primes: factor

importall Base
import Base.Callable
Expand Down Expand Up @@ -195,40 +187,24 @@ function DArray(refs)

DArray(identity, refs, ndims, reshape(npids, dimdist), nindexes, ncuts)
end
if VERSION < v"0.5.0-"
macro DArray(ex::Expr)
if ex.head !== :comprehension
throw(ArgumentError("invalid @DArray syntax"))
end
ex.args[1] = esc(ex.args[1])
ndim = length(ex.args) - 1
ranges = map(r->esc(r.args[2]), ex.args[2:end])
for d = 1:ndim
var = ex.args[d+1].args[1]
ex.args[d+1] = :( $(esc(var)) = ($(ranges[d]))[I[$d]] )
end
return :( DArray((I::Tuple{Vararg{UnitRange{Int}}})->($ex),
tuple($(map(r->:(length($r)), ranges)...))) )

macro DArray(ex0::Expr)
if ex0.head !== :comprehension
throw(ArgumentError("invalid @DArray syntax"))
end
else
macro DArray(ex0::Expr)
if ex0.head !== :comprehension
throw(ArgumentError("invalid @DArray syntax"))
end
ex = ex0.args[1]
if ex.head !== :generator
throw(ArgumentError("invalid @DArray syntax"))
end
ex.args[1] = esc(ex.args[1])
ndim = length(ex.args) - 1
ranges = map(r->esc(r.args[2]), ex.args[2:end])
for d = 1:ndim
var = ex.args[d+1].args[1]
ex.args[d+1] = :( $(esc(var)) = ($(ranges[d]))[I[$d]] )
end
return :( DArray((I::Tuple{Vararg{UnitRange{Int}}})->($ex0),
tuple($(map(r->:(length($r)), ranges)...))) )
ex = ex0.args[1]
if ex.head !== :generator
throw(ArgumentError("invalid @DArray syntax"))
end
ex.args[1] = esc(ex.args[1])
ndim = length(ex.args) - 1
ranges = map(r->esc(r.args[2]), ex.args[2:end])
for d = 1:ndim
var = ex.args[d+1].args[1]
ex.args[d+1] = :( $(esc(var)) = ($(ranges[d]))[I[$d]] )
end
return :( DArray((I::Tuple{Vararg{UnitRange{Int}}})->($ex0),
tuple($(map(r->:(length($r)), ranges)...))) )
end

# new DArray similar to an existing one
Expand Down Expand Up @@ -656,29 +632,127 @@ function Base.setindex!(a::Array, d::DArray,
return a
end

# We also want to optimize setindex! with a SubDArray source, but this is hard
# and only works on 0.5.

# Similar to Base.indexin, but just create a logical mask. Note that this
# must return a logical mask in order to support merging multiple masks
# together into one linear index since we need to know how many elements to
# skip at the end. In many cases range intersection would be much faster
# than generating a logical mask, but that loses the endpoint information.
indexin_mask(a, b::Number) = a .== b
indexin_mask(a, r::Range{Int}) = [i in r for i in a]
indexin_mask(a, b::AbstractArray{Int}) = indexin_mask(a, IntSet(b))
indexin_mask(a, b::AbstractArray) = indexin_mask(a, Set(b))
indexin_mask(a, b) = [i in b for i in a]

import Base: tail
# Given a tuple of indices and a tuple of masks, restrict the indices to the
# valid regions. This is, effectively, reversing Base.setindex_shape_check.
# We can't just use indexing into MergedIndices here because getindex is much
# pickier about singleton dimensions than setindex! is.
restrict_indices(::Tuple{}, ::Tuple{}) = ()
function restrict_indices(a::Tuple{Any, Vararg{Any}}, b::Tuple{Any, Vararg{Any}})
if (length(a[1]) == length(b[1]) == 1) || (length(a[1]) > 1 && length(b[1]) > 1)
(vec(a[1])[vec(b[1])], restrict_indices(tail(a), tail(b))...)
elseif length(a[1]) == 1
(a[1], restrict_indices(tail(a), b))
elseif length(b[1]) == 1 && b[1][1]
restrict_indices(a, tail(b))
else
throw(DimensionMismatch("this should be caught by setindex_shape_check; please submit an issue"))
end
end
# The final indices are funky - they're allowed to accumulate together.
# An easy (albeit very inefficient) fix for too many masks is to use the
# outer product to merge them. But we can do that lazily with a custom type:
function restrict_indices(a::Tuple{Any}, b::Tuple{Any, Any, Vararg{Any}})
(vec(a[1])[vec(ProductIndices(b, map(length, b)))],)
end
# But too many indices is much harder; this requires merging the indices
# in `a` before applying the final mask in `b`.
function restrict_indices(a::Tuple{Any, Any, Vararg{Any}}, b::Tuple{Any})
if length(a[1]) == 1
(a[1], restrict_indices(tail(a), b))
else
# When one mask spans multiple indices, we need to merge the indices
# together. At this point, we can just use indexing to merge them since
# there's no longer special handling of singleton dimensions
(view(MergedIndices(a, map(length, a)), b[1]),)
end
end

immutable ProductIndices{I,N} <: AbstractArray{Bool, N}
indices::I
sz::NTuple{N,Int}
end
Base.size(P::ProductIndices) = P.sz
# This gets passed to map to avoid breaking propagation of inbounds
Base.@propagate_inbounds propagate_getindex(A, I...) = A[I...]
Base.@propagate_inbounds Base.getindex{_,N}(P::ProductIndices{_,N}, I::Vararg{Int, N}) =
Bool((&)(map(propagate_getindex, P.indices, I)...))

immutable MergedIndices{I,N} <: AbstractArray{CartesianIndex{N}, N}
indices::I
sz::NTuple{N,Int}
end
Base.size(M::MergedIndices) = M.sz
Base.@propagate_inbounds Base.getindex{_,N}(M::MergedIndices{_,N}, I::Vararg{Int, N}) =
CartesianIndex(map(propagate_getindex, M.indices, I))
# Additionally, we optimize bounds checking when using MergedIndices as an
# array index since checking, e.g., A[1:500, 1:500] is *way* faster than
# checking an array of 500^2 elements of CartesianIndex{2}. This optimization
# also applies to reshapes of MergedIndices since the outer shape of the
# container doesn't affect the index elements themselves. We can go even
# farther and say that even restricted views of MergedIndices must be valid
# over the entire array. This is overly strict in general, but in this
# use-case all the merged indices must be valid at some point, so it's ok.
typealias ReshapedMergedIndices{T,N,M<:MergedIndices} Base.ReshapedArray{T,N,M}
typealias SubMergedIndices{T,N,M<:Union{MergedIndices, ReshapedMergedIndices}} SubArray{T,N,M}
typealias MergedIndicesOrSub Union{MergedIndices, ReshapedMergedIndices, SubMergedIndices}
import Base: checkbounds_indices
@inline checkbounds_indices(::Type{Bool}, inds::Tuple{}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) =
checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...))
@inline checkbounds_indices(::Type{Bool}, inds::Tuple{Any}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) =
checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...))
@inline checkbounds_indices(::Type{Bool}, inds::Tuple, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) =
checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...))

# The tricky thing here is that we want to optimize the accesses into the
# distributed array, but in doing so, we lose track of which indices in I we
# should be using.
#
# I’ve come to the conclusion that the function is utterly insane.
# There are *6* flavors of indices with four different reference points:
# 1. Find the indices of each portion of the DArray.
# 2. Find the valid subset of indices for the SubArray into that portion.
# 3. Find the portion of the `I` indices that should be used when you access the
# `K` indices in the subarray. This guy is nasty. It’s totally backwards
# from all other arrays, wherein we simply iterate over the source array’s
# elements. You need to *both* know which elements in `J` were skipped
# (`indexin_mask`) and which dimensions should match up (`restrict_indices`)
# 4. If `K` doesn’t correspond to an entire chunk, reinterpret `K` in terms of
# the local portion of the source array
function Base.setindex!(a::Array, s::SubDArray,
I::Union{UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...)
Base.setindex_shape_check(s, Base.index_lengths(a, I...)...)
n = length(I)
d = s.parent
J = s.indexes
if length(J) < n
a[I...] = convert(Array,s)
return a
end
offs = [isa(J[i],Int) ? J[i]-1 : first(J[i])-1 for i=1:n]
J = Base.decolon(d, s.indexes...)
@sync for i = 1:length(d.pids)
K_c = Any[d.indexes[i]...]
K = [ intersect(J[j],K_c[j]) for j=1:n ]
K_c = d.indexes[i]
K = map(intersect, J, K_c)
if !any(isempty, K)
idxs = [ I[j][K[j]-offs[j]] for j=1:n ]
K_mask = map(indexin_mask, J, K_c)
idxs = restrict_indices(Base.decolon(a, I...), K_mask)
if isequal(K, K_c)
# whole chunk
@async a[idxs...] = chunk(d, i)
else
# partial chunk
@async a[idxs...] =
remotecall_fetch(d.pids[i]) do
view(localpart(d), [K[j]-first(K_c[j])+1 for j=1:n]...)
view(localpart(d), [K[j]-first(K_c[j])+1 for j=1:length(J)]...)
end
end
end
Expand Down Expand Up @@ -1395,16 +1469,7 @@ function compute_boundaries{T}(d::DVector{T}; kwargs...)
np = length(pids)
sample_sz_on_wrkr = 512

if VERSION < v"0.5.0-"
results = Array(Any,np)
@sync begin
for (i,p) in enumerate(pids)
@async results[i] = remotecall_fetch(sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs...)
end
end
else
results = asyncmap(p -> remotecall_fetch(sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs...), pids)
end
results = asyncmap(p -> remotecall_fetch(sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs...), pids)

samples = Array(T,0)
for x in results
Expand Down Expand Up @@ -1455,14 +1520,7 @@ function Base.sort{T}(d::DVector{T}; sample=true, kwargs...)

elseif sample==false
# Assume an uniform distribution between min and max values
if VERSION < v"0.5.0-"
minmax=Array(Tuple, np)
@sync for (i,p) in enumerate(pids)
@async minmax[i] = remotecall_fetch(d->(minimum(localpart(d)), maximum(localpart(d))), p, d)
end
else
minmax=asyncmap(p->remotecall_fetch(d->(minimum(localpart(d)), maximum(localpart(d))), p, d), pids)
end
minmax=asyncmap(p->remotecall_fetch(d->(minimum(localpart(d)), maximum(localpart(d))), p, d), pids)
min_d = minimum(T[x[1] for x in minmax])
max_d = maximum(T[x[2] for x in minmax])

Expand Down Expand Up @@ -1503,19 +1561,10 @@ function Base.sort{T}(d::DVector{T}; sample=true, kwargs...)
end

local_sort_results = Array(Tuple, np)
if VERSION < v"0.5.0-"
@sync begin
for (i,p) in enumerate(pids)
@async local_sort_results[i] =
remotecall_fetch(
scatter_n_sort_localparts, p, presorted ? nothing : d, i, refs, boundaries; kwargs...)
end
end
else
Base.asyncmap!((i,p) -> remotecall_fetch(

Base.asyncmap!((i,p) -> remotecall_fetch(
scatter_n_sort_localparts, p, presorted ? nothing : d, i, refs, boundaries; kwargs...),
local_sort_results, 1:np, pids)
end

# Construct a new DArray from the sorted refs. Remove parts with 0-length since
# the DArray constructor_from_refs does not yet support it. This implies that
Expand Down
28 changes: 18 additions & 10 deletions test/darray.jl
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,22 @@ check_leaks()
@test fetch(@spawnat MYID localpart(D)[1,1]) == D[1,1]
@test fetch(@spawnat OTHERIDS localpart(D)[1,1]) == D[1,101]
close(D2)

S2 = convert(Vector{Float64}, D[4, 23:176])
@test A[4, 23:176] == S2

S3 = convert(Vector{Float64}, D[23:176, 197])
@test A[23:176, 197] == S3

S4 = zeros(4)
setindex!(S4, D[3:4, 99:100], :)
@test S4 == vec(D[3:4, 99:100])
@test S4 == vec(A[3:4, 99:100])

S5 = zeros(2,2)
setindex!(S5, D[1,1:4], :, 1:2)
@test vec(S5) == D[1, 1:4]
@test vec(S5) == A[1, 1:4]
end
close(D)
end
Expand Down Expand Up @@ -610,20 +626,12 @@ check_leaks()
# Commented out tests that need to be enabled in due course when DArray support is more complete
@testset "test mapslices" begin
a = drand((5,5), workers(), [1, min(nworkers(), 5)])
if VERSION < v"0.5.0-dev+4361"
h = mapslices(v -> hist(v,0:0.1:1)[2], a, 1)
else
h = mapslices(v -> fit(Histogram,v,0:0.1:1).weights, a, 1)
end
h = mapslices(v -> fit(Histogram,v,0:0.1:1).weights, a, 1)
# H = mapslices(v -> hist(v,0:0.1:1)[2], a, 2)
# s = mapslices(sort, a, [1])
# S = mapslices(sort, a, [2])
for i = 1:5
if VERSION < v"0.5.0-dev+4361"
@test h[:,i] == hist(a[:,i],0:0.1:1)[2]
else
@test h[:,i] == fit(Histogram, a[:,i],0:0.1:1).weights
end
@test h[:,i] == fit(Histogram, a[:,i],0:0.1:1).weights
# @test vec(H[i,:]) => hist(vec(a[i,:]),0:0.1:1)[2]
# @test s[:,i] => sort(a[:,i])
# @test vec(S[i,:]) => sort(vec(a[i,:]))
Expand Down

0 comments on commit 4e1a280

Please sign in to comment.