From cbe501544778699d526da7d9a436e6c9689d8653 Mon Sep 17 00:00:00 2001 From: Pau Freixes Date: Wed, 5 Jul 2017 01:03:54 +0200 Subject: [PATCH 1/3] Support for MaxClientsError for connection and pool #186 When a new connection tries to get connected to the redis server and it has been configured with a maximum number of connections, the client will get a `MaxClientsError` exception if this limit is reached out. Also if a pool initialization with a `min_size` that can meet the requirements - beucase of that limit u others issues - the pool creation will return also a `MaxClientsError`. --- aioredis/__init__.py | 2 ++ aioredis/connection.py | 14 ++++++++++++-- aioredis/errors.py | 20 +++++++++++++++++++- aioredis/pool.py | 3 +++ tests/connection_test.py | 14 ++++++++++++++ tests/errors_test.py | 23 +++++++++++++++++++++++ tests/pool_test.py | 9 +++++---- tests/transaction_commands_test.py | 2 +- 8 files changed, 79 insertions(+), 8 deletions(-) create mode 100644 tests/errors_test.py diff --git a/aioredis/__init__.py b/aioredis/__init__.py index 8a7c3fe84..7d454d276 100644 --- a/aioredis/__init__.py +++ b/aioredis/__init__.py @@ -16,6 +16,7 @@ ReadOnlyError, RedisError, ReplyError, + MaxClientsError, ChannelClosedError, WatchVariableError, PoolClosedError, @@ -43,6 +44,7 @@ # Errors 'RedisError', 'ReplyError', + 'MaxClientsError', 'ProtocolError', 'PipelineError', 'MultiExecError', diff --git a/aioredis/connection.py b/aioredis/connection.py index 818ab9ff2..ebc2d8d5b 100644 --- a/aioredis/connection.py +++ b/aioredis/connection.py @@ -24,6 +24,7 @@ ReplyError, WatchVariableError, ReadOnlyError, + MaxClientsError ) from .pubsub import Channel from .abc import AbcChannel @@ -155,6 +156,7 @@ def __init__(self, reader, writer, *, address, encoding=None, ) self._reader_task = asyncio.ensure_future(self._read_data(), loop=self._loop) + self._close_msg = None self._db = 0 self._closing = False self._closed = False @@ -200,11 +202,13 @@ async def _read_data(self): last_error = ConnectionClosedError("Reader at end of file") break + if isinstance(obj, MaxClientsError): + last_error = obj + break if self._in_pubsub: self._process_pubsub(obj) else: self._process_data(obj) - self._closing = True self._loop.call_soon(self._do_close, last_error) @@ -279,7 +283,9 @@ def execute(self, command, *args, encoding=_NOTSET): is broken. """ if self._reader is None or self._reader.at_eof(): - raise ConnectionClosedError("Connection closed or corrupted") + msg = self._close_msg if self._close_msg else\ + "Connection closed or corrupted" + raise ConnectionClosedError(msg) if command is None: raise TypeError("command must not be None") if None in set(args): @@ -356,6 +362,10 @@ def _do_close(self, exc): self._reader_task = None self._writer = None self._reader = None + + if exc is not None: + self._close_msg = str(exc) + while self._waiters: waiter, *spam = self._waiters.popleft() logger.debug("Cancelling waiter %r", (waiter, spam)) diff --git a/aioredis/errors.py b/aioredis/errors.py index 5d3b7c660..e0d6a3242 100644 --- a/aioredis/errors.py +++ b/aioredis/errors.py @@ -2,6 +2,7 @@ 'RedisError', 'ProtocolError', 'ReplyError', + 'MaxClientsError', 'PipelineError', 'MultiExecError', 'WatchVariableError', @@ -25,8 +26,25 @@ class ProtocolError(RedisError): class ReplyError(RedisError): """Raised for redis error replies (-ERR).""" + _REPLY = None -class PipelineError(ReplyError): + def __new__(cls, *args): + msg, *_ = args + for c in cls.__subclasses__(): + if msg == c._REPLY: + return c(*args) + + return super().__new__(cls, *args) + + +class MaxClientsError(ReplyError): + """Raised for redis server when the maximum number of client has been + reached.""" + + _REPLY = "ERR max number of clients reached" + + +class PipelineError(RedisError): """Raised if command within pipeline raised error.""" def __init__(self, errors): diff --git a/aioredis/pool.py b/aioredis/pool.py index 8ac5ba64c..8fb06cff7 100644 --- a/aioredis/pool.py +++ b/aioredis/pool.py @@ -386,6 +386,9 @@ async def _fill_free(self, *, override_min): self._acquiring += 1 try: conn = await self._create_new_connection(address) + # check the healthy of that connection, if + # something went wrong just trigger the Exception + await conn.execute('ping') self._pool.append(conn) finally: self._acquiring -= 1 diff --git a/tests/connection_test.py b/tests/connection_test.py index 63363342e..9b60111f8 100644 --- a/tests/connection_test.py +++ b/tests/connection_test.py @@ -12,6 +12,7 @@ RedisError, ReplyError, Channel, + MaxClientsError ) @@ -104,6 +105,19 @@ async def test_connect_unixsocket_timeout(create_connection, loop, server): server.unixsocket, db=0, loop=loop, timeout=0.1) +@pytest.mark.run_loop +def test_connect_maxclients(request, create_connection, loop, start_server): + server = start_server('server-maxclients') + conn = yield from create_connection( + server.tcp_address, loop=loop) + yield from conn.execute(b'CONFIG', b'SET', 'maxclients', 1) + + with pytest.raises(MaxClientsError): + conn2 = yield from create_connection( + server.tcp_address, loop=loop) + yield from conn2.execute('ping') + + def test_global_loop(create_connection, loop, server): asyncio.set_event_loop(loop) diff --git a/tests/errors_test.py b/tests/errors_test.py new file mode 100644 index 000000000..df9f68cc6 --- /dev/null +++ b/tests/errors_test.py @@ -0,0 +1,23 @@ +from aioredis.errors import ReplyError +from aioredis.errors import MaxClientsError + + +class TestReplyError: + + def test_return_default_class(self): + assert isinstance(ReplyError(None), ReplyError) + + def test_return_adhoc_class(self): + class MyError(ReplyError): + _REPLY = "my error" + + assert isinstance(ReplyError("my error"), MyError) + + +class TestMaxClientsError: + + def test_return_max_clients_error(self): + assert isinstance( + ReplyError("ERR max number of clients reached"), + MaxClientsError + ) diff --git a/tests/pool_test.py b/tests/pool_test.py index 99e17973d..ba1fa4815 100644 --- a/tests/pool_test.py +++ b/tests/pool_test.py @@ -9,7 +9,8 @@ ReplyError, PoolClosedError, ConnectionClosedError, - ConnectionsPool + ConnectionsPool, + MaxClientsError ) @@ -467,16 +468,16 @@ async def test_pool_check_closed_when_exception( await redis.config_set('maxclients', 2) with pytest.logs('aioredis', 'DEBUG') as cm: - with pytest.raises(Exception): + with pytest.raises(MaxClientsError): await create_pool(address=tuple(server.tcp_address), - minsize=2, loop=loop) + minsize=3, loop=loop) assert len(cm.output) >= 3 connect_msg = ( "DEBUG:aioredis:Creating tcp connection" " to ('localhost', {})".format(server.tcp_address.port)) assert cm.output[:2] == [connect_msg, connect_msg] - assert cm.output[-1] == "DEBUG:aioredis:Closed 1 connections" + assert cm.output[-1] == "DEBUG:aioredis:Closed 1 connection(s)" @pytest.mark.run_loop diff --git a/tests/transaction_commands_test.py b/tests/transaction_commands_test.py index d6bc7ffdb..232492d75 100644 --- a/tests/transaction_commands_test.py +++ b/tests/transaction_commands_test.py @@ -97,7 +97,7 @@ async def test_discard(redis): fut2 = tr.connection.execute('MULTI') fut3 = tr.connection.execute('incr', 'foo') - with pytest.raises(ReplyError): + with pytest.raises(MultiExecError): await tr.execute() with pytest.raises(TypeError): await fut1 From d4faefb4294708004e07bd8ff4687e1db3a5f5d0 Mon Sep 17 00:00:00 2001 From: Pau Freixes Date: Wed, 5 Jul 2017 09:04:57 +0200 Subject: [PATCH 2/3] Maxclients test must run only for Redis >= 2.8.0 --- tests/connection_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/connection_test.py b/tests/connection_test.py index 9b60111f8..121c255cb 100644 --- a/tests/connection_test.py +++ b/tests/connection_test.py @@ -106,6 +106,7 @@ async def test_connect_unixsocket_timeout(create_connection, loop, server): @pytest.mark.run_loop +@pytest.redis_version(2, 8, 0, reason="maxclients config setting") def test_connect_maxclients(request, create_connection, loop, start_server): server = start_server('server-maxclients') conn = yield from create_connection( From 29c360ba9a58758e432b15d513902f73ebb5c828 Mon Sep 17 00:00:00 2001 From: Alexey Popravka Date: Tue, 14 Nov 2017 11:38:21 +0200 Subject: [PATCH 3/3] fix tests (await syntax) --- aioredis/connection.py | 3 +-- tests/connection_test.py | 12 ++++++------ tests/pool_test.py | 2 +- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/aioredis/connection.py b/aioredis/connection.py index ebc2d8d5b..13239f15c 100644 --- a/aioredis/connection.py +++ b/aioredis/connection.py @@ -283,8 +283,7 @@ def execute(self, command, *args, encoding=_NOTSET): is broken. """ if self._reader is None or self._reader.at_eof(): - msg = self._close_msg if self._close_msg else\ - "Connection closed or corrupted" + msg = self._close_msg or "Connection closed or corrupted" raise ConnectionClosedError(msg) if command is None: raise TypeError("command must not be None") diff --git a/tests/connection_test.py b/tests/connection_test.py index 121c255cb..e8815dcb2 100644 --- a/tests/connection_test.py +++ b/tests/connection_test.py @@ -107,16 +107,16 @@ async def test_connect_unixsocket_timeout(create_connection, loop, server): @pytest.mark.run_loop @pytest.redis_version(2, 8, 0, reason="maxclients config setting") -def test_connect_maxclients(request, create_connection, loop, start_server): +async def test_connect_maxclients(create_connection, loop, start_server): server = start_server('server-maxclients') - conn = yield from create_connection( + conn = await create_connection( server.tcp_address, loop=loop) - yield from conn.execute(b'CONFIG', b'SET', 'maxclients', 1) + await conn.execute(b'CONFIG', b'SET', 'maxclients', 1) - with pytest.raises(MaxClientsError): - conn2 = yield from create_connection( + with pytest.raises((MaxClientsError, ConnectionError)): + conn2 = await create_connection( server.tcp_address, loop=loop) - yield from conn2.execute('ping') + await conn2.execute('ping') def test_global_loop(create_connection, loop, server): diff --git a/tests/pool_test.py b/tests/pool_test.py index ba1fa4815..5c3bbbf26 100644 --- a/tests/pool_test.py +++ b/tests/pool_test.py @@ -468,7 +468,7 @@ async def test_pool_check_closed_when_exception( await redis.config_set('maxclients', 2) with pytest.logs('aioredis', 'DEBUG') as cm: - with pytest.raises(MaxClientsError): + with pytest.raises((MaxClientsError, ConnectionError)): await create_pool(address=tuple(server.tcp_address), minsize=3, loop=loop)