-
-
Notifications
You must be signed in to change notification settings - Fork 39
/
queue.py
159 lines (127 loc) · 6.54 KB
/
queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
"""AMQPStorm Channel.Queue."""
import logging
from pamqp.specification import Queue as pamqp_queue
from amqpstorm import compatibility
from amqpstorm.base import Handler
from amqpstorm.exception import AMQPInvalidArgument
LOGGER = logging.getLogger(__name__)
class Queue(Handler):
"""RabbitMQ Queue Operations."""
__slots__ = []
def declare(self, queue='', passive=False, durable=False,
exclusive=False, auto_delete=False, arguments=None):
"""Declare a Queue.
:param str queue: Queue name
:param bool passive: Do not create
:param bool durable: Durable queue
:param bool exclusive: Request exclusive access
:param bool auto_delete: Automatically delete when not in use
:param dict arguments: Queue key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not isinstance(passive, bool):
raise AMQPInvalidArgument('passive should be a boolean')
elif not isinstance(durable, bool):
raise AMQPInvalidArgument('durable should be a boolean')
elif not isinstance(exclusive, bool):
raise AMQPInvalidArgument('exclusive should be a boolean')
elif not isinstance(auto_delete, bool):
raise AMQPInvalidArgument('auto_delete should be a boolean')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
declare_frame = pamqp_queue.Declare(queue=queue,
passive=passive,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
arguments=arguments)
return self._channel.rpc_request(declare_frame)
def delete(self, queue='', if_unused=False, if_empty=False):
"""Delete a Queue.
:param str queue: Queue name
:param bool if_unused: Delete only if unused
:param bool if_empty: Delete only if empty
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not isinstance(if_unused, bool):
raise AMQPInvalidArgument('if_unused should be a boolean')
elif not isinstance(if_empty, bool):
raise AMQPInvalidArgument('if_empty should be a boolean')
delete_frame = pamqp_queue.Delete(queue=queue, if_unused=if_unused,
if_empty=if_empty)
return self._channel.rpc_request(delete_frame)
def purge(self, queue):
"""Purge a Queue.
:param str queue: Queue name
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
purge_frame = pamqp_queue.Purge(queue=queue)
return self._channel.rpc_request(purge_frame)
def bind(self, queue='', exchange='', routing_key='', arguments=None):
"""Bind a Queue.
:param str queue: Queue name
:param str exchange: Exchange name
:param str routing_key: The routing key to use
:param dict arguments: Bind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
bind_frame = pamqp_queue.Bind(queue=queue,
exchange=exchange,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(bind_frame)
def unbind(self, queue='', exchange='', routing_key='', arguments=None):
"""Unbind a Queue.
:param str queue: Queue name
:param str exchange: Exchange name
:param str routing_key: The routing key used
:param dict arguments: Unbind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(queue):
raise AMQPInvalidArgument('queue should be a string')
elif not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
unbind_frame = pamqp_queue.Unbind(queue=queue,
exchange=exchange,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(unbind_frame)