Skip to content

Commit 2d91ef2

Browse files
author
Jakub Stastny aka botanicus
committed
Ruby: RPC examples.
1 parent 9f5c3d0 commit 2d91ef2

File tree

2 files changed

+94
-0
lines changed

2 files changed

+94
-0
lines changed

ruby/rpc_client.rb

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#!/usr/bin/env ruby
2+
# encoding: utf-8
3+
4+
# Note: This is just proof of concept. For
5+
# real-world usage, you are strongly advised
6+
# to use https://github.com/ruby-amqp/rpc
7+
# or some other RPC library.
8+
9+
require "amqp"
10+
11+
class FibonacciRpcClient
12+
def initialize
13+
subscribe_to_callback_queue
14+
end
15+
16+
def connection
17+
@connection ||= AMQP.connect(:host => "localhost")
18+
end
19+
20+
def channel
21+
@channel ||= AMQP::Channel.new(self.connection)
22+
end
23+
24+
def callback_queue
25+
@callback_queue ||= self.channel.queue("", :exclusive => true)
26+
end
27+
28+
def requests
29+
@requests ||= Hash.new
30+
end
31+
32+
def call(n, &block)
33+
corr_id = rand(10_000_000).to_s
34+
self.requests[corr_id] = nil
35+
self.callback_queue.append_callback(:declare) do
36+
AMQP::Exchange.default.publish(n.to_s, :routing_key => "rpc_queue", :reply_to => self.callback_queue.name, :correlation_id => corr_id)
37+
38+
EM.add_periodic_timer(0.1) do
39+
# p self.requests
40+
if result = self.requests[corr_id]
41+
block.call(result.to_i)
42+
EM.stop
43+
end
44+
end
45+
end
46+
end
47+
48+
private
49+
def subscribe_to_callback_queue
50+
self.callback_queue.subscribe do |header, body|
51+
corr_id = header.correlation_id
52+
unless self.requests[corr_id]
53+
self.requests[corr_id] = body
54+
end
55+
end
56+
end
57+
end
58+
59+
EM.run do
60+
fibonacci_rpc = FibonacciRpcClient.new()
61+
62+
puts " [x] Requesting fib(30)"
63+
fibonacci_rpc.call(30) do |response|
64+
puts " [.] Got #{response}"
65+
end
66+
end

ruby/rpc_server.rb

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/usr/bin/env ruby
2+
# encoding: utf-8
3+
4+
require "amqp"
5+
6+
def fib(n)
7+
return n if n == 0 || n == 1
8+
return fib(n - 1) + fib(n - 2)
9+
end
10+
11+
AMQP.start(:host => "localhost") do |connection|
12+
channel = AMQP::Channel.new(connection)
13+
queue = channel.queue("rpc_queue")
14+
15+
channel.prefetch(1)
16+
17+
queue.subscribe(:ack => true) do |header, body|
18+
n = body.to_i
19+
20+
puts " [.] fib(#{n})"
21+
response = fib(n)
22+
23+
AMQP::Exchange.default.publish(response.to_s, :routing_key => header.reply_to, :correlation_id => header.correlation_id)
24+
header.ack
25+
end
26+
27+
puts " [x] Awaiting RPC requests"
28+
end

0 commit comments

Comments
 (0)