|
2 | 2 | # encoding: utf-8 |
3 | 3 |
|
4 | 4 | require "bunny" |
| 5 | +require "thread" |
5 | 6 |
|
6 | 7 | conn = Bunny.new(:automatically_recover => false) |
7 | 8 | conn.start |
8 | 9 |
|
9 | 10 | ch = conn.create_channel |
10 | 11 |
|
| 12 | + |
11 | 13 | class FibonacciClient |
12 | 14 | attr_reader :reply_queue |
| 15 | + attr_accessor :response, :call_id |
| 16 | + attr_reader :lock, :condition |
13 | 17 |
|
14 | 18 | def initialize(ch, server_queue) |
15 | 19 | @ch = ch |
16 | 20 | @x = ch.default_exchange |
17 | 21 |
|
18 | 22 | @server_queue = server_queue |
19 | 23 | @reply_queue = ch.queue("", :exclusive => true) |
| 24 | + |
| 25 | + |
| 26 | + @lock = Mutex.new |
| 27 | + @condition = ConditionVariable.new |
| 28 | + that = self |
| 29 | + |
| 30 | + @reply_queue.subscribe do |delivery_info, properties, payload| |
| 31 | + if properties[:correlation_id] == that.call_id |
| 32 | + that.response = payload.to_i |
| 33 | + that.lock.synchronize{that.condition.signal} |
| 34 | + end |
| 35 | + end |
20 | 36 | end |
21 | 37 |
|
22 | 38 | def call(n) |
23 | | - correlation_id = self.generate_uuid |
| 39 | + self.call_id = self.generate_uuid |
24 | 40 |
|
25 | 41 | @x.publish(n.to_s, |
26 | 42 | :routing_key => @server_queue, |
27 | | - :correlation_id => correlation_id, |
| 43 | + :correlation_id => call_id, |
28 | 44 | :reply_to => @reply_queue.name) |
29 | 45 |
|
30 | | - response = nil |
31 | | - @reply_queue.subscribe(:block => true) do |delivery_info, properties, payload| |
32 | | - if properties[:correlation_id] == correlation_id |
33 | | - response = payload.to_i |
34 | | - |
35 | | - delivery_info.consumer.cancel |
36 | | - end |
37 | | - end |
38 | | - |
| 46 | + lock.synchronize{condition.wait(lock)} |
39 | 47 | response |
40 | 48 | end |
41 | 49 |
|
|
0 commit comments