Skip to content

Commit

Permalink
Merge branch 'pr_34'
Browse files Browse the repository at this point in the history
  • Loading branch information
iconara committed Oct 27, 2015
2 parents 85c946b + c469090 commit 6c51254
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 47 deletions.
103 changes: 56 additions & 47 deletions lib/ione/future.rb
Expand Up @@ -879,64 +879,56 @@ class ReducingFuture < CompletableFuture
def initialize(futures, initial_value, reducer)
super()
@futures = Array(futures)
@remaining = @futures.size
@initial_value = initial_value
@accumulator = initial_value.nil? ? NO_INITIAL_VALUE : initial_value
@initial_value = initial_value.nil? ? NO_INITIAL_VALUE : initial_value
@reducer = reducer
end

private

def reduce_one(value)
unless failed?
@lock.lock
begin
if @accumulator.equal?(NO_INITIAL_VALUE)
@accumulator = value
else
@accumulator = @reducer.call(@accumulator, value)
end
@remaining -= 1
rescue => e
@lock.unlock
fail(e)
else
@lock.unlock
end
unless failed?
if @remaining == 0
resolve(@accumulator)
:done
else
:continue
end
end
end
end
end

# @private
class OrderedReducingFuture < ReducingFuture
def initialize(futures, initial_value, reducer)
super
if @remaining > 0
reduce_next(0)
if @futures.empty?
resolve(@initial_value.equal?(NO_INITIAL_VALUE) ? nil : @initial_value)
elsif @initial_value.equal?(NO_INITIAL_VALUE)
@futures.shift.on_complete(&method(:reduce_next))
else
resolve(@initial_value)
reduce_next(@initial_value, nil)
end
end

private

def reduce_next(i)
@futures[i].on_complete do |v, e|
unless failed?
if e
fail(e)
elsif reduce_one(v) == :continue
reduce_next(i + 1)
def reduce_next(accumulator, e)
if e
@futures = nil
fail(e)
elsif @futures.empty?
@futures = nil
resolve(accumulator)
else
outer = Thread.current
looping = more = true
while more
more = false
@futures.shift.on_complete do |v, ee|
if ee
reduce_next(nil, ee)
else
begin
accumulator = @reducer.call(accumulator, v)
if @futures.empty? || !looping || !Thread.current.equal?(outer)
reduce_next(accumulator, nil)
else
more = true
end
rescue => eee
reduce_next(nil, eee)
end
end
end
end
looping = false
end
end
end
Expand All @@ -945,20 +937,37 @@ def reduce_next(i)
class UnorderedReducingFuture < ReducingFuture
def initialize(futures, initial_value, reducer)
super
if @remaining > 0
futures.each do |f|
if @futures.empty?
resolve(@initial_value.equal?(NO_INITIAL_VALUE) ? nil : @initial_value)
else
accumulator = @initial_value
remaining = @futures.size
@futures.each do |f|
f.on_complete do |v, e|
unless failed?
if e
fail(e)
else
reduce_one(v)
done = false
@lock.lock
begin
accumulator = accumulator.equal?(NO_INITIAL_VALUE) ? v : @reducer.call(accumulator, v)
remaining -= 1
done = (remaining == 0)
rescue => ee
@lock.unlock
fail(ee)
else
@lock.unlock
end
if done
@futures = nil
resolve(accumulator)
end
end
end
end
end
else
resolve(@initial_value)
end
end
end
Expand Down
12 changes: 12 additions & 0 deletions spec/ione/future_spec.rb
Expand Up @@ -916,6 +916,12 @@ def delayed(*context, &listener)
future.value.should == 6
end

it 'handles a really long list of futures' do
futures = Array.new(10000, Future.resolved(1))
future = Future.reduce(futures, 0, &:+)
future.value.should eq(10000)
end

context 'when the :ordered option is false' do
it 'calls the block with the values in the order of completion, when the :ordered option is false' do
promises = [Promise.new, Promise.new, Promise.new]
Expand Down Expand Up @@ -949,6 +955,12 @@ def delayed(*context, &listener)
future.should be_failed
end

it 'handles a really long list of futures' do
futures = Array.new(10000, Future.resolved(1))
future = Future.reduce(futures, 0, ordered: false, &:+)
future.value.should eq(10000)
end

context 'when the list of futures is empty' do
it 'returns a future that resolves to the initial value' do
Future.reduce([], :foo, ordered: false).value.should == :foo
Expand Down

0 comments on commit 6c51254

Please sign in to comment.