Permalink
Browse files

Initial celluloid-zmq gem

  • Loading branch information...
0 parents commit 0673758991c8bf7c7cb26ee6f30585937ec2d60e @tarcieri tarcieri committed Dec 13, 2011
@@ -0,0 +1,17 @@
+*.gem
+*.rbc
+.bundle
+.config
+.yardoc
+Gemfile.lock
+InstalledFiles
+_yardoc
+coverage
+doc/
+lib/bundler/man
+pkg
+rdoc
+spec/reports
+test/tmp
+test/version_tmp
+tmp
@@ -0,0 +1,4 @@
+--color
+--format documentation
+--backtrace
+--default_path spec
@@ -0,0 +1,6 @@
+source 'http://rubygems.org'
+
+gem 'celluloid', :git => 'git://github.com/tarcieri/celluloid'
+
+# Specify your gem's dependencies in celluloid-zmq.gemspec
+gemspec
@@ -0,0 +1,52 @@
+Celluloid::ZMQ
+==============
+
+This gem uses the ffi-rzmq library to provide Celluloid actors that can
+interact with 0MQ sockets.
+
+Celluloid::ZMQ provides two methods for multiplexing 0MQ sockets with
+receiving messages over Celluloid's actor protocol:
+
+* Celluloid::ZMQ#wait_readable(socket): wait until a message is available to
+ read from the given 0MQ socket
+* Celluloid::ZMQ#wait_writeable(socket): waits until there's space in the
+ given socket's message buffer to send another message
+
+Usage:
+
+ require 'celluloid-zmq'
+
+ ZMQ_CONTEXT = ZMQ::Context.new(1)
+
+ class MyZmqCell
+ include Celluloid::ZMQ
+
+ def initialize(addr)
+ @socket = ZMQ_CONTEXT.socket(ZMQ::PUSH)
+
+ unless ZMQ::Util.resultcode_ok? @socket.connect addr
+ @socket.close
+ raise "error connecting to #{addr}: #{ZMQ::Util.error_string}"
+ end
+ end
+
+ def write(message)
+ wait_writeable @socket
+ unless ZMQ::Util.resultcode_ok? @socket.send_string message
+ raise "error sending 0MQ message: #{ZMQ::Util.error_string}"
+ end
+ end
+
+ def read
+ wait_readable @socket
+ message = ''
+
+ rc = @socket.recv_string message
+ if ZMQ::Util.resultcode_ok? rc
+ handle_message message
+ else
+ raise "error receiving ZMQ string: #{ZMQ::Util.error_string}"
+ end
+ end
+ end
+
@@ -0,0 +1,7 @@
+#!/usr/bin/env rake
+require 'bundler/gem_tasks'
+require 'rspec/core/rake_task'
+
+RSpec::Core::RakeTask.new
+
+task :default => :spec
@@ -0,0 +1,26 @@
+# -*- encoding: utf-8 -*-
+require File.expand_path('../lib/celluloid/zmq/version', __FILE__)
+
+Gem::Specification.new do |gem|
+ gem.authors = ["Tony Arcieri"]
+ gem.email = ["tony.arcieri@gmail.com"]
+ gem.description = "Celluloid bindings to the ffi-rzmq library"
+ gem.summary = "Celluloid::ZMQ provides concurrent Celluloid actors that can listen for 0MQ events"
+ gem.homepage = "http://github.com/tarcieri/dcell"
+
+ gem.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
+ gem.files = `git ls-files`.split("\n")
+ gem.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
+ gem.name = "celluloid-zmq"
+ gem.require_paths = ["lib"]
+ gem.version = Celluloid::ZMQ::VERSION
+
+ gem.add_dependency "celluloid", ">= 0.6.2"
+ gem.add_dependency "ffi"
+ gem.add_dependency "ffi-rzmq"
+ gem.add_dependency "redis"
+ gem.add_dependency "redis-namespace"
+
+ gem.add_development_dependency "rake"
+ gem.add_development_dependency "rspec", ">= 2.7.0"
+end
@@ -0,0 +1,27 @@
+require 'ffi-rzmq'
+
+require 'celluloid'
+require 'celluloid/zmq/mailbox'
+require 'celluloid/zmq/reactor'
+require 'celluloid/zmq/version'
+
+module Celluloid
+ # Actors which run alongside 0MQ sockets
+ module ZMQ
+ def self.included(klass)
+ klass.send :include, ::Celluloid
+ klass.use_mailbox Celluloid::ZMQ::Mailbox
+ end
+
+ # Wait for the given IO object to become readable
+ def wait_readable(socket)
+ # Law of demeter be damned!
+ current_actor.mailbox.reactor.wait_readable(socket)
+ end
+
+ # Wait for the given IO object to become writeable
+ def wait_writeable(socket)
+ current_actor.mailbox.reactor.wait_writeable(socket)
+ end
+ end
+end
@@ -0,0 +1,13 @@
+module Celluloid
+ module ZMQ
+ # A Celluloid mailbox for Actors that wait on 0MQ sockets
+ class Mailbox < Celluloid::IO::Mailbox
+ def initialize
+ @messages = []
+ @lock = Mutex.new
+ @waker = Celluloid::IO::Waker.new
+ @reactor = Reactor.new(@waker)
+ end
+ end
+ end
+end
@@ -0,0 +1,74 @@
+module Celluloid
+ module ZMQ
+ # React to incoming 0MQ and Celluloid events. This is kinda sorta supposed
+ # to resemble the Reactor design pattern.
+ class Reactor
+ def initialize(waker)
+ @waker = waker
+ @poller = ::ZMQ::Poller.new
+ @readers = {}
+ @writers = {}
+
+ # FIXME: The way things are presently implemented is super ghetto
+ # The ZMQ::Poller should be able to wait on the waker somehow
+ # but I can't get it to work :(
+ #result = @poller.register(nil, ::ZMQ::POLLIN, @waker.io.fileno)
+ #
+ #unless ::ZMQ::Util.resultcode_ok?(result)
+ # raise "couldn't register waker with 0MQ poller"
+ #end
+ end
+
+ # Wait for the given ZMQ socket to become readable
+ def wait_readable(socket)
+ monitor_zmq socket, @readers, ::ZMQ::POLLIN
+ end
+
+ # Wait for the given ZMQ socket to become writeable
+ def wait_writeable(socket)
+ monitor_zmq socket, @writers, ::ZMQ::POLLOUT
+ end
+
+ # Monitor the given ZMQ socket with the given options
+ def monitor_zmq(socket, set, type)
+ if set.has_key? socket
+ raise ArgumentError, "another method is already waiting on #{socket.inspect}"
+ else
+ set[socket] = Fiber.current
+ end
+
+ @poller.register socket, type
+ Fiber.yield
+
+ @poller.deregister socket, type
+ socket
+ end
+
+ # Run the reactor, waiting for events, and calling the given block if
+ # the reactor is awoken by the waker
+ def run_once(timeout = nil)
+ # FIXME: This approach is super ghetto. Find some way to make the
+ # ZMQ::Poller wait on the waker's file descriptor
+ if @poller.size == 0
+ readable, _ = select [@waker.io], [], [], timeout
+ yield if readable and readable.include? @waker.io
+ else
+ if ::ZMQ::Util.resultcode_ok? @poller.poll(100)
+ @poller.readables.each do |sock|
+ fiber = @readers.delete sock
+ fiber.resume if fiber
+ end
+
+ @poller.writables.each do |sock|
+ fiber = @writers.delete sock
+ fiber.resume if fiber
+ end
+ end
+
+ readable, _ = select [@waker.io], [], [], 0
+ yield if readable and readable.include? @waker.io
+ end
+ end
+ end
+ end
+end
@@ -0,0 +1,5 @@
+module Celluloid
+ module ZMQ
+ VERSION = "0.0.1"
+ end
+end
@@ -0,0 +1,6 @@
+require 'spec_helper'
+require 'celluloid/rspec'
+
+describe Celluloid::ZMQ do
+ it_behaves_like "a Celluloid Actor", Celluloid::ZMQ
+end
@@ -0,0 +1,6 @@
+require 'spec_helper'
+require 'celluloid/rspec'
+
+describe Celluloid::ZMQ::Mailbox do
+ it_behaves_like "a Celluloid Mailbox"
+end
@@ -0,0 +1,2 @@
+require 'bundler/setup'
+require 'celluloid/zmq'

0 comments on commit 0673758

Please sign in to comment.