Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

several changes

- print a warning if the reactor is modified from a non-reactor
thread.

- start the logging client from the reactor thread to avoid some
startup race conditions

- capture all Exceptions rather than just those that descend from
StandardError.
  • Loading branch information...
commit 5ad1ab324c8bc5e9c1b51267893290e01cadbe36 1 parent c5977b9
Chuck Remes authored
View
133 lib/zm/reactor.rb
@@ -79,10 +79,10 @@ class Reactor
#
def initialize configuration = nil
configuration ||= Configuration.new
- @name = configuration.name || 'unnamed'
+ @name = configuration.name.to_s
@running = false
@thread = nil
- @poll_interval = determine_interval(configuration.poll_interval || 10)
+ @poll_interval = determine_interval(configuration.poll_interval)
@proc_queue = []
@proc_queue_mutex = Mutex.new
@@ -101,13 +101,11 @@ def initialize configuration = nil
@raw_to_socket = {}
Thread.abort_on_exception = true
- if configuration.log_endpoint
- @logger = LogClient.new self, configuration.log_endpoint
- @logging_enabled = true
- end
+ @log_endpoint = configuration.log_endpoint
+ @logging_enabled = @log_endpoint ? true : false
- @exception_handler = configuration.exception_handler if configuration.exception_handler
- @timers = ZMQMachine::Timers.new(@exception_handler)
+ @exception_handler = configuration.exception_handler
+ @timers = ZMQMachine::Timers.new(self, @exception_handler)
end
def shared_context?
@@ -132,6 +130,10 @@ def run blk = nil, &block
@running, @stopping = true, false
@thread = Thread.new do
+ Thread.current["reactor-name"] = @name
+
+ start_log_client
+
blk.call self if blk
while !@stopping && running? do
@@ -210,12 +212,16 @@ def next_tick blk = nil, &block
# Returns +true+ for a succesful close, +false+ otherwise.
#
def close_socket sock
- return false unless sock
+ if reactor_thread?
+ return false unless sock
- removed = delete_socket sock
- sock.raw_socket.close
+ removed = delete_socket sock
+ sock.raw_socket.close
- removed
+ removed
+ else
+ false
+ end
end
# Creates a REQ socket and attaches +handler_instance+ to the
@@ -358,28 +364,36 @@ def pull_socket handler_instance
# reactor to call the handler's on_writable method.
#
def register_writable sock
- @poller.register_writable sock.raw_socket
+ if reactor_thread?
+ @poller.register_writable sock.raw_socket
+ end
end
# Deregisters the +sock+ for POLLOUT. The handler will no longer
# receive calls to on_writable.
#
def deregister_writable sock
- @poller.deregister_writable sock.raw_socket
+ if reactor_thread?
+ @poller.deregister_writable sock.raw_socket
+ end
end
# Registers the +sock+ for POLLIN events that will cause the
# reactor to call the handler's on_readable method.
#
def register_readable sock
- @poller.register_readable sock.raw_socket
+ if reactor_thread?
+ @poller.register_readable sock.raw_socket
+ end
end
# Deregisters the +sock+ for POLLIN events. The handler will no longer
# receive calls to on_readable.
#
def deregister_readable sock
- @poller.deregister_readable sock.raw_socket
+ if reactor_thread?
+ @poller.deregister_readable sock.raw_socket
+ end
end
# Creates a timer that will fire a single time. Expects either a
@@ -390,7 +404,7 @@ def deregister_readable sock
#
def oneshot_timer delay, timer_proc = nil, &blk
blk ||= timer_proc
- @timers.add_oneshot delay, blk
+ timer = @timers.add_oneshot delay, blk
end
# Creates a timer that will fire once at a specific
@@ -399,8 +413,10 @@ def oneshot_timer delay, timer_proc = nil, &blk
# +exact_time+ may be either a Time object or a Numeric.
#
def oneshot_timer_at exact_time, timer_proc = nil, &blk
- blk ||= timer_proc
- @timers.add_oneshot_at exact_time, blk
+ if reactor_thread?
+ blk ||= timer_proc
+ timer = @timers.add_oneshot_at exact_time, blk
+ end
end
# Creates a timer that will fire every +delay+ milliseconds until
@@ -411,8 +427,10 @@ def oneshot_timer_at exact_time, timer_proc = nil, &blk
# milliseconds)
#
def periodical_timer delay, timer_proc = nil, &blk
- blk ||= timer_proc
- @timers.add_periodical delay, blk
+ if reactor_thread?
+ blk ||= timer_proc
+ timer = @timers.add_periodical delay, blk
+ end
end
# Cancels an existing timer if it hasn't already fired.
@@ -420,7 +438,9 @@ def periodical_timer delay, timer_proc = nil, &blk
# Returns true if cancelled, false if otherwise.
#
def cancel_timer timer
- @timers.cancel timer
+ if reactor_thread?
+ @timers.cancel timer
+ end
end
# Asks all timers to reschedule themselves starting from Timers.now.
@@ -433,10 +453,11 @@ def reschedule_timers
end
def list_timers
- @timers.list.each do |timer|
- name = timer.respond_to?(:name) ? timer.timer_proc.name : timer.timer_proc.to_s
- puts "fire time [#{Time.at(timer.fire_time / 1000)}], method [#{name}]"
+ list = @timers.list
+ list.each do |timer|
+ log :timer, timer.to_s
end
+ log(:timer, "No timers for reactor [#{@name}]") if list.empty?
end
def open_socket_count kind = :all
@@ -469,10 +490,34 @@ def open_socket_count kind = :all
#
def log level, message
if @logging_enabled
- @logger.write level, message
+ if reactor_thread?
+ @logger.write level, message
+ end
+ end
+ end
+
+ def reactor_thread?
+ unless thread_match?
+ str = "Reactor violation! Accessing reactor from a non-reactor thread!\n"
+ str << "Expected reactor thread [#{@name}] but got [#{self.class.current_thread_name}]\n"
+ str << "Begin backtrace:\n"
+ str << caller.join("\n")
+ str << "\nEnd backtrace.\n"
+ STDERR.print(str)
+ false
+ else
+ true
end
end
+ def thread_match?
+ @name == Reactor.current_thread_name
+ end
+
+ def self.current_thread_name
+ Thread.current['reactor-name']
+ end
+
private
@@ -481,7 +526,7 @@ def run_once
run_procs
run_timers
poll
- rescue => e
+ rescue Exception => e
if @exception_handler
@exception_handler.call(e)
else
@@ -493,6 +538,7 @@ def run_once
# Close each open socket and terminate the reactor context; this will
# release the native memory backing each of these objects
def cleanup
+ log(:info, "#{self.class}, Cleanup called, exiting reactor loop.")
@proc_queue_mutex.synchronize { @proc_queue.clear }
# work on a dup since #close_socket deletes from @sockets
@@ -542,6 +588,9 @@ def poll
reactor_socket = @raw_to_socket[sock]
reactor_socket.resume_write if reactor_socket
end
+
+ else
+ STDERR.print("#{self.class}, Poll returned an error, errno [#{ZMQ::Util.errno}] desc [#{ZMQ::Util.error_string}]\n")
end
end
@@ -549,16 +598,18 @@ def poll
end
def create_socket handler_instance, kind
- sock = nil
-
- begin
- sock = kind.new @context, handler_instance
- save_socket sock
- rescue ZMQ::ContextError => e
+ if reactor_thread?
sock = nil
- end
- sock
+ begin
+ sock = kind.new @context, handler_instance
+ save_socket sock
+ rescue ZMQ::ContextError => e
+ sock = nil
+ end
+
+ sock
+ end
end
def save_socket sock
@@ -570,14 +621,13 @@ def save_socket sock
# Returns true when all steps succeed, false otherwise
#
def delete_socket sock
- poll_deleted = @poller.delete(sock.raw_socket)
- sockets_deleted = @sockets.delete(sock)
- ffi_deleted = @raw_to_socket.delete(sock.raw_socket)
+ poll_deleted = @poller.delete(sock.raw_socket) ? true : false
+ sockets_deleted = @sockets.delete(sock) ? true : false
+ ffi_deleted = @raw_to_socket.delete(sock.raw_socket) ? true : false
poll_deleted && sockets_deleted && ffi_deleted
end
-
# Unnecessary to convert the number to microseconds; the ffi-rzmq
# library does this for us.
#
@@ -586,6 +636,11 @@ def determine_interval interval
interval <= 0 ? 1.0 : interval.to_i
end
+ def start_log_client
+ if @logging_enabled
+ @logger = LogClient.new self, @log_endpoint
+ end
+ end
end # class Reactor
View
17 lib/zm/server/base.rb
@@ -17,7 +17,7 @@ def initialize configuration
def shutdown
@reactor.log :debug, "#{self.class}#shutdown_socket, closing reactor socket"
@on_read = nil
- @reactor.close_socket @socket
+ @reactor.close_socket(@socket)
end
def on_attach socket
@@ -47,9 +47,11 @@ def on_attach socket
# received*.
#
def write messages, verbose = false
- @verbose = verbose
- @message_queue << messages
- write_queue_to_socket
+ if @reactor.reactor_thread?
+ @verbose = verbose
+ @message_queue << messages
+ write_queue_to_socket
+ end
end
# Prints each message when global debugging is enabled.
@@ -57,7 +59,11 @@ def write messages, verbose = false
# Forwards +messages+ on to the :on_read callback given in the constructor.
#
def on_readable socket, messages
- @on_read.call socket, messages
+ if @reactor.reactor_thread?
+ @on_read.call socket, messages
+ else
+ STDERR.print("error, #{self.class} Thread violation! Expected [#{Reactor.current_thread_name}] but got [#{Thread.current['reactor-name']}]\n")
+ end
close_messages messages
end
@@ -114,6 +120,7 @@ def write_queue_to_socket
elsif ZMQ::Util.errno == ZMQ::EAGAIN
# schedule another write attempt in 10 ms; break out of the loop
@reactor.log :debug, "#{self.class}#write_queue_to_socket, failed to write messages; scheduling next attempt"
+ STDERR.print("debug, #{self.class}#write_queue_to_socket, failed to write messages; scheduling next attempt\n")
@reactor.oneshot_timer 10, method(:write_queue_to_socket)
break
end
View
6 lib/zm/server/rep.rb
@@ -18,7 +18,11 @@ module XREP
include Base
def on_readable socket, messages, envelope
- @on_read.call socket, messages, envelope
+ if @reactor.reactor_thread?
+ @on_read.call socket, messages, envelope
+ else
+ STDERR.print("error, #{self.class} Thread violation! Expected [#{Reactor.current_thread_name}] but got [#{Thread.current['reactor-name']}]\n")
+ end
close_messages(envelope + messages)
end
View
6 lib/zm/server/req.rb
@@ -18,7 +18,11 @@ module XREQ
include Base
def on_readable socket, messages, envelope
- @on_read.call socket, messages, envelope
+ if @reactor.reactor_thread?
+ @on_read.call socket, messages, envelope
+ else
+ STDERR.print("error, #{self.class} Thread violation! Expected [#{Reactor.current_thread_name}] but got [#{Thread.current['reactor-name']}]\n")
+ end
close_messages(envelope + messages)
end
View
11 lib/zm/sockets/base.rb
@@ -108,8 +108,8 @@ def connect address
# ZMQ::Util.errno to check for errors.
#
def send_message message, multipart = false
- flag = multipart ? (ZMQ::SNDMORE | ZMQ::Util.nonblocking_flag) : ZMQ::Util.nonblocking_flag
- @raw_socket.send(message, flag)
+ flag = multipart ? (ZMQ::SNDMORE | ZMQ::NonBlocking) : ZMQ::NonBlocking
+ @raw_socket.sendmsg(message, flag)
end
# Convenience method to send a string on the socket. It handles
@@ -119,7 +119,7 @@ def send_message message, multipart = false
# details on the error.
#
def send_message_string message, multipart = false
- @raw_socket.send_string message, ZMQ::Util.nonblocking_flag | (multipart ? ZMQ::SNDMORE : 0)
+ @raw_socket.send_string message, ZMQ::NonBlocking | (multipart ? ZMQ::SNDMORE : 0)
end
# Convenience method for sending a multi-part message. The
@@ -152,7 +152,7 @@ def resume_read
while ZMQ::Util.resultcode_ok?(rc) && more
parts = []
- rc = @raw_socket.recvmsgs parts, ZMQ::Util.nonblocking_flag
+ rc = @raw_socket.recvmsgs parts, ZMQ::NonBlocking
if ZMQ::Util.resultcode_ok?(rc)
@handler.on_readable self, parts
@@ -161,7 +161,10 @@ def resume_read
if eagain?
more = false
elsif valid_socket_error?
+ STDERR.print("#{self.class} Received a valid socket error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
@handler.on_readable_error self, rc
+ else
+ STDERR.print("#{self.class} Unhandled read error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
end
end
end
View
5 lib/zm/sockets/envelope_help.rb
@@ -28,7 +28,7 @@ def resume_read
while ZMQ::Util.resultcode_ok?(rc) && more
parts, envelope = [], []
- rc = @raw_socket.recv_multipart parts, envelope, ZMQ::Util.nonblocking_flag
+ rc = @raw_socket.recv_multipart parts, envelope, ZMQ::NonBlocking
if ZMQ::Util.resultcode_ok?(rc)
@handler.on_readable self, parts, envelope
@@ -37,7 +37,10 @@ def resume_read
if eagain?
more = false
elsif valid_socket_error?
+ STDERR.print("#{self.class} Received a valid socket error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
@handler.on_readable_error self, rc
+ else
+ STDERR.print("#{self.class} Unhandled read error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
end
end
end
View
2  lib/zm/sockets/rep.rb
@@ -50,7 +50,7 @@ def initialize context, handler
# Attach a handler to the REP socket.
#
- # A REP socket must alternate between recv.send (i.e.
+ # A REP socket must alternate between recv.sendmsg (i.e.
# it cannot receive twice in a row without an intervening
# send). This socket expects its +handler+ to
# implement at least the #on_readable method. This method

0 comments on commit 5ad1ab3

Please sign in to comment.
Something went wrong with that request. Please try again.