Skip to content

Commit

Permalink
Merge 7011b6b into 8250553
Browse files Browse the repository at this point in the history
  • Loading branch information
Donovan Keme committed Aug 7, 2015
2 parents 8250553 + 7011b6b commit 6f6af77
Show file tree
Hide file tree
Showing 25 changed files with 365 additions and 266 deletions.
4 changes: 4 additions & 0 deletions .env-ci
@@ -0,0 +1,4 @@
CELLULOID_SPECS_LOG_STRATEGY=stderr
CELLULOID_SPECS_LOG_LEVEL=3
CELLULOID_SPECS_LOG_FILE=log/ci.log
CELLULOID_SPECS_LOG_SYNC=false
4 changes: 4 additions & 0 deletions .env-dev
@@ -0,0 +1,4 @@
CELLULOID_SPECS_LOG_STRATEGY=single
CELLULOID_SPECS_LOG_FILE=log/test.log
CELLULOID_SPECS_LOG_LEVEL=0
CELLULOID_SPECS_LOG_SYNC=true
3 changes: 3 additions & 0 deletions .gitmodules
@@ -0,0 +1,3 @@
[submodule "culture"]
path = culture
url = http://github.com/celluloid/culture.git
2 changes: 0 additions & 2 deletions .travis.yml
Expand Up @@ -12,8 +12,6 @@ rvm:
matrix:
fast_finish: true
allow_failures:
- rvm: jruby
- rvm: rbx-2
- rvm: ruby-head
- rvm: jruby-head

Expand Down
20 changes: 13 additions & 7 deletions CHANGES.md
@@ -1,37 +1,43 @@
0.70.0
-----
* Adapted to be compliant with version 0.17.0 of Celluloid.
* Added `write_to` for use with `Router` sockets.
* Added more direct set/get of socket identity.

0.16.1 (2015-04-26)
-------------------
-----
* Support for XPUB sockets
* Support for reading multipart messages
* Spec cleanup

0.16.0 (2014-09-04)
-------------------
-----
* Support for setting socket options
* More specs

0.15.0 (2013-09-04)
-------------------
-----
* Tracking release for Celluloid 0.15

0.14.0 (2013-05-07)
-------------------
-----
* Add pubsub example
* Add identity support to Sockets
* Depend on EventedMailbox from core instead of celluloid-io
* Remove overhead for IO waiting by calling directly to the reactor

0.13.0
------
-----
* Feature: Support for DealerSocket and RouterSocket
* Support for the #more_parts? method on sockets
* Celluloid 0.13 compatibility fixes

0.12.0
------
-----
* Tracking release for Celluloid 0.12.0

0.10.0
------
-----
* Factor celluloid-zmq into its own gem
* #linger= support

Expand Down
10 changes: 2 additions & 8 deletions Gemfile
@@ -1,8 +1,2 @@
source 'http://rubygems.org'

gem 'coveralls', require: false
gem 'celluloid', github: 'celluloid/celluloid', branch: 'master'
gem 'transpec', github: 'yujinakayama/transpec', tag: 'v3.1.0'

# Specify your gem's dependencies in celluloid-zmq.gemspec
gemspec
require File.expand_path("../culture/sync", __FILE__)
Celluloid::Sync::Gemfile[self]
7 changes: 2 additions & 5 deletions celluloid-zmq.gemspec
@@ -1,5 +1,5 @@
# -*- encoding: utf-8 -*-
require File.expand_path('../lib/celluloid/zmq/version', __FILE__)
require File.expand_path("../culture/sync", __FILE__)

Gem::Specification.new do |gem|
gem.authors = ["Tony Arcieri"]
Expand All @@ -12,13 +12,10 @@ Gem::Specification.new do |gem|
gem.name = "celluloid-zmq"
gem.version = Celluloid::ZMQ::VERSION

gem.add_dependency "celluloid", "~> 0.16"
Celluloid::Sync::Gemspec[gem]
gem.add_dependency "ffi"
gem.add_dependency "ffi-rzmq"

gem.add_development_dependency "rake"
gem.add_development_dependency "rspec", "~> 3.0"

# Files
ignores = File.read(".gitignore").split(/\r?\n/).reject{ |f| f =~ /^(#.+|\s*)$/ }.map {|f| Dir[f] }.flatten
gem.files = (Dir['**/*','.gitignore'] - ignores).reject {|f| !File.file?(f) }
Expand Down
1 change: 1 addition & 0 deletions culture
Submodule culture added at 5f9252
14 changes: 8 additions & 6 deletions examples/publish_subscribe.rb
@@ -1,16 +1,18 @@
require 'celluloid/zmq'
require 'celluloid/zmq/current'

Celluloid::ZMQ.init

class PublishSubscribe
include Celluloid::ZMQ

def run
link = "tcp://127.0.0.1:5555"

s1 = PubSocket.new
s2 = SubSocket.new
s3 = SubSocket.new
s4 = SubSocket.new
s5 = SubSocket.new
s1 = Socket::Pub.new
s2 = Socket::Sub.new
s3 = Socket::Sub.new
s4 = Socket::Sub.new
s5 = Socket::Sub.new

s1.linger = 100
s2.subscribe('') # receive all
Expand Down
20 changes: 17 additions & 3 deletions lib/celluloid/zmq.rb
@@ -1,16 +1,23 @@
require 'ffi-rzmq'

require 'celluloid'
$CELLULOID_ZMQ_BACKPORTED = (ENV["CELLULOID_ZMQ_BACKPORTED"] != "false") unless defined?($CELLULOID_ZMQ_BACKPORTED)

require ($CELLULOID_ZMQ_BACKPORTED) ? 'celluloid' : 'celluloid/current'

require 'celluloid/zmq/mailbox'
require 'celluloid/zmq/reactor'
require 'celluloid/zmq/sockets'
require 'celluloid/zmq/socket'
require 'celluloid/zmq/version'
require 'celluloid/zmq/waker'

require 'celluloid/zmq/socket/readable'
require 'celluloid/zmq/socket/writable'
require 'celluloid/zmq/socket/types'

module Celluloid
# Actors which run alongside 0MQ sockets
module ZMQ
UninitializedError = Class.new StandardError
class UninitializedError < Celluloid::Error; end

class << self
attr_writer :context
Expand Down Expand Up @@ -65,5 +72,12 @@ def wait_writable(socket)
end
module_function :wait_writable

def result_ok?(result)
::ZMQ::Util.resultcode_ok?(result)
end
module_function :result_ok?

end
end

require 'celluloid/zmq/deprecate' unless $CELLULOID_BACKPORTED == false || $CELLULOID_ZMQ_BACKPORTED == false
2 changes: 2 additions & 0 deletions lib/celluloid/zmq/current.rb
@@ -0,0 +1,2 @@
$CELLULOID_ZMQ_BACKPORTED = false
require "celluloid/zmq"
15 changes: 15 additions & 0 deletions lib/celluloid/zmq/deprecate.rb
@@ -0,0 +1,15 @@
module Celluloid
module ZMQ
ReadableSocket = Socket::Readable
WritableSocket = Socket::Writable
RepSocket = Socket::Rep
ReqSocket = Socket::Req
DealerSocket = Socket::Dealer
RouterSocket = Socket::Router
PushSocket = Socket::Push
PullSocket = Socket::Pull
PubSocket = Socket::Pub
XPubSocket = Socket::XPub
SubSocket = Socket::Sub
end
end
2 changes: 1 addition & 1 deletion lib/celluloid/zmq/mailbox.rb
@@ -1,7 +1,7 @@
module Celluloid
module ZMQ
# Replacement mailbox for Celluloid::ZMQ actors
class Mailbox < Celluloid::EventedMailbox
class Mailbox < Celluloid::Mailbox::Evented
def initialize
super(Reactor)
end
Expand Down
7 changes: 4 additions & 3 deletions lib/celluloid/zmq/reactor.rb
Expand Up @@ -3,10 +3,11 @@ module ZMQ
# React to incoming 0MQ and Celluloid events. This is kinda sorta supposed
# to resemble the Reactor design pattern.
class Reactor
extend Forwardable

extend Forwardable
def_delegator :@waker, :signal, :wakeup
def_delegator :@waker, :cleanup, :shutdown
def_delegator ZMQ, :result_ok?

def initialize
@waker = Waker.new
Expand All @@ -15,7 +16,7 @@ def initialize
@writers = {}

rc = @poller.register @waker.socket, ::ZMQ::POLLIN
unless ::ZMQ::Util.resultcode_ok? rc
unless result_ok? rc
raise "0MQ poll error: #{::ZMQ::Util.error_string}"
end
end
Expand Down Expand Up @@ -55,7 +56,7 @@ def run_once(timeout = nil)

rc = @poller.poll(timeout)

unless ::ZMQ::Util.resultcode_ok? rc
unless result_ok? rc
raise IOError, "0MQ poll error: #{::ZMQ::Util.error_string}"
end

Expand Down
75 changes: 75 additions & 0 deletions lib/celluloid/zmq/socket.rb
@@ -0,0 +1,75 @@
module Celluloid
module ZMQ
class Socket
extend Forwardable
def_delegator ZMQ, :result_ok?
# Create a new socket
def initialize(type)
@socket = Celluloid::ZMQ.context.socket ::ZMQ.const_get(type.to_s.upcase)
@linger = 0
end
attr_reader :linger

# Connect to the given 0MQ address
# Address should be in the form: tcp://1.2.3.4:5678/
def connect(addr)
unless result_ok? @socket.connect addr
raise IOError, "error connecting to #{addr}: #{::ZMQ::Util.error_string}"
end
true
end

def linger=(value)
@linger = value || -1

unless result_ok? @socket.setsockopt(::ZMQ::LINGER, value)
raise IOError, "couldn't set linger: #{::ZMQ::Util.error_string}"
end
end

def identity=(value)
unless result_ok? @socket.setsockopt(::ZMQ::IDENTITY, "#{value}")
raise IOError, "couldn't set identity: #{::ZMQ::Util.error_string}"
end
#de @socket.identity = value
end

def identity
#de @socket.identity
get(::ZMQ::IDENTITY)
end

def set(option, value, length = nil)
unless result_ok? @socket.setsockopt(option, value, length)
raise IOError, "couldn't set value for option #{option}: #{::ZMQ::Util.error_string}"
end
end

def get(option)
option_value = []

unless result_ok? @socket.getsockopt(option, option_value)
raise IOError, "couldn't get value for option #{option}: #{::ZMQ::Util.error_string}"
end

option_value[0]
end

# Bind to the given 0MQ address
# Address should be in the form: tcp://1.2.3.4:5678/
def bind(addr)
unless result_ok? @socket.bind(addr)
raise IOError, "couldn't bind to #{addr}: #{::ZMQ::Util.error_string}"
end
end

# Close the socket
def close
@socket.close
end

# Hide ffi-rzmq internals
alias_method :inspect, :to_s
end
end
end
46 changes: 46 additions & 0 deletions lib/celluloid/zmq/socket/readable.rb
@@ -0,0 +1,46 @@
module Celluloid
module ZMQ
class Socket
# Readable 0MQ sockets have a read method
module Readable
extend Forwardable
def_delegator ZMQ, :result_ok?

# always set LINGER on readable sockets
def bind(addr)
self.linger = @linger
super(addr)
end

def connect(addr)
self.linger = @linger
super(addr)
end

# Read a message from the socket
def read(buffer = '')
ZMQ.wait_readable(@socket) if ZMQ.evented?

unless result_ok? @socket.recv_string buffer
raise IOError, "error receiving ZMQ string: #{::ZMQ::Util.error_string}"
end
buffer
end

# Multiparts message ?
def_delegator :@socket, :more_parts?

# Reads a multipart message, stores it into the given buffer and returns
# the buffer.
def read_multipart(buffer = [])
ZMQ.wait_readable(@socket) if ZMQ.evented?

unless result_ok? @socket.recv_strings buffer
raise IOError, "error receiving ZMQ string: #{::ZMQ::Util.error_string}"
end
buffer
end
end
end
end
end

0 comments on commit 6f6af77

Please sign in to comment.