Skip to content

Commit

Permalink
Merge pull request #25 from gmr/autoclean
Browse files Browse the repository at this point in the history
Autoclean
  • Loading branch information
gmr committed Sep 27, 2017
2 parents 8560f4a + 23aec15 commit ee7f6cf
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 42 deletions.
4 changes: 3 additions & 1 deletion docs/history.rst
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
Version History
===============
- Next Release
- Log a warning when a tornado_session.Result is ``__del__'d`` without ``free`` being called.
- Free when tornado_session.Result is ``__del__'d`` without ``free`` being called.
- Auto-clean the pool after Results.free TTL+1 in tornado_session.TornadoSession
- Dont raise NotImplementedError in Results.free for synchronous use, just treat as a noop
- 1.9.1 2016-10-25
- Add better exception handling around connections and getting the logged in user
- 1.9.0 2016-07-01
Expand Down
2 changes: 1 addition & 1 deletion queries/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
Unicode and Unicode arrays.
"""
__version__ = '1.9.1'
__version__ = '1.10.0'
version = __version__

import logging
Expand Down
32 changes: 23 additions & 9 deletions queries/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,13 @@ class Pool(object):
def __init__(self,
pool_id,
idle_ttl=DEFAULT_IDLE_TTL,
max_size=DEFAULT_MAX_SIZE):
max_size=DEFAULT_MAX_SIZE,
time_method=None):
self.connections = {}
self._id = pool_id
self.idle_ttl = idle_ttl
self.max_size = max_size
self.time_method = time_method or time.time

def __contains__(self, connection):
"""Return True if the pool contains the connection"""
Expand Down Expand Up @@ -207,7 +209,7 @@ def free(self, connection):

if self.idle_connections == list(self.connections.values()):
with self._lock:
self.idle_start = time.time()
self.idle_start = self.time_method()
LOGGER.debug('Pool %s freed connection %s', self.id, id(connection))

def get(self, session):
Expand Down Expand Up @@ -258,7 +260,7 @@ def idle_duration(self):
"""
if self.idle_start is None:
return 0
return time.time() - self.idle_start
return self.time_method() - self.idle_start

@property
def is_full(self):
Expand Down Expand Up @@ -375,6 +377,8 @@ def instance(cls):
"""Only allow a single PoolManager instance to exist, returning the
handle for it.
:param callable time_method: Override the default :py:meth`time.time`
method for time calculations. Only applied on first invocation.
:rtype: PoolManager
"""
Expand Down Expand Up @@ -407,13 +411,11 @@ def clean(cls, pid):
with cls._lock:
cls._ensure_pool_exists(pid)
cls._pools[pid].clean()

# If the pool has no open connections, remove it
if not len(cls._pools[pid]):
del cls._pools[pid]
cls._maybe_remove_pool(pid)

@classmethod
def create(cls, pid, idle_ttl=DEFAULT_IDLE_TTL, max_size=DEFAULT_MAX_SIZE):
def create(cls, pid, idle_ttl=DEFAULT_IDLE_TTL, max_size=DEFAULT_MAX_SIZE,
time_method=None):
"""Create a new pool, with the ability to pass in values to override
the default idle TTL and the default maximum size.
Expand All @@ -426,14 +428,16 @@ def create(cls, pid, idle_ttl=DEFAULT_IDLE_TTL, max_size=DEFAULT_MAX_SIZE):
:param str pid: The pool ID
:param int idle_ttl: Time in seconds for the idle TTL
:param int max_size: The maximum pool size
:param callable time_method: Override the use of :py:meth:`time.time`
method for time values.
:raises: KeyError
"""
if pid in cls._pools:
raise KeyError('Pool %s already exists' % pid)
with cls._lock:
LOGGER.debug("Creating Pool: %s (%i/%i)", pid, idle_ttl, max_size)
cls._pools[pid] = Pool(pid, idle_ttl, max_size)
cls._pools[pid] = Pool(pid, idle_ttl, max_size, time_method)

@classmethod
def get(cls, pid, session):
Expand Down Expand Up @@ -595,6 +599,16 @@ def _ensure_pool_exists(cls, pid):
if pid not in cls._pools:
raise KeyError('Pool %s has not been created' % pid)

@classmethod
def _maybe_remove_pool(cls, pid):
"""If the pool has no open connections, remove it
:param str pid: The pool id to clean
"""
if not len(cls._pools[pid]):
del cls._pools[pid]


class QueriesException(Exception):
"""Base Exception for all other Queries exceptions"""
Expand Down
2 changes: 1 addition & 1 deletion queries/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def free(self):
connections.
"""
raise NotImplementedError
LOGGER.debug('Invoking synchronous free has no effect')

def items(self):
"""Return all of the rows that are in the result set.
Expand Down
7 changes: 1 addition & 6 deletions queries/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,10 @@
from psycopg2 import extensions
from psycopg2 import extras

from queries import pool
from queries import results
from queries import utils
from queries import pool, results, utils, DEFAULT_URI, PYPY

LOGGER = logging.getLogger(__name__)

from queries import DEFAULT_URI
from queries import PYPY

DEFAULT_ENCODING = 'UTF8'


Expand Down
19 changes: 15 additions & 4 deletions queries/tornado_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ def __init__(self, cursor, cleanup, fd):
self._fd = fd
self._freed = False

@gen.coroutine
def free(self):
"""Release the results and connection lock from the TornadoSession
object. This **must** be called after you finish processing the results
Expand All @@ -124,7 +123,8 @@ def free(self):

def __del__(self):
if not self._freed:
LOGGER.warning('%s not freed - %r', self.__class__.__name__, self)
LOGGER.warning('Auto-freeing result on deletion')
self.free()


class TornadoSession(session.Session):
Expand Down Expand Up @@ -162,15 +162,18 @@ def __init__(self, uri=DEFAULT_URI,
"""
self._connections = dict()
self._futures = dict()
self._cleanup_callback = None
self._pool_idle_ttl = pool_idle_ttl

self._cursor_factory = cursor_factory
self._ioloop = io_loop or ioloop.IOLoop.instance()
self._ioloop = io_loop or ioloop.IOLoop.current()
self._pool_manager = pool.PoolManager.instance()
self._uri = uri

# Ensure the pool exists in the pool manager
if self.pid not in self._pool_manager:
self._pool_manager.create(self.pid, pool_idle_ttl, pool_max_size)
self._pool_manager.create(self.pid, pool_idle_ttl, pool_max_size,
self._ioloop.time)

@property
def connection(self):
Expand Down Expand Up @@ -438,6 +441,14 @@ def _exec_cleanup(self, cursor, fd):
self._pool_manager.free(self.pid, self._connections[fd])
self._ioloop.remove_handler(fd)

# If the cleanup callback exists, remove it
if self._cleanup_callback:
self._ioloop.remove_timeout(self._cleanup_callback)

# Create a new cleanup callback to clean the pool of idle connections
self._cleanup_callback = self._ioloop.add_timeout(
self._pool_idle_ttl + 1, self._pool_manager.clean, self.pid)

if fd in self._connections:
del self._connections[fd]
if fd in self._futures:
Expand Down
15 changes: 11 additions & 4 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
[flake8]
exclude = .git,build,dist,docs,env

[nosetests]
with-coverage=1
cover-package=queries
cover-branches=1
cover-erase=1
cover-branches = 1
cover-erase = 1
cover-html = 1
cover-html-dir = build/coverage
cover-package = queries
logging-level = DEBUG
verbosity = 2
with-coverage = 1
5 changes: 2 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
'Programming Language :: Python :: 2.6',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: Implementation :: CPython',
'Programming Language :: Python :: Implementation :: PyPy',
'Topic :: Database',
'Topic :: Software Development :: Libraries']

setup(name='queries',
version='1.9.1',
version='1.10.0',
description="Simplified PostgreSQL client built upon Psycopg2",
maintainer="Gavin M. Roy",
maintainer_email="gavinmroy@gmail.com",
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
coverage
flake8
mock
nose
codecov
Expand Down
9 changes: 4 additions & 5 deletions tests/results_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def test_iter_on_empty(self):
self.cursor.rowcount = 0
with mock.patch.object(self.obj, '_rewind') as rewind:
[x for x in self.obj]
assert not rewind.called, '_rewind should not be called on empty result'
assert not rewind.called, \
'_rewind should not be called on empty result'

def test_iter_rewinds(self):
self.cursor.__iter__ = mock.Mock(return_value=iter([1, 2, 3]))
Expand Down Expand Up @@ -102,15 +103,13 @@ def test_count_returns_rowcount(self):
self.cursor.rowcount = 2
self.assertEqual(self.obj.count(), 2)

def test_free_raises_exception(self):
self.assertRaises(NotImplementedError, self.obj.free)

def test_items_returns_on_empty(self):
self.cursor.rowcount = 0
self.cursor.scroll = mock.Mock()
self.cursor.fetchall = mock.Mock()
self.obj.items()
assert not self.cursor.scroll.called, 'Cursor.scroll should not be called on empty result'
assert not self.cursor.scroll.called, \
'Cursor.scroll should not be called on empty result'

def test_items_invokes_scroll(self):
self.cursor.scroll = mock.Mock()
Expand Down
16 changes: 8 additions & 8 deletions tests/tornado_session_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ class SessionInitTests(unittest.TestCase):
def setUp(self):
self.obj = tornado_session.TornadoSession()

#def test_creates_empty_callback_dict(self):
# self.assertDictEqual(self.obj._futures, {})
def test_creates_empty_callback_dict(self):
self.assertDictEqual(self.obj._futures, {})

# def test_creates_empty_connections_dict(self):
# self.assertDictEqual(self.obj._connections, {})
def test_creates_empty_connections_dict(self):
self.assertDictEqual(self.obj._connections, {})

def test_sets_default_cursor_factory(self):
self.assertEqual(self.obj._cursor_factory, extras.RealDictCursor)
Expand Down Expand Up @@ -105,8 +105,8 @@ def test_connect_gets_pooled_connection(self):
conn = yield self.obj._connect()
self.obj._pool_manager.add(self.obj.pid, conn)
with mock.patch.object(self.obj._pool_manager, 'get') as get:
with mock.patch.object(self.io_loop, 'add_handler') as add_handler:
second_result = yield self.obj._connect()
with mock.patch.object(self.io_loop, 'add_handler'):
yield self.obj._connect()
get.assert_called_once_with(self.obj.pid, self.obj)

@testing.gen_test
Expand All @@ -116,7 +116,7 @@ def test_connect_pooled_connection_invokes_add_handler(self):
with mock.patch.object(self.obj._pool_manager, 'get') as get:
get.return_value = self.conn
with mock.patch.object(self.io_loop, 'add_handler') as add_handler:
second_result = yield self.obj._connect()
yield self.obj._connect()
add_handler.assert_called_once_with(self.conn.fileno(),
self.obj._on_io_events,
ioloop.IOLoop.WRITE)
Expand Down Expand Up @@ -232,5 +232,5 @@ def test_validate_invokes_connect(self):
future.set_result(connection)
_connect.return_value = future
obj = tornado_session.TornadoSession(io_loop=self.io_loop)
result = yield obj.validate()
yield obj.validate()
_connect.assert_called_once_with()

0 comments on commit ee7f6cf

Please sign in to comment.