Skip to content

Commit

Permalink
Make parallel reduce work with Dict (#268)
Browse files Browse the repository at this point in the history
Use `SplittablesBase.amount` instead of `length` for implementing
parallel `reduce`.  `Dict` still does not work with `dreduce` because
it uses index-based partitioning.
  • Loading branch information
tkf committed May 11, 2020
1 parent 62ad9f6 commit 62f35e6
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Project.toml
Expand Up @@ -21,7 +21,7 @@ BangBang = "0.3.18"
InitialValues = "0.2.1"
Requires = "0.5, 1.0"
Setfield = "0.3, 0.4, 0.5, 0.6"
SplittablesBase = "0.1"
SplittablesBase = "0.1.2"
Tables = "0.2, 1.0"
julia = "1"

Expand Down
4 changes: 2 additions & 2 deletions benchmark/Manifest.toml
Expand Up @@ -201,11 +201,11 @@ uuid = "2f01184e-e22b-5df5-ae63-d93ebab69eaf"

[[SplittablesBase]]
deps = ["Setfield", "Test"]
git-tree-sha1 = "394055c80b68d7da0d85c9829e8fbf1243b69b19"
git-tree-sha1 = "1af78869dfc6ebc890064f72f8e0b58a0fbafc0b"
repo-rev = "master"
repo-url = "https://github.com/tkf/SplittablesBase.jl.git"
uuid = "171d559e-b47b-412a-8079-5efa626c420e"
version = "0.1.0"
version = "0.1.2-DEV"

[[Statistics]]
deps = ["LinearAlgebra", "SparseArrays"]
Expand Down
4 changes: 2 additions & 2 deletions docs/Manifest.toml
Expand Up @@ -279,11 +279,11 @@ version = "1.0.0"

[[SplittablesBase]]
deps = ["Setfield", "Test"]
git-tree-sha1 = "66813cea8bdd5b4713b5ff93ebe7f155706bffe8"
git-tree-sha1 = "1af78869dfc6ebc890064f72f8e0b58a0fbafc0b"
repo-rev = "master"
repo-url = "https://github.com/tkf/SplittablesBase.jl.git"
uuid = "171d559e-b47b-412a-8079-5efa626c420e"
version = "0.1.1-DEV"
version = "0.1.2-DEV"

[[StaticArrays]]
deps = ["LinearAlgebra", "Random", "Statistics"]
Expand Down
2 changes: 1 addition & 1 deletion src/Transducers.jl
Expand Up @@ -23,7 +23,7 @@ using Logging: LogLevel, @logmsg
using Requires
using InitialValues: InitialValues, InitialValue, SpecificInitialValue, Init,
hasinitialvalue, asmonoid
using SplittablesBase: halve
using SplittablesBase: amount, halve

import Setfield
using Setfield: @lens, @set, set
Expand Down
4 changes: 2 additions & 2 deletions src/dreduce.jl
Expand Up @@ -19,7 +19,7 @@ See also: [Parallel processing tutorial](@ref tutorial-parallel),
# Keyword Arguments
- `pool::AbstractWorkerPool`: Passed to `Distributed.remotecall`.
- `basesize::Integer = length(array) ÷ nworkers()`: A size of chunk in
- `basesize::Integer = amount(array) ÷ nworkers()`: A size of chunk in
`array` that is processed by each worker. A smaller size may be
required when computation time for processing each item can
fluctuate a lot.
Expand Down Expand Up @@ -50,7 +50,7 @@ See [`dreduce`](@ref) and [`transduce`](@ref).
function dtransduce(
xform::Transducer, step, init, coll;
simd::SIMDFlag = Val(false),
basesize::Integer = max(1, length(coll) ÷ Distributed.nworkers()),
basesize::Integer = max(1, amount(coll) ÷ Distributed.nworkers()),
threads_basesize::Integer = max(1, basesize ÷ Threads.nthreads()),
pool::Distributed.AbstractWorkerPool = Distributed.default_worker_pool(),
_remote_reduce = _transduce_assoc_nocomplete,
Expand Down
8 changes: 4 additions & 4 deletions src/reduce.jl
Expand Up @@ -17,7 +17,7 @@ See also: [Parallel processing tutorial](@ref tutorial-parallel),
[`foldl`](@ref), [`dreduce`](@ref).
# Keyword Arguments
- `basesize::Integer = length(reducible) ÷ nthreads()`: A size of
- `basesize::Integer = amount(reducible) ÷ nthreads()`: A size of
chunk in `reducible` that is processed by each worker. A smaller
size may be required when:
* computation time for processing each item fluctuates a lot
Expand Down Expand Up @@ -81,11 +81,11 @@ Check if `reducible` collection is considered small compared to
`basesize` (an integer). Fold functions such as [`reduce`](@ref)
switches to sequential `__foldl__` when `issmall` returns `true`.
Default implementation is `length(reducible) <= basesize`.
Default implementation is `amount(reducible) <= basesize`.
"""
issmall

issmall(reducible, basesize) = length(reducible) <= basesize
issmall(reducible, basesize) = amount(reducible) <= basesize

issmall(reducible::SizedReducible) =
issmall(reducible.reducible, max(reducible.basesize, 1))
Expand Down Expand Up @@ -135,7 +135,7 @@ function transduce_assoc(
init,
coll;
simd::SIMDFlag = Val(false),
basesize::Integer = length(coll) ÷ Threads.nthreads(),
basesize::Integer = amount(coll) ÷ Threads.nthreads(),
stoppable::Union{Bool,Nothing} = nothing,
)
rf = _reducingfunction(xform, step; init = init, simd = simd)
Expand Down
4 changes: 2 additions & 2 deletions test/environments/jl10/Manifest.toml
Expand Up @@ -309,11 +309,11 @@ version = "1.0.0"

[[SplittablesBase]]
deps = ["Setfield", "Test"]
git-tree-sha1 = "394055c80b68d7da0d85c9829e8fbf1243b69b19"
git-tree-sha1 = "1af78869dfc6ebc890064f72f8e0b58a0fbafc0b"
repo-rev = "master"
repo-url = "https://github.com/tkf/SplittablesBase.jl.git"
uuid = "171d559e-b47b-412a-8079-5efa626c420e"
version = "0.1.0"
version = "0.1.2-DEV"

[[StaticArrays]]
deps = ["LinearAlgebra", "Random", "Statistics"]
Expand Down
4 changes: 2 additions & 2 deletions test/environments/main/Manifest.toml
Expand Up @@ -311,11 +311,11 @@ version = "1.0.0"

[[SplittablesBase]]
deps = ["Setfield", "Test"]
git-tree-sha1 = "66813cea8bdd5b4713b5ff93ebe7f155706bffe8"
git-tree-sha1 = "1af78869dfc6ebc890064f72f8e0b58a0fbafc0b"
repo-rev = "master"
repo-url = "https://github.com/tkf/SplittablesBase.jl.git"
uuid = "171d559e-b47b-412a-8079-5efa626c420e"
version = "0.1.1-DEV"
version = "0.1.2-DEV"

[[StaticArrays]]
deps = ["LinearAlgebra", "Random", "Statistics"]
Expand Down
16 changes: 16 additions & 0 deletions test/threads/test_fold.jl
Expand Up @@ -26,4 +26,20 @@ const parseint = Base.Fix1(parse, Int)
end
end

# TODO: make them work with `dreduce`
@testset "$_fold" for _fold in [foldl, reduce]
if _fold === foldl
fold = _fold
else
fold(args...; kw...) = _fold(args...; basesize = 1, kw...)
end
@testset "dict" begin
dict = Dict(zip("1234", 1:4))
@test fold(+, Map(last), dict) == 10
@test fold(add, Map(last), dict) == 10
@test fold(+, Map(parseint first), dict) == 10
@test fold(add, Map(parseint first), dict) == 10
end
end

end # module

0 comments on commit 62f35e6

Please sign in to comment.