Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Support maxclients error #186 #325

Merged
merged 3 commits into from
Nov 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions aioredis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
ReadOnlyError,
RedisError,
ReplyError,
MaxClientsError,
ChannelClosedError,
WatchVariableError,
PoolClosedError,
Expand Down Expand Up @@ -43,6 +44,7 @@
# Errors
'RedisError',
'ReplyError',
'MaxClientsError',
'ProtocolError',
'PipelineError',
'MultiExecError',
Expand Down
13 changes: 11 additions & 2 deletions aioredis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ReplyError,
WatchVariableError,
ReadOnlyError,
MaxClientsError
)
from .pubsub import Channel
from .abc import AbcChannel
Expand Down Expand Up @@ -155,6 +156,7 @@ def __init__(self, reader, writer, *, address, encoding=None,
)
self._reader_task = asyncio.ensure_future(self._read_data(),
loop=self._loop)
self._close_msg = None
self._db = 0
self._closing = False
self._closed = False
Expand Down Expand Up @@ -200,11 +202,13 @@ async def _read_data(self):
last_error = ConnectionClosedError("Reader at end of file")
break

if isinstance(obj, MaxClientsError):
last_error = obj
break
if self._in_pubsub:
self._process_pubsub(obj)
else:
self._process_data(obj)

self._closing = True
self._loop.call_soon(self._do_close, last_error)

Expand Down Expand Up @@ -279,7 +283,8 @@ def execute(self, command, *args, encoding=_NOTSET):
is broken.
"""
if self._reader is None or self._reader.at_eof():
raise ConnectionClosedError("Connection closed or corrupted")
msg = self._close_msg or "Connection closed or corrupted"
raise ConnectionClosedError(msg)
if command is None:
raise TypeError("command must not be None")
if None in set(args):
Expand Down Expand Up @@ -356,6 +361,10 @@ def _do_close(self, exc):
self._reader_task = None
self._writer = None
self._reader = None

if exc is not None:
self._close_msg = str(exc)

while self._waiters:
waiter, *spam = self._waiters.popleft()
logger.debug("Cancelling waiter %r", (waiter, spam))
Expand Down
20 changes: 19 additions & 1 deletion aioredis/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
'RedisError',
'ProtocolError',
'ReplyError',
'MaxClientsError',
'PipelineError',
'MultiExecError',
'WatchVariableError',
Expand All @@ -25,8 +26,25 @@ class ProtocolError(RedisError):
class ReplyError(RedisError):
"""Raised for redis error replies (-ERR)."""

_REPLY = None

class PipelineError(ReplyError):
def __new__(cls, *args):
msg, *_ = args
for c in cls.__subclasses__():
if msg == c._REPLY:
return c(*args)

return super().__new__(cls, *args)


class MaxClientsError(ReplyError):
"""Raised for redis server when the maximum number of client has been
reached."""

_REPLY = "ERR max number of clients reached"


class PipelineError(RedisError):
"""Raised if command within pipeline raised error."""

def __init__(self, errors):
Expand Down
3 changes: 3 additions & 0 deletions aioredis/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,9 @@ async def _fill_free(self, *, override_min):
self._acquiring += 1
try:
conn = await self._create_new_connection(address)
# check the healthy of that connection, if
# something went wrong just trigger the Exception
await conn.execute('ping')
self._pool.append(conn)
finally:
self._acquiring -= 1
Expand Down
15 changes: 15 additions & 0 deletions tests/connection_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
RedisError,
ReplyError,
Channel,
MaxClientsError
)


Expand Down Expand Up @@ -104,6 +105,20 @@ async def test_connect_unixsocket_timeout(create_connection, loop, server):
server.unixsocket, db=0, loop=loop, timeout=0.1)


@pytest.mark.run_loop
@pytest.redis_version(2, 8, 0, reason="maxclients config setting")
async def test_connect_maxclients(create_connection, loop, start_server):
server = start_server('server-maxclients')
conn = await create_connection(
server.tcp_address, loop=loop)
await conn.execute(b'CONFIG', b'SET', 'maxclients', 1)

with pytest.raises((MaxClientsError, ConnectionError)):
conn2 = await create_connection(
server.tcp_address, loop=loop)
await conn2.execute('ping')


def test_global_loop(create_connection, loop, server):
asyncio.set_event_loop(loop)

Expand Down
23 changes: 23 additions & 0 deletions tests/errors_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from aioredis.errors import ReplyError
from aioredis.errors import MaxClientsError


class TestReplyError:

def test_return_default_class(self):
assert isinstance(ReplyError(None), ReplyError)

def test_return_adhoc_class(self):
class MyError(ReplyError):
_REPLY = "my error"

assert isinstance(ReplyError("my error"), MyError)


class TestMaxClientsError:

def test_return_max_clients_error(self):
assert isinstance(
ReplyError("ERR max number of clients reached"),
MaxClientsError
)
9 changes: 5 additions & 4 deletions tests/pool_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
ReplyError,
PoolClosedError,
ConnectionClosedError,
ConnectionsPool
ConnectionsPool,
MaxClientsError
)


Expand Down Expand Up @@ -467,16 +468,16 @@ async def test_pool_check_closed_when_exception(
await redis.config_set('maxclients', 2)

with pytest.logs('aioredis', 'DEBUG') as cm:
with pytest.raises(Exception):
with pytest.raises((MaxClientsError, ConnectionError)):
await create_pool(address=tuple(server.tcp_address),
minsize=2, loop=loop)
minsize=3, loop=loop)

assert len(cm.output) >= 3
connect_msg = (
"DEBUG:aioredis:Creating tcp connection"
" to ('localhost', {})".format(server.tcp_address.port))
assert cm.output[:2] == [connect_msg, connect_msg]
assert cm.output[-1] == "DEBUG:aioredis:Closed 1 connections"
assert cm.output[-1] == "DEBUG:aioredis:Closed 1 connection(s)"


@pytest.mark.run_loop
Expand Down
2 changes: 1 addition & 1 deletion tests/transaction_commands_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ async def test_discard(redis):
fut2 = tr.connection.execute('MULTI')
fut3 = tr.connection.execute('incr', 'foo')

with pytest.raises(ReplyError):
with pytest.raises(MultiExecError):
await tr.execute()
with pytest.raises(TypeError):
await fut1
Expand Down