diff --git a/README.md b/README.md index 3d4242e7e..72dd698bf 100644 --- a/README.md +++ b/README.md @@ -38,10 +38,10 @@ b = compute(a) # => Computed(10000x10000 Array{Float64,2} in 9 parts each of (max size) 4000x4000) ``` -The result is an object containing metadata about the various pieces. They may be created on different workers and will stay there until a another worker needs it. However, you can request and get the whole data with the `gather` function. +The result is an object containing metadata about the various pieces. They may be created on different workers and will stay there until a another worker needs it. However, you can request and get the whole data with the `collect` function. ```julia -gather(b) +collect(b) # => 10000x10000 Array{Float64,2}: .... ``` @@ -131,7 +131,7 @@ X[[20,30,40], :] X[20:40, [30,40,60]] ``` -Note that indexing again results in a `Computation` object of the type `GetIndex`. You can use it as input to another computation or call `gather` on it to get the indexed sub array. +Note that indexing again results in a `Computation` object of the type `GetIndex`. You can use it as input to another computation or call `collect` on it to get the indexed sub array. #### Sparse matrix support @@ -164,7 +164,7 @@ julia> compute(save(s1, "s1")) julia> x = load(Context(), "s1") Computed(10000x10000 Array{Float64,2} in 3x3 parts each of (max size) 4000x4000) -julia> gather(x) +julia> collect(x) 10000x10000 sparse matrix with 999793 Float64 entries: ... ``` @@ -205,7 +205,7 @@ result = compute(save(saved_A+saved_A', "ApAt")) - `reduceblock(f, c)` - reduce each block of data by applying `f` to the block. In block distributed array, the result has the same dimensionality as the input. - `reducebykey(f, c)` - given a collection of tuples or pairs, use the first element of the tuples as the key, and reduce the values of each key. Computes a Dict of results. -*Note: all these operations result in a `Computation` object. You need to call `compute` or `gather` on them to actually do the computation.* +*Note: all these operations result in a `Computation` object. You need to call `compute` or `collect` on them to actually do the computation.* **Array API** - Unary element-wise operations: @@ -229,10 +229,10 @@ $, &, (.!=), (.<), (.<=), (.==), (.>), - `*` on Computations can either stand for matrix-matrix or matrix-vector multiplications. - transpose on a matrix can be done using the `x'` syntax -**Compute and gather** +**Compute and collect** - `compute(ctx, c)` - compute a computation `c` -- `gather(ctx, c)` - compute the result, and collate the result on the host process (usually pid 1). +- `collect(ctx, c)` - compute the result, and collate the result on the host process (usually pid 1). **Context** diff --git a/src/Dagger.jl b/src/Dagger.jl index 8babe3c72..4b15ffe56 100644 --- a/src/Dagger.jl +++ b/src/Dagger.jl @@ -2,17 +2,19 @@ module Dagger using Compat +import Base.collect + include("lib/util.jl") include("lib/logging.jl") include("lib/dumbref.jl") -# Data and slices of data +# Distributed data +include("processor.jl") include("thunk.jl") include("domain.jl") include("chunks.jl") # Task scheduling -include("processor.jl") include("compute.jl") # Array computations diff --git a/src/array/darray.jl b/src/array/darray.jl index e03c3773d..cb5133d9e 100644 --- a/src/array/darray.jl +++ b/src/array/darray.jl @@ -76,8 +76,10 @@ domain(x::AbstractArray) = ArrayDomain([1:l for l in size(x)]) compute(ctx, x::ArrayOp) = compute(ctx, cached_stage(ctx, x)::DArray) -gather(ctx, x::ArrayOp) = - gather(ctx, cached_stage(ctx, x)::DArray) +collect(ctx::Context, x::ArrayOp) = + collect(ctx, compute(ctx, x)) + +collect(x::ArrayOp) = collect(Context(), x) @compat function Base.show(io::IO, ::MIME"text/plain", x::ArrayOp) write(io, string(typeof(x))) @@ -101,12 +103,12 @@ domainchunks(d::DArray) = d.subdomains size(x::DArray) = size(domain(x)) stage(ctx, c::DArray) = c -function gather(ctx, d::DArray) +function collect(ctx::Context, d::DArray) a = compute(ctx, d, persist=false) ps_input = chunks(a) ps = Array{Any}(size(ps_input)) @sync for i in 1:length(ps_input) - @async ps[i] = gather(ctx, ps_input[i]) + @async ps[i] = collect(ctx, ps_input[i]) end if isempty(ps) emptyarray(Array{eltype(d), ndims(d)}, size(d)...) diff --git a/src/array/sort.jl b/src/array/sort.jl index 313cb89a2..bf93e642c 100644 --- a/src/array/sort.jl +++ b/src/array/sort.jl @@ -41,9 +41,9 @@ function compute(ctx, s::Sort) DArray(compute(ctx, shuffle_merge(inp, splitter_ranks, splitters, s.order))) end -function delayed_map_and_gather(f, ctx, Xs...) +function delayed_map_and_collect(f, ctx, Xs...) result_parts = map(delayed(f, get_result=true), Xs...) - gather(ctx, delayed(tuple)(result_parts...)) + collect(ctx, delayed(tuple)(result_parts...)) end function pselect(ctx, A, ranks, ord) @@ -72,7 +72,7 @@ function pselect(ctx, A, ranks, ord) # find medians chunk_ranges = [vec(active_ranges[i,:]) for i in 1:Nc] - chunk_medians = delayed_map_and_gather(ctx, chunk_ranges, cs) do ranges, data + chunk_medians = delayed_map_and_collect(ctx, chunk_ranges, cs) do ranges, data # as many ranges as ranks to find map(r->submedian(data, r), ranges) end @@ -85,7 +85,7 @@ function pselect(ctx, A, ranks, ord) Ms = vec(mapslices(x->weightedmedian(x, ls, ord), median_matrix, 1)) # scatter weighted - LEGs = delayed_map_and_gather(ctx, cs, chunk_ranges) do chunk, ranges + LEGs = delayed_map_and_collect(ctx, cs, chunk_ranges) do chunk, ranges # for each median found right now, locate G,T,E vals map((range, m)->locate_pivot(chunk, range, m, ord), ranges, Ms) end diff --git a/src/chunks.jl b/src/chunks.jl index 44793e942..538780dd1 100644 --- a/src/chunks.jl +++ b/src/chunks.jl @@ -1,5 +1,5 @@ -export chunk, gather +export chunk, collect export domain, UnitDomain, project, alignfirst, ArrayDomain @@ -52,14 +52,14 @@ function unrelease{T}(c::Chunk{T,MemToken}) end unrelease(c::Chunk) = c -function gather(ctx, chunk::Chunk) +function collect(ctx::Context, chunk::Chunk) # delegate fetching to handle by default. - gather(ctx, chunk.handle) + collect(ctx, chunk.handle) end ### ChunkIO -function gather(ctx, ref::MemToken) +function collect(ctx::Context, ref::MemToken) res = fetch(ref) if isnull(res) throw(KeyError(ref)) diff --git a/src/compute.jl b/src/compute.jl index f7a1d6dde..bb6354a90 100644 --- a/src/compute.jl +++ b/src/compute.jl @@ -1,12 +1,17 @@ export stage, cached_stage, compute, debug_compute, cached, free! using Compat +compute(x) = compute(Context(), x) +compute(ctx, c::Chunk) = c + +collect(ctx::Context, c) = collect(ctx, compute(ctx, c)) +collect(d::Union{Chunk,Thunk}) = collect(Context(), d) + @compat abstract type Computation end -compute(x) = compute(Context(), x) -gather(d) = gather(Context(), d) compute(ctx, c::Computation) = compute(ctx, stage(ctx, c)) -gather(ctx, c) = gather(compute(ctx, c)) +collect(c::Computation) = collect(Context(), c) + function finish_task!(state, node, node_order; free=true) if istask(node) && node.cache @@ -278,7 +283,7 @@ function start_state(deps::Dict, node_order) end _move(ctx, to_proc, x) = x -_move(ctx, to_proc::OSProc, x::Union{Chunk, Thunk}) = gather(ctx, x) +_move(ctx, to_proc::OSProc, x::Union{Chunk, Thunk}) = collect(ctx, x) function do_task(ctx, proc, thunk_id, f, data, send_result, persist) @dbg timespan_start(ctx, :comm, thunk_id, proc) @@ -319,3 +324,6 @@ function debug_compute(arg; profile=false) dbgctx = Context(procs(ctx), LocalEventLog(), profile) debug_compute(dbgctx, arg) end + +Base.@deprecate gather(ctx, x) collect(ctx, x) +Base.@deprecate gather(x) collect(x) diff --git a/src/file-io.jl b/src/file-io.jl index b0cd73f00..ee315835f 100644 --- a/src/file-io.jl +++ b/src/file-io.jl @@ -62,7 +62,7 @@ function save(ctx, io::IO, chunk::Chunk, file_path) write(io, meta) data_offset = position(io) - save(ctx, io, gather(ctx, chunk)) + save(ctx, io, collect(ctx, chunk)) Chunk(chunktype(chunk), domain(chunk), FileReader(file_path, chunktype(chunk), data_offset, false), false) end @@ -172,7 +172,7 @@ function save(ctx, io::IO, m::BitArray) save(ctx, io, convert(Array{Bool}, m)) end -function gather{X,T<:Array}(ctx, c::Chunk{X,FileReader{T}}) +function collect{X,T<:Array}(ctx::Context, c::Chunk{X,FileReader{T}}) h = c.handle io = open(h.file, "r+") seek(io, h.data_offset) @@ -182,7 +182,7 @@ function gather{X,T<:Array}(ctx, c::Chunk{X,FileReader{T}}) arr end -function gather{X,T<:BitArray}(ctx, c::Chunk{X, FileReader{T}}) +function collect{X,T<:BitArray}(ctx::Context, c::Chunk{X, FileReader{T}}) h = c.handle io = open(h.file, "r+") seek(io, h.data_offset) @@ -210,7 +210,7 @@ function save{Tv, Ti}(ctx, io::IO, m::SparseMatrixCSC{Tv,Ti}) m end -function gather{X, T<:SparseMatrixCSC}(ctx, c::Chunk{X, FileReader{T}}) +function collect{X, T<:SparseMatrixCSC}(ctx::Context, c::Chunk{X, FileReader{T}}) h = c.handle io = open(h.file, "r+") seek(io, h.data_offset) @@ -237,7 +237,7 @@ function gather{X, T<:SparseMatrixCSC}(ctx, c::Chunk{X, FileReader{T}}) end function getsub{X,T<:AbstractArray}(ctx, c::Chunk{X,FileReader{T}}, d) - Chunk(gather(ctx, c)[d]) + Chunk(collect(ctx, c)[d]) end diff --git a/test/array.jl b/test/array.jl index 76d27139c..fb0fe84ad 100644 --- a/test/array.jl +++ b/test/array.jl @@ -3,7 +3,7 @@ import Dagger: chunks, DArray, domainchunks @testset "rand" begin function test_rand(X) X1 = compute(X) - X2 = gather(X1) + X2 = collect(X1) @test isa(X, Dagger.ArrayOp) @test isa(X1, Dagger.DArray) @@ -13,14 +13,14 @@ import Dagger: chunks, DArray, domainchunks @test domain(X1) == ArrayDomain(1:100, 1:100) @test domainchunks(X1) |> size == (10, 10) @test domainchunks(X1) == partition(Blocks(10, 10), ArrayDomain(1:100, 1:100)) - @test gather(X1) == gather(X1) + @test collect(X1) == collect(X1) end X = rand(Blocks(10, 10), 100, 100) test_rand(X) Xsp = sprand(Blocks(10, 10), 100, 100, 0.1) test_rand(Xsp) R = rand(Blocks(10), 20) - r = gather(R) + r = collect(R) @test r[1:10] != r[11:20] end @testset "sum(ones(...))" begin @@ -31,14 +31,14 @@ end @testset "distributing an array" begin function test_dist(X) X1 = Distribute(Blocks(10, 20), X) - @test gather(X1) == X + @test collect(X1) == X Xc = compute(X1) @test chunks(Xc) |> size == (10, 5) @test domainchunks(Xc) |> size == (10, 5) @test map(x->size(x) == (10, 20), domainchunks(Xc)) |> all end x = [1 2; 3 4] - @test gather(Distribute(Blocks(1,1), x)) == x + @test collect(Distribute(Blocks(1,1), x)) == x #test_dist(rand(100, 100)) #test_dist(sprand(100, 100, 0.1)) end @@ -47,7 +47,7 @@ end function test_transpose(X) x, y = size(X) X1 = Distribute(Blocks(10, 20), X) - @test gather(X1') == X' + @test collect(X1') == X' Xc = compute(X1') @test chunks(Xc) |> size == (div(y, 20), div(x,10)) @test domainchunks(Xc) |> size == (div(y, 20), div(x, 10)) @@ -66,8 +66,8 @@ end @test_throws DimensionMismatch compute(X1*X1) X2 = compute(X1'*X1) X3 = compute(X1*X1') - @test norm(gather(X2) - X'X) < tol - @test norm(gather(X3) - X*X') < tol + @test norm(collect(X2) - X'X) < tol + @test norm(collect(X3) - X*X') < tol @test chunks(X2) |> size == (2, 2) @test chunks(X3) |> size == (4, 4) @test map(x->size(x) == (20, 20), domainchunks(X2)) |> all @@ -78,16 +78,16 @@ end x = rand(10,10) X = Distribute(Blocks(3,3), x) y = rand(10) - @test norm(gather(X*y) - x*y) < 1e-13 + @test norm(collect(X*y) - x*y) < 1e-13 end @testset "concat" begin m = rand(75,75) x = Distribute(Blocks(10,20), m) y = Distribute(Blocks(10,10), m) - @test hcat(m,m) == gather(hcat(x,x)) - @test vcat(m,m) == gather(vcat(x,x)) - @test hcat(m,m) == gather(hcat(x,y)) + @test hcat(m,m) == collect(hcat(x,x)) + @test vcat(m,m) == collect(vcat(x,x)) + @test hcat(m,m) == collect(hcat(x,y)) @test_throws DimensionMismatch compute(vcat(x,y)) end @@ -96,25 +96,25 @@ end X = Distribute(Blocks(3,3), x) y = rand(10) - @test Diagonal(y)*x == gather(Diagonal(y)*X) + @test Diagonal(y)*x == collect(Diagonal(y)*X) end @testset "Getindex" begin function test_getindex(x) X = Distribute(Blocks(3,3), x) - @test gather(X[3:8, 2:7]) == x[3:8, 2:7] + @test collect(X[3:8, 2:7]) == x[3:8, 2:7] ragged_idx = [1,2,9,7,6,2,4,5] - @test gather(X[ragged_idx, 2:7]) == x[ragged_idx, 2:7] - @test gather(X[ragged_idx, reverse(ragged_idx)]) == x[ragged_idx, reverse(ragged_idx)] + @test collect(X[ragged_idx, 2:7]) == x[ragged_idx, 2:7] + @test collect(X[ragged_idx, reverse(ragged_idx)]) == x[ragged_idx, reverse(ragged_idx)] ragged_idx = [1,2,9,7,6,2,4,5] - @test gather(X[[2,7,10], :]) == x[[2,7,10], :] - @test gather(X[[], ragged_idx]) == x[[], ragged_idx] - @test gather(X[[], []]) == x[[], []] + @test collect(X[[2,7,10], :]) == x[[2,7,10], :] + @test collect(X[[], ragged_idx]) == x[[], ragged_idx] + @test collect(X[[], []]) == x[[], []] @testset "dimensionality reduction" begin # THESE NEED FIXING!! - @test vec(gather(X[ragged_idx, 5])) == vec(x[ragged_idx, 5]) - @test vec(gather(X[5, ragged_idx])) == vec(x[5, ragged_idx]) + @test vec(collect(X[ragged_idx, 5])) == vec(x[ragged_idx, 5]) + @test vec(collect(X[5, ragged_idx])) == vec(x[5, ragged_idx]) @test X[5, 5] == x[5,5] end end @@ -126,42 +126,42 @@ end @testset "cleanup" begin X = Distribute(Blocks(10,10), rand(10,10)) - @test gather(sin.(X)) == gather(sin.(X)) + @test collect(sin.(X)) == collect(sin.(X)) end @testset "sort" begin x = rand(1:10, 10) X = Distribute(Blocks(3), x) - @test gather(sort(X)) == sort(x) - @test gather(sort(X, rev=true, alg=Base.Sort.DEFAULT_STABLE)) == sort(x, rev=true, alg=Base.Sort.DEFAULT_STABLE) + @test collect(sort(X)) == sort(x) + @test collect(sort(X, rev=true, alg=Base.Sort.DEFAULT_STABLE)) == sort(x, rev=true, alg=Base.Sort.DEFAULT_STABLE) X = Distribute(Blocks(1), x) - @test gather(sort(X)) == sort(x) - @test gather(sort(X, rev=true)) == sort(x, rev=true) + @test collect(sort(X)) == sort(x) + @test collect(sort(X, rev=true)) == sort(x, rev=true) X = Distribute(Blocks(10), x) - @test gather(sort(X)) == sort(x) - @test gather(sort(X, rev=true)) == sort(x, rev=true) + @test collect(sort(X)) == sort(x) + @test collect(sort(X, rev=true)) == sort(x, rev=true) x = [("A",1), ("A",2), ("B",1)] y = compute(Distribute(Blocks(1), x)) - @test gather(sort(y)) == x + @test collect(sort(y)) == x x = ones(10) y = compute(Distribute(Blocks(3), x)) - @test map(x->length(gather(x)), compute(sort(y)).chunks) == [3,3,3,1] + @test map(x->length(collect(x)), compute(sort(y)).chunks) == [3,3,3,1] end @testset "reducedim" begin x = rand(1:10, 10, 5) X = Distribute(Blocks(3,3), x) - @test reducedim(+, x, 1) == gather(reducedim(+, X, 1)) - @test reducedim(+, x, 2) == gather(reducedim(+, X, 2)) + @test reducedim(+, x, 1) == collect(reducedim(+, X, 1)) + @test reducedim(+, x, 2) == collect(reducedim(+, X, 2)) x = rand(1:10, 10, 5) X = Distribute(Blocks(10, 10), x) - @test sum(x, 1) == gather(sum(X, 1)) - @test sum(x, 2) == gather(sum(X, 2)) + @test sum(x, 1) == collect(sum(X, 1)) + @test sum(x, 2) == collect(sum(X, 2)) end @testset "setindex" begin @@ -169,6 +169,6 @@ end y=copy(x) y[3:8, 2:7]=1.0 X = Distribute(Blocks(3,3), x) - @test gather(setindex(X,1.0, 3:8, 2:7)) == y - @test gather(X) == x + @test collect(setindex(X,1.0, 3:8, 2:7)) == y + @test collect(X) == x end diff --git a/test/cache.jl b/test/cache.jl index b0ba73e14..e884d48dd 100644 --- a/test/cache.jl +++ b/test/cache.jl @@ -16,15 +16,15 @@ using Dagger sum1 = delayed((x...)->sum([x...]))(map(delayed(sum), thunks1)...) thunks2 = map(delayed(-), thunks1) sum2 = delayed((x...)->sum([x...]))(map(delayed(sum), thunks2)...) - s1 = gather(sum1) - @test -s1 == gather(sum2) - @test s1 == gather(sum1) - @test -gather(sum1) == gather(sum2) + s1 = collect(sum1) + @test -s1 == collect(sum2) + @test s1 == collect(sum1) + @test -collect(sum1) == collect(sum2) thunks1 = map(delayed(_ -> rand(10^6), cache=true), workers()) sum1 = delayed((x...)->sum([x...]))(map(delayed(sum), thunks1)...) thunks2 = map(delayed(-), thunks1) sum2 = delayed((x...)->sum([x...]))(map(delayed(sum), thunks2)...) - s1 = gather(sum1) # this should evict thunk1s from memory - @test -s1 != gather(sum2) + s1 = collect(sum1) # this should evict thunk1s from memory + @test -s1 != collect(sum2) end