Skip to content

Commit

Permalink
Updates
Browse files Browse the repository at this point in the history
  • Loading branch information
Fernando Alonso committed Nov 6, 2012
1 parent 3757a77 commit ec68124
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 17 deletions.
25 changes: 18 additions & 7 deletions lib/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ def vm_inner_inbound_start
# enable tcp communication, so workers can be outside the local server.

$stdout.puts "Connecting to the Inner Inbound (PULL) socket"
@socket = MaZMQ::Pull.new
@socket.connect :ipc, "/tmp/cucub-inner-inbound.sock"
#@socket = MaZMQ::Pull.new
#@socket.connect :ipc, "/tmp/cucub-inner-inbound.sock"
@socket = PanZMQ::Pull.new
@socket.connect "ipc:///tmp/cucub-inner-inbound.sock"
end

def vm_inner_outbound_start
Expand All @@ -47,12 +49,20 @@ def vm_inner_outbound_start
# enable tcp communication, so workers can be outside the local server.

$stdout.puts "Connecting to the Inner Inbound (PUSH) socket"
@socket = MaZMQ::Push.new
@socket.connect :ipc, "/tmp/cucub-inner-outbound.sock"
#@socket = MaZMQ::Push.new
#@socket.connect :ipc, "/tmp/cucub-inner-outbound.sock"
@socket = PanZMQ::Push.new
@socket.connect "ipc:///tmp/cucub-inner-outbound.sock"
end

def recv_string
@socket.recv_string(ZMQ::NOBLOCK)
#def recv_string
# @socket.recv_string(ZMQ::NOBLOCK)
#end

def receive(&block)
@socket.receive { |msg|
block.call(msg)
}
end

def send_string(msg)
Expand All @@ -72,7 +82,8 @@ def self.shutdown!
@@vm_inner_inbound.close if defined?(@@vm_inner_inbound)
@@vm_inner_outbound.close if defined?(@@vm_inner_outbound)

MaZMQ.terminate
#MaZMQ.terminate
PanZMQ.terminate
end

end
Expand Down
2 changes: 2 additions & 0 deletions lib/cucub-vm.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
require 'cucub-protocol'
require './lib/panzmq.rb'

require './lib/object'
require './lib/objects_hub'
require './lib/channel'
require './lib/actor'
require './lib/reactor'
require './lib/vm'
9 changes: 9 additions & 0 deletions lib/panzmq.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'ffi-rzmq'

module PanZMQ
@@context = nil
def self.context
Expand All @@ -13,6 +15,7 @@ def self.terminate
class Pull
def initialize
@socket = PanZMQ.context.socket ZMQ::PULL
#@socket.setsockopt(ZMQ::LINGER, 0)
@messages = []
@alive = true
end
Expand All @@ -36,19 +39,25 @@ def receive(&block)
end

def close
kill
@socket.close
end
end

class Push
def initialize
@socket = PanZMQ.context.socket ZMQ::PUSH
#@socket.setsockopt(ZMQ::LINGER, 0)
end

def connect(params)
@socket.connect params
end

def send_string(message)
@socket.send_string(message)
end

def bind(params)
@socket.bind params
end
Expand Down
17 changes: 7 additions & 10 deletions lib/reactor.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
require 'singleton'

require 'ffi-rzmq'
require 'fiber'

module Cucub
class Reactor
include Singleton

def initialize
@workers = []
@actors = []
end

def run
Expand All @@ -25,16 +24,15 @@ def plug_actors

def container
@reactor_fiber = Fiber.new {
@vm_inbound = PanZMQ::Pull.new
@vm_inbound.connect "ipc:///tmp/cucub-inner-inbound.sock"
init_channels
$stdout.puts "prepared to receive."

# worker is going to be a Class, fibered-aware, which can receive messages
# relay(@worker)

@vm_inbound.receive { |msg|
@inbound.receive { |msg|
$stdout.puts "received: #{msg.inspect}"
@workers.first.wire(msg)
@actors.first.wire(msg)
$stdout.puts "\n"
}
}
Expand All @@ -54,10 +52,9 @@ def stop
$stdout.puts "Stopping gracefully the reactor."
@running = false
@actors.each { |actor| actor.kill }
@vm_inbound.close
PanZMQ.terminate
EM.stop
#exit
#PanZMQ.terminate
#EM.stop
exit
end
end
end

0 comments on commit ec68124

Please sign in to comment.