Permalink
Browse files

initialize socket options

  • Loading branch information...
1 parent bf278f7 commit 0112facd9e720fb2909e2c16c232af6fa4f09c67 @igrigorik committed Nov 15, 2010
Showing with 18 additions and 5 deletions.
  1. +18 −2 lib/zdevice.rb
  2. +0 −1 spec/relay_spec.rb
  3. +0 −2 spec/zdevice_spec.rb
View
@@ -60,7 +60,7 @@ def initialize(name, ctx, conf = {}, &blk)
@sockets[name] = ZSocket.new(name, ctx, c)
(class << self; self; end).class_eval do
define_method "#{name}" do
- @sockets[name]
+ @sockets[name].socket
end
end
end
@@ -82,15 +82,31 @@ def initialize(name, ctx = nil, conf = {})
raise 'missing type' if !conf.key? :type
@name = name
+ conf[:option] ||= {}
@type = case conf.delete(:type).downcase
when :pub then ZMQ::PUB
when :sub then ZMQ::SUB
else 1
end
+ # if no filter is specified, then accept all messages by default
+ if @type == ZMQ::SUB
+ conf[:option][:subscribe] = '' if !conf[:option][:subscribe]
+ end
+
@socket = ctx.socket @type
(conf[:bind] || []).each { |addr| @socket.bind addr }
- (conf[:connect] || []).each { |addr| @socket.connect(addr); @socket.setsockopt(ZMQ::SUBSCRIBE, '') }
+ (conf[:connect] || []).each { |addr| @socket.connect(addr) }
+
+ conf[:option].each do |k, v|
+ flag = case k
+ when :subscribe then ZMQ::SUBSCRIBE
+ when :hwm then ZMQ::HWM
+ when :swap then ZMQ::SWAP
+ end
+
+ [v].flatten.map {|val| @socket.setsockopt(flag, val) }
+ end
end
def close
View
@@ -43,7 +43,6 @@
Thread.new do
loop do
pub.send ZMQ::Message.new("queue test")
- sleep(1)
end
end
@@ -58,8 +58,6 @@
s.type.should >= 0
s.close
end
-
- it "should setup subscription filters"
end
end

0 comments on commit 0112fac

Please sign in to comment.