Permalink
Browse files

abandoning project so send up all changes in my local repository

  • Loading branch information...
1 parent 54fa8aa commit 4c1eaa1150509d5c96ca8cbe77127e057aaac702 @chuckremes committed Jan 9, 2013
Showing with 89 additions and 43 deletions.
  1. +19 −0 History.txt
  2. +0 −1 lib/zm/log_server.rb
  3. +2 −1 lib/zm/reactor.rb
  4. +15 −3 lib/zm/server/sub.rb
  5. +27 −18 lib/zm/sockets/base.rb
  6. +19 −17 lib/zm/sockets/envelope_help.rb
  7. +4 −0 lib/zm/sockets/sub.rb
  8. +1 −1 version.txt
  9. +2 −2 zmqmachine.gemspec
View
19 History.txt
@@ -1,3 +1,22 @@
+== 2013-01-09
+ * Project has been abandoned by author. Use celluloid, dcell
+ or celluloid-io instead.
+
+== 0.8.1 / 2012-02-03
+ * Move socket close functionality into Socket::Base module.
+ No longer exposes internal socket structure to outside
+ objects. Also, sets internal raw socket to nil after closing.
+ This can be checked by #resume_read & #resume_write so that
+ we don't try to read/write to a socket that has been closed.
+ This can happen if we close a socket during another socket's
+ callback; the closed socket may have data on it that we
+ haven't yet processed in Reactor#poll, but by closing it we
+ have already determined we don't want the data.
+
+ * Add some sugar to the Socket::Sub class for adding and
+ removing subscription filters. We no longer default to
+ subscribing to *all* data when creating one of these sockets.
+
== 0.8.0 / 2012-01-19
* Fixed some issues so this gem is compatible with ffi-rzmq
version 0.9.5 at a minimum.
View
1 lib/zm/log_server.rb
@@ -62,7 +62,6 @@ def on_read socket, messages
def write messages
# no op
end
-
end # class LogServer
end
View
3 lib/zm/reactor.rb
@@ -216,7 +216,7 @@ def close_socket sock
return false unless sock
removed = delete_socket sock
- sock.raw_socket.close
+ sock.close
removed
else
@@ -633,6 +633,7 @@ def delete_socket sock
#
def determine_interval interval
# set a lower bound of 1 millisec so we don't burn up the CPU
+ interval ||= 10
interval <= 0 ? 1.0 : interval.to_i
end
View
18 lib/zm/server/sub.rb
@@ -7,14 +7,22 @@ module SUB
include Base
def initialize configuration
- @topic = configuration.topic || ''
+ @topic = configuration.topic
super
end
def write messages
# no op
close messages
end
+
+ def add_subscription_filter(string)
+ subscribe(@socket, string)
+ end
+
+ def remove_subscription_filter(string)
+ unsubscribe(@socket, string)
+ end
private
@@ -24,13 +32,17 @@ def allocate_socket
end
def register_for_events socket
- subscribe socket, @topic
+ subscribe socket, @topic if @topic
super
end
def subscribe socket, topic
- rc = socket.subscribe topic
+ rc = socket.subscribe topic.to_s
+ end
+
+ def unsubscribe socket, topic
+ rc = socket.unsubscribe topic.to_s
end
View
45 lib/zm/sockets/base.rb
@@ -81,6 +81,11 @@ def attach handler
handler.on_attach self
end
+ def close
+ @raw_socket.close
+ @raw_socket = nil
+ end
+
# Creates a 0mq socket endpoint for the transport given in the
# +address+. Other 0mq sockets may then #connect to this bound
# endpoint.
@@ -147,24 +152,26 @@ def identity=(value) @raw_socket.identity = value; end
# Used by the reactor. Never called by user code.
#
def resume_read
- rc = 0
- more = true
-
- while ZMQ::Util.resultcode_ok?(rc) && more
- parts = []
- rc = @raw_socket.recvmsgs parts, ZMQ::NonBlocking
-
- if ZMQ::Util.resultcode_ok?(rc)
- @handler.on_readable self, parts
- else
- # verify errno corresponds to EAGAIN
- if eagain?
- more = false
- elsif valid_socket_error?
- STDERR.print("#{self.class} Received a valid socket error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
- @handler.on_readable_error self, rc
+ if @raw_socket
+ rc = 0
+ more = true
+
+ while ZMQ::Util.resultcode_ok?(rc) && more
+ parts = []
+ rc = @raw_socket.recvmsgs parts, ZMQ::NonBlocking
+
+ if ZMQ::Util.resultcode_ok?(rc)
+ @handler.on_readable self, parts
else
- STDERR.print("#{self.class} Unhandled read error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
+ # verify errno corresponds to EAGAIN
+ if eagain?
+ more = false
+ elsif valid_socket_error?
+ STDERR.print("#{self.class} Received a valid socket error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
+ @handler.on_readable_error self, rc
+ else
+ STDERR.print("#{self.class} Unhandled read error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
+ end
end
end
end
@@ -173,7 +180,9 @@ def resume_read
# Used by the reactor. Never called by user code.
#
def resume_write
- @handler.on_writable self
+ if @raw_socket
+ @handler.on_writable self
+ end
end
def inspect
View
36 lib/zm/sockets/envelope_help.rb
@@ -23,24 +23,26 @@ def send_messages messages, envelope = nil
# Used by the reactor. Never called by user code.
#
def resume_read
- rc = 0
- more = true
-
- while ZMQ::Util.resultcode_ok?(rc) && more
- parts, envelope = [], []
- rc = @raw_socket.recv_multipart parts, envelope, ZMQ::NonBlocking
-
- if ZMQ::Util.resultcode_ok?(rc)
- @handler.on_readable self, parts, envelope
- else
- # verify errno corresponds to EAGAIN
- if eagain?
- more = false
- elsif valid_socket_error?
- STDERR.print("#{self.class} Received a valid socket error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
- @handler.on_readable_error self, rc
+ if @raw_socket
+ rc = 0
+ more = true
+
+ while ZMQ::Util.resultcode_ok?(rc) && more
+ parts, envelope = [], []
+ rc = @raw_socket.recv_multipart parts, envelope, ZMQ::NonBlocking
+
+ if ZMQ::Util.resultcode_ok?(rc)
+ @handler.on_readable self, parts, envelope
else
- STDERR.print("#{self.class} Unhandled read error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
+ # verify errno corresponds to EAGAIN
+ if eagain?
+ more = false
+ elsif valid_socket_error?
+ STDERR.print("#{self.class} Received a valid socket error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
+ @handler.on_readable_error self, rc
+ else
+ STDERR.print("#{self.class} Unhandled read error [#{ZMQ::Util.errno}], [#{ZMQ::Util.error_string}]\n")
+ end
end
end
end
View
4 lib/zm/sockets/sub.rb
@@ -73,6 +73,10 @@ def subscribe topic
def subscribe_all
subscribe ''
end
+
+ def unsubscribe topic
+ @raw_socket.setsockopt(ZMQ::UNSUBSCRIBE, topic)
+ end
private
View
2 version.txt
@@ -1 +1 @@
-0.8.0
+0.8.1
View
4 zmqmachine.gemspec
@@ -2,7 +2,7 @@
Gem::Specification.new do |s|
s.name = %q{zmqmachine}
- s.version = "0.8.0"
+ s.version = "0.8.1"
s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
s.authors = ["Chuck Remes"]
@@ -20,7 +20,7 @@ and asynchronous.
It is possible to extend the 0mq library to "poll" normal file
descriptors. This isn't on my roadmap but patches are accepted.}
- s.email = %q{cremes@mac.com}
+ s.email = %q{git@chuckremes.com}
s.extra_rdoc_files = ["History.txt", "README.rdoc", "version.txt"]
s.files = `git ls-files`.split("\n")
s.homepage = %q{http://github.com/chuckremes/zmqmachine}

0 comments on commit 4c1eaa1

Please sign in to comment.