Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #304 from devin-c/ruby_inter_broker_routing

Ruby inter-broker routing examples
  • Loading branch information...
commit 906a96780daa80e2d3aba4c6303bff3629ec78c2 2 parents a94d57a + e7a3119
@hintjens hintjens authored
View
66 examples/Ruby/peering1.rb
@@ -0,0 +1,66 @@
+# Broker peering simulation (part 1)
+# Prototypes the state flow
+#
+# Translated from C by Devin Christensen: http://github.com/devin-c
+
+require "rubygems"
+require "ffi-rzmq"
+
+class Broker
+ def initialize(name, peers)
+ raise ArgumentError, "A broker require's a name" unless name
+ raise ArgumentError, "A broker require's peers" unless peers.any?
+
+ @name = name
+ @peers = peers
+ @context = ZMQ::Context.new
+
+ setup_state_backend
+ setup_state_frontend
+ end
+
+ def run
+ poller = ZMQ::Poller.new
+ poller.register_readable @state_frontend
+
+ until poller.poll(1000) == -1 do
+ if poller.readables.any?
+ @state_frontend.recv_string peer_name = ""
+ @state_frontend.recv_string available = ""
+
+ puts "#{peer_name} - #{available} workers free"
+ else
+ @state_backend.send_strings [@name, rand(10).to_s]
+ end
+ end
+
+ @state_frontend.close
+ @state_backend.close
+ @context.terminate
+ end
+
+ private
+ def setup_state_backend
+ @state_backend = @context.socket ZMQ::PUB
+ @state_backend.bind "ipc://#{@name}-state.ipc"
+ end
+
+ def setup_state_frontend
+ @state_frontend = @context.socket ZMQ::SUB
+
+ @peers.each do |peer|
+ puts "I: connecting to state backend at #{peer}"
+ @state_frontend.connect "ipc://#{peer}-state.ipc"
+ @state_frontend.setsockopt ZMQ::SUBSCRIBE, peer
+ end
+ end
+end
+
+begin
+ broker = Broker.new(ARGV.shift, ARGV)
+
+ broker.run
+rescue ArgumentError
+ puts "usage: ruby peering1.rb broker_name [peer_name ...]"
+end
+
View
178 examples/Ruby/peering2.rb
@@ -0,0 +1,178 @@
+# Broker peering simulation (part 2)
+# Prototypes the request-reply flow
+#
+# Translated from C by Devin Christensen: http://github.com/devin-c
+
+require "rubygems"
+require "ffi-rzmq"
+
+NUMBER_OF_CIENTS = 10
+NUMBER_OF_WORKERS = 3
+WORKER_READY = "\x01"
+
+class Client
+ def initialize(broker_name)
+ @context = ZMQ::Context.new
+ @socket = @context.socket ZMQ::REQ
+ @socket.connect "ipc://#{broker_name}-localfe.ipc"
+ end
+
+ def run
+ loop do
+ break if @socket.send_string("HELLO") == -1
+ break if @socket.recv_string(reply = "") == -1
+ puts "Client: #{reply}"
+ sleep 1
+ end
+
+ @socket.close
+ @context.terminate
+ end
+end
+
+class Worker
+ def initialize(broker_name)
+ @context = ZMQ::Context.new
+ @socket = @context.socket ZMQ::REQ
+ @socket.connect "ipc://#{broker_name}-localbe.ipc"
+ end
+
+ def run
+ @socket.send_string WORKER_READY
+
+ loop do
+ break if @socket.recv_strings(frames = []) == -1
+ puts "Worker: #{frames.last}"
+ break if @socket.send_strings(frames[0..-2] + ["OK"]) == -1
+ end
+
+ @socket.close
+ @context.terminate
+ end
+end
+
+class Broker
+ attr_reader :name
+
+ def initialize(name, peers)
+ raise ArgumentError, "A broker require's a name" unless name
+ raise ArgumentError, "A broker require's peers" unless peers.any?
+
+ puts "I: preparing broker at #{name}..."
+
+ @name = name
+ @peers = peers
+ @context = ZMQ::Context.new
+ @available_workers = []
+
+ setup_cloud_backend
+ setup_cloud_frontend
+ setup_local_backend
+ setup_local_frontend
+ end
+
+ def run
+ poller = ZMQ::Poller.new
+
+ poller.register_readable @cloud_backend
+ poller.register_readable @local_backend
+
+ poller.register_readable @cloud_frontend
+ poller.register_readable @local_frontend
+
+ while poller.poll > 0
+ poller.readables.each do |readable|
+ if @available_workers.any?
+ if readable === @local_frontend
+ @local_frontend.recv_strings frames = []
+ route_to_backend frames, true
+ elsif readable === @cloud_frontend
+ @cloud_frontend.recv_strings frames = []
+ route_to_backend frames, false
+ end
+ else
+ if readable === @local_backend
+ @local_backend.recv_strings frames = []
+ @available_workers << frames.shift(2)[0]
+
+ route_to_frontend(frames) unless frames == [WORKER_READY]
+ elsif readable === @cloud_backend
+ @cloud_backend.recv_strings frames = []
+
+ route_to_frontend frames[2..-1]
+ end
+ end
+ end
+ end
+
+ @cloud_backend.close
+ @local_backend.close
+ @cloud_frontend.close
+ @local_frontend.close
+ @context.terminate
+ end
+
+ private
+ def route_to_frontend(frames)
+ if @peers.include? frames[0]
+ @cloud_frontend.send_strings frames
+ else
+ @local_frontend.send_strings frames
+ end
+ end
+
+ def route_to_backend(frames, reroutable = false)
+ if reroutable && rand(5) == 0
+ @cloud_backend.send_strings [@peers.sample, ""] + frames
+ else
+ @local_backend.send_strings [@available_workers.shift, ""] + frames
+ end
+ end
+
+ def setup_cloud_backend
+ @cloud_backend = @context.socket ZMQ::ROUTER
+ @cloud_backend.identity = @name
+
+ @peers.each do |peer|
+ puts "I: connecting to cloud frontend at #{peer}"
+ @cloud_backend.connect "ipc://#{peer}-cloud.ipc"
+ end
+ end
+
+ def setup_cloud_frontend
+ @cloud_frontend = @context.socket ZMQ::ROUTER
+ @cloud_frontend.identity = @name
+ @cloud_frontend.bind "ipc://#{@name}-cloud.ipc"
+ end
+
+ def setup_local_backend
+ @local_backend = @context.socket ZMQ::ROUTER
+ @local_backend.bind "ipc://#{@name}-localbe.ipc"
+ end
+
+ def setup_local_frontend
+ @local_frontend = @context.socket ZMQ::ROUTER
+ @local_frontend.bind "ipc://#{@name}-localfe.ipc"
+ end
+end
+
+begin
+ broker = Broker.new(ARGV.shift, ARGV)
+
+ puts "Press Enter when all the brokers are started: "
+
+ STDIN.getc
+
+ NUMBER_OF_WORKERS.times do
+ Thread.new { Worker.new(broker.name).run }
+ end
+
+ NUMBER_OF_CIENTS.times do
+ Thread.new { Client.new(broker.name).run }
+ end
+
+ broker.run
+
+rescue ArgumentError
+ puts "usage: ruby peering2.rb broker_name [peer_name ...]"
+end
View
256 examples/Ruby/peering3.rb
@@ -0,0 +1,256 @@
+# Broker peering simulation (part 3)
+# Prototypes the full flow of status and tasks
+#
+# Translated from C by Devin Christensen: http://github.com/devin-c
+
+require "rubygems"
+require "ffi-rzmq"
+
+NUMBER_OF_CIENTS = 10
+NUMBER_OF_WORKERS = 3
+WORKER_READY = "\x01"
+
+class Client
+ def initialize(broker_name)
+ @context = ZMQ::Context.new
+ @frontend = @context.socket ZMQ::REQ
+ @monitor = @context.socket ZMQ::PUSH
+ @frontend.connect "ipc://#{broker_name}-localfe.ipc"
+ @monitor.connect "ipc://#{broker_name}-monitor.ipc"
+ end
+
+ def run
+ poller = ZMQ::Poller.new
+ poller.register_readable @frontend
+
+ catch(:exit) do
+ loop do
+ sleep rand 5
+
+ rand(15).times do
+ task_id = "%04X" % rand(0x10000)
+
+ @frontend.send_string task_id
+
+ if poller.poll(10_000) == 1
+ @frontend.recv_string reply = ""
+ throw :exit unless reply == task_id
+ @monitor.send_string "#{reply}"
+ else
+ @monitor.send_string "E:CLIENT EXIT - lost task #{task_id}"
+ throw :exit
+ end
+ end
+ end
+ end
+
+ @frontend.close
+ @monitor.close
+ @context.terminate
+ end
+end
+
+class Worker
+ def initialize(broker_name)
+ @context = ZMQ::Context.new
+ @backend = @context.socket ZMQ::REQ
+ @backend.connect "ipc://#{broker_name}-localbe.ipc"
+ end
+
+ def run
+ @backend.send_string WORKER_READY
+
+ loop do
+ @backend.recv_strings frames = []
+ sleep rand 2 # Sleep either 0 or 1 second
+ @backend.send_strings frames
+ end
+
+ @backend.close
+ @context.terminate
+ end
+end
+
+class Broker
+ attr_reader :name
+
+ def initialize(name, peers)
+ raise ArgumentError, "A broker require's a name" unless name
+ raise ArgumentError, "A broker require's peers" unless peers.any?
+
+ puts "I: preparing broker at #{name}..."
+
+ @name = name
+ @peers = peers
+ @context = ZMQ::Context.new
+ @available_workers = []
+ @peers_capacity = {}
+
+ setup_cloud_backend
+ setup_cloud_frontend
+ setup_local_backend
+ setup_local_frontend
+ setup_state_frontend
+ setup_state_backend
+ setup_monitor
+ end
+
+ def run
+ poller = ZMQ::Poller.new
+
+ poller.register_readable @cloud_backend
+ poller.register_readable @cloud_frontend
+ poller.register_readable @local_backend
+ poller.register_readable @local_frontend
+ poller.register_readable @state_frontend
+ poller.register_readable @monitor
+
+ while poller.poll > 0
+ cached_local_capacity = @available_workers.size
+
+ poller.readables.each do |readable|
+ case readable
+ when @local_frontend
+
+ # Route local tasks to local or cloud workers
+ if total_capacity > 0
+ @local_frontend.recv_strings frames = []
+ route_to_backend frames
+ end
+
+ when @cloud_frontend
+
+ # Route tasks from the cloud to local workers only
+ if @available_workers.any?
+ @cloud_frontend.recv_strings frames = []
+ route_to_backend frames
+ end
+
+ when @local_backend
+ @local_backend.recv_strings frames = []
+ @available_workers << frames.shift(2)[0]
+
+ route_to_frontend(frames) unless frames == [WORKER_READY]
+
+ when @cloud_backend
+ @cloud_backend.recv_strings frames = []
+
+ route_to_frontend frames[2..-1]
+
+ when @state_frontend
+ @state_frontend.recv_string peer = ""
+ @state_frontend.recv_string capacity = ""
+ @peers_capacity[peer] = capacity.to_i
+
+ when @monitor
+ @monitor.recv_string message = ""
+ puts message
+ end
+ end
+
+ unless cached_local_capacity == @available_workers.size
+ @state_backend.send_strings [@name, @available_workers.size.to_s]
+ end
+ end
+
+ @cloud_backend.close
+ @local_backend.close
+ @cloud_frontend.close
+ @local_frontend.close
+ @context.terminate
+ end
+
+ private
+ def total_capacity
+ cloud_capacity = @peers_capacity.reduce(0) do |sum, (peer, capacity)|
+ sum + capacity
+ end
+
+ cloud_capacity + @available_workers.size
+ end
+
+ def route_to_backend(frames)
+
+ # Route to local workers whenever they're available
+ if @available_workers.any?
+ @local_backend.send_strings [@available_workers.shift, ""] + frames
+
+ # When there are no local workers available, route to the peer with
+ # the greatest capacity
+ else
+ peer = @peers_capacity.max_by { |x| x[1] }[0]
+ @cloud_backend.send_strings [peer, ""] + frames
+ end
+
+ def route_to_frontend(frames)
+ if @peers.include? frames[0]
+ @cloud_frontend.send_strings frames
+ else
+ @local_frontend.send_strings frames
+ end
+ end
+
+ end
+
+ def setup_cloud_backend
+ @cloud_backend = @context.socket ZMQ::ROUTER
+ @cloud_backend.identity = @name
+
+ @peers.each do |peer|
+ puts "I: connecting to cloud frontend at #{peer}"
+ @cloud_backend.connect "ipc://#{peer}-cloud.ipc"
+ end
+ end
+
+ def setup_cloud_frontend
+ @cloud_frontend = @context.socket ZMQ::ROUTER
+ @cloud_frontend.identity = @name
+ @cloud_frontend.bind "ipc://#{@name}-cloud.ipc"
+ end
+
+ def setup_local_backend
+ @local_backend = @context.socket ZMQ::ROUTER
+ @local_backend.bind "ipc://#{@name}-localbe.ipc"
+ end
+
+ def setup_local_frontend
+ @local_frontend = @context.socket ZMQ::ROUTER
+ @local_frontend.bind "ipc://#{@name}-localfe.ipc"
+ end
+
+ def setup_monitor
+ @monitor = @context.socket ZMQ::PULL
+ @monitor.bind "ipc://#{@name}-monitor.ipc"
+ end
+
+ def setup_state_backend
+ @state_backend = @context.socket ZMQ::PUB
+ @state_backend.bind "ipc://#{@name}-state.ipc"
+ end
+
+ def setup_state_frontend
+ @state_frontend = @context.socket ZMQ::SUB
+
+ @peers.each do |peer|
+ puts "I: connecting to state backend at #{peer}"
+ @state_frontend.connect "ipc://#{peer}-state.ipc"
+ @state_frontend.setsockopt ZMQ::SUBSCRIBE, peer
+ end
+ end
+end
+
+begin
+ broker = Broker.new(ARGV.shift, ARGV)
+
+ NUMBER_OF_WORKERS.times do
+ Thread.new { Worker.new(broker.name).run }
+ end
+
+ NUMBER_OF_CIENTS.times do
+ Thread.new { Client.new(broker.name).run }
+ end
+
+ broker.run
+rescue ArgumentError
+ puts "usage: ruby peering3.rb broker_name [peer_name ...]"
+end
Please sign in to comment.
Something went wrong with that request. Please try again.