Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

InterfaceError: connection already closed #106

Open
drpancake opened this issue Jun 7, 2021 · 16 comments
Open

InterfaceError: connection already closed #106

drpancake opened this issue Jun 7, 2021 · 16 comments

Comments

@drpancake
Copy link

drpancake commented Jun 7, 2021

I'm pushing 500+ tasks per minute through Dramatiq as part of a Django app and occasionally (once every 1-2 days) I suddenly get hundreds instances of this error from the workers and the only way to fix it is to restart the worker process.

Sentry reports that it's triggered in both before_process_message() and after_process_message() when they both call Task.tasks.create_or_update_from_message().

I have CONN_MAX_AGE set to 0 and database connections are pooled via PgBouncer.

Please let me know if I'm missing any other information.

@drpancake
Copy link
Author

I found this old issue in Celery that is perhaps helpful: celery/django-celery#121

Also this blog post: https://tryolabs.com/blog/2014/02/12/long-running-process-and-django-orm/

And one more lead: https://stackoverflow.com/a/37891165/3211027

@Bogdanp
Copy link
Owner

Bogdanp commented Jun 13, 2021

Is the DbConnectionsMiddleware correctly configured in your app? Make sure it has a higher priority than the admin middleware (i.e. comes before it in the middleware list).

@drpancake
Copy link
Author

Is the DbConnectionsMiddleware correctly configured in your app? Make sure it has a higher priority than the admin middleware (i.e. comes before it in the middleware list).

@Bogdanp yes it's in the correct order, my MIDDLEWARE is set up as follows:

"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Retries",
"django_dramatiq.middleware.DbConnectionsMiddleware",
"django_dramatiq.middleware.AdminMiddleware",
"ss.web.utils.dramatiq.SentryMiddleware",

I'm also now seeing this error around 50 times per hour: 'NoneType' object has no attribute 'decode'. It's raised from the decode() method in dramatiq.encoder.JSONEncoder.

I currently have CONN_MAX_AGE=0 set in Django. Would you recommend changing this setting?

@Bogdanp
Copy link
Owner

Bogdanp commented Jun 14, 2021

No, that setting should be fine (if a little inefficient). Re. the decoding error, are you using the Redis broker? If so, you should try upgrading to dramatiq v1.11.0 which has some related fixes and improvements.

@drpancake
Copy link
Author

drpancake commented Jun 14, 2021

@Bogdanp Yep using Redis as the broker. I'll try that, thanks.

Any clues about the connection already closed error? I'm happy to dig into this and try to fix it myself, but I'm not exactly sure where to begin! I looked at the source and I can see that this library does handle old/broken DB connections, so I wonder if this is some obscure race condition that's only happening because I'm running a lot of tasks.

@Bogdanp
Copy link
Owner

Bogdanp commented Jun 14, 2021

I'm really not sure what it could be. I would probably start by making sure the clean up code in the connections middleware runs at the right time and there's nothing else that might be opening up connections after it and leaving them open.

@kylemacfarlane
Copy link

kylemacfarlane commented Aug 29, 2021

I don't actually use django_dramatiq but I just run into the same problem with plain Django + Dramatiq. Are you wrapping any DB calls in sync_to_async?

Here's a standalone Django management command to recreate it:

import asyncio
from time import sleep

from asgiref.sync import sync_to_async
from django.core.management.base import BaseCommand


class Command(BaseCommand):
    def handle(self, *args, **options):
        async def test():
            print('Getting')
            return await sync_to_async(AModel.objects.get)(id=1234)

        # Each loop of the while block is akin to a Dramatiq task under a long
        # running worker
        while True:
            task = asyncio.ensure_future(test())
            asyncio.get_event_loop().run_until_complete(asyncio.wait([task]))

            try:
                print(task.result())
            except Exception as e:
                print(e)

            sleep(5)

While the above is running kill the connection in Postgres with the following:

SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE pid != pg_backend_pid() AND query != '';

Then it should start spitting out "connection already closed" errors.

It seems like extra DB cleanup is needed for any async threads or it could be a Django bug?

@kylemacfarlane
Copy link

kylemacfarlane commented Aug 29, 2021

A fix seems to be to send db.close_old_connections() through sync_to_async() so that it finds and cleans up any connections in there.

import asyncio
from django import db
from time import sleep

from asgiref.sync import sync_to_async
from django.core.management.base import BaseCommand


class Command(BaseCommand):
    def handle(self, *args, **options):
        async def test():
            return await sync_to_async(AModel.objects.get)(id=1234)            

        # Each loop of the while block is akin to a Dramatiq task under a long
        # running worker
        while True:
            try:
                print('SYNC', AModel.objects.get(id=1234))
            except Exception as e:
                print(e)

            task = asyncio.ensure_future(test())
            asyncio.get_event_loop().run_until_complete(asyncio.wait([task]))

            try:
                print('ASYNC', task.result())
            except Exception as e:
                print(e)

            # Remove dead sync connections
            db.close_old_connections()
            # Remove dead async connections
            asyncio.get_event_loop().run_until_complete(
                sync_to_async(db.close_old_connections)()
            )

            sleep(5)

@drpancake
Copy link
Author

Thanks for this.

I don't actually use django_dramatiq but I just run into the same problem with plain Django + Dramatiq. Are you wrapping any DB calls in sync_to_async?

No, I'm not using any Django async functionality in my project. I tried putting db.close_old_connections() in various places but it doesn't seem to help.

My current (horrible) solution is to restart my worker process every 15 minutes!

@kylemacfarlane
Copy link

It could also be a regular thread somewhere (or running dramatiq with --threads x and non thread safe code). From what I understand sync_to_async is just a wrapper around an asyncio ThreadExecutor.

@martyphee
Copy link

I'm having the same issue. The funny part is it's only on our staging system. Production seems fine. This was a lift and shift extraction of a microservice. I've been working on getting it into production. No async code.

We're using
django_dramatiq
dramatiq_sqs
sentry-dramatiq
django_periodiq

Settings

DATABASES = {"default": dj_database_url.parse(configuration.db.url, conn_max_age=500)}
.
.
.
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq_sqs.SQSBroker",
    "OPTIONS": {
        "namespace": "pricing-service_tasks",
    },
    "MIDDLEWARE": [
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Retries",
        "django_dramatiq.middleware.DbConnectionsMiddleware",
        "periodiq.PeriodiqMiddleware",
    ],
}

Only task running

@dramatiq.actor(periodic=cron("*/1 * * * *"))
def example_task():
    """For testing Dramatiq setup in staging and prod. Will delete/revert once I'm done."""
    sleep(5)
    user, _ = User.objects.get_or_create(email="test@dramatiq.test", is_active=False)
    user.first_name = random.choice(range(1, 999999))
    user.save()

@drpancake
Copy link
Author

@martyphee Is it easily reproducible or somewhat random? Do you have a high throughput of tasks?

I'm also using Sentry and I wonder if that could be related.

For extra context I'm averaging 20 tasks/second and it this issue usually appears after 24-48 hours. But it appears to be entirely random.

@martyphee
Copy link

@drpancake seems to consistently happen on our staging servers. The picture below are the task processors and each is now holding 64 connections. That will be the max they hold and we get into a situation where the connection pool, for the lack of a better description, freaks out and throws tons of errors saying connection is already closed. I think it might be hitting the max db connections.
image

This is the only job running and it only runs once per minute as a test. There is nothing else hitting it.

It took about 24 hours to hit the 64/conn per pod. I restarted the pods yesterday morning.

Sentry error
image

@drpancake
Copy link
Author

Interesting! Let me know if you make any breakthroughs. Perhaps you could try running the task more frequently to see if you hit the error sooner.

As much as I detest it, I may have to move over to Celery at some point as it's at least battle-tested for a high throughput of tasks like my use case.

@mlissner
Copy link

As much as I detest it, I may have to move over to Celery at some point as it's at least battle-tested for a high throughput of tasks like my use case.

FWIW, I'm having this problem with Celery and there's an issue about this here: celery/django-celery#121

Bummer to see dramatiq having this problem too. I was just thinking I could solve so many of precisely these kinds of issues by switching over.

@drpancake
Copy link
Author

@mlissner That's disheartening to hear! I'm still using Dramatiq but I have a cron job that restarts the worker process every 15 minutes. Luckily for my use case tasks getting killed randomly is acceptable.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants