Skip to content

Commit

Permalink
Initial Work
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Jan 29, 2011
0 parents commit 66fe48d
Show file tree
Hide file tree
Showing 13 changed files with 350 additions and 0 deletions.
18 changes: 18 additions & 0 deletions .bnsignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# The list of files that should be ignored by Mr Bones.
# Lines that start with '#' are comments.
#
# A .gitignore file can be used instead by setting it as the ignore
# file in your Rakefile:
#
# Bones {
# ignore_file '.gitignore'
# }
#
# For a project with a C extension, the following would be a good set of
# exclude patterns (uncomment them if you want to use them):
# *.[oa]
# *~
announcement.txt
coverage
doc
pkg
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
*.swp
*.swo
*.swn

*.rbc
4 changes: 4 additions & 0 deletions History.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
== 1.0.0 / 2011-01-29

* 1 major enhancement
* Birthday!
45 changes: 45 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
## em-zeromq ##

Low level event machine support for ZeroMQ

# DESCRIPTION: #

WARNING THIS IS ALPHA QUALITY AT THE MOMENT

A minimal test case passes, the basics of this are working.
It needs work for sure, for instance, you cannot remove a socket yet


# Using: #

This only works with ZeroMQ 2.1.x which is still unreleased
Build+Install ZeroMQ 2.1 from HEAD ( https://github.com/zeromq/zeromq2 )

Run the specs, see specs for examples

Want to help out? Ask! Much work to be done here

# LICENSE: #

(The MIT License)

Copyright (c) 2009 FIXME (different license?)

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
'Software'), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
17 changes: 17 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

begin
require 'bones'
rescue LoadError
abort '### Please install the "bones" gem ###'
end

task :default => 'test:run'
task 'gem:release' => 'test:run'

Bones {
name 'em-zeromq'
authors 'Andrew Cholakian'
email 'andrew@andrewvc.com'
url 'https://github.com/andrewvc/em-zeromq'
}

7 changes: 7 additions & 0 deletions bin/em-zeromq
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env ruby

require File.expand_path(
File.join(File.dirname(__FILE__), %w[.. lib em-zeromq]))

# Put your code here

67 changes: 67 additions & 0 deletions lib/em-zeromq.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
require 'eventmachine'
require 'ffi-rzmq'

module EmZeromq

# :stopdoc:
LIBPATH = ::File.expand_path(::File.dirname(__FILE__)) + ::File::SEPARATOR
PATH = ::File.dirname(LIBPATH) + ::File::SEPARATOR
# :startdoc:

# Returns the version string for the library.
#
def self.version
@version ||= File.read(path('version.txt')).strip
end

# Returns the library path for the module. If any arguments are given,
# they will be joined to the end of the libray path using
# <tt>File.join</tt>.
#
def self.libpath( *args, &block )
rv = args.empty? ? LIBPATH : ::File.join(LIBPATH, args.flatten)
if block
begin
$LOAD_PATH.unshift LIBPATH
rv = block.call
ensure
$LOAD_PATH.shift
end
end
return rv
end

# Returns the lpath for the module. If any arguments are given,
# they will be joined to the end of the path using
# <tt>File.join</tt>.
#
def self.path( *args, &block )
rv = args.empty? ? PATH : ::File.join(PATH, args.flatten)
if block
begin
$LOAD_PATH.unshift PATH
rv = block.call
ensure
$LOAD_PATH.shift
end
end
return rv
end

# Utility method used to require all files ending in .rb that lie in the
# directory below this file that has the same name as the filename passed
# in. Optionally, a specific _directory_ name can be passed in such that
# the _filename_ does not have to be equivalent to the directory.
#
def self.require_all_libs_relative_to( fname, dir = nil )
dir ||= ::File.basename(fname, '.*')
search_me = ::File.expand_path(
::File.join(::File.dirname(fname), dir, '**', '*.rb'))

Dir.glob(search_me).sort.each {|rb| require rb}
end

end # module EmZeromq

EmZeromq.require_all_libs_relative_to(__FILE__)

42 changes: 42 additions & 0 deletions lib/em-zeromq/connection.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@

module EventMachine
module ZeroMQ
class Connection < EventMachine::Connection
attr_accessor :on_readable, :on_writable, :handler
attr_reader :socket, :socket_type, :address

def initialize(socket, socket_type, address, handler)
@socket = socket
@socket_type = socket_type
@handler = handler
@address = address
end

def notify_readable
return unless read_capable?
messages = []

#complete_messages = (@socket.getsockopt(ZMQ::EVENTS) & ZMQ::POLLIN) == ZMQ::POLLIN
while (msg = ZMQ::Message.new) && (@socket.recv(msg, ZMQ::NOBLOCK)) && msg.copy_out_string
messages << msg
end

@handler.on_readable(@socket, messages)
end

def notify_writable
@handler.on_writable(@socket)
end

private

def read_capable?
@read_capable ||= EM::ZeroMQ::READABLE_TYPES.include? @socket_type
end

def write_capable?
@write_capable ||= EM::ZeroMQ::WRITABLE_TYPES.include? @socket_type
end
end
end
end
30 changes: 30 additions & 0 deletions lib/em-zeromq/zeromq.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
module EventMachine
module ZeroMQ
READABLE_TYPES = [ZMQ::SUB, ZMQ::PULL, ZMQ::REQ, ZMQ::REP, ZMQ::XREQ, ZMQ::XREP, ZMQ::PAIR]
WRITABLE_TYPES = [ZMQ::PUB, ZMQ::PUSH, ZMQ::REQ, ZMQ::REP, ZMQ::XREQ, ZMQ::XREP, ZMQ::PAIR]

def self.create(context, socket_type, bind_or_connect, address, handler)
socket = context.socket socket_type

unless [:bind, :connect].include?(bind_or_connect)
raise ArgumentError, "Invalid Option '#{bind_or_connect}' try :bind or :connect"
end

if bind_or_connect == :bind
socket.bind address
else
socket.connect address
end

conn = EM.watch(socket.getsockopt(ZMQ::FD), EventMachine::ZeroMQ::Connection, socket, socket_type, address, handler)
conn.notify_readable = true if EM::ZeroMQ::READABLE_TYPES.include? socket_type

#Given the nature of ZMQ this isn't that useful, and will generally
#cause perf problems as it repeatedly triggers. If people really want to
#use it, they should do so explicitly
conn.notify_writable = false
conn
end

end
end
72 changes: 72 additions & 0 deletions spec/em-zeromq_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
require File.join(File.dirname(__FILE__), %w[spec_helper])

describe EventMachine::ZeroMQ do
class EMTestPullHandler
attr_reader :received
def initialize
@received = []
end
def on_readable(socket, messages)
@received += messages
end
end
class EMTestPushHandler
def initialize(&block)
@on_writable_callback = block
end
def on_writable(socket)
@on_writable_callback.call(socket) if @on_writable_callback
end
end

def make_pull(addr, b_or_c, handler=EMTestPullHandler.new)
EM::ZeroMQ.create SPEC_CTX, ZMQ::PULL, b_or_c, addr, handler
end

def make_push(addr, b_or_c, handler=EMTestPushHandler.new)
EM::ZeroMQ.create SPEC_CTX, ZMQ::PUSH, b_or_c, addr, handler
end

it "Should instantiate a connection given valid opts" do
pull_conn = nil
run_reactor do
pull_conn = make_pull(rand_addr, :bind, EMTestPullHandler.new)
end
pull_conn.should be_a(EventMachine::ZeroMQ::Connection)
end

describe "sending/receiving a single message via PUB/SUB" do
before(:all) do
results = {}
@test_message = test_message = "TMsg#{rand(999)}"

run_reactor(0.5) do
results[:pull_hndlr] = pull_hndlr = EMTestPullHandler.new
pull_conn = make_pull rand_addr, :bind, pull_hndlr
push_conn = make_push pull_conn.address, :connect

push_conn.socket.send_string test_message, ZMQ::NOBLOCK

EM::Timer.new(0.1) { results[:specs_ran] = true }
end

@results = results
end

it "should run completely" do
@results[:specs_ran].should be_true
end

it "should receive one message" do
@results[:pull_hndlr].received.length.should == 1
end

it "should receive the message as a ZMQ::Message" do
@results[:pull_hndlr].received.first.should be_a(ZMQ::Message)
end

it "should receive the message intact" do
@results[:pull_hndlr].received.first.copy_out_string.should == @test_message
end
end
end
42 changes: 42 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
require 'set'
Thread.abort_on_exception = true

require File.expand_path(
File.join(File.dirname(__FILE__), %w[.. lib em-zeromq]))

def run_reactor(time=0.2,&block)
Thread.new do
EM.run do
yield
end
end
sleep time
EM.stop rescue nil
sleep 0.1
end

USED_RAND_ADDRS = Set.new
def rand_addr(scheme='tcp')
addr = nil
loop do
case scheme
when 'tcp'
addr = "tcp://127.0.0.1:#{rand(10_000) + 20_000}"
when 'inproc'
addr = "inproc://testinp-#{rand(10_000) + 20_000}"
end

if USED_RAND_ADDRS.include? addr
next
else
USED_RAND_ADDRS << addr
break
end
end
addr
end

SPEC_CTX = ZMQ::Context.new 1
def spec_ctx
SPEC_CTX
end
Empty file added test/test_em-zeromq.rb
Empty file.
1 change: 1 addition & 0 deletions version.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
0.0.0

0 comments on commit 66fe48d

Please sign in to comment.