Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to v4 results in "RuntimeError: Event loop is closed" #332

Closed
stumpylog opened this issue Oct 12, 2022 · 35 comments · Fixed by #347
Closed

Update to v4 results in "RuntimeError: Event loop is closed" #332

stumpylog opened this issue Oct 12, 2022 · 35 comments · Fixed by #347

Comments

@stumpylog
Copy link

stumpylog commented Oct 12, 2022

After upgrading to channels-redis==4.0.0, our celery tasks are all reporting the following traceback:

future: <Task finished name='Task-9' coro=<Connection.disconnect() done, defined at /usr/local/lib/python3.9/site-packages/red                                                                                                                                                                is/asyncio/connection.py:819> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/redis/asyncio/connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/usr/local/lib/python3.9/asyncio/streams.py", line 353, in close
    return self._transport.close()
  File "/usr/local/lib/python3.9/asyncio/selector_events.py", line 698, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 751, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed
[2022-10-12 07:30:31,972] [ERROR] [asyncio] Task exception was never retrieved

Downgrading the image to channels-redis==3.4.1 resolves the issue, so I'm starting out here. This seems probably related to #312.

Image OS is Debian Bullseye, amd64. The django application is running with gunicorn.

Probably related packages:

celery==5.2.7
channels==3.0.5
channels-redis==4.0.0
hiredis==2.0.0
redis==4.3.4

Full Pipfile.lock: https://github.com/paperless-ngx/paperless-ngx/blob/dev/Pipfile.lock

@ljodal
Copy link
Contributor

ljodal commented Oct 12, 2022

I’m seeing the exact same thing. Seem like the same issue as reported in #312

@carltongibson
Copy link
Member

There was lots of discussion on the other issue. If you could investigate that would be handy.

A minimal reproduce would help too. It's hard to say anything without one. 🤔

@stumpylog
Copy link
Author

The application is inherited, so I'm not really sure how the whole channels thing works yet or what connects where.

At best so far I can narrow it down to the only task which uses async_to_sync to send status updates. It will be called multiple times (roughly 10) to update a status from 0% to 100%

@aaronmader
Copy link

If I use ahaltindis's workaround detailed here, my code works. So it definitely seems to be a conflict/issue with async_to_sync.

loop = asyncio.get_event_loop()
coroutine = channel_layer.group_send(
    group_name,
    {
        'type': 'task.message',
        'text': context
    })
loop.run_until_complete(coroutine)

Perhaps channels, or channels_redis documentation could be improved, to provide an example of how users should trigger a group_send from a synchronous context (such that several calls can be made)? Perhaps a 'sync' version of group_send could be made available?

@ljodal
Copy link
Contributor

ljodal commented Oct 13, 2022

I'm pretty sure the issue we have is very similar to what's reported above. We do this async_to_sync(channel_layer.group_send)(group_name, channel_layer_payload) from a sync gunicorn worker and I think that's where this stems from. The stack trace doesn't go back to our code though, so kinda hard to tell exactly.

lunika added a commit to openfun/marsha that referenced this issue Oct 19, 2022
Since we are using channels-redis 4, we have connection issues to redis.
We must downgrade it to version <4 and block its upgrade, also
django-channels upgrade, while this issue exists.
An issue is open on channels-redis repo lookink what we are experiencing
django/channels_redis#332
lunika added a commit to openfun/marsha that referenced this issue Oct 19, 2022
Since we are using channels-redis 4, we have connection issues to redis.
We must downgrade it to version <4 and block its upgrade, also
django-channels upgrade, while this issue exists.
An issue is open on channels-redis repo lookink what we are experiencing
django/channels_redis#332
@nblam1994
Copy link

I also get this issue after I upgrade to 4.0.0

@Elabbasy00
Copy link

Same here and sometimes

Task exception was never retrieved
future: <Task finished name='Task-378' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-378' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-378' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop
Task exception was never retrieved
future: <Task finished name='Task-381' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-381' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-381' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop
Task exception was never retrieved
future: <Task finished name='Task-382' coro=<Connection.disconnect() done, defined at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError("Task <Task pending name='Task-382' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop")>
Traceback (most recent call last):
  File "/home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py", line 831, in disconnect
    await self._writer.wait_closed()  # type: ignore[union-attr]
  File "/usr/lib/python3.10/asyncio/streams.py", line 344, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-382' coro=<Connection.disconnect() running at /home/elabbasy/Desktop/BOT_TOOL/webserver/server/venv/lib/python3.10/site-packages/redis/asyncio/connection.py:831>> got Future <Future pending> attached to a different loop

@carltongibson
Copy link
Member

Any chance of a minimal reproduce? It's hard to say anything with just a traceback

@Elabbasy00
Copy link

Elabbasy00 commented Oct 29, 2022

asgiref==3.5.2
channels==4.0.0
channels-redis==4.0.0
daphne==4.0.0

views.py

# passing request.data to serializer
if serializer.is_valid():
                serializer.save(interpreter=request.user)
                channel_layer = get_channel_layer()
                async_to_sync(channel_layer.group_send)(
                    "admins", {"type": "chat_message",  'message': {"command": "admin_new_ticket","ticket": serializer.data}})
                return Response(serializer.data, status=status.HTTP_200_OK)

and just normal consumer

import json
from channels.generic.websocket import AsyncWebsocketConsumer


class AdminsConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        self.room_group_name = 'admins'

        # Join room group
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # Leave room group
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)

    # Receive message from WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json["message"]

        # Send message to room group
        await self.channel_layer.group_send(
            self.room_group_name, {"type": "chat_message", "message": message}
        )

    # Receive message from room group
    async def chat_message(self, event):
        message = event["message"]

        # Send message to WebSocket
        await self.send(text_data=json.dumps({"message": message}))

@abtinmo
Copy link

abtinmo commented Nov 6, 2022

same issue here,

Task exception was never retrieved
future: <Task finished name='Task-90' coro=<Connection.disconnect() done, defined at /usr/local/lib/python3.8/site-packages/redis/asyncio/connection.py:819> exception=RuntimeError('Event loop is closed')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/redis/asyncio/connection.py", line 828, in disconnect
    self._writer.close()  # type: ignore[union-attr]
  File "/usr/local/lib/python3.8/asyncio/streams.py", line 353, in close
    return self._transport.close()
  File "/usr/local/lib/python3.8/asyncio/selector_events.py", line 692, in close
    self._loop.call_soon(self._call_connection_lost, None)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 719, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 508, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

dependencies:

channels==4.0.0
channels-redis==4.0.0
daphne==4.0.0
Django==3.2.16
django-redis==5.0.0
redis==4.3.4

code:

async_to_sync(channel_layer.group_send)(
    group_name,
    {"type": "store.manager.handout", "data": {"type": request.POST["message_type"]}}
)

this comment did not worked either, I'm getting:

There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

@bachloxo
Copy link

bachloxo commented Nov 7, 2022

Any update this issue. I'm still face this issue

@Elabbasy00
Copy link

as a workaround Downgrades channels-redis to 3.4.1,

this work for me

@ilysenko
Copy link

Any update this issue. I'm still face this issue

me too

@realsuayip
Copy link

@carltongibson You can also reproduce the error using this repo:

https://github.com/realsuayip/zaida/tree/490e0c5a49a750bc56a63f9cba5c9514ed91eee4

Steps to reproduce:

1 - Clone the repo
2 - Run "python3 docker.py up"
3 - Once all the containers are running, run "python3 docker.py test"

Hope it helps.

@dvf
Copy link

dvf commented Nov 19, 2022

Hoping we can resolve this soon

@carltongibson
Copy link
Member

Can we stop with the "me too" and "any update" comments please.

If you have significant new information to add then please do. (Great!) Otherwise it's just noise.

I'm planning new releases over the new year, and looking into this is part of that.

@dvf
Copy link

dvf commented Nov 19, 2022

@carltongibson this a major breaking issue so I'm just trying to bring it more attention.

Tomorrow I'll go through the source and try make a contribution.

@sevdog
Copy link
Contributor

sevdog commented Nov 23, 2022

I am experimenting this issue within a daphne instance which only runs consumers for websockets using a customized (derived) version of RedisChannelLayer.

The code which is execute does (shall?) not have any calls to async_to_sync but it uses database_sync_to_async (multiple times while processing a single message).

Here are my requirements:

# python 3.9
asgiref==3.5.2
Django==3.2.16
channels==4.0.0
channels-redis==4.0.0

The produced error message is very similar to that reported in aio-libs-abandoned/aioredis-py#1103 (for what I have seen the fix for that problem was ported also in redis-py).

Maybe the problem is caused by the way this layer tries to close/clear the connection pool.

@dvf
Copy link

dvf commented Nov 23, 2022

For anyone else with this issue, it doesn't exhibit when using the newer RedisPubSubChannelLayer. It's not in the Channels docs because it's still in beta.

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.pubsub.RedisPubSubChannelLayer",
        "CONFIG": {
            "hosts": ["rediss://***:***@***.com:6379"],
        },
    },
}

@glenaddict
Copy link

Thanks @carltongibson I can confirm this fixes the problem

@sevdog
Copy link
Contributor

sevdog commented Jan 25, 2023

@carltongibson thank you for the suggestion. However I am a bit concerned that this may be a bit dirty to force users to call this method when using async_to_sync because the close_pools method is only defined in RedisChannelLayer and is not part of any specification.

Having to make a direct call to .close_pools method is going to be a problem when someone is willingly to move to an other layer implementation (ie: from RedisChannelLayer to RedisPubSubChannelLayer.

I belive that the RedisChannelLayer should have a method like that defined in the pub-sub to intercept loop close event:

def _wrap_close(proxy, loop):
original_impl = loop.close
def _wrapper(self, *args, **kwargs):
if loop in proxy._layers:
layer = proxy._layers[loop]
del proxy._layers[loop]
loop.run_until_complete(layer.flush())
self.close = original_impl
return self.close(*args, **kwargs)
loop.close = types.MethodType(_wrapper, loop)

This way it will be transparent to users and more robust.

@carltongibson
Copy link
Member

@sevdog That may be worth it yes 🤔 — have you got a cycle to put that in a PR? (As per progress here, I've begun working towards the next releases, so it would be timely.)

@sevdog
Copy link
Contributor

sevdog commented Jan 25, 2023

@carltongibson I will see if I can get a time slot for this and open a PR.

I would like to find a way to merge the connection management algorithms from RedisPubSubChannelLayer and RedisChannelLayer (IMO it does not make sense to have two completely different implementations of those logics), but it could take more time to handle.

@carltongibson
Copy link
Member

@sevdog That would be nice. There's no urgency though. poco a poco

@btel
Copy link

btel commented Feb 8, 2023

I have the same issue when sending messages over channels. I tried the solution of @carltongibson but implies that I need to re-create the connection each time before sending the message. Is this correct? Thanks for help!

@carltongibson
Copy link
Member

The channel layer encapsulates the connection handling, but connections are per event loop, and sync_to_async() creates a new event loop each time, so...

(It shouldn't really be an issue... If it's performance critical it's likely you're already looking at avoiding sync_to_async at all.)

@btel
Copy link

btel commented Feb 8, 2023

Thanks for quick reply. So if I understood correctly, to send a message through the channel outsider a consumer I should do something like:

async def closing_send(channel, message):
     channel_layer = channels.layers.get_channel_layer()
     await channel_layer.send(channel, message)
     await channel_layer.close_pools()
async_to_sync(closing_send)('test-channel", {"type": "hello"})

but the connections returned from get_channel_layer are cached (in channels.layers.channel_layers object) so if there are other threads using the connection (like websockets) they will be also disconnected.

I tested this and it seems to work fine:

async def closing_send(channel, message):
     channel_layer = channels.layers.channel_layers.make_backed(DEFAULT_CHANNEL_LAYER)
     await channel_layer.send(channel, message)
     await channel_layer.close_pools()
async_to_sync(closing_send)('test-channel", {"type": "hello"})

Did I get it right?

@carltongibson
Copy link
Member

@btel That looks like the example yes. As per #332 (comment) I want to look into encapsulating that for the next set of releases

@eric-musliner
Copy link

eric-musliner commented Feb 8, 2023

This should be mentioned in the docs until an official fix is released.

Edit. Until this is resolved I wouldn't consider 4.0 a stable release

@ianepperson
Copy link

I've been running into this too. I see this issue is marked as "documentation" - but I don't think this is a request to update the docs; it's a bug, right? A pretty important one too.

@carltongibson
Copy link
Member

It's a lack of correctly dealing with the connection shutdown, when using the asgiref.sync helpers to integrate with sync code.

I marked it documentation because if you just but call it understanding that the connection needs to be closed before the event loop exits then everything works as expected.

Nonetheless @sevdog is looking at whether we can have this handled automatically for you as well. If that pans out it'll be in the next set of releases, which I'm working towards already.

@sevdog
Copy link
Contributor

sevdog commented Feb 16, 2023

I was looking at this just yesterday, refactoring the two layers to share the same connection-handling codebase is a bit complex. The two implementations have a lot of differences and share very few elements. So it may take more time (or at least more concentration) to handle that.

In the meantime I have prepared #347 to address this issue in RedisChannelLayer.

sevdog added a commit to sevdog/channels_redis that referenced this issue Feb 24, 2023
carltongibson pushed a commit to sevdog/channels_redis that referenced this issue Mar 9, 2023
sevdog added a commit to sevdog/channels_redis that referenced this issue Mar 9, 2023
nftchance added a commit to FlipsideCrypto/badger that referenced this issue Mar 11, 2023
The indexer was broken due to (django/channels_redis#332).

This resulted in the indexer constantly erroring out, and not having a way to recover because it was constantly stuck searching for the object in the database.

While there are still a few nuances with this, the API is accessible, is not blocked by the indexer, and "can" be independently scaled if that need arises; although is not implemented today.

There may still be a few bugs here, but we **cannot** move to version 4 of `channels-redis` otherwise everything will implode and leave you with no idea what is going on wrong (the package handles things on the backend that you don't think about and results in you debugging absolutely your entire project only to realize the library is unstable and usage should be delayed until stable v5 is released.)
carltongibson pushed a commit to sevdog/channels_redis that referenced this issue Mar 28, 2023
sevdog added a commit to sevdog/channels_redis that referenced this issue Apr 3, 2023
hmpf added a commit to hmpf/Argus that referenced this issue Apr 18, 2023
hmpf added a commit to Uninett/Argus that referenced this issue Apr 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Done
Development

Successfully merging a pull request may close this issue.