Skip to content

Commit

Permalink
Stricter locking around pool connection management.
Browse files Browse the repository at this point in the history
  • Loading branch information
coleifer committed Dec 12, 2023
1 parent 110faf3 commit ea3fb11
Showing 1 changed file with 38 additions and 23 deletions.
61 changes: 38 additions & 23 deletions playhouse/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ class Meta:
That's it!
"""
import functools
import heapq
import logging
import random
import threading
import time
from collections import namedtuple
from itertools import chain
Expand Down Expand Up @@ -67,6 +69,14 @@ class MaxConnectionsExceeded(ValueError): pass
'checked_out'))


def locked(fn):
@functools.wraps(fn)
def inner(self, *args, **kwargs):
with self._pool_lock:
return fn(self, *args, **kwargs)
return inner


class PooledDatabase(object):
def __init__(self, database, max_connections=20, stale_timeout=None,
timeout=None, **kwargs):
Expand All @@ -76,6 +86,8 @@ def __init__(self, database, max_connections=20, stale_timeout=None,
if self._wait_timeout == 0:
self._wait_timeout = float('inf')

self._pool_lock = threading.RLock()

# Available / idle connections stored in a heap, sorted oldest first.
self._connections = []

Expand Down Expand Up @@ -119,6 +131,7 @@ def connect(self, reuse_if_open=False):
raise MaxConnectionsExceeded('Max connections exceeded, timed out '
'attempting to connect.')

@locked
def _connect(self):
while True:
try:
Expand Down Expand Up @@ -154,7 +167,7 @@ def _connect(self):
len(self._in_use) >= self._max_connections):
raise MaxConnectionsExceeded('Exceeded maximum connections.')
conn = super(PooledDatabase, self)._connect()
ts = time.time() - random.random() / 1000
ts = time.time()
key = self.conn_key(conn)
logger.debug('Created new connection %s.', key)

Expand All @@ -173,6 +186,7 @@ def _can_reuse(self, conn):
# Called on check-in to make sure the connection can be re-used.
return True

@locked
def _close(self, conn, close_conn=False):
key = self.conn_key(conn)
if close_conn:
Expand All @@ -188,6 +202,7 @@ def _close(self, conn, close_conn=False):
else:
logger.debug('Closed %s.', key)

@locked
def manual_close(self):
"""
Close the underlying connection without returning it to the pool.
Expand All @@ -206,40 +221,40 @@ def manual_close(self):
self.close()
self._close(conn, close_conn=True)

@locked
def close_idle(self):
# Close any open connections that are not currently in-use.
with self._lock:
for _, conn in self._connections:
self._close(conn, close_conn=True)
self._connections = []
for _, conn in self._connections:
self._close(conn, close_conn=True)
self._connections = []

@locked
def close_stale(self, age=600):
# Close any connections that are in-use but were checked out quite some
# time ago and can be considered stale.
with self._lock:
in_use = {}
cutoff = time.time() - age
n = 0
for key, pool_conn in self._in_use.items():
if pool_conn.checked_out < cutoff:
self._close(pool_conn.connection, close_conn=True)
n += 1
else:
in_use[key] = pool_conn
self._in_use = in_use
in_use = {}
cutoff = time.time() - age
n = 0
for key, pool_conn in self._in_use.items():
if pool_conn.checked_out < cutoff:
self._close(pool_conn.connection, close_conn=True)
n += 1
else:
in_use[key] = pool_conn
self._in_use = in_use
return n

@locked
def close_all(self):
# Close all connections -- available and in-use. Warning: may break any
# active connections used by other threads.
self.close()
with self._lock:
for _, conn in self._connections:
self._close(conn, close_conn=True)
for pool_conn in self._in_use.values():
self._close(pool_conn.connection, close_conn=True)
self._connections = []
self._in_use = {}
for _, conn in self._connections:
self._close(conn, close_conn=True)
for pool_conn in self._in_use.values():
self._close(pool_conn.connection, close_conn=True)
self._connections = []
self._in_use = {}


class PooledMySQLDatabase(PooledDatabase, MySQLDatabase):
Expand Down

0 comments on commit ea3fb11

Please sign in to comment.