-
-
Notifications
You must be signed in to change notification settings - Fork 39
/
flask_threaded_rpc_client.py
89 lines (68 loc) · 2.73 KB
/
flask_threaded_rpc_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import threading
from time import sleep
from flask import Flask
import amqpstorm
from amqpstorm import Message
app = Flask(__name__)
class RpcClient(object):
"""Asynchronous Rpc client."""
def __init__(self, host, username, password, rpc_queue):
self.queue = {}
self.host = host
self.username = username
self.password = password
self.channel = None
self.connection = None
self.callback_queue = None
self.rpc_queue = rpc_queue
self.open()
def open(self):
"""Open Connection."""
self.connection = amqpstorm.Connection(self.host, self.username,
self.password)
self.channel = self.connection.channel()
self.channel.queue.declare(self.rpc_queue)
result = self.channel.queue.declare(exclusive=True)
self.callback_queue = result['queue']
self.channel.basic.consume(self._on_response, no_ack=True,
queue=self.callback_queue)
self._create_process_thread()
def _create_process_thread(self):
"""Create a thread responsible for consuming messages in response
to RPC requests.
"""
thread = threading.Thread(target=self._process_data_events)
thread.setDaemon(True)
thread.start()
def _process_data_events(self):
"""Process Data Events using the Process Thread."""
self.channel.start_consuming(to_tuple=False)
def _on_response(self, message):
"""On Response store the message with the correlation id in a local
dictionary.
"""
self.queue[message.correlation_id] = message.body
def send_request(self, payload):
# Create the Message object.
message = Message.create(self.channel, payload)
message.reply_to = self.callback_queue
# Create an entry in our local dictionary, using the automatically
# generated correlation_id as our key.
self.queue[message.correlation_id] = None
# Publish the RPC request.
message.publish(routing_key=self.rpc_queue)
# Return the Unique ID used to identify the request.
return message.correlation_id
@app.route('/rpc_call/<payload>')
def rpc_call(payload):
"""Simple Flask implementation for making asynchronous Rpc calls. """
# Send the request and store the requests Unique ID.
corr_id = RPC_CLIENT.send_request(payload)
# Wait until we have received a response.
while RPC_CLIENT.queue[corr_id] is None:
sleep(0.1)
# Return the response to the user.
return RPC_CLIENT.queue[corr_id]
if __name__ == '__main__':
RPC_CLIENT = RpcClient('127.0.0.1', 'guest', 'guest', 'rpc_queue')
app.run()