From 15b4d97985bbaecf117343140ef464f633191368 Mon Sep 17 00:00:00 2001 From: Changaco Date: Mon, 7 Oct 2019 15:30:41 +0200 Subject: [PATCH 1/2] switch `idle_connections` back to a LIFO queue I used a FIFO queue so that the `getconn` method would close old connections automatically, but doing this can result in more connections being kept open than necessary, thus defeating the purpose of the `idle_timeout` attribute. --- psycopg2_pool/__init__.py | 42 +++++++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/psycopg2_pool/__init__.py b/psycopg2_pool/__init__.py index 4c9c638..e5749d3 100644 --- a/psycopg2_pool/__init__.py +++ b/psycopg2_pool/__init__.py @@ -54,7 +54,7 @@ class ConnectionPool(object): .. attribute:: idle_connections - The pool of unused connections, first in first out. + The pool of unused connections, last in first out. Type: :class:`collections.deque`. .. attribute:: return_times @@ -113,7 +113,7 @@ def getconn(self): while True: try: # Attempt to take an idle connection from the pool. - conn = self.idle_connections.popleft() + conn = self.idle_connections.pop() except IndexError: # We don't have any idle connection available, open a new one. if len(self.connections_in_use) >= self.maxconn: @@ -143,6 +143,7 @@ def putconn(self, conn): self.connections_in_use.discard(conn) # Determine if the connection should be kept or discarded. + current_time = uptime() if self.idle_timeout == 0 and len(self.idle_connections) >= self.minconn: conn.close() else: @@ -154,9 +155,42 @@ def putconn(self, conn): if status != _ext.TRANSACTION_STATUS_IDLE: # The connection is still in a transaction, roll it back. conn.rollback() - self.return_times[conn] = uptime() + self.return_times[conn] = current_time self.idle_connections.append(conn) + # Clean up the idle connections. + if self.idle_timeout: + # We cap the number of iterations to ensure that we don't end up in + # an infinite loop. + for i in range(len(self.idle_connections)): + try: + conn = self.idle_connections[0] + except IndexError: + break + return_time = self.return_times.get(conn) + if return_time is None: + # The connection's return time is missing, give up. + break + if return_time < (current_time - self.idle_timeout): + # This connection has been idle too long, attempt to drop it. + try: + popped_conn = self.idle_connections.popleft() + except IndexError: + # Another thread removed this connection from the queue. + continue + if popped_conn == conn: + # Okay, we can close and discard this connection. + self.return_times.pop(conn, None) + conn.close() + else: + # We got a different connection, put it back. + self.idle_connections.appendleft(popped_conn) + continue + else: + # The leftmost connection isn't too old, so we can assume + # that the other ones aren't either. + break + # Open new connections if we've dropped below minconn. while (len(self.idle_connections) + len(self.connections_in_use)) < self.minconn: self._connect() @@ -167,7 +201,7 @@ def clear(self): This method could be useful if you have periods of high activity that result in many connections being opened, followed by prolonged periods - with zero activity (no connections checked out of the pool at all), + with zero activity (no calls to :meth:`getconn` or :meth:`putconn`), *and* you care about closing those extraneous connections during the inactivity period. It's up to you to call this method in that case. From eb131b69167802ca02ca67cdaa16623557e32b83 Mon Sep 17 00:00:00 2001 From: Changaco Date: Tue, 8 Oct 2019 10:59:26 +0200 Subject: [PATCH 2/2] add a regression test --- tests.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests.py b/tests.py index 356b7cd..a389a2d 100644 --- a/tests.py +++ b/tests.py @@ -172,6 +172,34 @@ def test_caching(self): assert total_cons_after_get == total_cons + def test_extraneous_connections_are_discarded(self): + pool = ConnectionPool(minconn=0, idle_timeout=120) + # Get multiple connections then put them all back + conns = [pool.getconn() for i in range(3)] + for conn in conns: + pool.putconn(conn) + # Simulate 1 minute passing + for conn in conns: + pool.return_times[conn] -= 60 + # Check out one connection multiple times, we should always get the same one + last_conn = conns[-1] + for i in range(len(conns)): + conn = pool.getconn() + assert conn is last_conn + pool.putconn(conn) + # Simulate another minute passing + for conn in conns: + pool.return_times[conn] -= 60 + # Get a connection then return it, the other connections should be + # discarded by `putconn` because they're too old now + conn = pool.getconn() + assert conn is last_conn + assert list(pool.idle_connections) == conns[:2] + assert set(pool.return_times) == set(conns[:2]) + pool.putconn(conn) + assert list(pool.idle_connections) == [conn] + assert set(pool.return_times) == set([conn]) + def test_clear(self): pool = ConnectionPool(0, 10) conn1 = pool.getconn()