Skip to content

Commit

Permalink
Avoid stack overflow error with Future.reduce
Browse files Browse the repository at this point in the history
This completely splits the implementation between the ordered and
unordered reduce, as it becomes rather difficult to share code between
the two.

This tweaks the internal interface slightly, in particular by not using
an instance variable for the accumulator in either case.

With a long list of resolved futures, the previous implementation of an
ordered reduce caused a stack-overflow exception, as each call to
reduce_next happened within the previous call (synchronously).

This changes the behavior to loop over futures instead of using
recursion while on_complete dispatches synchronously in the same thread.
The implementation is heavily based on the non-recursive implementation
of Future.after
  • Loading branch information
grddev committed Oct 26, 2015
1 parent 878c4ab commit 892dcca
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 47 deletions.
103 changes: 56 additions & 47 deletions lib/ione/future.rb
Expand Up @@ -874,64 +874,56 @@ class ReducingFuture < CompletableFuture
def initialize(futures, initial_value, reducer)
super()
@futures = Array(futures)
@remaining = @futures.size
@initial_value = initial_value
@accumulator = initial_value.nil? ? NO_INITIAL_VALUE : initial_value
@initial_value = initial_value.nil? ? NO_INITIAL_VALUE : initial_value
@reducer = reducer
end

private

def reduce_one(value)
unless failed?
@lock.lock
begin
if @accumulator.equal?(NO_INITIAL_VALUE)
@accumulator = value
else
@accumulator = @reducer.call(@accumulator, value)
end
@remaining -= 1
rescue => e
@lock.unlock
fail(e)
else
@lock.unlock
end
unless failed?
if @remaining == 0
resolve(@accumulator)
:done
else
:continue
end
end
end
end
end

# @private
class OrderedReducingFuture < ReducingFuture
def initialize(futures, initial_value, reducer)
super
if @remaining > 0
reduce_next(0)
if @futures.empty?
resolve(@initial_value.equal?(NO_INITIAL_VALUE) ? nil : @initial_value)
elsif @initial_value.equal?(NO_INITIAL_VALUE)
@futures.shift.on_complete(&method(:reduce_next))
else
resolve(@initial_value)
reduce_next(@initial_value, nil)
end
end

private

def reduce_next(i)
@futures[i].on_complete do |v, e|
unless failed?
if e
fail(e)
elsif reduce_one(v) == :continue
reduce_next(i + 1)
def reduce_next(accumulator, e)
if e
@futures = nil
fail(e)
elsif @futures.empty?
@futures = nil
resolve(accumulator)
else
outer = Thread.current
looping = more = true
while more
more = false
@futures.shift.on_complete do |v, ee|
if ee
reduce_next(nil, ee)
else
begin
accumulator = @reducer.call(accumulator, v)
if @futures.empty? || !looping || !Thread.current.equal?(outer)
reduce_next(accumulator, nil)
else
more = true
end
rescue => eee
reduce_next(nil, eee)
end
end
end
end
looping = false
end
end
end
Expand All @@ -940,20 +932,37 @@ def reduce_next(i)
class UnorderedReducingFuture < ReducingFuture
def initialize(futures, initial_value, reducer)
super
if @remaining > 0
futures.each do |f|
if @futures.empty?
resolve(@initial_value.equal?(NO_INITIAL_VALUE) ? nil : @initial_value)
else
accumulator = @initial_value
remaining = @futures.size
@futures.each do |f|
f.on_complete do |v, e|
unless failed?
if e
fail(e)
else
reduce_one(v)
done = false
@lock.lock
begin
accumulator = accumulator.equal?(NO_INITIAL_VALUE) ? v : @reducer.call(accumulator, v)
remaining -= 1
done = (remaining == 0)
rescue => ee
@lock.unlock
fail(ee)
else
@lock.unlock
end
if done
@futures = nil
resolve(accumulator)
end
end
end
end
end
else
resolve(@initial_value)
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions spec/ione/future_spec.rb
Expand Up @@ -916,6 +916,12 @@ def delayed(*context, &listener)
future.value.should == 6
end

it 'handles a really long list of futures' do
futures = Array.new(10000, Future.resolved(1))
future = Future.reduce(futures, 0, &:+)
future.value.should eq(10000)
end

context 'when the :ordered option is false' do
it 'calls the block with the values in the order of completion, when the :ordered option is false' do
promises = [Promise.new, Promise.new, Promise.new]
Expand Down Expand Up @@ -949,6 +955,12 @@ def delayed(*context, &listener)
future.should be_failed
end

it 'handles a really long list of futures' do
futures = Array.new(10000, Future.resolved(1))
future = Future.reduce(futures, 0, ordered: false, &:+)
future.value.should eq(10000)
end

context 'when the list of futures is empty' do
it 'returns a future that resolves to the initial value' do
Future.reduce([], :foo, ordered: false).value.should == :foo
Expand Down

0 comments on commit 892dcca

Please sign in to comment.