This repository has been archived by the owner on Feb 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 335
/
pool.py
479 lines (410 loc) · 16.2 KB
/
pool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
import asyncio
import collections
import types
from .connection import create_connection, _PUBSUB_COMMANDS
from .log import logger
from .util import parse_url, CloseEvent
from .errors import PoolClosedError
from .abc import AbcPool
from .locks import Lock
async def create_pool(address, *, db=None, password=None, ssl=None,
encoding=None, minsize=1, maxsize=10,
parser=None, loop=None, create_connection_timeout=None,
pool_cls=None, connection_cls=None):
# FIXME: rewrite docstring
"""Creates Redis Pool.
By default it creates pool of Redis instances, but it is
also possible to create pool of plain connections by passing
``lambda conn: conn`` as commands_factory.
*commands_factory* parameter is deprecated since v0.2.9
All arguments are the same as for create_connection.
Returns RedisPool instance or a pool_cls if it is given.
"""
if pool_cls:
assert issubclass(pool_cls, AbcPool),\
"pool_class does not meet the AbcPool contract"
cls = pool_cls
else:
cls = ConnectionsPool
if isinstance(address, str):
address, options = parse_url(address)
db = options.setdefault('db', db)
password = options.setdefault('password', password)
encoding = options.setdefault('encoding', encoding)
create_connection_timeout = options.setdefault(
'timeout', create_connection_timeout)
if 'ssl' in options:
assert options['ssl'] or (not options['ssl'] and not ssl), (
"Conflicting ssl options are set", options['ssl'], ssl)
ssl = ssl or options['ssl']
# TODO: minsize/maxsize
pool = cls(address, db, password, encoding,
minsize=minsize, maxsize=maxsize,
ssl=ssl, parser=parser,
create_connection_timeout=create_connection_timeout,
connection_cls=connection_cls,
loop=loop)
try:
await pool._fill_free(override_min=False)
except Exception:
pool.close()
await pool.wait_closed()
await pool.wait_closed()
raise
return pool
class ConnectionsPool(AbcPool):
"""Redis connections pool."""
def __init__(self, address, db=None, password=None, encoding=None,
*, minsize, maxsize, ssl=None, parser=None,
create_connection_timeout=None,
connection_cls=None,
loop=None):
assert isinstance(minsize, int) and minsize >= 0, (
"minsize must be int >= 0", minsize, type(minsize))
assert maxsize is not None, "Arbitrary pool size is disallowed."
assert isinstance(maxsize, int) and maxsize > 0, (
"maxsize must be int > 0", maxsize, type(maxsize))
assert minsize <= maxsize, (
"Invalid pool min/max sizes", minsize, maxsize)
if loop is None:
loop = asyncio.get_event_loop()
self._address = address
self._db = db
self._password = password
self._ssl = ssl
self._encoding = encoding
self._parser_class = parser
self._minsize = minsize
self._create_connection_timeout = create_connection_timeout
self._loop = loop
self._pool = collections.deque(maxlen=maxsize)
self._used = set()
self._acquiring = 0
self._cond = asyncio.Condition(lock=Lock(loop=loop), loop=loop)
self._close_state = CloseEvent(self._do_close, loop=loop)
self._pubsub_conn = None
self._connection_cls = connection_cls
def __repr__(self):
return '<{} [db:{}, size:[{}:{}], free:{}]>'.format(
self.__class__.__name__, self.db,
self.minsize, self.maxsize, self.freesize)
@property
def minsize(self):
"""Minimum pool size."""
return self._minsize
@property
def maxsize(self):
"""Maximum pool size."""
return self._pool.maxlen
@property
def size(self):
"""Current pool size."""
return self.freesize + len(self._used) + self._acquiring
@property
def freesize(self):
"""Current number of free connections."""
return len(self._pool)
@property
def address(self):
return self._address
async def clear(self):
"""Clear pool connections.
Close and remove all free connections.
"""
async with self._cond:
await self._do_clear()
async def _do_clear(self):
waiters = []
while self._pool:
conn = self._pool.popleft()
conn.close()
waiters.append(conn.wait_closed())
await asyncio.gather(*waiters, loop=self._loop)
async def _do_close(self):
async with self._cond:
assert not self._acquiring, self._acquiring
waiters = []
while self._pool:
conn = self._pool.popleft()
conn.close()
waiters.append(conn.wait_closed())
for conn in self._used:
conn.close()
waiters.append(conn.wait_closed())
await asyncio.gather(*waiters, loop=self._loop)
# TODO: close _pubsub_conn connection
logger.debug("Closed %d connection(s)", len(waiters))
def close(self):
"""Close all free and in-progress connections and mark pool as closed.
"""
if not self._close_state.is_set():
self._close_state.set()
@property
def closed(self):
"""True if pool is closed."""
return self._close_state.is_set()
async def wait_closed(self):
"""Wait until pool gets closed."""
await self._close_state.wait()
@property
def db(self):
"""Currently selected db index."""
return self._db or 0
@property
def encoding(self):
"""Current set codec or None."""
return self._encoding
def execute(self, command, *args, **kw):
"""Executes redis command in a free connection and returns
future waiting for result.
Picks connection from free pool and send command through
that connection.
If no connection is found, returns coroutine waiting for
free connection to execute command.
"""
conn, address = self.get_connection(command, args)
if conn is not None:
fut = conn.execute(command, *args, **kw)
return self._check_result(fut, command, args, kw)
else:
coro = self._wait_execute(address, command, args, kw)
return self._check_result(coro, command, args, kw)
def execute_pubsub(self, command, *channels):
"""Executes Redis (p)subscribe/(p)unsubscribe commands.
ConnectionsPool picks separate connection for pub/sub
and uses it until explicitly closed or disconnected
(unsubscribing from all channels/patterns will leave connection
locked for pub/sub use).
There is no auto-reconnect for this PUB/SUB connection.
Returns asyncio.gather coroutine waiting for all channels/patterns
to receive answers.
"""
conn, address = self.get_connection(command)
if conn is not None:
return conn.execute_pubsub(command, *channels)
else:
return self._wait_execute_pubsub(address, command, channels, {})
def get_connection(self, command, args=()):
"""Get free connection from pool.
Returns connection.
"""
# TODO: find a better way to determine if connection is free
# and not havily used.
command = command.upper().strip()
is_pubsub = command in _PUBSUB_COMMANDS
if is_pubsub and self._pubsub_conn:
if not self._pubsub_conn.closed:
return self._pubsub_conn, self._pubsub_conn.address
self._pubsub_conn = None
for i in range(self.freesize):
conn = self._pool[0]
self._pool.rotate(1)
if conn.closed: # or conn._waiters: (eg: busy connection)
continue
if conn.in_pubsub:
continue
if is_pubsub:
self._pubsub_conn = conn
self._pool.remove(conn)
self._used.add(conn)
return conn, conn.address
return None, self._address # figure out
def _check_result(self, fut, *data):
"""Hook to check result or catch exception (like MovedError).
This method can be coroutine.
"""
return fut
async def _wait_execute(self, address, command, args, kw):
"""Acquire connection and execute command."""
conn = await self.acquire(command, args)
try:
return (await conn.execute(command, *args, **kw))
finally:
self.release(conn)
async def _wait_execute_pubsub(self, address, command, args, kw):
if self.closed:
raise PoolClosedError("Pool is closed")
assert self._pubsub_conn is None or self._pubsub_conn.closed, (
"Expected no or closed connection", self._pubsub_conn)
async with self._cond:
if self.closed:
raise PoolClosedError("Pool is closed")
if self._pubsub_conn is None or self._pubsub_conn.closed:
conn = await self._create_new_connection(address)
self._pubsub_conn = conn
conn = self._pubsub_conn
return (await conn.execute_pubsub(command, *args, **kw))
async def select(self, db):
"""Changes db index for all free connections.
All previously acquired connections will be closed when released.
"""
res = True
async with self._cond:
for i in range(self.freesize):
res = res and (await self._pool[i].select(db))
self._db = db
return res
async def auth(self, password):
self._password = password
async with self._cond:
for i in range(self.freesize):
await self._pool[i].auth(password)
@property
def in_pubsub(self):
if self._pubsub_conn and not self._pubsub_conn.closed:
return self._pubsub_conn.in_pubsub
return 0
@property
def pubsub_channels(self):
if self._pubsub_conn and not self._pubsub_conn.closed:
return self._pubsub_conn.pubsub_channels
return types.MappingProxyType({})
@property
def pubsub_patterns(self):
if self._pubsub_conn and not self._pubsub_conn.closed:
return self._pubsub_conn.pubsub_patterns
return types.MappingProxyType({})
async def acquire(self, command=None, args=()):
"""Acquires a connection from free pool.
Creates new connection if needed.
"""
if self.closed:
raise PoolClosedError("Pool is closed")
async with self._cond:
if self.closed:
raise PoolClosedError("Pool is closed")
while True:
await self._fill_free(override_min=True)
if self.freesize:
conn = self._pool.popleft()
assert not conn.closed, conn
assert conn not in self._used, (conn, self._used)
self._used.add(conn)
return conn
else:
await self._cond.wait()
def release(self, conn):
"""Returns used connection back into pool.
When returned connection has db index that differs from one in pool
the connection will be closed and dropped.
When queue of free connections is full the connection will be dropped.
"""
assert conn in self._used, (
"Invalid connection, maybe from other pool", conn)
self._used.remove(conn)
if not conn.closed:
if conn.in_transaction:
logger.warning(
"Connection %r is in transaction, closing it.", conn)
conn.close()
elif conn.in_pubsub:
logger.warning(
"Connection %r is in subscribe mode, closing it.", conn)
conn.close()
elif conn._waiters:
logger.warning(
"Connection %r has pending commands, closing it.", conn)
conn.close()
elif conn.db == self.db:
if self.maxsize and self.freesize < self.maxsize:
self._pool.append(conn)
else:
# consider this connection as old and close it.
conn.close()
else:
conn.close()
# FIXME: check event loop is not closed
asyncio.ensure_future(self._wakeup(), loop=self._loop)
def _drop_closed(self):
for i in range(self.freesize):
conn = self._pool[0]
if conn.closed:
self._pool.popleft()
else:
self._pool.rotate(-1)
async def _fill_free(self, *, override_min):
# drop closed connections first
self._drop_closed()
# address = self._address
while self.size < self.minsize:
self._acquiring += 1
try:
conn = await self._create_new_connection(self._address)
# check the healthy of that connection, if
# something went wrong just trigger the Exception
await conn.execute('ping')
self._pool.append(conn)
finally:
self._acquiring -= 1
# connection may be closed at yield point
self._drop_closed()
if self.freesize:
return
if override_min:
while not self._pool and self.size < self.maxsize:
self._acquiring += 1
try:
conn = await self._create_new_connection(self._address)
self._pool.append(conn)
finally:
self._acquiring -= 1
# connection may be closed at yield point
self._drop_closed()
def _create_new_connection(self, address):
return create_connection(address,
db=self._db,
password=self._password,
ssl=self._ssl,
encoding=self._encoding,
parser=self._parser_class,
timeout=self._create_connection_timeout,
connection_cls=self._connection_cls,
loop=self._loop)
async def _wakeup(self, closing_conn=None):
async with self._cond:
self._cond.notify()
if closing_conn is not None:
await closing_conn.wait_closed()
def __enter__(self):
raise RuntimeError(
"'await' should be used as a context manager expression")
def __exit__(self, *args):
pass # pragma: nocover
def __await__(self):
# To make `with await pool` work
conn = yield from self.acquire().__await__()
return _ConnectionContextManager(self, conn)
def get(self):
'''Return async context manager for working with connection.
async with pool.get() as conn:
await conn.execute('get', 'my-key')
'''
return _AsyncConnectionContextManager(self)
class _ConnectionContextManager:
__slots__ = ('_pool', '_conn')
def __init__(self, pool, conn):
self._pool = pool
self._conn = conn
def __enter__(self):
return self._conn
def __exit__(self, exc_type, exc_value, tb):
try:
self._pool.release(self._conn)
finally:
self._pool = None
self._conn = None
class _AsyncConnectionContextManager:
__slots__ = ('_pool', '_conn')
def __init__(self, pool):
self._pool = pool
self._conn = None
async def __aenter__(self):
conn = await self._pool.acquire()
self._conn = conn
return self._conn
async def __aexit__(self, exc_type, exc_value, tb):
try:
self._pool.release(self._conn)
finally:
self._pool = None
self._conn = None