Skip to content

Commit

Permalink
rename gather to collect
Browse files Browse the repository at this point in the history
  • Loading branch information
Shashi Gowda committed Jun 8, 2017
1 parent 90c21e2 commit 8cec4e8
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 72 deletions.
14 changes: 7 additions & 7 deletions README.md
Expand Up @@ -38,10 +38,10 @@ b = compute(a)
# => Computed(10000x10000 Array{Float64,2} in 9 parts each of (max size) 4000x4000)
```

The result is an object containing metadata about the various pieces. They may be created on different workers and will stay there until a another worker needs it. However, you can request and get the whole data with the `gather` function.
The result is an object containing metadata about the various pieces. They may be created on different workers and will stay there until a another worker needs it. However, you can request and get the whole data with the `collect` function.

```julia
gather(b)
collect(b)
# => 10000x10000 Array{Float64,2}:
....
```
Expand Down Expand Up @@ -131,7 +131,7 @@ X[[20,30,40], :]
X[20:40, [30,40,60]]
```

Note that indexing again results in a `Computation` object of the type `GetIndex`. You can use it as input to another computation or call `gather` on it to get the indexed sub array.
Note that indexing again results in a `Computation` object of the type `GetIndex`. You can use it as input to another computation or call `collect` on it to get the indexed sub array.

#### Sparse matrix support

Expand Down Expand Up @@ -164,7 +164,7 @@ julia> compute(save(s1, "s1"))
julia> x = load(Context(), "s1")
Computed(10000x10000 Array{Float64,2} in 3x3 parts each of (max size) 4000x4000)
julia> gather(x)
julia> collect(x)
10000x10000 sparse matrix with 999793 Float64 entries:
...
```
Expand Down Expand Up @@ -205,7 +205,7 @@ result = compute(save(saved_A+saved_A', "ApAt"))
- `reduceblock(f, c)` - reduce each block of data by applying `f` to the block. In block distributed array, the result has the same dimensionality as the input.
- `reducebykey(f, c)` - given a collection of tuples or pairs, use the first element of the tuples as the key, and reduce the values of each key. Computes a Dict of results.

*Note: all these operations result in a `Computation` object. You need to call `compute` or `gather` on them to actually do the computation.*
*Note: all these operations result in a `Computation` object. You need to call `compute` or `collect` on them to actually do the computation.*

**Array API**
- Unary element-wise operations:
Expand All @@ -229,10 +229,10 @@ $, &, (.!=), (.<), (.<=), (.==), (.>),
- `*` on Computations can either stand for matrix-matrix or matrix-vector multiplications.
- transpose on a matrix can be done using the `x'` syntax

**Compute and gather**
**Compute and collect**

- `compute(ctx, c)` - compute a computation `c`
- `gather(ctx, c)` - compute the result, and collate the result on the host process (usually pid 1).
- `collect(ctx, c)` - compute the result, and collate the result on the host process (usually pid 1).

**Context**

Expand Down
6 changes: 4 additions & 2 deletions src/Dagger.jl
Expand Up @@ -2,17 +2,19 @@ module Dagger

using Compat

import Base.collect

include("lib/util.jl")
include("lib/logging.jl")
include("lib/dumbref.jl")

# Data and slices of data
# Distributed data
include("processor.jl")
include("thunk.jl")
include("domain.jl")
include("chunks.jl")

# Task scheduling
include("processor.jl")
include("compute.jl")

# Array computations
Expand Down
10 changes: 6 additions & 4 deletions src/array/darray.jl
Expand Up @@ -76,8 +76,10 @@ domain(x::AbstractArray) = ArrayDomain([1:l for l in size(x)])
compute(ctx, x::ArrayOp) =
compute(ctx, cached_stage(ctx, x)::DArray)

gather(ctx, x::ArrayOp) =
gather(ctx, cached_stage(ctx, x)::DArray)
collect(ctx::Context, x::ArrayOp) =
collect(ctx, compute(ctx, x))

collect(x::ArrayOp) = collect(Context(), x)

@compat function Base.show(io::IO, ::MIME"text/plain", x::ArrayOp)
write(io, string(typeof(x)))
Expand All @@ -101,12 +103,12 @@ domainchunks(d::DArray) = d.subdomains
size(x::DArray) = size(domain(x))
stage(ctx, c::DArray) = c

function gather(ctx, d::DArray)
function collect(ctx::Context, d::DArray)
a = compute(ctx, d, persist=false)
ps_input = chunks(a)
ps = Array{Any}(size(ps_input))
@sync for i in 1:length(ps_input)
@async ps[i] = gather(ctx, ps_input[i])
@async ps[i] = collect(ctx, ps_input[i])
end
if isempty(ps)
emptyarray(Array{eltype(d), ndims(d)}, size(d)...)
Expand Down
8 changes: 4 additions & 4 deletions src/array/sort.jl
Expand Up @@ -41,9 +41,9 @@ function compute(ctx, s::Sort)
DArray(compute(ctx, shuffle_merge(inp, splitter_ranks, splitters, s.order)))
end

function delayed_map_and_gather(f, ctx, Xs...)
function delayed_map_and_collect(f, ctx, Xs...)
result_parts = map(delayed(f, get_result=true), Xs...)
gather(ctx, delayed(tuple)(result_parts...))
collect(ctx, delayed(tuple)(result_parts...))
end

function pselect(ctx, A, ranks, ord)
Expand Down Expand Up @@ -72,7 +72,7 @@ function pselect(ctx, A, ranks, ord)
# find medians
chunk_ranges = [vec(active_ranges[i,:]) for i in 1:Nc]

chunk_medians = delayed_map_and_gather(ctx, chunk_ranges, cs) do ranges, data
chunk_medians = delayed_map_and_collect(ctx, chunk_ranges, cs) do ranges, data
# as many ranges as ranks to find
map(r->submedian(data, r), ranges)
end
Expand All @@ -85,7 +85,7 @@ function pselect(ctx, A, ranks, ord)
Ms = vec(mapslices(x->weightedmedian(x, ls, ord), median_matrix, 1))

# scatter weighted
LEGs = delayed_map_and_gather(ctx, cs, chunk_ranges) do chunk, ranges
LEGs = delayed_map_and_collect(ctx, cs, chunk_ranges) do chunk, ranges
# for each median found right now, locate G,T,E vals
map((range, m)->locate_pivot(chunk, range, m, ord), ranges, Ms)
end
Expand Down
8 changes: 4 additions & 4 deletions src/chunks.jl
@@ -1,5 +1,5 @@

export chunk, gather
export chunk, collect

export domain, UnitDomain, project, alignfirst, ArrayDomain

Expand Down Expand Up @@ -52,14 +52,14 @@ function unrelease{T}(c::Chunk{T,MemToken})
end
unrelease(c::Chunk) = c

function gather(ctx, chunk::Chunk)
function collect(ctx::Context, chunk::Chunk)
# delegate fetching to handle by default.
gather(ctx, chunk.handle)
collect(ctx, chunk.handle)
end


### ChunkIO
function gather(ctx, ref::MemToken)
function collect(ctx::Context, ref::MemToken)
res = fetch(ref)
if isnull(res)
throw(KeyError(ref))
Expand Down
16 changes: 12 additions & 4 deletions src/compute.jl
@@ -1,12 +1,17 @@
export stage, cached_stage, compute, debug_compute, cached, free!
using Compat

compute(x) = compute(Context(), x)
compute(ctx, c::Chunk) = c

collect(ctx::Context, c) = collect(ctx, compute(ctx, c))
collect(d::Union{Chunk,Thunk}) = collect(Context(), d)

@compat abstract type Computation end

compute(x) = compute(Context(), x)
gather(d) = gather(Context(), d)
compute(ctx, c::Computation) = compute(ctx, stage(ctx, c))
gather(ctx, c) = gather(compute(ctx, c))
collect(c::Computation) = collect(Context(), c)


function finish_task!(state, node, node_order; free=true)
if istask(node) && node.cache
Expand Down Expand Up @@ -278,7 +283,7 @@ function start_state(deps::Dict, node_order)
end

_move(ctx, to_proc, x) = x
_move(ctx, to_proc::OSProc, x::Union{Chunk, Thunk}) = gather(ctx, x)
_move(ctx, to_proc::OSProc, x::Union{Chunk, Thunk}) = collect(ctx, x)

function do_task(ctx, proc, thunk_id, f, data, send_result, persist)
@dbg timespan_start(ctx, :comm, thunk_id, proc)
Expand Down Expand Up @@ -319,3 +324,6 @@ function debug_compute(arg; profile=false)
dbgctx = Context(procs(ctx), LocalEventLog(), profile)
debug_compute(dbgctx, arg)
end

Base.@deprecate gather(ctx, x) collect(ctx, x)
Base.@deprecate gather(x) collect(x)
10 changes: 5 additions & 5 deletions src/file-io.jl
Expand Up @@ -62,7 +62,7 @@ function save(ctx, io::IO, chunk::Chunk, file_path)
write(io, meta)
data_offset = position(io)

save(ctx, io, gather(ctx, chunk))
save(ctx, io, collect(ctx, chunk))

Chunk(chunktype(chunk), domain(chunk), FileReader(file_path, chunktype(chunk), data_offset, false), false)
end
Expand Down Expand Up @@ -172,7 +172,7 @@ function save(ctx, io::IO, m::BitArray)
save(ctx, io, convert(Array{Bool}, m))
end

function gather{X,T<:Array}(ctx, c::Chunk{X,FileReader{T}})
function collect{X,T<:Array}(ctx::Context, c::Chunk{X,FileReader{T}})
h = c.handle
io = open(h.file, "r+")
seek(io, h.data_offset)
Expand All @@ -182,7 +182,7 @@ function gather{X,T<:Array}(ctx, c::Chunk{X,FileReader{T}})
arr
end

function gather{X,T<:BitArray}(ctx, c::Chunk{X, FileReader{T}})
function collect{X,T<:BitArray}(ctx::Context, c::Chunk{X, FileReader{T}})
h = c.handle
io = open(h.file, "r+")
seek(io, h.data_offset)
Expand Down Expand Up @@ -210,7 +210,7 @@ function save{Tv, Ti}(ctx, io::IO, m::SparseMatrixCSC{Tv,Ti})
m
end

function gather{X, T<:SparseMatrixCSC}(ctx, c::Chunk{X, FileReader{T}})
function collect{X, T<:SparseMatrixCSC}(ctx::Context, c::Chunk{X, FileReader{T}})
h = c.handle
io = open(h.file, "r+")
seek(io, h.data_offset)
Expand All @@ -237,7 +237,7 @@ function gather{X, T<:SparseMatrixCSC}(ctx, c::Chunk{X, FileReader{T}})
end

function getsub{X,T<:AbstractArray}(ctx, c::Chunk{X,FileReader{T}}, d)
Chunk(gather(ctx, c)[d])
Chunk(collect(ctx, c)[d])
end


Expand Down

0 comments on commit 8cec4e8

Please sign in to comment.