Skip to content

Commit

Permalink
Implementing ConnectionPool.connection.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Nov 17, 2015
1 parent acc61bf commit c8ceb50
Show file tree
Hide file tree
Showing 2 changed files with 161 additions and 13 deletions.
71 changes: 65 additions & 6 deletions gcloud_bigtable/happybase/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
"""Google Cloud Bigtable HappyBase pool module."""


import contextlib
import six
import threading

from gcloud_bigtable.happybase.connection import Connection
from gcloud_bigtable.happybase.connection import _get_cluster


_MIN_POOL_SIZE = 1
"""Minimum allowable size of a connection pool."""


class NoConnectionsAvailable(RuntimeError):
"""Exception raised when no connections are available.
Expand Down Expand Up @@ -58,7 +63,7 @@ def __init__(self, size, **kwargs):
if not isinstance(size, six.integer_types):
raise TypeError('Pool size arg must be an integer')

if size <= 0:
if size < _MIN_POOL_SIZE:
raise ValueError('Pool size must be positive')

self._lock = threading.Lock()
Expand All @@ -75,17 +80,71 @@ def __init__(self, size, **kwargs):
connection = Connection(**connection_kwargs)
self._queue.put(connection)

def _acquire_connection(self, timeout=None):
"""Acquire a connection from the pool.
:type timeout: int
:param timeout: (Optional) Time (in seconds) to wait for a connection
to open.
:rtype: :class:`.Connection`
:returns: An active connection from the queue stored on the pool.
:raises: :class:`NoConnectionsAvailable` if ``Queue.get`` fails
before the ``timeout`` (only if a timeout is specified).
"""
try:
return self._queue.get(block=True, timeout=timeout)
except six.moves.queue.Empty:
raise NoConnectionsAvailable('No connection available from pool '
'within specified timeout')

@contextlib.contextmanager
def connection(self, timeout=None):
"""Obtain a connection from the pool.
Intended to be used as a context manager, but not implemented with
that functionality yet.
Must be used as a context manager, for example::
with pool.connection() as connection:
pass # do something with the connection
If ``timeout`` is omitted, this method waits forever for a connection
to become available.
:type timeout: int
:param timeout: (Optional) Time (in seconds) to wait for a connection
to open.
:raises: :class:`NotImplementedError <exceptions.NotImplementedError>`
temporarily until the method is implemented.
:rtype: :class:`.Connection`
:returns: An active connection from the pool.
:raises: :class:`NoConnectionsAvailable` if no connection can be
retrieved from the pool before the ``timeout`` (only if
a timeout is specified).
"""
raise NotImplementedError('Temporarily not implemented.')
connection = getattr(self._thread_connections, 'current', None)

retrieved_new_cnxn = False
if connection is None:
# In this case we need to actually grab a connection from the
# pool. After retrieval, the connection is stored on a thread
# local so that nested connection requests from the same
# thread can re-use the same connection instance.
#
# NOTE: This code acquires a lock before assigning to the
# thread local; see
# ('https://emptysqua.re/blog/'
# 'another-thing-about-pythons-threadlocals/')
retrieved_new_cnxn = True
connection = self._acquire_connection(timeout)
with self._lock:
self._thread_connections.current = connection

# This is a no-op for connections that have already been opened
# since they just call Client.start().
connection.open()
yield connection

# Remove thread local reference after the outermost 'with' block
# ends. Afterwards the thread no longer owns the connection.
if retrieved_new_cnxn:
del self._thread_connections.current
self._queue.put(connection)
103 changes: 96 additions & 7 deletions gcloud_bigtable/happybase/test_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,77 @@ def test_constructor_non_positive_size(self):
with self.assertRaises(ValueError):
self._makeOne(size)

def test_connection(self):
size = 1
timeout = 10
cluster = _Cluster() # Avoid implicit environ check.
pool = self._makeOne(size, cluster=cluster)
with self.assertRaises(NotImplementedError):
pool.connection(timeout=timeout)
def _makeOneWithMockQueue(self, queue_return):
from gcloud_bigtable._testing import _Monkey
from gcloud_bigtable.happybase import pool as MUT

# We are going to use a fake queue, so we don't want any connections
# or clusters to be created in the constructor.
size = -1
cluster = object()
with _Monkey(MUT, _MIN_POOL_SIZE=size):
pool = self._makeOne(size, cluster=cluster)

pool._queue = _Queue(queue_return)
return pool

def test__acquire_connection(self):
queue_return = object()
pool = self._makeOneWithMockQueue(queue_return)

timeout = 432
connection = pool._acquire_connection(timeout=timeout)
self.assertTrue(connection is queue_return)
self.assertEqual(pool._queue._get_calls, [(True, timeout)])
self.assertEqual(pool._queue._put_calls, [])

def test__acquire_connection_failure(self):
from gcloud_bigtable.happybase.pool import NoConnectionsAvailable

pool = self._makeOneWithMockQueue(None)
timeout = 1027
with self.assertRaises(NoConnectionsAvailable):
pool._acquire_connection(timeout=timeout)
self.assertEqual(pool._queue._get_calls, [(True, timeout)])
self.assertEqual(pool._queue._put_calls, [])

def test_connection_is_context_manager(self):
import contextlib

queue_return = _Connection()
pool = self._makeOneWithMockQueue(queue_return)
cnxn_context = pool.connection()
self.assertTrue(isinstance(cnxn_context,
contextlib.GeneratorContextManager))

def test_connection_no_current_cnxn(self):
queue_return = _Connection()
pool = self._makeOneWithMockQueue(queue_return)
timeout = 55

self.assertFalse(hasattr(pool._thread_connections, 'current'))
with pool.connection(timeout=timeout) as connection:
self.assertEqual(pool._thread_connections.current, queue_return)
self.assertTrue(connection is queue_return)
self.assertFalse(hasattr(pool._thread_connections, 'current'))

self.assertEqual(pool._queue._get_calls, [(True, timeout)])
self.assertEqual(pool._queue._put_calls,
[(queue_return, None, None)])

def test_connection_with_current_cnxn(self):
current_cnxn = _Connection()
queue_return = _Connection()
pool = self._makeOneWithMockQueue(queue_return)
pool._thread_connections.current = current_cnxn
timeout = 8001

with pool.connection(timeout=timeout) as connection:
self.assertTrue(connection is current_cnxn)

self.assertEqual(pool._queue._get_calls, [])
self.assertEqual(pool._queue._put_calls, [])
self.assertEqual(pool._thread_connections.current, current_cnxn)


class _Client(object):
Expand All @@ -152,6 +216,12 @@ def stop(self):
self.stop_calls += 1


class _Connection(object):

def open(self):
pass


class _Cluster(object):

def __init__(self, copies=()):
Expand All @@ -166,3 +236,22 @@ def copy(self):
return result
else:
return self


class _Queue(object):

def __init__(self, result=None):
self.result = result
self._get_calls = []
self._put_calls = []

def get(self, block=None, timeout=None):
self._get_calls.append((block, timeout))
if self.result is None:
import six
raise six.moves.queue.Empty
else:
return self.result

def put(self, item, block=None, timeout=None):
self._put_calls.append((item, block, timeout))

0 comments on commit c8ceb50

Please sign in to comment.