-
-
Notifications
You must be signed in to change notification settings - Fork 39
/
exchange.py
137 lines (111 loc) · 5.88 KB
/
exchange.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
"""AMQPStorm Channel.Exchange."""
import logging
from pamqp.specification import Exchange as pamqp_exchange
from amqpstorm import compatibility
from amqpstorm.base import Handler
from amqpstorm.exception import AMQPInvalidArgument
LOGGER = logging.getLogger(__name__)
class Exchange(Handler):
"""RabbitMQ Exchange Operations."""
__slots__ = []
def declare(self, exchange='', exchange_type='direct', passive=False,
durable=False, auto_delete=False, arguments=None):
"""Declare an Exchange.
:param str exchange: Exchange name
:param str exchange_type: Exchange type
:param bool passive: Do not create
:param bool durable: Durable exchange
:param bool auto_delete: Automatically delete when not in use
:param dict arguments: Exchange key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
elif not compatibility.is_string(exchange_type):
raise AMQPInvalidArgument('exchange_type should be a string')
elif not isinstance(passive, bool):
raise AMQPInvalidArgument('passive should be a boolean')
elif not isinstance(durable, bool):
raise AMQPInvalidArgument('durable should be a boolean')
elif not isinstance(auto_delete, bool):
raise AMQPInvalidArgument('auto_delete should be a boolean')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
declare_frame = pamqp_exchange.Declare(exchange=exchange,
exchange_type=exchange_type,
passive=passive,
durable=durable,
auto_delete=auto_delete,
arguments=arguments)
return self._channel.rpc_request(declare_frame)
def delete(self, exchange='', if_unused=False):
"""Delete an Exchange.
:param str exchange: Exchange name
:param bool if_unused: Delete only if unused
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(exchange):
raise AMQPInvalidArgument('exchange should be a string')
delete_frame = pamqp_exchange.Delete(exchange=exchange,
if_unused=if_unused)
return self._channel.rpc_request(delete_frame)
def bind(self, destination='', source='', routing_key='',
arguments=None):
"""Bind an Exchange.
:param str destination: Exchange name
:param str source: Exchange to bind to
:param str routing_key: The routing key to use
:param dict arguments: Bind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(destination):
raise AMQPInvalidArgument('destination should be a string')
elif not compatibility.is_string(source):
raise AMQPInvalidArgument('source should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
bind_frame = pamqp_exchange.Bind(destination=destination,
source=source,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(bind_frame)
def unbind(self, destination='', source='', routing_key='',
arguments=None):
"""Unbind an Exchange.
:param str destination: Exchange name
:param str source: Exchange to unbind from
:param str routing_key: The routing key used
:param dict arguments: Unbind key/value arguments
:raises AMQPInvalidArgument: Invalid Parameters
:raises AMQPChannelError: Raises if the channel encountered an error.
:raises AMQPConnectionError: Raises if the connection
encountered an error.
:rtype: dict
"""
if not compatibility.is_string(destination):
raise AMQPInvalidArgument('destination should be a string')
elif not compatibility.is_string(source):
raise AMQPInvalidArgument('source should be a string')
elif not compatibility.is_string(routing_key):
raise AMQPInvalidArgument('routing_key should be a string')
elif arguments is not None and not isinstance(arguments, dict):
raise AMQPInvalidArgument('arguments should be a dict or None')
unbind_frame = pamqp_exchange.Unbind(destination=destination,
source=source,
routing_key=routing_key,
arguments=arguments)
return self._channel.rpc_request(unbind_frame)