Permalink
Browse files

Merge pull request #8 from fonzo14/master

each_job  fiber compliant
  • Loading branch information...
2 parents 3f163fd + df8b358 commit 351704a942b318ddc1b44e167fcf8e798ad2ccf6 @dj2 dj2 committed Aug 30, 2011
Showing with 49 additions and 8 deletions.
  1. +18 −8 lib/em-jack/connection.rb
  2. +31 −0 spec/em-jack/fiber_spec.rb
View
@@ -219,16 +219,26 @@ def put(msg, opts = nil, &blk)
end
def each_job(timeout = nil, &blk)
- work = Proc.new do
- r = reserve(timeout)
- r.callback do |job|
- blk.call(job)
- EM.next_tick { work.call }
+ if (@fiberized)
+ work = Proc.new do
+ Fiber.new do
+ job = reserve(timeout)
+ blk.call(job)
+ end.resume
+ EM.next_tick { work.call }
end
- r.errback do
- EM.next_tick { work.call }
+ else
+ work = Proc.new do
+ r = reserve(timeout)
+ r.callback do |job|
+ blk.call(job)
+ EM.next_tick { work.call }
+ end
+ r.errback do
+ EM.next_tick { work.call }
+ end
end
- end
+ end
work.call
end
View
@@ -21,5 +21,36 @@
end.resume
end
end
+ it "should process each job" do
+ EM.run do
+ EM.add_timer(10) { EM.stop }
+
+ job_body = ''
+
+ f = Fiber.new do
+ bean = EMJack::Connection.new
+ bean.fiber!
+
+ bean.put("hello!")
+ bean.put("bonjour!")
+
+ mock = double()
+ mock.should_receive(:foo).with("hello!")
+ mock.should_receive(:foo).with("bonjour!")
+
+ bean.each_job(0) do |job|
+ mock.foo(job.body)
+ job_body = job.body
+ job.delete
+ end
+
+ end
+
+ f.resume
+
+ EM.add_timer(1) { EM.stop unless f.alive?; job_body.should eq "bonjour!" unless f.alive? }
+
+ end
+ end
end
end

0 comments on commit 351704a

Please sign in to comment.