Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

small refactor of lib/em/spawnable.rb #230

Closed
wants to merge 9 commits into from
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ This unique combination makes EventMachine a premier choice for designers of cri
applications, including Web servers and proxies, email and IM production systems, authentication/authorization
processors, and many more.

EventMachine has been around since yearly 2000s and is a mature and battle tested library.
EventMachine has been around since the early 2000s and is a mature and battle tested library.


## What EventMachine is good for? ##
Expand Down
18 changes: 6 additions & 12 deletions lib/em/spawnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,24 @@ module EventMachine
class SpawnedProcess
# Send a message to the spawned process
def notify *x
me = self
EM.next_tick {
# A notification executes in the context of this
# SpawnedProcess object. That makes self and notify
# work as one would expect.
#
y = me.call(*x)
y = call(*x)
if y and y.respond_to?(:pull_out_yield_block)
a,b = y.pull_out_yield_block
set_receiver a
self.notify if b
notify if b
end
}
end
alias_method :resume, :notify
alias_method :run, :notify # for formulations like (EM.spawn {xxx}).run

def set_receiver blk
(class << self ; self ; end).class_eval do
remove_method :call if method_defined? :call
define_method :call, blk
end
class << self; self; end.instance_eval { define_method :call, blk }
end

end
Expand All @@ -66,16 +62,14 @@ def pull_out_yield_block

# Spawn an erlang-style process
def self.spawn &block
s = SpawnedProcess.new
s.set_receiver block
s
SpawnedProcess.new.tap { |s| s.set_receiver block }
end

def self.yield &block # :nodoc:
return YieldBlockFromSpawnedProcess.new( block, false )
YieldBlockFromSpawnedProcess.new( block, false )
end

def self.yield_and_notify &block # :nodoc:
return YieldBlockFromSpawnedProcess.new( block, true )
YieldBlockFromSpawnedProcess.new( block, true )
end
end
5 changes: 2 additions & 3 deletions lib/em/threaded_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ def initialize

@running = true
@queue = ::Queue.new
@thread = Thread.new do
@queue.pop.call while @running
end
@thread = Thread.new { @queue.pop.call while @running }
end

# Called on the EM thread, generally in a perform block to return a
Expand All @@ -75,6 +73,7 @@ def dispatch
completion.fail e
end
end
@thread.run
completion
end

Expand Down
89 changes: 41 additions & 48 deletions lib/eventmachine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ class << self

# System errnos
# @private

ERRNOS = Errno::constants.grep(/^E/).inject(Hash.new(:unknown)) { |hash, name|
errno = Errno.__send__(:const_get, name)
errno = Errno.const_get name
hash[errno::Errno] = errno
hash
}
Expand Down Expand Up @@ -154,13 +155,10 @@ def self.run blk=nil, tail=nil, &block
# which throws something inside of #run. Without the ensure, the second test
# will start without release_machine being called and will immediately throw

#


tail and @tails.unshift(tail)

if reactor_running?
(b = blk || block) and b.call # next_tick(b)
(b = blk || block) and b.call
else
@conns = {}
@acceptors = {}
Expand Down Expand Up @@ -219,11 +217,10 @@ def self.run blk=nil, tail=nil, &block
# finishes running, until user code calls {EventMachine.stop})
#
def self.run_block &block
pr = proc {
run do
block.call
EventMachine::stop
}
run(&pr)
end
end

# @return [Boolean] true if the calling thread is the same thread as the reactor.
Expand Down Expand Up @@ -338,7 +335,7 @@ def self.add_periodic_timer *args, &block
interval = args.shift
code = args.shift || block

EventMachine::PeriodicTimer.new(interval, code)
PeriodicTimer.new(interval, code)
end


Expand Down Expand Up @@ -401,7 +398,7 @@ def self.cancel_timer timer_or_sig
#
#
def self.stop_event_loop
EventMachine::stop
stop
end

# Initiates a TCP server (socket acceptor) on the specified IP address and port.
Expand Down Expand Up @@ -704,7 +701,7 @@ def self.bind_connect bind_addr, bind_port, server, port=nil, handler=nil, *args
# }
#
# @author Riham Aldakkak (eSpace Technologies)
def EventMachine::watch io, handler=nil, *args, &blk
def self.watch io, handler=nil, *args, &blk
attach_io io, true, handler, *args, &blk
end

Expand All @@ -714,12 +711,12 @@ def EventMachine::watch io, handler=nil, *args, &blk
#
# To watch a fd instead, use {EventMachine.watch}, which will not alter the state of the socket
# and fire notify_readable and notify_writable events instead.
def EventMachine::attach io, handler=nil, *args, &blk
def self.attach io, handler=nil, *args, &blk
attach_io io, false, handler, *args, &blk
end

# @private
def EventMachine::attach_io io, watch_mode, handler=nil, *args
def self.attach_io io, watch_mode, handler=nil, *args
klass = klass_from_handler(Connection, handler, *args)

if !watch_mode and klass.public_instance_methods.any?{|m| [:notify_readable, :notify_writable].include? m.to_sym }
Expand All @@ -739,7 +736,7 @@ def EventMachine::attach_io io, watch_mode, handler=nil, *args
c.instance_variable_set(:@fd, fd)

@conns[s] = c
block_given? and yield c
yield c if block_given?
c
end

Expand All @@ -758,13 +755,12 @@ def self.reconnect server, port, handler
# We may want to change it yet again and call the block, if any.

raise "invalid handler" unless handler.respond_to?(:connection_completed)
#raise "still connected" if @conns.has_key?(handler.signature)
return handler if @conns.has_key?(handler.signature)

s = connect_server server, port
handler.signature = s
@conns[s] = handler
block_given? and yield handler
yield handler if block_given?
handler
end

Expand Down Expand Up @@ -844,7 +840,7 @@ def self.open_datagram_socket address, port, handler=nil, *args
s = open_udp_socket address, port.to_i
c = klass.new s, *args
@conns[s] = c
block_given? and yield c
yield c if block_given?
c
end

Expand Down Expand Up @@ -919,7 +915,7 @@ def self.get_max_timers
#
# @return [Integer] Number of connections currently held by the reactor.
def self.connection_count
self.get_connection_count
get_connection_count
end

# The is the responder for the loopback-signalled event.
Expand All @@ -936,10 +932,7 @@ def self.run_deferred_callbacks
cback.call result if cback
end

@next_tick_mutex.synchronize do
jobs, @next_tick_queue = @next_tick_queue, []
jobs
end.each { |j| j.call }
@next_tick_mutex.synchronize { @next_tick_queue.dup | @next_tick_queue.clear }.each &:[]
end


Expand Down Expand Up @@ -1005,7 +998,7 @@ def self.defer op = nil, callback = nil, &blk
# @private
def self.spawn_threadpool
until @threadpool.size == @threadpool_size.to_i
thread = Thread.new do
@threadpool << Thread.new do
Thread.current.abort_on_exception = true
while true
op, cback = *@threadqueue.pop
Expand All @@ -1014,7 +1007,6 @@ def self.spawn_threadpool
EventMachine.signal_loopbreak
end
end
@threadpool << thread
end
end

Expand Down Expand Up @@ -1065,7 +1057,7 @@ def self.next_tick pr=nil, &block
# @note This method has no effective implementation on Windows or in the pure-Ruby
# implementation of EventMachine
def self.set_effective_user username
EventMachine::setuid_string username
setuid_string username
end


Expand All @@ -1084,7 +1076,7 @@ def self.set_effective_user username
# @param [Integer] n_descriptors The maximum number of file or socket descriptors that your process may open
# @return [Integer] The new descriptor table size.
def self.set_descriptor_table_size n_descriptors=nil
EventMachine::set_rlimit_nofile n_descriptors
set_rlimit_nofile n_descriptors
end


Expand Down Expand Up @@ -1118,12 +1110,12 @@ def self.popen cmd, handler=nil, *args
# Perhaps misnamed since the underlying function uses socketpair and is full-duplex.

klass = klass_from_handler(Connection, handler, *args)
w = Shellwords::shellwords( cmd )
w.unshift( w.first ) if w.first
s = invoke_popen( w )
w = Shellwords::shellwords cmd
w.unshift(w.first) if w.first
s = invoke_popen w
c = klass.new s, *args
@conns[s] = c
yield(c) if block_given?
yield c if block_given?
c
end

Expand All @@ -1138,7 +1130,7 @@ def self.popen cmd, handler=nil, *args
#
# @return [Boolean] true if the EventMachine reactor loop is currently running
def self.reactor_running?
(@reactor_running || false)
!!@reactor_running
end


Expand All @@ -1151,7 +1143,7 @@ def self.open_keyboard handler=nil, *args
s = read_keyboard
c = klass.new s, *args
@conns[s] = c
block_given? and yield c
yield c if block_given?
c
end

Expand Down Expand Up @@ -1222,12 +1214,12 @@ def self.open_keyboard handler=nil, *args
def self.watch_file(filename, handler=nil, *args)
klass = klass_from_handler(FileWatch, handler, *args)

s = EM::watch_filename(filename)
s = watch_filename filename
c = klass.new s, *args
# we have to set the path like this because of how Connection.new works
c.instance_variable_set("@path", filename)
@conns[s] = c
block_given? and yield c
yield c if block_given?
c
end

Expand Down Expand Up @@ -1255,12 +1247,12 @@ def self.watch_process(pid, handler=nil, *args)

klass = klass_from_handler(ProcessWatch, handler, *args)

s = EM::watch_pid(pid)
s = watch_pid pid
c = klass.new s, *args
# we have to set the path like this because of how Connection.new works
c.instance_variable_set("@pid", pid)
@conns[s] = c
block_given? and yield c
yield c if block_given?
c
end

Expand Down Expand Up @@ -1341,7 +1333,7 @@ def self.error_handler cb = nil, &blk
#
# @see EventMachine.disable_proxy
def self.enable_proxy(from, to, bufsize=0, length=0)
EM::start_proxy(from.signature, to.signature, bufsize, length)
start_proxy(from.signature, to.signature, bufsize, length)
end

# Takes just one argument, a {Connection} that has proxying enabled via {EventMachine.enable_proxy}.
Expand All @@ -1351,7 +1343,7 @@ def self.enable_proxy(from, to, bufsize=0, length=0)
# @param [EventMachine::Connection] from Source of data that is being proxied
# @see EventMachine.enable_proxy
def self.disable_proxy(from)
EM::stop_proxy(from.signature)
stop_proxy(from.signature)
end

# Retrieve the heartbeat interval. This is how often EventMachine will check for dead connections
Expand All @@ -1360,7 +1352,7 @@ def self.disable_proxy(from)
#
# @return [Integer] Heartbeat interval, in seconds
def self.heartbeat_interval
EM::get_heartbeat_interval
get_heartbeat_interval
end

# Set the heartbeat interval. This is how often EventMachine will check for dead connections
Expand All @@ -1369,7 +1361,7 @@ def self.heartbeat_interval
#
# @param [Integer] time Heartbeat interval, in seconds
def self.heartbeat_interval=(time)
EM::set_heartbeat_interval time.to_f
set_heartbeat_interval time.to_f
end

private
Expand Down Expand Up @@ -1416,7 +1408,7 @@ def self.event_callback conn_binding, opcode, data # :nodoc:
raise NoHandlerForAcceptedConnection unless accep
c = accep.new data, *args
@conns[data] = c
blk and blk.call(c)
blk.call c if blk
c # (needed?)
##
# The remaining code is a fallback for the pure ruby and java reactors.
Expand Down Expand Up @@ -1452,28 +1444,29 @@ def self._open_file_for_writing filename, handler=nil # :nodoc:
s = _write_file filename
c = klass.new s
@conns[s] = c
block_given? and yield c
yield c if block_given?
c
end

private
def self.klass_from_handler(klass = Connection, handler = nil, *args)
klass = if handler and handler.is_a?(Class)
klass = if handler.is_a?(Class)
raise ArgumentError, "must provide module or subclass of #{klass.name}" unless klass >= handler
handler
elsif handler
begin
if handler.const_defined?(:EM_CONNECTION_CLASS)
handler::EM_CONNECTION_CLASS
rescue NameError
handler::const_set(:EM_CONNECTION_CLASS, Class.new(klass) {include handler})
else
handler.const_set :EM_CONNECTION_CLASS, Class.new(klass) { include handler }
end
else
klass
end

arity = klass.instance_method(:initialize).arity
arity = klass.instance_method(:initialize).arity
expected = arity >= 0 ? arity : -(arity + 1)
if (arity >= 0 and args.size != expected) or (arity < 0 and args.size < expected)

if arity >= 0 && args.size != expected or arity < 0 && args.size < expected
raise ArgumentError, "wrong number of arguments for #{klass}#initialize (#{args.size} for #{expected})"
end

Expand Down