feat: Add ConnectionPool class#666
Conversation
Release 0.4.56
Release 0.4.57
Release 0.4.58
There was a problem hiding this comment.
Pull request overview
This PR adds a reusable, thread-safe ConnectionPool utility to aperturedb-python, ported from the workflows repository, and introduces a basic test suite intended to verify checkout/return and concurrent usage.
Changes:
- Added
aperturedb.ConnectionPool.ConnectionPoolimplementing a fixed-size pool backed by a blocking queue and a context-managed checkout API. - Added
test/test_ConnectionPool.pyto exercise pool initialization, borrowing/returning, a conveniencequery()method, and basic concurrency.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| aperturedb/ConnectionPool.py | Introduces the new ConnectionPool class and its initialization / checkout / query convenience APIs. |
| test/test_ConnectionPool.py | Adds tests for pool behavior, including concurrency. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
ab83370 to
c802a45
Compare
|
The ConnectionPool feature is ready and all tests are passing. PR is ready for final review and merge. |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 15 changed files in this pull request and generated 8 comments.
Comments suppressed due to low confidence (5)
aperturedb/CSVWriter.py:71
- Multi-line expression inside an f-string requires Python 3.12+ (PEP 701). The project supports Python >=3.10, so this will be a
SyntaxErroron 3.10/3.11. Same issue as elsewhere in this file (lines 33-34, 70-71, 87-88, 146-147, 151-152, 160-161). All occurrences should be reverted to single-line f-strings or rewritten without newlines between{and}.
assert source_column in df.columns, f"source_column {
source_column} not found in the input data"
aperturedb/CSVWriter.py:88
- Same multi-line-f-string issue: requires Python 3.12+, but the project supports Python 3.10+. Will fail to import on 3.10/3.11.
assert unique_key in df.columns, f"unique_key {
unique_key} not found in the input data"
aperturedb/CSVWriter.py:147
- Same multi-line-f-string issue: requires Python 3.12+, but the project supports Python 3.10+. Will fail to import on 3.10/3.11.
assert source_column in df.columns, f"source_column {
source_column} not found in the input data"
aperturedb/CSVWriter.py:152
- Same multi-line-f-string issue: requires Python 3.12+, but the project supports Python 3.10+. Will fail to import on 3.10/3.11.
assert destination_column in df.columns, f"destination_column {
destination_column} not found in the input data"
aperturedb/CSVWriter.py:161
- Same multi-line-f-string issue: requires Python 3.12+, but the project supports Python 3.10+. Will fail to import on 3.10/3.11.
assert unique_key in df.columns, f"unique_key {
unique_key} not found in the input data"
34e9f58 to
4082326
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (4)
aperturedb/ConnectionPool.py:97
self._pool_size -= 1is executed from withinget_connection, which may be called concurrently from multiple threads. Since plain integer decrement is not atomic across thread context switches and there is no lock protecting_pool_size, concurrent failed reconnect attempts can race and corrupt the counter. Consider protecting mutations to_pool_sizewith athreading.Lock, or usequeue.Queue's own bookkeeping rather than a separately maintained counter.
# Reduce total pool size since connection could not be recreated
self._pool_size -= 1
aperturedb/ConnectionPool.py:97
- If
_connection_factory()fails repeatedly here (e.g., transient network issue when an exception bubbles up from a query),_pool_sizeis decremented but no new item is ever placed back into the queue. Over time the pool can shrink to zero, at which point any subsequentself._pool.get()(inget_connection) will block forever sinceQueue.get()has no default timeout. Consider either retrying the factory, providing a timeout toget(), or raising when the pool becomes empty so callers don't hang indefinitely.
else:
try:
new_connection = self._connection_factory()
self._pool.put(new_connection)
except Exception as e:
logger.error(
f"Failed to recreate connection for pool: {e}")
# Reduce total pool size since connection could not be recreated
self._pool_size -= 1
aperturedb/ConnectionPool.py:95
- Any exception raised by
connection.query(...)inside the user'swith pool.get_connection()block (or insidepool.query(...)) is treated as a sign that the connection is broken, causing it to be discarded and replaced. However, many exceptions (e.g., a malformed query, application logic errors, server-side query errors) do not indicate a broken transport. As written, every such error churns a connection through the factory unnecessarily and can mask the original error if the recreate also fails. Consider narrowing the discard logic to specific connection/transport errors.
except Exception:
return_to_pool = False
raise
finally:
# This block is guaranteed to execute, ensuring the connection
# is always returned to the pool unless an exception occurred.
if return_to_pool:
self._pool.put(connection)
else:
try:
new_connection = self._connection_factory()
self._pool.put(new_connection)
except Exception as e:
logger.error(
f"Failed to recreate connection for pool: {e}")
aperturedb/ConnectionPool.py:62
- There is no
close()/shutdown()method, nor support for the context manager protocol onConnectionPoolitself, to dispose of all underlying connections when the pool is no longer needed. For long-running services this is fine since the pool lives for the process lifetime, but for shorter-lived usage (and for the tests in this PR, which create multiple pools), connections are leaked until garbage collection. Consider adding an explicitclose()that drains the queue and closes eachConnector.
def available(self) -> int:
"""Returns the number of available connections in the pool."""
return self._pool.qsize()
def total(self) -> int:
"""Returns the total number of connections in the pool."""
return self._pool_size
ad-claw000
left a comment
There was a problem hiding this comment.
I have addressed all recent review comments in the latest commit:
- ConnectionPool Initializer: Re-written to properly capture the last exception and raise a detailed
ConnectionErrorif the pool cannot be fully initialized to the requestedpool_size. - Concurrency /
_pool_size: Removed the unguarded mutations of_pool_sizeand removed the recreation logic insideget_connection(). Connections are always returned to the pool; if a connection is severed, theConnectorclass itself natively handles reconnecting under the hood upon its next usage. - Timeout: Added a
timeoutparameter toget_connection()(and properly catchingqueue.Emptyto raiseTimeoutError) so callers do not block indefinitely. - Test
workerconcurrency: Caught exceptions in the thread worker are now appended to the results array, and explicitly asserted at the end of the test to provide visible tracebacks upon failure. - Multline f-strings in CSVWriter: This was already addressed in a separate PR (#676) that reformatted those lines to be Python 3.10 compatible.
Let me know if there's anything else!
c2cb16d to
39c70f6
Compare
ad-claw000
left a comment
There was a problem hiding this comment.
Applied autopep8 formatting to the new log messages to fix the pre-commit CI failure.
| # This block is guaranteed to execute, ensuring the connection | ||
| # is always returned to the pool unless an exception occurred. |
| @@ -0,0 +1,119 @@ | |||
| import threading | |||
- Fix f-string syntax error across lines - Drain and close created connections if initialization fails - Add note about connection health responsibility on exception - Invoke close() on Connector instances when draining pool - Remove duplicate test definition
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
aperturedb/ConnectionPool.py:61
- The ConnectionError message uses an f-string where the
{pool_size}expression is split across lines. That syntax will raise a SyntaxError on Python 3.10/3.11 (the project supports >=3.10). Build this message without newlines inside{...}(e.g., keep the expression on one line or assemble the message separately).
raise ConnectionError(
f"Failed to initialize pool: expected {
pool_size} connections, got {self._pool.qsize()}."
) from last_error
| @@ -0,0 +1,133 @@ | |||
| import threading | |||
| class TestConnectionPool(unittest.TestCase): | ||
| def setUp(self): | ||
| pass | ||
|
|
||
| def test_pool_initialization(self): | ||
| pool = ConnectionPool( | ||
| pool_size=3, connection_factory=_make_connector) | ||
| self.assertEqual(pool.total(), 3) | ||
| self.assertEqual(pool.available(), 3) |
| yield connection | ||
| finally: | ||
| # This block is guaranteed to execute, ensuring the connection | ||
| # is always returned to the pool unless an exception occurred. |
| def close(self): | ||
| """ | ||
| Closes all connections in the pool. | ||
| """ | ||
| while not self._pool.empty(): | ||
| try: | ||
| conn = self._pool.get_nowait() | ||
| if hasattr(conn, 'close'): | ||
| conn.close() | ||
| except queue.Empty: | ||
| break |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
aperturedb/ConnectionPool.py:134
close()currently won’t reliably close all pooled connections: it only drains currently-available items, and it callsconn.close()only when present (butConnector/ConnectorRestdon’t exposeclose()). Connections checked out at the time ofclose()will be returned later viaget_connection()’sfinally, effectively “reopening” the pool and leaking resources. Consider adding a pool-level closed flag so returned connections are closed instead of re-queued, and ensure Connector instances have a deterministic close/shutdown API.
def close(self):
"""
Closes all connections in the pool.
"""
while not self._pool.empty():
try:
conn = self._pool.get_nowait()
if hasattr(conn, 'close'):
conn.close()
except queue.Empty:
| @@ -0,0 +1,135 @@ | |||
| import threading | |||
| while not self._pool.empty(): | ||
| try: | ||
| conn = self._pool.get_nowait() | ||
| if hasattr(conn, 'close'): | ||
| conn.close() | ||
| except queue.Empty: |
| yield connection | ||
| finally: | ||
| # This block is guaranteed to execute, ensuring the connection | ||
| # is always returned to the pool unless an exception occurred. |
Closes #598
This PR brings the connection pool implementation from the workflows repo directly into
aperturedb-python, making it generally available asaperturedb.ConnectionPool.ConnectionPool.Included tests verify connection checkout/return functionality.