Skip to content
This repository
Browse code

Merge pull request #300 from devin-c/ruby_lbbroker

Add load-balanced broker example for ruby
  • Loading branch information...
commit 1bcc8f436e77c248ab621589a6c343bd8e50c8bc 2 parents 23b80a8 + 2c1b7a2
Pieter Hintjens hintjens authored

Showing 1 changed file with 95 additions and 0 deletions. Show diff stats Hide diff stats

  1. +95 0 examples/Ruby/lbbroker.rb
95 examples/Ruby/lbbroker.rb
... ... @@ -0,0 +1,95 @@
  1 +# Load-balancing broker
  2 +# Clients and workers are shown here in-process
  3 +
  4 +require 'rubygems'
  5 +require 'ffi-rzmq'
  6 +
  7 +CLIENT_SIZE = 10
  8 +WORKER_SIZE = 3
  9 +
  10 +def client_task(identity)
  11 + context = ZMQ::Context.new
  12 + client = context.socket ZMQ::REQ
  13 + client.identity = identity
  14 + client.connect "ipc://frontend.ipc"
  15 +
  16 + client.send_string "HELLO"
  17 + client.recv_string reply = ""
  18 +
  19 + puts "#{identity}: #{reply}"
  20 +
  21 + client.close
  22 + context.destroy
  23 +end
  24 +
  25 +def worker_task(identity)
  26 + context = ZMQ::Context.new
  27 + worker = context.socket ZMQ::REQ
  28 + worker.identity = identity
  29 + worker.connect "ipc://backend.ipc"
  30 +
  31 + worker.send_string "READY"
  32 +
  33 + loop do
  34 + worker.recv_string client = ""
  35 + worker.recv_string empty = ""
  36 + worker.recv_string request = ""
  37 +
  38 + puts "#{identity}: #{request} from #{client}"
  39 +
  40 + worker.send_strings [client, empty, "OK from #{identity}"]
  41 + end
  42 +
  43 + worker.close
  44 + context.destroy
  45 +end
  46 +
  47 +def main_task
  48 + context = ZMQ::Context.new
  49 + frontend = context.socket ZMQ::ROUTER
  50 + backend = context.socket ZMQ::ROUTER
  51 +
  52 + frontend.bind "ipc://frontend.ipc"
  53 + backend.bind "ipc://backend.ipc"
  54 +
  55 + CLIENT_SIZE.times do |client_id|
  56 + Thread.new { client_task "CLIENT-#{client_id}" }
  57 + end
  58 +
  59 + WORKER_SIZE.times do |worker_id|
  60 + Thread.new { worker_task "WORKER-#{worker_id}" }
  61 + end
  62 +
  63 + available_workers = []
  64 + poller = ZMQ::Poller.new
  65 + poller.register_readable backend
  66 + poller.register_readable frontend
  67 +
  68 + # The poller will continuously poll the backend and will poll the
  69 + # frontend when there is at least one worker available.
  70 +
  71 + while poller.poll > 0
  72 + poller.readables.each do |readable|
  73 + if readable === backend
  74 + backend.recv_string worker = ""
  75 + backend.recv_string empty = ""
  76 + backend.recv_strings reply = []
  77 +
  78 + frontend.send_strings reply unless reply[0] == "READY"
  79 +
  80 + # Add this worker to the list of available workers
  81 + available_workers << worker
  82 + elsif readable === frontend && available_workers.any?
  83 + # Read the request from the client and forward it to the LRU worker
  84 + frontend.recv_strings request = []
  85 + backend.send_strings [available_workers.shift, ""] + request
  86 + end
  87 + end
  88 + end
  89 +
  90 + frontend.close
  91 + backend.close
  92 + context.destroy
  93 +end
  94 +
  95 +main_task

0 comments on commit 1bcc8f4

Please sign in to comment.
Something went wrong with that request. Please try again.