Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Pubsub: health check races with get_message to read from the socket #1217

Open
1 task done
Tracked by #1225
bmerry opened this issue Nov 22, 2021 · 5 comments · May be fixed by #1207
Open
1 task done
Tracked by #1225

Pubsub: health check races with get_message to read from the socket #1217

bmerry opened this issue Nov 22, 2021 · 5 comments · May be fixed by #1207
Labels

Comments

@bmerry
Copy link
Collaborator

bmerry commented Nov 22, 2021

Describe the bug

This is somewhat related to #1206 (both have to do with pubsub and health checks) but a different failure mode. I think this is actually the aioredis equivalent to this redis-py bug that I linked from #1206, and possibly the same approach used in its corresponding PR will work (I haven't had a chance to review the PR).

When issuing a subscribe command on a PubSub for which there are currently no subscriptions and the connection hasn't been used for a while (specifically, the health check interval), the underlying connection will issue a PING to check the health, and try to read the PONG. However, another async task may be blocked in get_message, also trying to read from the socket. This leads to an exception.

To Reproduce

  1. Install the PR from #1207 (which fixes #1206), or master.
  2. Start a Redis server on localhost.
  3. Run the script below. It will crash.
#!/usr/bin/env python3

import asyncio

import aioredis


async def poll(ps):
    while True:
        message = await ps.get_message(timeout=10)
        if message is not None:
            print(message)


async def main():
    r = aioredis.Redis.from_url("redis://localhost", health_check_interval=1)
    ps = r.pubsub()
    await ps.subscribe("foo")
    poller = asyncio.create_task(poll(ps))
    await ps.unsubscribe("foo")
    await asyncio.sleep(2)
    await ps.subscribe("baz")
    await asyncio.sleep(0.1)
    poller.cancel()
    try:
        await poller
    except asyncio.CancelledError:
        pass

asyncio.run(main())

Expected behavior

The script should run without errors.

Logs/tracebacks

Traceback (most recent call last):
  File "./crashit_aioredis2.py", line 30, in <module>
    asyncio.run(main())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "./crashit_aioredis2.py", line 22, in main
    await ps.subscribe("baz")
  File "/home/bmerry/src/aioredis/aioredis/client.py", line 4131, in subscribe
    ret_val = await self.execute_command("SUBSCRIBE", *new_channels.keys())
  File "/home/bmerry/src/aioredis/aioredis/client.py", line 4020, in execute_command
    await self._execute(connection, connection.send_command, *args, **kwargs)
  File "/home/bmerry/src/aioredis/aioredis/client.py", line 4024, in _execute
    return await command(*args, **kwargs)
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 885, in send_command
    await self.send_packed_command(
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 854, in send_packed_command
    await self.check_health()
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 824, in check_health
    if str_if_bytes(await self.read_response()) != "PONG":
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 900, in read_response
    response = await self._parser.read_response()
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 537, in read_response
    await self.read_from_socket()
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 498, in read_from_socket
    buffer = await self._stream.read(self._read_size)
  File "/usr/lib/python3.8/asyncio/streams.py", line 684, in read
    await self._wait_for_data('read')
  File "/usr/lib/python3.8/asyncio/streams.py", line 503, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data
Task exception was never retrieved
future: <Task finished name='Task-2' coro=<poll() done, defined at ./crashit_aioredis2.py:8> exception=ConnectionError('Connection closed by server.')>
Traceback (most recent call last):
  File "./crashit_aioredis2.py", line 10, in poll
    message = await ps.get_message(timeout=10)
  File "/home/bmerry/src/aioredis/aioredis/client.py", line 4171, in get_message
    response = await self.parse_response(block=False, timeout=timeout)
  File "/home/bmerry/src/aioredis/aioredis/client.py", line 4048, in parse_response
    if not block and not await conn.can_read(timeout=timeout):
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 893, in can_read
    return await self._parser.can_read(timeout)
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 484, in can_read
    return await self.read_from_socket(timeout=timeout, raise_on_timeout=False)
  File "/home/bmerry/src/aioredis/aioredis/connection.py", line 500, in read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
aioredis.exceptions.ConnectionError: Connection closed by server.

Python Version

$ python --version
Python 3.8.10

aioredis Version

a708bd14b1a8bec0a1f3d469bf5384eb2726b5fa

Additional context

No response

Code of Conduct

  • I agree to follow the aio-libs Code of Conduct
@bmerry bmerry added the bug label Nov 22, 2021
@bmerry
Copy link
Collaborator Author

bmerry commented Nov 22, 2021

cc @Andrew-Chen-Wang since you worked #1207.

@Andrew-Chen-Wang
Copy link
Collaborator

Andrew-Chen-Wang commented Nov 22, 2021

I would have created two sockets, but I understand that isn't an option in terms of scaling. One solution might be to implement a lock on the socket when doing single command execution such as SUBSCRIBE which will block out the get_message() from trying to receive a message.

This would be a PubSub specific lock to prevent this failure mode. Because data is returned in single line fashion, that could mean the subscribe call could get unrelated data though. That would mean we'd need to store all responses not related to the locking single execution command to stream out to get_message after we do get a response for subscribe.

It's not ideal, but that's all I've got. Situation gets convoluted if, in the get_message poller, you do more commands like GET or SET. Then we're on a backlog until all single execution commands have been completed. It'd be a nasty queue implementation...

I'll play around with it today.

@Andrew-Chen-Wang
Copy link
Collaborator

Andrew-Chen-Wang commented Nov 22, 2021

@bmerry After thinking about it for awhile, I think this is the only expected behavior as shown in asyncio/streams.py in the StreamReader (where the exception about coroutine is raised):

    async def _wait_for_data(self, func_name):
        """Wait until feed_data() or feed_eof() is called.

        If stream was paused, automatically resume it.
        """
        # StreamReader uses a future to link the protocol feed_data() method
        # to a read coroutine. Running two read coroutines at the same time
        # would have an unexpected behaviour. It would not possible to know
        # which coroutine would get the next data.
        if self._waiter is not None:
            raise RuntimeError(
                f'{func_name}() called while another coroutine is '
                f'already waiting for incoming data')

        assert not self._eof, '_wait_for_data after EOF'

        # Waiting for data while paused will make deadlock, so prevent it.
        # This is essential for readexactly(n) for case when n > self._limit.
        if self._paused:
            self._paused = False
            self._transport.resume_reading()

        self._waiter = self._loop.create_future()
        try:
            await self._waiter
        finally:
            self._waiter = None

Specifically:

StreamReader uses a future to link the protocol feed_data() method to a read coroutine. Running two read coroutines at the same time would have an unexpected behaviour. It would not possible to know which coroutine would get the next data.

Again, we want to implement a lock feature such that we can somehow get an ordered stream of response data, but the queue for this would be interesting (I'm imagining a queue/ordered dict with elements/keys' type be Coroutine where the queue can pair response data with the data structure).

@Andrew-Chen-Wang Andrew-Chen-Wang added the need investigation Need to look into described issue. label Nov 22, 2021
@bmerry
Copy link
Collaborator Author

bmerry commented Nov 23, 2021

It may be worth following the approach in redis/redis-py#1737, which looks promising (basically, get_message doesn't try to read from the socket while there are no subscriptions, and commands don't do health checks if there are subscriptions).

@Andrew-Chen-Wang
Copy link
Collaborator

That sounds good to me 👍

@Andrew-Chen-Wang Andrew-Chen-Wang removed the need investigation Need to look into described issue. label Dec 28, 2021
@Andrew-Chen-Wang Andrew-Chen-Wang linked a pull request Dec 28, 2021 that will close this issue
5 tasks
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants