Skip to content

Commit

Permalink
Merge 8f19709 into 2ec5fa9
Browse files Browse the repository at this point in the history
  • Loading branch information
tkf committed Mar 12, 2020
2 parents 2ec5fa9 + 8f19709 commit e36e025
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 27 deletions.
10 changes: 5 additions & 5 deletions benchmark/multi-thread/bench_overhead.jl
Expand Up @@ -13,21 +13,21 @@ let n = 64
Map(_ -> nothing),
1:$n;
basesize = 1,
# terminatable = nothing,
# stoppable = nothing,
)
s["terminatable=true"] = @benchmarkable reduce(
s["stoppable=true"] = @benchmarkable reduce(
right,
Map(_ -> nothing),
1:$n;
basesize = 1,
terminatable = true,
stoppable = true,
)
s["terminatable=false"] = @benchmarkable reduce(
s["stoppable=false"] = @benchmarkable reduce(
right,
Map(_ -> nothing),
1:$n;
basesize = 1,
terminatable = false,
stoppable = false,
)
end

Expand Down
4 changes: 2 additions & 2 deletions src/progress.jl
Expand Up @@ -172,14 +172,14 @@ end

_reduce(
ctx,
terminatable,
stoppable,
task,
rf,
init,
coll::SizedReducible{<:ProgressLoggingFoldable},
) =
_reduce_progress(rf, init, coll) do rf, init, coll
_reduce(ctx, terminatable, task, rf, init, coll)
_reduce(ctx, stoppable, task, rf, init, coll)
end

if VERSION >= v"1.2"
Expand Down
34 changes: 17 additions & 17 deletions src/reduce.jl
@@ -1,5 +1,5 @@
"""
reduce(step, xf, reducible; [init, simd, basesize, terminatable]) :: T
reduce(step, xf, reducible; [init, simd, basesize, stoppable]) :: T
Thread-based parallelization of [`foldl`](@ref). The "bottom"
reduction function `step(::T, ::T) :: T` must be associative and
Expand All @@ -23,10 +23,10 @@ See also: [Parallel processing tutorial](@ref tutorial-parallel),
* computation time for processing each item fluctuates a lot
* computation can be terminated by [`reduced`](@ref) or
transducers using it, such as [`ReduceIf`](@ref)
- `terminatable::Bool`: [This option usually does not have to be set
- `stoppable::Bool`: [This option usually does not have to be set
manually.] Transducers.jl's `reduce` has a slight overhead for
supporting terminatable reduction with [`reduced`](@ref). It can be
disabled by passing `terminatable = false`. It is automatically set
supporting stoppable reduction with [`reduced`](@ref). It can be
disabled by passing `stoppable = false`. It is automatically set
when needed.
- For other keyword arguments, see [`foldl`](@ref).
Expand Down Expand Up @@ -130,18 +130,18 @@ function transduce_assoc(
coll;
simd::SIMDFlag = Val(false),
basesize::Integer = length(coll) ÷ Threads.nthreads(),
terminatable::Union{Bool,Nothing} = nothing,
stoppable::Union{Bool,Nothing} = nothing,
)
rf = maybe_usesimd(Reduction(xform, step), simd)
if terminatable === nothing
terminatable = _might_return_reduced(rf, init, coll)
if stoppable === nothing
stoppable = _might_return_reduced(rf, init, coll)
end
acc = @return_if_reduced _transduce_assoc_nocomplete(
rf,
init,
coll,
basesize,
terminatable,
stoppable,
)
result = complete(rf, acc)
if unreduced(result) isa DefaultInit
Expand All @@ -158,10 +158,10 @@ else
maybe_collect(coll) = collect(coll)
end

function _transduce_assoc_nocomplete(rf, init, coll, basesize, terminatable = true)
function _transduce_assoc_nocomplete(rf, init, coll, basesize, stoppable = true)
reducible = SizedReducible(maybe_collect(coll), basesize)
@static if VERSION >= v"1.3-alpha"
return _reduce(TaskContext(), terminatable, DummyTask(), rf, init, reducible)
return _reduce(TaskContext(), stoppable, DummyTask(), rf, init, reducible)
else
return _reduce_threads_for(rf, init, reducible)
end
Expand All @@ -175,7 +175,7 @@ end

function _reduce(
ctx,
terminatable,
stoppable,
next_task,
rf::R,
init::I,
Expand All @@ -184,12 +184,12 @@ function _reduce(
if should_abort(ctx)
# As other tasks may be calling `fetch` on `next_task`, it
# _must_ be scheduled at some point to avoid dead lock:
terminatable && schedule(next_task)
stoppable && schedule(next_task)
# Maybe use `error=false`? Or pass something and get it via `yieldto`?
return init
end
if issmall(reducible)
terminatable && schedule(next_task)
stoppable && schedule(next_task)
acc = _reduce_basecase(rf, init, reducible)
if acc isa Reduced
cancel!(ctx)
Expand All @@ -198,9 +198,9 @@ function _reduce(
else
left, right = _halve(reducible)
fg, bg = splitcontext(ctx)
task = nonsticky!(@task _reduce(bg, terminatable, next_task, rf, init, right))
terminatable || schedule(task)
a0 = _reduce(fg, terminatable, task, rf, init, left)
task = nonsticky!(@task _reduce(bg, stoppable, next_task, rf, init, right))
stoppable || schedule(task)
a0 = _reduce(fg, stoppable, task, rf, init, left)
b0 = fetch(task)
a = @return_if_reduced a0
should_abort(ctx) && return a # slight optimization
Expand Down Expand Up @@ -253,7 +253,7 @@ combine_step(rf) =
end

# The output of `reduce` is correct regardless of the value of
# `terminatable`. Thus, we can use `return_type` here purely for
# `stoppable`. Thus, we can use `return_type` here purely for
# optimization.
_might_return_reduced(rf, init, coll) =
Base.typeintersect(
Expand Down
6 changes: 3 additions & 3 deletions test/threads/test_parallel_reduce.jl
Expand Up @@ -38,13 +38,13 @@ end
end

@testset "early termination (grid)" begin
@testset for needle in 1:20, len in 1:20, terminatable in [true, false]
@testset for needle in 1:20, len in 1:20, stoppable in [true, false]
@test reduce(
right,
ReduceIf(x -> x >= needle),
1:len;
basesize = 1,
terminatable = terminatable,
stoppable = stoppable,
) == min(needle, len)
end
end
Expand All @@ -56,7 +56,7 @@ end
xs = collect(enumerate(rand(rng, 1:100, 100 * basesize)))
xf = ReduceIf(x -> x[2] >= p)
@test reduce(right, xf, xs; basesize=basesize) == foldl(right, xf, xs)
@test reduce(right, xf, xs; basesize = basesize, terminatable = false) ==
@test reduce(right, xf, xs; basesize = basesize, stoppable = false) ==
foldl(right, xf, xs)
end
end
Expand Down

0 comments on commit e36e025

Please sign in to comment.