Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Added EM.schedule, EM::Channel and EM::Queue

  • Loading branch information...
commit 953cf4a8f062a5a7c63671cf17cb37662258ee5c 1 parent faab343
@raggi raggi authored
View
43 examples/ex_channel.rb
@@ -0,0 +1,43 @@
+require File.dirname(__FILE__) + '/helper'
+
+EM.run do
+
+ # Create a channel to push data to, this could be stocks...
+ RandChannel = EM::Channel.new
+
+ # The server simply subscribes client connections to the channel on connect,
+ # and unsubscribes them on disconnect.
+ class Server < EM::Connection
+ def self.start(host = '127.0.0.1', port = 8000)
+ EM.start_server(host, port, self)
+ end
+
+ def post_init
+ @sid = RandChannel.subscribe { |m| send_data "#{m.inspect}\n" }
+ end
+
+ def unbind
+ RandChannel.unsubscribe @sid
+ end
+ end
+ Server.start
+
+ # Two client connections, that just print what they receive.
+ 2.times do
+ EM.connect('127.0.0.1', 8000) do |c|
+ c.extend EM::P::LineText2
+ def c.receive_line(line)
+ puts "Subscriber: #{signature} got #{line}"
+ end
+ EM.add_timer(2) { c.close_connection }
+ end
+ end
+
+ # This part of the example is more fake, but imagine sleep was in fact a
+ # long running calculation to achieve the value.
+ 40.times do
+ EM.defer lambda { v = sleep(rand * 2); RandChannel << [Time.now, v] }
+ end
+
+ EM.add_timer(5) { EM.stop }
+end
View
2  examples/ex_queue.rb
@@ -0,0 +1,2 @@
+require File.dirname(__FILE__) + '/helper'
+
View
2  examples/helper.rb
@@ -0,0 +1,2 @@
+$:.unshift File.expand_path(File.dirname(__FILE__) + '/../lib')
+require 'eventmachine'
View
16 lib/em/callback.rb
@@ -0,0 +1,16 @@
+module EventMachine
+ # Utility method for coercing arguments to an object that responds to #call
+ # Accepts an object and a method name to send to, or a block, or an object
+ # that responds to call.
+ def self.Callback(object = nil, method = nil, &blk)
+ if object && method
+ lambda { |*args| object.send method, *args }
+ else
+ if object.respond_to? :call
+ object
+ else
+ blk || raise(ArgumentError)
+ end
+ end
+ end
+end
View
43 lib/em/channel.rb
@@ -0,0 +1,43 @@
+module EventMachine
+ # == EventMachine::Channel
+ #
+ # Provides a simple interface to push items to a number of subscribers. The
+ # channel will schedule all operations on the main reactor thread for thread
+ # safe reactor operations.
+ #
+ # This provides a convenient way for connections to consume messages from
+ # long running code in defer, without threading issues.
+ #
+ # See examples/ex_channel.rb for example usage.
+ class Channel
+ def initialize
+ @subs = {}
+ @uid = 0
+ end
+
+ # Takes any arguments suitable for EM::Callback() and returns a subscriber
+ # id for use when unsubscribing.
+ def subscribe(*a, &b)
+ name = gen_id
+ EM.schedule { @subs[name] = EM::Callback(*a, &b) }
+ name
+ end
+
+ # Removes this subscriber from the list.
+ def unsubscribe(name)
+ EM.schedule { @subs.delete name }
+ end
+
+ # Add items to the channel, which are pushed out to all subscribers.
+ def push(*items)
+ items = items.dup
+ EM.schedule { @subs.values.each { |s| items.each { |i| s.call i } } }
+ end
+ alias << push
+
+ private
+ def gen_id
+ @uid += 1
+ end
+ end
+end
View
53 lib/em/queue.rb
@@ -0,0 +1,53 @@
+module EventMachine
+ # == EventMachine::Queue
+ #
+ # A cross thread, reactor scheduled, linear queue.
+ #
+ # This class provides a simple "Queue" like abstraction on top of the reactor
+ # scheduler. It services two primary purposes:
+ # * API sugar for stateful protocols
+ # * Pushing processing onto the same thread as the reactor
+ class Queue
+ def initialize
+ @items = []
+ @popq = []
+ end
+
+ # Pop items off the queue, running the block on the reactor thread. The pop
+ # will not happen immediately, but at some point in the future, either in
+ # the next tick, if the queue has data, or when the queue is populated.
+ def pop(*a, &b)
+ cb = EM::Callback(*a, &b)
+ EM.schedule do
+ if @items.empty?
+ @popq << cb
+ else
+ cb.call @items.shift
+ end
+ end
+ nil # Always returns nil
+ end
+
+ # Push items onto the queue in the reactor thread. The items will not appear
+ # in the queue immediately, but will be scheduled for addition during the
+ # next reactor tick.
+ def push(*items)
+ EM.schedule do
+ @items.concat items
+ @popq.shift.call @items.shift until @items.empty? || @popq.empty?
+ end
+ end
+
+ # N.B. This is a peek, it's not thread safe, and may only tend toward
+ # accuracy.
+ def empty?
+ @items.empty?
+ end
+
+ # N.B. This is a peek, it's not thread safe, and may only tend toward
+ # accuracy.
+ def size
+ @items.size
+ end
+ end
+end
View
24 lib/eventmachine.rb
@@ -86,6 +86,9 @@
require 'em/protocols'
require 'em/connection'
require 'em/filewatcher'
+require 'em/callback'
+require 'em/queue'
+require 'em/channel'
require 'shellwords'
@@ -236,6 +239,7 @@ def self.run blk=nil, tail=nil, &block
if @next_tick_queue && !@next_tick_queue.empty?
add_timer(0) { signal_loopbreak }
end
+ @reactor_thread = Thread.current
run_machine
ensure
begin
@@ -273,6 +277,22 @@ def self.run_block &block
run(&pr)
end
+ # Returns true if the calling thread is the same thread as the reactor.
+ def self.reactor_thread?
+ Thread.current == @reactor_thread
+ end
+
+ # Runs the given callback on the reactor thread, or immediately if called
+ # from the reactor thread. Accepts the same arguments as EM::Callback
+ def self.schedule(*a, &b)
+ cb = Callback(*a, &b)
+ if reactor_thread?
+ cb.call
+ else
+ next_tick { cb.call }
+ end
+ end
+
# fork_reactor forks a new process and calls EM#run inside of it, passing your block.
#--
# This implementation is subject to change, especially if we clean up the relationship
@@ -1030,8 +1050,8 @@ def self.defer op = nil, callback = nil, &blk
unless @threadpool
require 'thread'
@threadpool = []
- @threadqueue = Queue.new
- @resultqueue = Queue.new
+ @threadqueue = ::Queue.new
+ @resultqueue = ::Queue.new
spawn_threadpool
end
View
126 tests/test_basic.rb
@@ -42,16 +42,16 @@ def teardown
def test_libtype
lt = EventMachine.library_type
- em_lib = (ENV["EVENTMACHINE_LIBRARY"] || $eventmachine_library || :xxx).to_sym
-
- # Running from test runner, under jruby.
- if RUBY_PLATFORM == 'java'
- unless em_lib == :pure_ruby
- assert_equal( :java, lt )
- return
- end
- end
-
+ em_lib = (ENV["EVENTMACHINE_LIBRARY"] || $eventmachine_library || :xxx).to_sym
+
+ # Running from test runner, under jruby.
+ if RUBY_PLATFORM == 'java'
+ unless em_lib == :pure_ruby
+ assert_equal( :java, lt )
+ return
+ end
+ end
+
case em_lib
when :pure_ruby
assert_equal( :pure_ruby, lt )
@@ -60,12 +60,12 @@ def test_libtype
when :java
assert_equal( :java, lt )
else
- # Running from jruby as a standalone test.
- if RUBY_PLATFORM == 'java'
- assert_equal( :java, lt )
- else
- assert_equal( :extension, lt )
- end
+ # Running from jruby as a standalone test.
+ if RUBY_PLATFORM == 'java'
+ assert_equal( :java, lt )
+ else
+ assert_equal( :extension, lt )
+ end
end
end
@@ -116,10 +116,10 @@ def test_server
# even after the supplied block completes.
def test_run_block
assert !EM.reactor_running?
- a = nil
- EM.run_block { a = "Worked" }
- assert a
- assert !EM.reactor_running?
+ a = nil
+ EM.run_block { a = "Worked" }
+ assert a
+ assert !EM.reactor_running?
end
@@ -136,24 +136,24 @@ def test_run_block
TestPort = 9070
class UnbindError < EM::Connection
- def initialize *args
- super
- end
- def connection_completed
- close_connection_after_writing
- end
- def unbind
- raise "Blooey"
- end
+ def initialize *args
+ super
+ end
+ def connection_completed
+ close_connection_after_writing
+ end
+ def unbind
+ raise "Blooey"
+ end
end
def xxx_test_unbind_error
- assert_raises( RuntimeError ) {
- EM.run {
- EM.start_server TestHost, TestPort
- EM.connect TestHost, TestPort, UnbindError
- }
- }
+ assert_raises( RuntimeError ) {
+ EM.run {
+ EM.start_server TestHost, TestPort
+ EM.connect TestHost, TestPort, UnbindError
+ }
+ }
end
#------------------------------------
@@ -177,26 +177,26 @@ def xxx_test_unbind_error
#
class PostInitError < EM::Connection
- def post_init
- aaa bbb # should produce a Ruby exception
- end
+ def post_init
+ aaa bbb # should produce a Ruby exception
+ end
end
# This test causes issues, the machine becomes unreleasable after
# release_machine suffers an exception in event_callback.
def xxx_test_post_init_error
- assert_raises( EventMachine::ConnectionNotBound ) {
- EM.run {
- EM::Timer.new(1) {EM.stop}
- EM.start_server TestHost, TestPort
- EM.connect TestHost, TestPort, PostInitError
- }
- }
- EM.run {
- EM.stop
- }
- assert !EM.reactor_running?
+ assert_raises( EventMachine::ConnectionNotBound ) {
+ EM.run {
+ EM::Timer.new(1) {EM.stop}
+ EM.start_server TestHost, TestPort
+ EM.connect TestHost, TestPort, PostInitError
+ }
+ }
+ EM.run {
+ EM.stop
+ }
+ assert !EM.reactor_running?
end
-
+
module BrsTestSrv
def receive_data data
$received << data
@@ -211,21 +211,39 @@ def post_init
close_connection_after_writing
end
end
-
+
# From ticket #50
def test_byte_range_send
$received = ''
$sent = (0..255).to_a.pack('C*')
EM::run {
-
+
EM::start_server TestHost, TestPort, BrsTestSrv
-
+
EM::connect TestHost, TestPort, BrsTestCli
-
+
EM::add_timer(0.5) { assert(false, 'test timed out'); EM.stop; Kernel.warn "test timed out!" }
}
assert_equal($sent, $received)
end
+ def test_schedule_on_reactor_thread
+ x = false
+ EM.run do
+ EM.schedule { x = true }
+ EM.stop
+ end
+ assert x
+ end
+
+ def test_schedule_from_thread
+ x = false
+ assert !x
+ EM.run do
+ Thread.new { EM.schedule { x = true } }.join
+ EM.stop
+ end
+ assert x
+ end
end
View
51 tests/test_channel.rb
@@ -0,0 +1,51 @@
+$:.unshift "../lib"
+require 'eventmachine'
+require 'test/unit'
+
+class TestEventMachineChannel < Test::Unit::TestCase
+ def test_channel_subscribe
+ s = 0
+ EM.run do
+ c = EM::Channel.new
+ c.subscribe { |v| s = v; EM.stop }
+ c << 1
+ end
+ assert_equal 1, s
+ end
+
+ def test_channel_unsubscribe
+ s = 0
+ EM.run do
+ c = EM::Channel.new
+ subscription = c.subscribe { |v| s = v }
+ c.unsubscribe(subscription)
+ c << 1
+ EM.next_tick { EM.stop }
+ end
+ assert_not_equal 1, s
+ end
+
+ def test_channel_reactor_thread_push
+ out = []
+ c = EM::Channel.new
+ c.subscribe { |v| out << v }
+ Thread.new { c.push(1,2,3) }.join
+ assert out.empty?
+
+ EM.run { EM.next_tick { EM.stop } }
+
+ assert_equal [1,2,3], out
+ end
+
+ def test_channel_reactor_thread_callback
+ out = []
+ c = EM::Channel.new
+ Thread.new { c.subscribe { |v| out << v } }.join
+ c.push(1,2,3)
+ assert out.empty?
+
+ EM.run { EM.next_tick { EM.stop } }
+
+ assert_equal [1,2,3], out
+ end
+end
View
142 tests/test_next_tick.rb
@@ -33,77 +33,91 @@
class TestNextTick < Test::Unit::TestCase
- def setup
- end
+ def test_tick_arg
+ pr = proc {EM.stop}
+ EM.epoll
+ EM.run {
+ EM.next_tick pr
+ }
+ assert true
+ end
- def teardown
- end
+ def test_tick_block
+ EM.epoll
+ EM.run {
+ EM.next_tick {EM.stop}
+ }
+ assert true
+ end
- def test_tick_arg
- pr = proc {EM.stop}
- EM.epoll
- EM.run {
- EM.next_tick pr
- }
- assert true
- end
+ # This illustrates the solution to a long-standing problem.
+ # It's now possible to correctly nest calls to EM#run.
+ # See the source code commentary for EM#run for more info.
+ #
+ def test_run_run
+ EM.run {
+ EM.run {
+ EM.next_tick {EM.stop}
+ }
+ }
+ end
- def test_tick_block
- EM.epoll
- EM.run {
- EM.next_tick {EM.stop}
- }
- assert true
- end
+ def test_pre_run_queue
+ x = false
+ EM.next_tick { EM.stop; x = true }
+ EM.run { EM.add_timer(0.2) { EM.stop } }
+ assert x
+ end
- # This illustrates the solution to a long-standing problem.
- # It's now possible to correctly nest calls to EM#run.
- # See the source code commentary for EM#run for more info.
- #
- def test_run_run
- EM.run {
- EM.run {
- EM.next_tick {EM.stop}
- }
- }
- end
+ # We now support an additional parameter for EM#run.
+ # You can pass two procs to EM#run now. The first is executed as the normal
+ # run block. The second (if given) is scheduled for execution after the
+ # reactor loop completes.
+ # The reason for supporting this is subtle. There has always been an expectation
+ # that EM#run doesn't return until after the reactor loop ends. But now it's
+ # possible to nest calls to EM#run, which means that a nested call WILL
+ # RETURN. In order to write code that will run correctly either way, it's
+ # recommended to put any code which must execute after the reactor completes
+ # in the second parameter.
+ #
+ def test_run_run_2
+ a = proc {EM.stop}
+ b = proc {assert true}
+ EM.run a, b
+ end
- def test_pre_run_queue
- x = false
- EM.next_tick { EM.stop; x = true }
- EM.run { EM.add_timer(0.2) { EM.stop } }
- assert x
- end
- # We now support an additional parameter for EM#run.
- # You can pass two procs to EM#run now. The first is executed as the normal
- # run block. The second (if given) is scheduled for execution after the
- # reactor loop completes.
- # The reason for supporting this is subtle. There has always been an expectation
- # that EM#run doesn't return until after the reactor loop ends. But now it's
- # possible to nest calls to EM#run, which means that a nested call WILL
- # RETURN. In order to write code that will run correctly either way, it's
- # recommended to put any code which must execute after the reactor completes
- # in the second parameter.
- #
- def test_run_run_2
- a = proc {EM.stop}
- b = proc {assert true}
- EM.run a, b
- end
+ # This illustrates that EM#run returns when it's called nested.
+ # This isn't a feature, rather it's something to be wary of when writing code
+ # that must run correctly even if EM#run is called while a reactor is already
+ # running.
+ def test_run_run_3
+ a = []
+ EM.run {
+ EM.run proc {EM.stop}, proc {a << 2}
+ a << 1
+ }
+ assert_equal( [1,2], a )
+ end
- # This illustrates that EM#run returns when it's called nested.
- # This isn't a feature, rather it's something to be wary of when writing code
- # that must run correctly even if EM#run is called while a reactor is already
- # running.
- def test_run_run_3
- a = []
- EM.run {
- EM.run proc {EM.stop}, proc {a << 2}
- a << 1
- }
- assert_equal( [1,2], a )
- end
+ def test_schedule_on_reactor_thread
+ x = false
+ EM.run do
+ EM.schedule { x = true }
+ EM.stop
+ end
+ assert x
+ end
+
+ def test_schedule_from_thread
+ x = false
+ EM.run do
+ Thread.new { EM.schedule { x = true } }.join
+ assert !x
+ EM.next_tick { EM.stop }
+ end
+ assert x
+ end
end
View
44 tests/test_queue.rb
@@ -0,0 +1,44 @@
+$:.unshift "../lib"
+require 'eventmachine'
+require 'test/unit'
+
+class TestEventMachineQueue < Test::Unit::TestCase
+ def test_queue_push
+ s = 0
+ EM.run do
+ q = EM::Queue.new
+ q.push(1)
+ EM.next_tick { s = q.size; EM.stop }
+ end
+ assert_equal 1, s
+ end
+
+ def test_queue_pop
+ x,y,z = nil
+ EM.run do
+ q = EM::Queue.new
+ q.push(1,2,3)
+ q.pop { |v| x = v }
+ q.pop { |v| y = v }
+ q.pop { |v| z = v; EM.stop }
+ end
+ assert_equal 1, x
+ assert_equal 2, y
+ assert_equal 3, z
+ end
+
+ def test_queue_reactor_thread
+ q = EM::Queue.new
+
+ Thread.new { q.push(1,2,3) }.join
+ assert q.empty?
+ EM.run { EM.next_tick { EM.stop } }
+ assert_equal 3, q.size
+
+ x = nil
+ Thread.new { q.pop { |v| x = v } }.join
+ assert_equal nil, x
+ EM.run { EM.next_tick { EM.stop } }
+ assert_equal 1, x
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.