Skip to content

Commit

Permalink
Initial Celluloid::ZMQ::Socket API
Browse files Browse the repository at this point in the history
  • Loading branch information
tarcieri committed Feb 21, 2012
1 parent dd82398 commit 958c474
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 5 deletions.
11 changes: 6 additions & 5 deletions lib/celluloid/zmq.rb
Expand Up @@ -2,6 +2,7 @@

require 'celluloid/io'
require 'celluloid/zmq/reactor'
require 'celluloid/zmq/sockets'
require 'celluloid/zmq/version'
require 'celluloid/zmq/waker'

Expand All @@ -18,18 +19,18 @@ def included(klass)
end

# Obtain a 0MQ context (or lazily initialize it)
def context
def context(threads = 1)
return @context if @context
@context = ::ZMQ::Context.new(1)
at_exit { @context.close }
@context = ::ZMQ::Context.new(threads)
at_exit { @context.terminate }
@context
end
alias_method :init, :context
end

extend Forwardable

# Wait for the given IO object to become readable/writeable
def_delegators 'current_actor.mailbox.reactor',
:wait_readable, :wait_writeable
def_delegators 'current_actor.mailbox.reactor', :wait_readable, :wait_writeable
end
end
124 changes: 124 additions & 0 deletions lib/celluloid/zmq/sockets.rb
@@ -0,0 +1,124 @@
module Celluloid
module ZMQ
module ReadableSocket
# Read a message from the socket
def read(buffer = '')
Celluloid.current_actor.wait_readable(@socket) if evented?

unless ::ZMQ::Util.resultcode_ok? @socket.recv_string buffer
raise IOError, "error receiving ZMQ string: #{::ZMQ::Util.error_string}"
end
buffer
end
end

module WritableSocket
# Send a message to the socket
def send(message)
Celluloid.current_actor.wait_writable(@socket) if evented?

if ::ZMQ::Util.resultcode_ok? socket.send_string message
raise IOError, "error sending 0MQ message: #{::ZMQ::Util.error_string}"
end
message
end
alias_method :<<, :send
end

class Socket
# Create a new socket
def initialize(type)
@socket = Celluloid::ZMQ.context.socket ::ZMQ.const_get(type.to_s.upscase)

unless ::ZMQ::Util.resultcode_ok? @socket.setsockopt(::ZMQ::LINGER, 0)
@socket.close
raise IOError, "couldn't set ZMQ::LINGER: #{::ZMQ::Util.error_string}"
end
end

# Connect to the given 0MQ address
# Address should be in the form: tcp://1.2.3.4:5678/
def connect(addr)
unless ::ZMQ::Util.resultcode_ok? @socket.connect addr
raise IOError, "error connecting to #{addr}: #{::ZMQ::Util.error_string}"
end
end

# Bind to the given 0MQ address
# Address should be in the form: tcp://1.2.3.4:5678/
def bind(addr)
unless ::ZMQ::Util.resultcode_ok? @socket.bind(addr)
raise IOError, "couldn't bind to #{addr}: #{::ZMQ::Util.error_string}"
end
end

# Close the socket
def close
@socket.close
end

# Does the 0MQ socket support evented operation?
def evented?
actor = Thread.current[:actor]
actor && actor.mailbox.is_a?(Celluloid::IO::Mailbox) && actor.mailbox.reactor.is_a?(Celluloid::ZMQ::Reactor)
end

# Hide ffi-rzmq internals
alias_method :inspect, :to_s
end

# ReqSockets are the counterpart of RepSockets (REQ/REP)
class ReqSocket < Socket
include ReadableSocket

def initialize
super :req
end
end

# RepSockets are the counterpart of ReqSockets (REQ/REP)
class RepSocket < Socket
include WritableSocket

def initialize
super :rep
end
end

# PushSockets are the counterpart of PullSockets (PUSH/PULL)
class PushSocket < Socket
include WritableSocket

def initialize
super :push
end
end

# PullSockets are the counterpart of PushSockets (PUSH/PULL)
class PullSocket < Socket
include ReadableSocket

def initialize
super :pull
end
end

# PubSockets are the counterpart of SubSockets (PUB/SUB)
class PubSocket < Socket
include WritableSocket

def initialize
super :pub
end
end

# SubSockets are the counterpart of PubSockets (PUB/SUB)
class SubSocket < Socket
include ReadableSocket

def initialize
super :sub
end
end
end
end

0 comments on commit 958c474

Please sign in to comment.