Skip to content
Browse files

Made the Reactor to run inside a thread, in order to be able to use i…

…t inside other apps
  • Loading branch information...
1 parent 127e72a commit de6c961431a9e32e0de2ab94f8fb1b9aee47d400 @krakatoa krakatoa committed Jan 21, 2013
Showing with 105 additions and 26 deletions.
  1. +1 −1 lib/channel.rb
  2. +7 −7 lib/cucub-vm.rb
  3. +90 −17 lib/reactor.rb
  4. +6 −1 lib/vm.rb
  5. +1 −0 lib/vm/servolux.rb
View
2 lib/channel.rb
@@ -59,7 +59,7 @@ def vm_inner_outbound_start
end
#def recv_string
- # @socket.recv_string(ZMQ::NOBLOCK)
+ # @socket.recv_string#(ZMQ::NOBLOCK)
#end
def on_receive(&block)
View
14 lib/cucub-vm.rb
@@ -1,9 +1,9 @@
require 'cucub-protocol'
-require './lib/panzmq.rb'
+require 'pan-zmq'
-require './lib/object'
-require './lib/objects_hub'
-require './lib/channel'
-require './lib/actor'
-require './lib/reactor'
-require './lib/vm'
+require_relative './object'
+require_relative './objects_hub'
+require_relative './channel'
+require_relative './actor'
+require_relative './reactor'
+require_relative './vm'
View
107 lib/reactor.rb
@@ -2,49 +2,121 @@
require 'fiber'
+require "logger"
+
module Cucub
class Reactor
include Singleton
+ attr_accessor :logger
+
def initialize
+ if not Cucub::VM.instance.threaded
+ self.logger = Logger.new($stderr)
+ self.logger.level = Logger::DEBUG
+
+ self.class.send(:include, ::Servolux::Threaded)
+ end
+
@actors = []
- end
- def run
- plug_actors
+ @reactor_thread = nil
- container.resume
- end
+ @state = :idle
- def plug_actors
- Cucub::ObjectsHub.instance.objects.each do |object|
- actor = Cucub::Actor.new(object)
- plug_actor(actor)
+ trap("INT") { self.stop }
+
+ if not Cucub::VM.instance.threaded
+ start
+ join
end
+ self
end
- def container
- @reactor_fiber = Fiber.new {
- init_channels
- $stdout.puts "prepared to receive."
+ def run
+ if @state == :idle
- # worker is going to be a Class, fibered-aware, which can receive messages
- # relay(@worker)
+ $stdout.puts "idle"
+
+ plug_actors
+ init_channels
+ $stdout.puts "receiving"
+ # This should be on inbound creation method
@inbound.on_receive { |msg|
$stdout.puts "received: #{msg.inspect}"
#msg[msg.size - 1] = unwrap_message(msg) #.last)
-
+
msg = unwrap_message(msg)
#@actors.first.wire(msg)
@actors.first.process(msg)
$stdout.puts "\n"
}
- PanZMQ::Poller.instance.poll
+
+ @state = :preparing_container
+ end
+ if @state == :preparing_container
+ $stdout.puts "preparing container"
+ container_prepare
+ end
+
+ # container.resume
+
+ begin
+ $stdout.puts "running container"
+ container_run
+ # @reactor_thread.join
+ rescue Exception => e
+ puts "handled exception"
+ end
+
+ end
+
+ def plug_actors
+ Cucub::ObjectsHub.instance.objects.each do |object|
+ actor = Cucub::Actor.new(object)
+ plug_actor(actor)
+ end
+ end
+
+ def container_prepare
+ #@reactor_fiber = Fiber.new {
+ @reactor_thread ||= Thread.new {
+
+ begin
+
+ # worker is going to be a Class, fibered-aware, which can receive messages
+ # relay(@worker)
+
+ $stdout.puts "prepared to receive."
+
+ while @state != :stopped
+ case @state
+ when :preparing_container
+ Thread.stop
+ when :running
+ PanZMQ::Poller.instance.poll
+ end
+ end
+
+ rescue Exception => e
+ puts "exception: #{e.exception}"
+ end
}
end
+ def container_run
+ @state = :running
+ @reactor_thread.run
+ # @reactor_thread.join
+ end
+
+ def container_stop
+ @state = :stopped
+ # @reactor_thread.raise "terminated!"
+ end
+
def unwrap_message(message)
unserialized = Cucub::Message.parse(message)
unserialized
@@ -67,6 +139,7 @@ def init_channels
end
def stop
+ @state = :stopped
Cucub::Channel.shutdown!
kill_actors
exit
View
7 lib/vm.rb
@@ -1,17 +1,22 @@
require 'singleton'
-require './lib/vm/configuration'
+require_relative './vm/configuration'
require 'pan-zmq'
module Cucub
class VM
include Singleton
+ attr_reader :threaded
+
def initialize
#@dispatcher = Cucub::Dispatcher.instance
@running = true
end
def start!(vm_opts={})
+ vm_opts[:threaded] = true if vm_opts[:threaded].nil?
+ @threaded = vm_opts[:threaded]
+
if @running
@config_filepath = vm_opts[:config]
self.init_classes
View
1 lib/vm/servolux.rb
@@ -7,6 +7,7 @@ class Servolux < Servolux::Server
attr_accessor :vm_opts
def run
+ @vm_opts[:threaded] = false
Cucub::VM.instance.start!(@vm_opts)
Process.waitall
end

0 comments on commit de6c961

Please sign in to comment.
Something went wrong with that request. Please try again.