Skip to content

Commit

Permalink
Merge branch 'master' of git://github.com/fl00r/eventmachine into enu…
Browse files Browse the repository at this point in the history
…merable
  • Loading branch information
ibc committed Feb 24, 2012
2 parents 7021a11 + 2caaf73 commit 9ea2829
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 9 deletions.
62 changes: 53 additions & 9 deletions lib/em/iterator.rb
Expand Up @@ -40,7 +40,52 @@ module EventMachine
# async_http_get(url){ iter.next }
# end
#

# Support for Enumerable in Ruby 1.9+
module IteratorWithEnumerable
def setup_list(list)
raise ArgumentError, 'argument must be an Enumerable' unless list.respond_to?(:each)
list.to_enum
end

def next_item
@next_item
end

# We can't check just next_item as far as it can return nil in two cases:
# when our enumerator is stopped and when it stores nil value
def next?
begin
@next_item = @list.next
true
rescue StopIteration
false
rescue => e
raise e
end
end
end

# Ruby 1.8 uses continuations in Enumerable, so we should use Arrays
module IteratorWithArray
def setup_list(list)
raise ArgumentError, 'argument must be an array' unless list.respond_to?(:to_a)
list.dup.to_a
end

def next_item
@list.shift
end

def next?
@list.any?
end
end

class Iterator
include IteratorWithEnumerable if defined? Fiber
include IteratorWithArray unless defined? Fiber

# Create a new parallel async iterator with specified concurrency.
#
# i = EM::Iterator.new(1..100, 10)
Expand All @@ -49,8 +94,7 @@ class Iterator
# is started via #each, #map or #inject
#
def initialize(list, concurrency = 1)
raise ArgumentError, 'argument must be an array' unless list.respond_to?(:to_a)
@list = list.to_a.dup
@list = setup_list(list)
@concurrency = concurrency

@started = false
Expand Down Expand Up @@ -97,12 +141,8 @@ def each(foreach=nil, after=nil, &blk)
@process_next = proc{
# p [:process_next, :pending=, @pending, :workers=, @workers, :ended=, @ended, :concurrency=, @concurrency, :list=, @list]
unless @ended or @workers > @concurrency
if @list.empty?
@ended = true
@workers -= 1
all_done.call
else
item = @list.shift
if next?
item = next_item
@pending += 1

is_done = false
Expand All @@ -123,6 +163,10 @@ class << on_done
end

foreach.call(item, on_done)
else
@ended = true
@workers -= 1
all_done.call
end
else
@workers -= 1
Expand Down Expand Up @@ -267,4 +311,4 @@ def spawn_workers
p results
})
}
end
end
110 changes: 110 additions & 0 deletions tests/test_iterator.rb
@@ -0,0 +1,110 @@
require 'em_test_helper'

class TestEnumerable
include Enumerable

def each
while (num = rand(20)) != 10 do
yield num
end
end
end

class TestEnumerable2
include Enumerable

def each
arr = ('a'..'g').to_a
while it = arr.shift do
yield it
end
end
end

class TestEnumerable3
include Enumerable

def each
('a'..'f').each.with_index do |ltr, index|
break if ltr == 'e'
eval("@#{ltr} = #{index}")
yield ltr
end
end
end

class TestIterator < Test::Unit::TestCase

def test_iterator_with_array
assert_nothing_raised do
EM.run {
after = proc{ EM.stop }
EM::Iterator.new(0..10, 10).each(nil, after){ |num,iter| iter.next }
}
end
end

def test_iterator_with_array_with_result
nums = []
EM.run {
after = proc{ EM.stop }
EM::Iterator.new((0..10)).each(nil, after){ |num,iter|
nums << num
iter.next
}
}
res = (0..10).to_a
assert_equal res, nums
end

def test_iterator_with_enumerable
assert_nothing_raised do
EM.run {
en = TestEnumerable.new
after = proc{ EM.stop }
EM::Iterator.new(en, 10).each(nil, after){ |num, iter| iter.next }
}
end
end

def test_iterator_with_enumerable_with_result
letters = []
EM.run {
en = TestEnumerable2.new
after = proc{ EM.stop }
EM::Iterator.new(en, 10).each(nil, after){ |ltr,iter|
letters << ltr
iter.next
}
}
res = ('a'..'g').to_a
assert_equal res, letters
end

def test_iterator_with_array_with_nils
nums = []
EM.run {
after = proc{ EM.stop }
EM::Iterator.new(["Hello", nil, "World", nil]).each(nil, after){ |num,iter|
nums << num
iter.next
}
}
res = "Hello World"
assert_equal res, nums.compact.join(" ")
end

def test_iterator_for_lazyness
enumerable = TestEnumerable3.new
enumerator = enumerable.to_enum
EM.run {
after = proc{ EM.stop }
EM::Iterator.new(enumerator).each(nil, after){ |num,iter|
iter.next
}
}
assert_equal 0, enumerable.instance_variable_get(:@a)
assert_equal 3, enumerable.instance_variable_get(:@d)
assert_equal false, enumerable.instance_variable_defined?(:@e)
end
end

0 comments on commit 9ea2829

Please sign in to comment.