-
Notifications
You must be signed in to change notification settings - Fork 11
/
queues.py
116 lines (100 loc) · 4.2 KB
/
queues.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# -*- coding: utf-8 -*-
import ssl as pyssl
from pika import ConnectionParameters, PlainCredentials, SSLOptions
class PikaClient(object):
"""Base class for connecting to RabbitMQ using Pika
Args:
host: RabbitMQ host
port: RabbitMQ port
user: RabbitMQ user
password: RabbitMQ password
connection_attempts: Maximum number of retry attempts
heartbeat: Time between RabbitMQ heartbeats
heartbeat_interval: DEPRECATED, use heartbeat
virtual_host: RabbitMQ virtual host
exchange: Default exchange that will be used
ssl: SSL Options
blocked_connection_timeout: If not None, the value is a non-negative timeout,
in seconds, for the connection to remain blocked (triggered by
Connection.Blocked from broker); if the timeout expires before connection
becomes unblocked, the connection will be torn down, triggering the
adapter-specific mechanism for informing client app about the closed
connection (e.g., on_close_callback or ConnectionClosed exception) with
`reason_code` of `InternalCloseReasons.BLOCKED_CONNECTION_TIMEOUT`.
"""
def __init__(
self,
host="localhost",
port=5672,
user="guest",
password="guest",
connection_attempts=3,
heartbeat_interval=3600,
virtual_host="/",
exchange="beer_garden",
ssl=None,
blocked_connection_timeout=None,
**kwargs
):
self._host = host
self._port = port
self._user = user
self._password = password
self._connection_attempts = connection_attempts
self._heartbeat = kwargs.get("heartbeat", heartbeat_interval)
self._blocked_connection_timeout = blocked_connection_timeout
self._virtual_host = virtual_host
self._exchange = exchange
ssl = ssl or {}
mode = pyssl.CERT_REQUIRED if ssl.get("ca_verify") else pyssl.CERT_NONE
self._ssl_enabled = ssl.get("enabled", False)
self._ssl_options = SSLOptions(
cafile=ssl.get("ca_cert", None), verify_mode=mode
)
# Save the 'normal' params so they don't need to be reconstructed
self._conn_params = self.connection_parameters()
@property
def connection_url(self):
"""str: Connection URL for this client's connection information"""
virtual_host = self._conn_params.virtual_host
if virtual_host == "/":
virtual_host = ""
return "amqp%s://%s:%s@%s:%s/%s" % (
"s" if self._ssl_enabled else "",
self._conn_params.credentials.username,
self._conn_params.credentials.password,
self._conn_params.host,
self._conn_params.port,
virtual_host,
)
def connection_parameters(self, **kwargs):
"""Get ``ConnectionParameters`` associated with this client
Will construct a ``ConnectionParameters`` object using parameters
passed at initialization as defaults. Any parameters passed in
kwargs will override initialization parameters.
Args:
**kwargs: Overrides for specific parameters
Returns:
:obj:`pika.ConnectionParameters`: ConnectionParameters object
"""
credentials = PlainCredentials(
username=kwargs.get("user", self._user),
password=kwargs.get("password", self._password),
)
return ConnectionParameters(
host=kwargs.get("host", self._host),
port=kwargs.get("port", self._port),
ssl=kwargs.get("ssl_enabled", self._ssl_enabled),
ssl_options=kwargs.get("ssl_options", self._ssl_options),
virtual_host=kwargs.get("virtual_host", self._virtual_host),
connection_attempts=kwargs.get(
"connection_attempts", self._connection_attempts
),
heartbeat=kwargs.get(
"heartbeat", kwargs.get("heartbeat_interval", self._heartbeat)
),
blocked_connection_timeout=kwargs.get(
"blocked_connection_timeout", self._blocked_connection_timeout
),
credentials=credentials,
)