Skip to content

Commit

Permalink
Add Distributed.jl-based parallelization of reduce
Browse files Browse the repository at this point in the history
  • Loading branch information
tkf committed Oct 12, 2019
1 parent d14a2ea commit 2e8bfb3
Show file tree
Hide file tree
Showing 10 changed files with 103 additions and 4 deletions.
1 change: 1 addition & 0 deletions Project.toml
Expand Up @@ -6,6 +6,7 @@ version = "0.4.2"
[deps]
ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197"
BangBang = "198e06fe-97b7-11e9-32a5-e1d131e6ad66"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
InitialValues = "22cec73e-a1b8-11e9-2c92-598750a2cf9c"
Markdown = "d6f4376e-aef5-505a-96c1-9c027394607a"
Requires = "ae029012-a4dd-5104-9daa-d747884805df"
Expand Down
2 changes: 1 addition & 1 deletion benchmark/Manifest.toml
Expand Up @@ -191,7 +191,7 @@ uuid = "0796e94c-ce3b-5d07-9a54-7f471281c624"
version = "0.5.6"

[[Transducers]]
deps = ["ArgCheck", "BangBang", "InitialValues", "Markdown", "Requires", "Setfield"]
deps = ["ArgCheck", "BangBang", "Distributed", "InitialValues", "Markdown", "Requires", "Setfield"]
path = ".."
uuid = "28d57a85-8fef-5791-bfe6-a80928e7c999"
version = "0.4.2"
Expand Down
2 changes: 1 addition & 1 deletion docs/Manifest.toml
Expand Up @@ -258,7 +258,7 @@ uuid = "0796e94c-ce3b-5d07-9a54-7f471281c624"
version = "0.5.6"

[[Transducers]]
deps = ["ArgCheck", "BangBang", "InitialValues", "Markdown", "Requires", "Setfield"]
deps = ["ArgCheck", "BangBang", "Distributed", "InitialValues", "Markdown", "Requires", "Setfield"]
path = ".."
uuid = "28d57a85-8fef-5791-bfe6-a80928e7c999"
version = "0.4.2"
Expand Down
1 change: 1 addition & 0 deletions docs/Project.toml
Expand Up @@ -2,6 +2,7 @@
ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197"
BangBang = "198e06fe-97b7-11e9-32a5-e1d131e6ad66"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
InitialValues = "22cec73e-a1b8-11e9-2c92-598750a2cf9c"
Literate = "98b081ad-f1c9-55d3-8b20-4c87d4299306"
Expand Down
2 changes: 2 additions & 0 deletions docs/src/manual.md
Expand Up @@ -14,6 +14,8 @@ foldl
foreach
mapreduce
reduce
dreduce
dtransduce
eduction
map!
copy!
Expand Down
4 changes: 3 additions & 1 deletion src/Transducers.jl
Expand Up @@ -5,7 +5,7 @@ export Transducer, Map, Filter, Cat, MapCat, Take, PartitionBy, Scan, Zip,
Interpose, Dedupe, Partition, Iterated, Count, GroupBy, ReduceIf,
TakeLast, FlagFirst, MapSplat, ScanEmit, Enumerate, NotA, OfType,
transduce, eduction, setinput, Reduced, reduced, unreduced, ifunreduced,
Completing, OnInit, CopyInit, right, reducingfunction,
Completing, OnInit, CopyInit, right, reducingfunction, dreduce, dtransduce,
AdHocFoldable

# Deprecated:
Expand All @@ -15,6 +15,7 @@ using Base.Broadcast: Broadcasted

using ArgCheck
using BangBang: push!!, empty!!, setindex!!
using Distributed: Distributed, @everywhere
using Requires
using InitialValues: InitialValues, InitialValue, SpecificInitialValue, Init,
hasinitialvalue
Expand Down Expand Up @@ -47,6 +48,7 @@ include("library.jl")
include("simd.jl")
include("processes.jl")
include("reduce.jl")
include("dreduce.jl")
include("air.jl")
include("lister.jl")
include("show.jl")
Expand Down
68 changes: 68 additions & 0 deletions src/dreduce.jl
@@ -0,0 +1,68 @@
"""
dreduce(step, xform::Transducer, array; [init, simd, basesize, pool])
Distributed.jl-based parallelization of `reduce`. Input collection must
be indexable.
# Keyword Arguments
- `pool::AbstractWorkerPool`: Passed to `Distributed.remotecall`.
- `basesize::Integer`: A size of chunk in `itr` that is processed by
each worker. Unlike `reduce`, this is `length(itr) ÷ nworkers()`
so that inter-process communication is minimized. A smaller size may
be required when computation time for processing each item can fluctuate
a lot.
- For other keyword arguments, see [`reduce`](@ref).
# Examples
```jldoctest
julia> using Transducers
using Distributed
using BangBang
julia> dreduce(append!!, Map(vcat), 1:3; init=Union{}[])
3-element Array{Int64,1}:
1
2
3
```
"""
dreduce(step, xform::Transducer, itr; init=MissingInit(), kwargs...) =
unreduced(dtransduce(xform, Completing(step), init, itr); kwargs...)

"""
dtransduce(xform::Transducer, step, init, array; [simd, basesize, pool])
See [`dreduce`](@ref) and [`transduce`](@ref).
"""
function dtransduce(
xform::Transducer, step, init, coll;
simd::SIMDFlag = Val(false),
basesize::Integer = length(coll) ÷ Distributed.nworkers(),
pool::Distributed.AbstractWorkerPool = Distributed.default_worker_pool(),
)
@argcheck basesize > 0
load_me_everywhere()
rf = maybe_usesimd(Reduction(xform, step), simd)
futures = map(firstindex(coll):basesize:lastindex(coll)) do start
Distributed.remotecall(
foldl_nocomplete,
pool,
rf,
init,
coll[start:min(end, start - 1 + basesize)],
)
end
# TODO: Cancel remote computation when there is a Reduced.
results = map(fetch, futures)
i = findfirst(isreduced, results)
i === nothing || return results[i]
c = foldl(results) do a, b
combine(rf, a, b)
end
return complete(rf, c)
end

function load_me_everywhere()
pkgid = Base.PkgId(@__MODULE__)
@everywhere Base.require($pkgid)
end
2 changes: 1 addition & 1 deletion test/Manifest.toml
Expand Up @@ -276,7 +276,7 @@ uuid = "0796e94c-ce3b-5d07-9a54-7f471281c624"
version = "0.5.6"

[[Transducers]]
deps = ["ArgCheck", "BangBang", "InitialValues", "Markdown", "Requires", "Setfield"]
deps = ["ArgCheck", "BangBang", "Distributed", "InitialValues", "Markdown", "Requires", "Setfield"]
path = ".."
uuid = "28d57a85-8fef-5791-bfe6-a80928e7c999"
version = "0.4.2"
Expand Down
1 change: 1 addition & 0 deletions test/Project.toml
Expand Up @@ -3,6 +3,7 @@ ArgCheck = "dce04be8-c92d-5529-be00-80e4d2c0e197"
BangBang = "198e06fe-97b7-11e9-32a5-e1d131e6ad66"
BlockArrays = "8e7c35d0-a365-5155-bbbb-fb81a777f24e"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b"
Documenter = "e30172f5-a6a5-5a46-863b-614d45cd2de4"
InitialValues = "22cec73e-a1b8-11e9-2c92-598750a2cf9c"
InteractiveUtils = "b77e0a4c-d291-57a0-90e8-8db25a27a240"
Expand Down
24 changes: 24 additions & 0 deletions test/test_distributed_reduce.jl
@@ -0,0 +1,24 @@
module TestDistributedReduce

include("preamble.jl")
using BangBang
using Distributed

if get(ENV, "CI", "false") == "true"
addprocs(3)
end
@info "Testing with:" nworkers()

@testset begin
fname = gensym(:attach_pid)
@everywhere $fname(x) = [(x, getpid())]
fun = getproperty(Main, fname)

pids = fetch.(map(id -> remotecall(getpid, id), workers()))
xs = 1:10
ys = dreduce(append!!, Map(fun), xs; init=Union{}[])
@test first.(ys) == xs
@test Set(last.(ys)) == Set(pids)
end

end # module

0 comments on commit 2e8bfb3

Please sign in to comment.