Permalink
Browse files

Refactored Server::Channel class

  • Loading branch information...
krakatoa committed Feb 20, 2013
1 parent 0484ae6 commit f05778d57518413b9ca9f4acd9ecbc5c94cdf35c
Showing with 121 additions and 131 deletions.
  1. +1 −1 lib/dispatcher.rb
  2. +120 −130 lib/server/channel.rb
View
@@ -39,7 +39,7 @@ def start(block=nil)
@ipc_get = Cucub::Channel.ipc_get
=end
- @box = Cucub::Server::Channel.reply
+ @box = Cucub::Server::Channel.box
####
=begin
View
@@ -1,115 +1,133 @@
module Cucub
class Server
- module Channel
-=begin
- def self.initialize
- Cucub::Channel.local_push
- end
-=end
+ class Channel
+
+ def initialize(kind)
+ @kind = kind
+ case @kind
+ when :box
+ box_start
+ when :inner_inbound
+ inner_inbound_start
+ when :inner_outbound
+ inner_outbound_start
+ end
+ end
- def self.reply
- # Es usado dentro del Reactor
- return @reply if @reply.is_a? PanZMQ::Reply
- $stdout.puts "Initializing Outer Inbound (REPLY) socket"
- @reply = PanZMQ::Reply.new
- @reply.bind("tcp://#{Cucub::Server.instance.address}:6441")
- @reply.register
-
- # TODO Do this setting on Dispatcher ?
- # TODO Decouple into a Different handler: OuterInboundRouting
- @reply.on_receive { |msg|
- # Chequear por llamada async / sync
-
- # Async - returns job id
- #### Cucub::Queue.instance.push(msg)
-
- # TODO Sync - returns object return
- # Usar EM Defer
- #Cucub::LiveObject.pass(msg)
+ def box_start
+ # Es usado dentro del Reactor
+ $stdout.puts "Initializing Outer Inbound (REPLY) socket"
+ @socket = PanZMQ::Reply.new
+ @socket.bind("tcp://#{Cucub::Server.instance.address}:6441")
+ @socket.register
- message = Cucub::Message.parse(msg)
- puts "received@server msg to: #{message.header.to.class_name}"
- # TODO check why this class_name is in capitals and the configurations.classes are in downcase
- destination = Cucub::Dispatcher.instance.router.random_with_least_assignments(message.header.to.class_name.downcase, Cucub::Server.instance.stats_collector.vm_stats)
+ # TODO Do this setting on Dispatcher ?
+ # TODO Decouple into a Different handler: OuterInboundRouting
+ @socket.on_receive { |msg|
+ # Chequear por llamada async / sync
- # TODO Route messages according class_name and object_uuid
+ # Async - returns job id
+ #### Cucub::Queue.instance.push(msg)
+
+ # TODO Sync - returns object return
+ # Usar EM Defer
+ #Cucub::LiveObject.pass(msg)
- @inner_inbound.send_string("#{destination.klass}##{destination.uid} #{msg}")
- Cucub::Server.instance.stats_collector.sent_msg_to(destination.uid, destination.klass)
+ message = Cucub::Message.parse(msg)
+ puts "received@server msg to: #{message.header.to.class_name}"
+ # TODO check why this class_name is in capitals and the configurations.classes are in downcase
+ destination = Cucub::Dispatcher.instance.router.random_with_least_assignments(message.header.to.class_name.downcase, Cucub::Server.instance.stats_collector.vm_stats)
- @reply.send_string("Cucub::Reply ok!")
- }
+ # TODO Route messages according class_name and object_uuid
- @reply
- end
+ Cucub::Server::Channel.inner_inbound.send_string("#{destination.klass}##{destination.uid} #{msg}")
+ Cucub::Server.instance.stats_collector.sent_msg_to(destination.uid, destination.klass)
- ### Right now, and in earlier versions, cucub-server will support only REPLY socket for inbound.
-=begin
- def self.pull
- # Es usado dentro del Reactor
- return @pull if @pull.is_a? MaZMQ::Pull
- @pull = MaZMQ::Pull.new
- @pull.bind(:tcp, Cucub.address, 6442)
+ @socket.send_string("Cucub::Reply ok!")
+ }
-####
- @pull.on_read { |msg|
- Cucub::Queue.instance.push(msg)
- }
+ @socket
+ end
- @pull
- end
-=end
+ def inner_inbound_start
+ # a ZMQ PUSH-PULL socket which binds to the local server as a 'pusher',
+ # so workers connect to it in a 1-N way to consume messages.
+
+ # It works by setting up an IPC socket. In a future, it might be considered to
+ # enable tcp communication, so workers can be outside the local server.
- def self.inner_inbound
- # a ZMQ PUSH-PULL socket which binds to the local server as a 'pusher',
- # so workers connect to it in a 1-N way to consume messages.
+ $stdout.puts "Initializing Inner Inbound (PUSH) socket"
+ @socket = PanZMQ::Broadcast.new
+ @socket.bind "ipc:///tmp/cucub-inner-inbound.sock"
+ @socket
+ end
- # It works by setting up an IPC socket. In a future, it might be considered to
- # enable tcp communication, so workers can be outside the local server.
+ def inner_outbound_start
+ # a ZMQ PUSH-PULL socket which binds to the local server as a 'puller',
+ # so workers connect to it in a 1-N way to push their messages.
+
+ # It works by setting up an IPC socket. In a future, it might be considered to
+ # enable tcp communication, so workers can be outside the local server.
+
+ $stdout.puts "Initializing Inner Outbound (PULL) socket"
+ @socket = PanZMQ::Pull.new
+ @socket.bind "ipc:///tmp/cucub-inner-outbound.sock"
+ @socket.register
+ @socket.on_receive {|msg|
+ message = Cucub::Message.parse(msg)
+ puts "Received @inner_outbound@server: #{message.inspect}"
+ case message.header.to.layer
+ when :server
+ message.unlock(:inner_outbound)
+ case message.body.action
+ when "register"
+ Cucub::Server.instance.register_vm(message.body.additionals)
+ when "ready"
+ puts "READY: #{message.inspect}"
+ message.body.additionals.each do |done|
+ ready = Cucub::Message.parse(done)
+ ready.unlock(:msgpack)
+ # puts "==== #{ready.inspect}"
+ Cucub::Server.instance.stats_collector.mark_done(message.header.from.object_uuid, ready.header.to.class_name.underscore)
+ end
+ puts "STATS: #{Cucub::Server.instance.stats_collector.vm_stats}"
+ end
+ end
+
+ # Cucub::Server.instance.register_vm()
+ }
+ @socket
+ end
- return @inner_inbound if @inner_inbound.is_a? PanZMQ::Broadcast
- $stdout.puts "Initializing Inner Inbound (PUSH) socket"
- @inner_inbound = PanZMQ::Broadcast.new
- @inner_inbound.bind "ipc:///tmp/cucub-inner-inbound.sock"
- end
+ def self.box
+ return @@box if defined?(@@box) and @@box.is_a? Cucub::Server::Channel
- def self.inner_outbound
- # a ZMQ PUSH-PULL socket which binds to the local server as a 'puller',
- # so workers connect to it in a 1-N way to push their messages.
+ @@box = Cucub::Server::Channel.new(:box)
+ end
- # It works by setting up an IPC socket. In a future, it might be considered to
- # enable tcp communication, so workers can be outside the local server.
-
- return @inner_outbound if @inner_outbound.is_a? PanZMQ::Pull
- $stdout.puts "Initializing Inner Outbound (PULL) socket"
- @inner_outbound = PanZMQ::Pull.new
- @inner_outbound.bind "ipc:///tmp/cucub-inner-outbound.sock"
- @inner_outbound.register
- @inner_outbound.on_receive {|msg|
- message = Cucub::Message.parse(msg)
- puts "Received @inner_outbound@server: #{message.inspect}"
- case message.header.to.layer
- when :server
- message.unlock(:inner_outbound)
- case message.body.action
- when "register"
- Cucub::Server.instance.register_vm(message.body.additionals)
- when "ready"
- puts "READY: #{message.inspect}"
- message.body.additionals.each do |done|
- ready = Cucub::Message.parse(done)
- ready.unlock(:msgpack)
- # puts "==== #{ready.inspect}"
- Cucub::Server.instance.stats_collector.mark_done(message.header.from.object_uuid, ready.header.to.class_name.underscore)
- end
- puts "STATS: #{Cucub::Server.instance.stats_collector.vm_stats}"
- end
- end
+ def self.inner_inbound
+ return @@inner_inbound if defined?(@@inner_inbound) and @@inner_inbound.is_a? Cucub::Server::Channel
+ @@inner_inbound = Cucub::Server::Channel.new(:inner_inbound)
+ end
- # Cucub::Server.instance.register_vm()
- }
- end
+ def self.inner_outbound
+ return @@inner_outbound if defined?(@@inner_outbound) and @@inner_outbound.is_a? Cucub::Server::Channel
+
+ @@inner_outbound = Cucub::Server::Channel.new(:inner_outbound)
+ end
+
+ def self.channels
+ channels = []
+ channels << @@box if defined?(@@box)
+ channels << @@inner_inbound if defined?(@@inner_inbound)
+ channels << @@inner_outbound if defined?(@@inner_outbound)
+ channels
+ end
+
+ def send_string(msg)
+ @socket.send_string(msg)
+ end
####
=begin
@@ -152,47 +170,19 @@ def self.ipc_get
@ipc_get
end
=end
-
- def self.shutdown!
- if @reply
- puts 'Closing reply channel.'
- @reply.close
+ def close
+ @socket.close
end
- if @pull
- puts 'Closing pull channel.'
- @pull.close
- end
+ def self.shutdown!
+ @@box.close if defined?(@@box)
- if @inner_outbound
- puts "Closing inner outbound channel."
- @inner_outbound.close
- end
+ @@inner_inbound.close if defined?(@@inner_inbound)
- if @inner_inbound
- puts "Closing inner inbound channel."
- @inner_inbound.close
- end
-
-=begin
- if @local_push
- puts 'Closing local_push channel.'
- @local_push.shutdown!
- end
-
- if @ipc_get
- puts 'Closing ipc_get channel.'
- @ipc_get.close
- end
-
- if @ipc_set
- puts 'Closing ipc_set channel.'
- @ipc_set.close
- end
-=end
+ @@inner_outbound.close if defined?(@@inner_outbound)
- PanZMQ.terminate
+ PanZMQ.terminate
+ end
end
end
- end
end

0 comments on commit f05778d

Please sign in to comment.