Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Introduced Fiber aware actor+ffi-rzmq

  • Loading branch information...
commit 3757a771baf78314bc38bb9edaa9056da7be681b 1 parent dbcf53b
@krakatoa krakatoa authored
Showing with 150 additions and 8 deletions.
  1. +34 −0 lib/actor.rb
  2. +12 −0 lib/channel.rb
  3. +60 −0 lib/panzmq.rb
  4. +44 −8 lib/reactor.rb
View
34 lib/actor.rb
@@ -0,0 +1,34 @@
+module Cucub
+ class Actor
+ def initialize
+ state = :alive
+ set_fiber
+ end
+
+ def set_fiber
+ @fiber = Fiber.new { |args, origin|
+ while alive?
+ puts "> at Actor"
+ msg = args.shift
+ puts msg
+
+ # play
+
+ origin.transfer
+ end
+ }
+ end
+
+ def wire(msg)
+ @fiber.transfer(msg, Fiber.current)
+ end
+
+ def kill
+ @state = :dead
+ end
+
+ def alive?
+ @state == :alive
+ end
+ end
+end
View
12 lib/channel.rb
@@ -51,6 +51,18 @@ def vm_inner_outbound_start
@socket.connect :ipc, "/tmp/cucub-inner-outbound.sock"
end
+ def recv_string
+ @socket.recv_string(ZMQ::NOBLOCK)
+ end
+
+ def send_string(msg)
+ @socket.send_string(msg)
+ end
+
+ def socket
+ @socket
+ end
+
def close
$stdout.puts "Closing #{@kind} socket."
@socket.close
View
60 lib/panzmq.rb
@@ -0,0 +1,60 @@
+module PanZMQ
+ @@context = nil
+ def self.context
+ # Como MaZMQ estaria funcionando siempre en EM, el proceso en el cual corre seria siempre unico, y por esa razon (repasando http://zguide.zeromq.org/page:all#Getting-the-Context-Right), usamos un unico Contexto en toda la aplicacion. Y el usuario no tiene que instanciar uno.
+ @@context ||= ZMQ::Context.new
+ @@context
+ end
+
+ def self.terminate
+ @@context.terminate if @@context
+ end
+
+ class Pull
+ def initialize
+ @socket = PanZMQ.context.socket ZMQ::PULL
+ @messages = []
+ @alive = true
+ end
+
+ def kill
+ @alive = false
+ end
+
+ def connect(params)
+ @socket.connect params
+ end
+
+ def bind(params)
+ @socket.bind params
+ end
+
+ def receive(&block)
+ while (@socket.recv_strings(@messages) == 0 and @alive)
+ yield @messages
+ end
+ end
+
+ def close
+ @socket.close
+ end
+ end
+
+ class Push
+ def initialize
+ @socket = PanZMQ.context.socket ZMQ::PUSH
+ end
+
+ def connect(params)
+ @socket.connect params
+ end
+
+ def bind(params)
+ @socket.bind params
+ end
+
+ def close
+ @socket.close
+ end
+ end
+end
View
52 lib/reactor.rb
@@ -1,18 +1,48 @@
require 'singleton'
+require 'ffi-rzmq'
+require 'fiber'
+
module Cucub
class Reactor
include Singleton
+ def initialize
+ @workers = []
+ end
+
def run
- EM.epoll
- EM.run do
- $stdout.puts "inside reactor"
- self.init_channels
- #Cucub::Channel.channels.each {|channel|
- # $stdout.puts channel.inspect
- #}
- end
+ plug_actors
+
+ container.resume
+ end
+
+ def plug_actors
+ @actor = Cucub::Actor.new
+
+ plug_actor(@actor)
+ end
+
+ def container
+ @reactor_fiber = Fiber.new {
+ @vm_inbound = PanZMQ::Pull.new
+ @vm_inbound.connect "ipc:///tmp/cucub-inner-inbound.sock"
+ $stdout.puts "prepared to receive."
+
+ # worker is going to be a Class, fibered-aware, which can receive messages
+ # relay(@worker)
+
+ @vm_inbound.receive { |msg|
+ $stdout.puts "received: #{msg.inspect}"
+ @workers.first.wire(msg)
+ $stdout.puts "\n"
+ }
+ }
+ end
+
+ def plug_actor(actor)
+ @actors << actor
+ # maybe send plugged event ?
end
def init_channels
@@ -21,7 +51,13 @@ def init_channels
end
def stop
+ $stdout.puts "Stopping gracefully the reactor."
+ @running = false
+ @actors.each { |actor| actor.kill }
+ @vm_inbound.close
+ PanZMQ.terminate
EM.stop
+ #exit
end
end
end

0 comments on commit 3757a77

Please sign in to comment.
Something went wrong with that request. Please try again.