Permalink
Browse files

Merge pull request #304 from devin-c/ruby_inter_broker_routing

Ruby inter-broker routing examples
  • Loading branch information...
2 parents a94d57a + e7a3119 commit 906a96780daa80e2d3aba4c6303bff3629ec78c2 @hintjens hintjens committed Mar 18, 2013
Showing with 500 additions and 0 deletions.
  1. +66 −0 examples/Ruby/peering1.rb
  2. +178 −0 examples/Ruby/peering2.rb
  3. +256 −0 examples/Ruby/peering3.rb
View
@@ -0,0 +1,66 @@
+# Broker peering simulation (part 1)
+# Prototypes the state flow
+#
+# Translated from C by Devin Christensen: http://github.com/devin-c
+
+require "rubygems"
+require "ffi-rzmq"
+
+class Broker
+ def initialize(name, peers)
+ raise ArgumentError, "A broker require's a name" unless name
+ raise ArgumentError, "A broker require's peers" unless peers.any?
+
+ @name = name
+ @peers = peers
+ @context = ZMQ::Context.new
+
+ setup_state_backend
+ setup_state_frontend
+ end
+
+ def run
+ poller = ZMQ::Poller.new
+ poller.register_readable @state_frontend
+
+ until poller.poll(1000) == -1 do
+ if poller.readables.any?
+ @state_frontend.recv_string peer_name = ""
+ @state_frontend.recv_string available = ""
+
+ puts "#{peer_name} - #{available} workers free"
+ else
+ @state_backend.send_strings [@name, rand(10).to_s]
+ end
+ end
+
+ @state_frontend.close
+ @state_backend.close
+ @context.terminate
+ end
+
+ private
+ def setup_state_backend
+ @state_backend = @context.socket ZMQ::PUB
+ @state_backend.bind "ipc://#{@name}-state.ipc"
+ end
+
+ def setup_state_frontend
+ @state_frontend = @context.socket ZMQ::SUB
+
+ @peers.each do |peer|
+ puts "I: connecting to state backend at #{peer}"
+ @state_frontend.connect "ipc://#{peer}-state.ipc"
+ @state_frontend.setsockopt ZMQ::SUBSCRIBE, peer
+ end
+ end
+end
+
+begin
+ broker = Broker.new(ARGV.shift, ARGV)
+
+ broker.run
+rescue ArgumentError
+ puts "usage: ruby peering1.rb broker_name [peer_name ...]"
+end
+
View
@@ -0,0 +1,178 @@
+# Broker peering simulation (part 2)
+# Prototypes the request-reply flow
+#
+# Translated from C by Devin Christensen: http://github.com/devin-c
+
+require "rubygems"
+require "ffi-rzmq"
+
+NUMBER_OF_CIENTS = 10
+NUMBER_OF_WORKERS = 3
+WORKER_READY = "\x01"
+
+class Client
+ def initialize(broker_name)
+ @context = ZMQ::Context.new
+ @socket = @context.socket ZMQ::REQ
+ @socket.connect "ipc://#{broker_name}-localfe.ipc"
+ end
+
+ def run
+ loop do
+ break if @socket.send_string("HELLO") == -1
+ break if @socket.recv_string(reply = "") == -1
+ puts "Client: #{reply}"
+ sleep 1
+ end
+
+ @socket.close
+ @context.terminate
+ end
+end
+
+class Worker
+ def initialize(broker_name)
+ @context = ZMQ::Context.new
+ @socket = @context.socket ZMQ::REQ
+ @socket.connect "ipc://#{broker_name}-localbe.ipc"
+ end
+
+ def run
+ @socket.send_string WORKER_READY
+
+ loop do
+ break if @socket.recv_strings(frames = []) == -1
+ puts "Worker: #{frames.last}"
+ break if @socket.send_strings(frames[0..-2] + ["OK"]) == -1
+ end
+
+ @socket.close
+ @context.terminate
+ end
+end
+
+class Broker
+ attr_reader :name
+
+ def initialize(name, peers)
+ raise ArgumentError, "A broker require's a name" unless name
+ raise ArgumentError, "A broker require's peers" unless peers.any?
+
+ puts "I: preparing broker at #{name}..."
+
+ @name = name
+ @peers = peers
+ @context = ZMQ::Context.new
+ @available_workers = []
+
+ setup_cloud_backend
+ setup_cloud_frontend
+ setup_local_backend
+ setup_local_frontend
+ end
+
+ def run
+ poller = ZMQ::Poller.new
+
+ poller.register_readable @cloud_backend
+ poller.register_readable @local_backend
+
+ poller.register_readable @cloud_frontend
+ poller.register_readable @local_frontend
+
+ while poller.poll > 0
+ poller.readables.each do |readable|
+ if @available_workers.any?
+ if readable === @local_frontend
+ @local_frontend.recv_strings frames = []
+ route_to_backend frames, true
+ elsif readable === @cloud_frontend
+ @cloud_frontend.recv_strings frames = []
+ route_to_backend frames, false
+ end
+ else
+ if readable === @local_backend
+ @local_backend.recv_strings frames = []
+ @available_workers << frames.shift(2)[0]
+
+ route_to_frontend(frames) unless frames == [WORKER_READY]
+ elsif readable === @cloud_backend
+ @cloud_backend.recv_strings frames = []
+
+ route_to_frontend frames[2..-1]
+ end
+ end
+ end
+ end
+
+ @cloud_backend.close
+ @local_backend.close
+ @cloud_frontend.close
+ @local_frontend.close
+ @context.terminate
+ end
+
+ private
+ def route_to_frontend(frames)
+ if @peers.include? frames[0]
+ @cloud_frontend.send_strings frames
+ else
+ @local_frontend.send_strings frames
+ end
+ end
+
+ def route_to_backend(frames, reroutable = false)
+ if reroutable && rand(5) == 0
+ @cloud_backend.send_strings [@peers.sample, ""] + frames
+ else
+ @local_backend.send_strings [@available_workers.shift, ""] + frames
+ end
+ end
+
+ def setup_cloud_backend
+ @cloud_backend = @context.socket ZMQ::ROUTER
+ @cloud_backend.identity = @name
+
+ @peers.each do |peer|
+ puts "I: connecting to cloud frontend at #{peer}"
+ @cloud_backend.connect "ipc://#{peer}-cloud.ipc"
+ end
+ end
+
+ def setup_cloud_frontend
+ @cloud_frontend = @context.socket ZMQ::ROUTER
+ @cloud_frontend.identity = @name
+ @cloud_frontend.bind "ipc://#{@name}-cloud.ipc"
+ end
+
+ def setup_local_backend
+ @local_backend = @context.socket ZMQ::ROUTER
+ @local_backend.bind "ipc://#{@name}-localbe.ipc"
+ end
+
+ def setup_local_frontend
+ @local_frontend = @context.socket ZMQ::ROUTER
+ @local_frontend.bind "ipc://#{@name}-localfe.ipc"
+ end
+end
+
+begin
+ broker = Broker.new(ARGV.shift, ARGV)
+
+ puts "Press Enter when all the brokers are started: "
+
+ STDIN.getc
+
+ NUMBER_OF_WORKERS.times do
+ Thread.new { Worker.new(broker.name).run }
+ end
+
+ NUMBER_OF_CIENTS.times do
+ Thread.new { Client.new(broker.name).run }
+ end
+
+ broker.run
+
+rescue ArgumentError
+ puts "usage: ruby peering2.rb broker_name [peer_name ...]"
+end
Oops, something went wrong.

0 comments on commit 906a967

Please sign in to comment.