Skip to content

Commit

Permalink
speed up select 2x
Browse files Browse the repository at this point in the history
  • Loading branch information
Shashi Gowda committed Aug 8, 2016
1 parent 888b4a1 commit 66c08ba
Showing 1 changed file with 13 additions and 12 deletions.
25 changes: 13 additions & 12 deletions src/array/sort.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,35 @@ Base.sort(x::LazyArray; kwargs...) =
Sort(x, Dict(kwargs))

size(x::LazyArray) = size(x.input)

function compute(ctx, s::Sort)
inp = cached_stage(ctx, s.input)
inp = compute(Context(), mappart(p->sort(p; s.kwargs...), s.input)).result
ps = parts(inp)

sorted_parts = map(p->Thunk(x->sort(x; s.kwargs...), (p,)), ps)
blockwise_sorted = compute(Context(), Cat(Any, domain(inp), sorted_parts))
persist!(blockwise_sorted)
persist!(inp)

ls = map(length, parts(domain(inp)))
splitter_ranks = cumsum(ls)[1:end-1]

splitters = select(ctx, blockwise_sorted, splitter_ranks)
ComputedArray(compute(Context(), shuffle_merge(blockwise_sorted, splitters)))
splitters = select(ctx, inp, splitter_ranks)
ComputedArray(compute(Context(), shuffle_merge(inp, splitters)))
end

function mappart_eager(f, ctx, xs, T=Any)
function mappart_eager(f, ctx, xs, T, name)
ps = parts(xs)
thunks = Thunk[Thunk(f(i), (ps[i],))
master=OSProc(1)
#@dbg timespan_start(ctx, name, 0, master)
thunks = Thunk[Thunk(f(i), (ps[i],), get_result=true)
for i in 1:length(ps)]

gather(ctx, Thunk((xs...)->T[xs...], (thunks...)))
res = compute(ctx, Thunk((xs...)->T[xs...], (thunks...), meta=true))
#@dbg timespan_end(ctx, name, 0, master)
res
end

function broadcast1(ctx, f, xs::Cat, m,T)
ps = parts(xs)
@assert size(m, 1) == length(ps)
mappart_eager(ctx, xs,Vector{T}) do i
mappart_eager(ctx, xs,Vector{T},:broadcast1) do i
inp = vec(m[i,:])
function (p)
map(x->f(p, x)::T, inp)
Expand All @@ -48,7 +49,7 @@ end
function broadcast2(ctx, f, xs::Cat, m,v,T)
ps = parts(xs)
@assert size(m, 1) == length(ps)
mappart_eager(ctx, xs,Vector{T}) do i
mappart_eager(ctx, xs,Vector{T},:broadcast2) do i
inp = vec(m[i,:])
function (p)
map((x,y)->f(p, x, y)::T, inp, vec(v))
Expand Down

0 comments on commit 66c08ba

Please sign in to comment.