Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Fix "ERR max number of clients reached" on connect #186

Closed
2 tasks
popravich opened this issue Jan 18, 2017 · 9 comments
Closed
2 tasks

Fix "ERR max number of clients reached" on connect #186

popravich opened this issue Jan 18, 2017 · 9 comments
Assignees
Milestone

Comments

@popravich
Copy link
Contributor

When creating connection to Redis instance with clients limit reached
ERR max number of clients reached message is sent.
_read_data task will receive it however there would be no _waiters to process it —
no command issues / not in pub/sub mode.
Currently there is an assertion on non-empty _waiters list.

TODO:

  • raise MaxClientsError (or any of ReplyError, RedisError);
  • keep the assertion on non-empty _waiters list to track other possible cases
    when messages received before command sent.
@popravich popravich added the bug label Jan 18, 2017
@popravich popravich added this to the v1.0 milestone Jan 18, 2017
@popravich popravich self-assigned this Jan 18, 2017
@popravich popravich added the task label Jan 18, 2017
@pfreixes
Copy link
Contributor

Im gonna work on that if you don't mind

@pfreixes
Copy link
Contributor

I have a proposal, just catch the error and reraise it at first attempt to send a command. The connection is also closed. Just a POC, thoughts?

diff --git a/aioredis/connection.py b/aioredis/connection.py
index b31c886..adf251d 100644
--- a/aioredis/connection.py
+++ b/aioredis/connection.py
@@ -23,6 +23,7 @@ from .errors import (
     ReplyError,
     WatchVariableError,
     ReadOnlyError,
+    MaxClientsError
     )
 from .pubsub import Channel
 from .abc import AbcChannel
@@ -150,6 +151,7 @@ class RedisConnection(AbcConnection):
         self._pubsub_channels = coerced_keys_dict()
         self._pubsub_patterns = coerced_keys_dict()
         self._encoding = encoding
+        self._exc_before_command = None

     def __repr__(self):
         return '<RedisConnection [db:{}]>'.format(self._db)
@@ -167,28 +169,34 @@ class RedisConnection(AbcConnection):
                 #       before response
                 logger.error("Exception on data read %r", exc, exc_info=True)
                 break
+
             if data == b'' and self._reader.at_eof():
                 logger.debug("Connection has been closed by server")
                 break
-            self._parser.feed(data)
-            while True:
-                try:
-                    obj = self._parser.gets()
-                except ProtocolError as exc:
-                    # ProtocolError is fatal
-                    # so connection must be closed
-                    if self._in_transaction is not None:
-                        self._transaction_error = exc
-                    self._closing = True
-                    self._do_close(exc)
-                    return
-                else:
-                    if obj is False:
-                        break
-                    if self._in_pubsub:
-                        self._process_pubsub(obj)
+            elif data == b'-ERR max number of clients reached\r\n':
+                self._exc_before_command = MaxClientsError()
+                break
+            else:
+                self._parser.feed(data)
+                while True:
+                    try:
+                        obj = self._parser.gets()
+                    except ProtocolError as exc:
+                        # ProtocolError is fatal
+                        # so connection must be closed
+                        if self._in_transaction is not None:
+                            self._transaction_error = exc
+                        self._closing = True
+                        self._do_close(exc)
+                        return
                     else:
-                        self._process_data(obj)
+                        if obj is False:
+                            break
+                        if self._in_pubsub:
+                            self._process_pubsub(obj)
+                        else:
+                            self._process_data(obj)
+
         self._closing = True
         self._do_close(None)

@@ -256,6 +264,14 @@ class RedisConnection(AbcConnection):
         * ProtocolError when response can not be decoded meaning connection
           is broken.
         """
+        if self._exc_before_command is not None:
+            # indeed the connection is closed, but the first execute attempt
+            # will receive the original error received. Further attempts to
+            # access to that instance will return a closed connection error.
+            exc = self._exc_before_command
+            self._exc_before_command = None
+            raise exc
+
         if self._reader is None or self._reader.at_eof():
             raise ConnectionClosedError("Connection closed or corrupted")
         if command is None:

@pfreixes
Copy link
Contributor

any feedback @popravich ?

@popravich
Copy link
Contributor Author

I don't like these lines:

elif data == b'-ERR max number of clients reached\r\n':
    self._exc_before_command = MaxClientsError()
    break

the problem here is that we can send some commands (either auth or select) to server before we receive this data, so there can be several scenarios:

  • we read data from socket and it equals error message:
    • connection will be closed and pending commands will be just cancelled without proper error.
  • we read data from socket and it not equals error message (two error messages received):
    • pending command gets its MaxClientsError response, but the second error crashes _reader_task.

I think we can do something like in except ProtocolError block:

if isinstance(obj, MaxClientsError):
    self._closing = True
    self._do_close(obj)
    return

I think this can work.

@pfreixes
Copy link
Contributor

pfreixes commented Jul 3, 2017

Yeps, thanks for your feedback. Agree that the original place that I've proposed was not the best one.

Have in mind that the block except ProtocolError won't catch any exception, the parsed object comming from that message is a ReplyError object. Indeed, if the command is sent before the data arrives to the socket buffer, the user will receive an exception as a ReplyError exception with the proper max number of clients reached message. For me, this is the expected behaviour.

Right now the issue might be only, and its a matter of consistency, if the command is sent after the data arrives to the socket buffer. In that case, the client will receive a Connection error meanwhile in background the async task in charge of read data will print the AssertionError becuase of the len assertion.

Are you keen on change this last scenario? Raising the same ReplyError by the first attempt to use that connection? and just a simple ConnectionError for the following ones?

@popravich
Copy link
Contributor Author

Have in mind that the block except ProtocolError won't catch any exception, the parsed object comming from that message is a ReplyError object.

I was talking about something like this:

diff --git a/aioredis/connection.py b/aioredis/connection.py
index b31c886..0fd26f2 100644
--- a/aioredis/connection.py
+++ b/aioredis/connection.py
@@ -23,6 +23,7 @@ from .errors import (
     ReplyError,
     WatchVariableError,
     ReadOnlyError,
+    MaxClientsError,
     )
 from .pubsub import Channel
 from .abc import AbcChannel
@@ -185,6 +186,10 @@ class RedisConnection(AbcConnection):
                 else:
                     if obj is False:
                         break
+                    if isinstance(obj, MaxClientsError):
+                        self._closing = True
+                        self._do_close(obj)
+                        return
                     if self._in_pubsub:
                         self._process_pubsub(obj)
                     else:
diff --git a/aioredis/errors.py b/aioredis/errors.py
index 6d93b38..5177c07 100644
--- a/aioredis/errors.py
+++ b/aioredis/errors.py
@@ -25,6 +25,14 @@ class ProtocolError(RedisError):
 class ReplyError(RedisError):
     """Raised for redis error replies (-ERR)."""

+    def __new__(cls, *args):
+        # TODO: detect error type and find proper subclass
+        return super().__new__(*args)
+
+
+class MaxClientsError(ReplyError):
+    """...."""
+

 class PipelineError(ReplyError):
     """Raised if command within pipeline raised error."""

So, if max number of clients reached is received before any command sent —
reader task will close connection with this reply error.
And if some command is sent (ie future added to _waiters)
the connection will be closed and pending future will be resolved with correct exception: MaxClientsError.

Are you keen on change this last scenario? Raising the same ReplyError by the first attempt to use that connection? and just a simple ConnectionError for the following ones?

I think we should keep it as is but we can update ConnectionClosedError message with close exception message:

diff --git a/aioredis/connection.py b/aioredis/connection.py
index b31c886..2f2d199 100644
--- a/aioredis/connection.py
+++ b/aioredis/connection.py
@@ -142,6 +143,7 @@ class RedisConnection(AbcConnection):
         self._db = 0
         self._closing = False
         self._closed = False
+        self._close_msg = None
         self._close_waiter = create_future(loop=self._loop)
         self._reader_task.add_done_callback(self._close_waiter.set_result)
         self._in_transaction = None
@@ -257,6 +263,8 @@ class RedisConnection(AbcConnection):
           is broken.
         """
         if self._reader is None or self._reader.at_eof():
+            if self._close_msg is not None:
+                raise ConnectionClosedError(self._close_msg)
             raise ConnectionClosedError("Connection closed or corrupted")
         if command is None:
             raise TypeError("command must not be None")
@@ -333,6 +341,8 @@ class RedisConnection(AbcConnection):
         self._reader_task = None
         self._writer = None
         self._reader = None
+        if exc is not None:
+            self._close_msg = str(exc)
         while self._waiters:
             waiter, *spam = self._waiters.popleft()
             logger.debug("Cancelling waiter %r", (waiter, spam))

@pfreixes
Copy link
Contributor

pfreixes commented Jul 4, 2017

+1 I like it. Clean and consistent.

@pfreixes
Copy link
Contributor

pfreixes commented Jul 4, 2017

If the base class ReplyError is going to make some introspection to return the proper class, the PipelineError must change the parent class. Using the RedisError ? It will help to keep the solution cleaner.

Or there was a reason to make the PipelineError a derivated one of the ReplyError ?

@popravich
Copy link
Contributor Author

PipelineError must change the parent class. Using the RedisError

I think its okay to change the base class.

pfreixes added a commit to pfreixes/aioredis that referenced this issue Jul 4, 2017
…d#186

When a new connection tries to get connected to the redis server and it
has been configured with a maximum number of connections, the client
will get a `MaxClientsError` exception if this limit is reached out.

Also if a pool initialization with a `min_size` that can meet the
requirements - beucase of that limit u others issues - the pool creation
will return also a `MaxClientsError`.
popravich pushed a commit that referenced this issue Nov 14, 2017
When a new connection tries to get connected to the redis server and it
has been configured with a maximum number of connections, the client
will get a `MaxClientsError` exception if this limit is reached out.

Also if a pool initialization with a `min_size` that can meet the
requirements - beucase of that limit u others issues - the pool creation
will return also a `MaxClientsError`.
popravich added a commit that referenced this issue Nov 14, 2017
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

2 participants