Thread pool pattern for evented (EventMachine) fibers.
Use fiber_storm
to write evented code in a synchronous manner using the thread pool pattern.
require "fiber_storm" # Initialize with a pool of two fibers. FiberStorm.new(:size => 2) do |storm| start_time = Time.now # Create 3 executions to be run by the two fibers (i.e. max concurrency of 2). 3.times do storm.execute do FiberStorm.sleep(1) # Fiber aware sleep. puts "#{Fiber.current} slept for 1 second" end end # "block" until all executions are finished. storm.join puts "Running time: " + (Time.now - start_time).to_s end
fiber_storm
uses EventMachine under the hood to simulate blocking calls. FiberStorm#run and FiberStorm#new wrap given blocks with calls to EM.run and EM.stop, as well as wrap them in a Fiber.
For example:
FiberStorm.new do |storm| ... end
Is the same as:
EM.run do Fiber.new do storm = FiberStorm.new ... EM.stop end.resume end
You can override this behavior like so:
FiberStorm.new(:em_run => false, :em_stop => false) do |storm| ... end
And the block will not be wrapped with EM.run and EM.stop (though it will still be wrapped in a Fiber).
Some FiberStorm methods appear to block, though it’s just EventMachine + Fiber trickery and the program isn’t really halted (duh).
FiberStorm.new :size => 2, :execute_blocks => true do |storm| storm.execute{ FiberStorm.sleep(1) } storm.execute{ FiberStorm.sleep(1) } storm.execute{ FiberStorm.sleep(1) } # Will "block" until there is an available fiber. storm.join # Will "block" until all executions are finished running. end
With more EventMachine + Fiber trickery, we can implement timeouts! Well, kind of… the timeout method relies on the block you give it to be “cooperative”. In other words, if the block never calls Fiber.yield then the timeout code will never get a chance to run.
FiberStorm.new :timeout => 1 do |storm| execution = storm.execute{ FiberStorm.sleep(2) } execution.join # "block" until the execution is finished. execution.timeout? # => true execution = storm.execute{ sleep(2) } # Uh oh, that kind of sleep doesn't yield the fiber! execution.join # "block" until the execution is finished. execution.timeout? # => false execution = storm.execute{ 100000000.times {rand*rand} } # Block never calls Fiber.yield; won't timeout. execution.join # "block" until the execution is finished. execution.timeout? # => false end
This gem is really meant to be used with other Fiber + EventMachine aware libraries that imitate synchronous behavior. I’m not really sure what is out there as of now, but the idea is that you would have, say, a Fiber aware ActiveRecord gem and Fiber aware memcache-client gem, etc. Then you could use fiber_storm
to execute concurrent tasks using them and it wouldn’t look any different from normal non-evented code:
FiberStorm.new :size => 2 do |storm| storm.execute do user = Rails.cache.fetch("User/1"){ User.find(1) } do_something_with_user(user) end storm.execute do user = Rails.cache.fetch("User/2"){ User.find(2) } do_something_with_user(user) end # Both of those will execute in "parallel" using Fibers and EventMachine, no threads. # "block" until both executions are finished. storm.join end
Some concepts borrowed from em-synchrony.
-
Check out the latest master to make sure the feature hasn’t been implemented or the bug hasn’t been fixed yet
-
Check out the issue tracker to make sure someone already hasn’t requested it and/or contributed it
-
Fork the project
-
Start a feature/bugfix branch
-
Commit and push until you are happy with your contribution
-
Make sure to add tests for it. This is important so I don’t break it in a future version unintentionally.
-
Please try not to mess with the Rakefile, version, or history. If you want to have your own version, or is otherwise necessary, that is fine, but please isolate to its own commit so I can cherry-pick around it.
Copyright © 2011 Christopher J Bottaro. See LICENSE.txt for further details.