Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

FiberIterator not passing iterator to block, "call 'next' on nil" exception #148

Open
nelgau opened this Issue · 5 comments

2 participants

@nelgau

I'm using a FiberIterator to iterate over a list of redis clients, collecting the hashes that are returned by injection —

EM::Synchrony::FiberIterator.new(client_ops(fields), @concurrency).inject({}) do |rh, co, iter|
  iter.return rh.merge!(co[:client].mapped_hmget(@key, *co[:ops]))
end

For the iteration, each works great. However, interation using inject produces the following exception:

[...]/gems/eventmachine-1.0.0/lib/em/iterator.rb:197:in `block (2 levels) in inject': undefined method `next' for nil:NilClass (NoMethodError)

It seems that FiberIterator's each method is not passing an interator into the foreach/blk proc. The implementation of inject in Eventmachine expects a non-nil iterator on which it can call next.

If I knew what the intended behavior was supposed to be, I would have submitted a patch. A temporary, ulgy and wrong fix is the following code:

module EventMachine
  module Synchrony

    class NextConsumer
      def next
      end
    end

    class FiberIterator < EM::Synchrony::Iterator

      # execute each iterator block within its own fiber
      # and auto-advance the iterator after each call
      def each(foreach=nil, after=nil, &blk)
        fe = Proc.new do |obj, iter|
          Fiber.new { (foreach || blk).call(obj, NextConsumer.new); iter.next }.resume
        end

        super(fe, after)
      end

    end
  end
end
@igrigorik
Owner

Ah, yes.. I don't think I ever got to inject, hence the commented out spec:
https://github.com/igrigorik/em-synchrony/blob/master/spec/fiber_iterator_spec.rb#L22

I'd start with getting a failed spec, and then we can iterate from there!

@nelgau

Got it. I'll investigate a version of inject that will play nicely against that spec...

But I'm having some difficulty with the spec as defined. We're going to add the numbers 1 to 5 in order, two fibers at a time. Each of the two fibers will start with the current tally, add their number, and call return on the iterator with the sum, updating the tally. Only one of the fibers will win this race condition, unless we introduce sequencing within return. I expect to see something like:

[:sync, 0, 1]
[:sync, 0, 2]
[:sync, 2, 3]
[:sync, 2, 4]
[:sync, 6, 5]

Failures:

  1) EventMachine::Synchrony::FiberIterator should sum values within the iterator
     Failure/Error: res.should == data.inject(:+)
       expected: 15
            got: 11 (using ==)

If I modify FiberIterator.each to not auto-advance and run the spec as given, this is indeed the result.

To take this from another perspective, if I introduce some persistent object that's carried through the process and accessible to all fibers, I get a natural ordering of updates. For example, here's the same spec with a hash instead of an integer:

it "should sum values within the iterator" do
  EM.synchrony do
    data = (1..5).to_a
    res = EM::Synchrony::FiberIterator.new(data, 2).inject({:total => 0}) do |total_hash, num, iter|
      EM::Synchrony.sleep(0.1)

      p [:sync, total_hash[:total], num]
      total_hash[:total] += num
      iter.return(total_hash)
    end

    res[:total].should == data.inject(:+)
    EventMachine.stop
  end
end

And the output is as expected:

[:sync, 0, 1]
[:sync, 1, 2]
[:sync, 3, 3]
[:sync, 6, 4]
[:sync, 10, 5]

I'm interested to know what you think and in which direction you think I should proceed. It seems like its not possible to carry the semantics of inject over to this concurrent world without requiring the use of some tricks to avoid race conditions. Now, I'm very new to this world of fibers and Eventmachine so I could be wrong. If I seem so certain, forgive me. I'm really not. :)

@igrigorik
Owner

Humm.. yeah, this is a tricky one:

require 'eventmachine'

EM.run do
  EM::Iterator.new((1..5), 2).inject(0, proc{ |total,num,iter|
    EM.next_tick {
      p [total, num]
      total += num
      iter.return(total)
    }
  }, proc{ |results|
    p results
    EM.stop
  })
end

EM.run do
  total = []
  EM::Iterator.new((1..5), 2).inject(total, proc{ |total,num,iter|
    EM.next_tick {
      p [total, num]
      total.push num
      iter.return(total)
    }
  }, proc{ |results|
    p results.inject(:+)
    EM.stop
  })
end

Yields

[0, 1]
[1, 2]
[1, 3]
[4, 4]
[4, 5]
9
[[], 1]
[[1], 2]
[[1, 2], 3]
[[1, 2, 3], 4]
[[1, 2, 3, 4], 5]
15

So the behavior is consistent with EM's iterator implementation. I'm not really sure there is a clean solution here..

@tmm1 any suggestions?

@nelgau

Very interesting. I'll probably go with a map-reduce style implementation, as you give in your second example. Thanks!

@igrigorik
Owner

We still need to fix the .next bug, but other then that.. Probably worth checking-in the spec + some docs on the gotcha's of using it between multiple fibers. :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.