|
4 | 4 |
|
5 | 5 | class FibonacciClient(object): |
6 | 6 | def __init__(self): |
7 | | - self.connection = pika.AsyncoreConnection(pika.ConnectionParameters( |
| 7 | + self.connection = pika.BlockingConnection(pika.ConnectionParameters( |
8 | 8 | host='localhost')) |
9 | 9 |
|
10 | 10 | self.channel = self.connection.channel() |
11 | 11 |
|
12 | 12 | result = self.channel.queue_declare(exclusive=True) |
13 | | - self.callback_queue = result.queue |
| 13 | + self.callback_queue = result.method.queue |
14 | 14 |
|
15 | | - self.requests = {} |
16 | 15 | self.channel.basic_consume(self.on_response, no_ack=True, |
17 | 16 | queue=self.callback_queue) |
18 | 17 |
|
19 | 18 | def on_response(self, ch, method, props, body): |
20 | | - corr_id = props.correlation_id |
21 | | - if corr_id in self.requests: |
22 | | - self.requests[corr_id] = body |
| 19 | + if self.corr_id == props.correlation_id: |
| 20 | + self.response = body |
| 21 | + self.channel.stop_consuming() |
23 | 22 |
|
24 | 23 | def call(self, n): |
25 | | - corr_id = str(uuid.uuid4()) |
26 | | - self.requests[corr_id] = None |
| 24 | + self.corr_id = str(uuid.uuid4()) |
27 | 25 | self.channel.basic_publish(exchange='', |
28 | 26 | routing_key='rpc_queue', |
29 | 27 | properties=pika.BasicProperties( |
30 | 28 | reply_to = self.callback_queue, |
31 | | - correlation_id = corr_id, |
| 29 | + correlation_id = self.corr_id, |
32 | 30 | ), |
33 | 31 | body=str(n)) |
34 | | - while self.requests[corr_id] is None: |
35 | | - pika.asyncore_loop(count=1) |
36 | | - response = self.requests[corr_id] |
37 | | - del self.requests[corr_id] |
38 | | - return int(response) |
| 32 | + self.channel.start_consuming() |
| 33 | + return int(self.response) |
39 | 34 |
|
40 | 35 |
|
41 | 36 | fibonacci_rpc = FibonacciClient() |
|
0 commit comments