Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DbConnectionsMiddleware and "Lost connection to MySQL server during query" #19

Closed
brunabxs opened this issue Aug 21, 2018 · 11 comments
Closed
Assignees
Labels

Comments

@brunabxs
Copy link

brunabxs commented Aug 21, 2018

Hi,

I am using DbConnectionsMiddleware and I am still having problems if two tasks are processed and MySQL closes the connection between these two executions.

I wrote a test to illustrate this example:

models.py

class SomeModel(models.Model):
    pass

tests.py

import dramatiq
import pytest
import time

from django_dramatiq.middleware import DbConnectionsMiddleware

from django.db import OperationalError


@pytest.fixture(scope='function')
def broker_stub():
    broker = StubBroker(middleware=[DbConnectionsMiddleware()])
    broker.emit_after('process_boot')
    dramatiq.set_broker(broker)
    yield broker
    broker.flush_all()
    broker.close()


@pytest.fixture()
def worker_stub(broker_stub):
    worker = Worker(broker_stub, worker_timeout=100, worker_threads=1)
    worker.start()
    yield worker
    worker.stop()


@pytest.fixture(scope='function')
def db_wait_timeout():
    from django.db import connection
    value = 10
    # set wait_timeout to force MySQL errors
    with connection.cursor() as cursor:
        cursor.execute(f'SET @@GLOBAL.wait_timeout={value};')
    connection.close()
    yield value
    # rollback wait_timeout to avoid django_db fixture cleanup errors
    with connection.cursor() as cursor:
        cursor.execute('SET @@GLOBAL.wait_timeout=28800;')
    connection.close()


@pytest.mark.django_db(transaction=True)
def test_must_not_raise_operational_error(self, db_wait_timeout, broker_stub, worker_stub):
    results = []

    @dramatiq.actor()
    def do_work():
        from .models import SomeModel
        try:
            model = SomeModel()
            model.save()
            results.append('success')
        except Exception as error:
            results.append(error)

    # initialize connections with database
    do_work.send()
    broker_stub.join(do_work.queue_name)
    worker_stub.join()

    # wait until all connections are terminated by MySQL
    time.sleep(db_wait_timeout + 5)

    # try to use available connections
    do_work.send()

    # wait until all connections are terminated by MySQL
    time.sleep(db_wait_timeout + 5)

    assert results == ['success', 'success']

I think the problem occurs because DbConnectionsMiddleware closes django db connections only:

  • before_consumer_thread_shutdown
  • before_worker_thread_shutdown
  • before_worker_shutdown

I think it needs also to close old connections:

  • before_process_message
  • after_process_message
class DbConnectionsMiddleware(Middleware):
    ...
    def _close_old_connections(self, *args, **kwargs):
        db.close_old_connections()
        
    def before_process_message(self, broker, message):
        self._close_old_connections()

    def after_process_message(self, broker, message, *, result=None, exception=None):
        self._close_old_connections()

Am I doing something wrong or what I've proposed makes sense?

@beigna
Copy link
Contributor

beigna commented Sep 11, 2018

Hi!, I don't know if my problem is related... But I'm having too much OperationalError: (2006, 'MySQL server has gone away')

I run dramatiq worker in two servers (with RabbitMQ as backend) and after 24 or 36 hours of running, my Sentry is flooded with 'MySQL server has gone away' and I must to restart the workers.

Thanks!

Django==2.1
django-dramatiq==0.4.1
dramatiq==1.3.0
DRAMATIQ_BROKER = {
    "BROKER": "dramatiq.brokers.rabbitmq.RabbitmqBroker",
    "OPTIONS": {
        "url": "amqp://shfdashfga@sdlhfgsd:5672/asdasd",
    },
    "MIDDLEWARE": [
        "dramatiq.middleware.AgeLimit",
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Retries",
        "django_dramatiq.middleware.DbConnectionsMiddleware",
        "project.middleware.DramatiqSentryMiddleware",
    ]
}

@beigna
Copy link
Contributor

beigna commented Jan 4, 2019

Hi, Happy new year!

I updated my dependencies Today

Django==2.1
django-dramatiq==0.5.1
dramatiq==1.4.1

The same issue :(
How can I force the worker to re-establish the connection to MySQL when this error occurs?
Can the worker open and close the connection with each message that acquire? (without write a custom middleware like @brunabxs proposes)

Thanks for your time!

OperationalError: (2006, 'MySQL server has gone away')
  File "dramatiq/worker.py", line 397, in process_message
    res = actor(*message.args, **message.kwargs)
  File "dramatiq/actor.py", line 213, in __call__
    return self.fn(*args, **kwargs)
  File "billing/tasks.py", line 23, in accounting
    service_binding=sb['id']
  File "django/db/models/manager.py", line 82, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
  File "django/db/models/query.py", line 393, in get
    num = len(clone)
  File "django/db/models/query.py", line 250, in __len__
    self._fetch_all()
  File "django/db/models/query.py", line 1183, in _fetch_all
    self._result_cache = list(self._iterable_class(self))
  File "django/db/models/query.py", line 54, in __iter__
    results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
  File "django/db/models/sql/compiler.py", line 1061, in execute_sql
    cursor.execute(sql, params)
  File "raven/contrib/django/client.py", line 127, in execute
    return real_execute(self, sql, params)
  File "django/db/backends/utils.py", line 68, in execute
    return self._execute_with_wrappers(sql, params, many=False, executor=self._execute)
  File "django/db/backends/utils.py", line 77, in _execute_with_wrappers
    return executor(sql, params, many, context)
  File "django/db/backends/utils.py", line 85, in _execute
    return self.cursor.execute(sql, params)
  File "django/db/utils.py", line 89, in __exit__
    raise dj_exc_value.with_traceback(traceback) from exc_value
  File "django/db/backends/utils.py", line 85, in _execute
    return self.cursor.execute(sql, params)
  File "django/db/backends/mysql/base.py", line 71, in execute
    return self.cursor.execute(query, args)
  File "MySQLdb/cursors.py", line 250, in execute
    self.errorhandler(self, exc, value)
  File "MySQLdb/connections.py", line 50, in defaulterrorhandler
    raise errorvalue
  File "MySQLdb/cursors.py", line 247, in execute
    res = self._query(query)
  File "MySQLdb/cursors.py", line 412, in _query
    rowcount = self._do_query(q)
  File "MySQLdb/cursors.py", line 375, in _do_query
    db.query(q)
  File "MySQLdb/connections.py", line 276, in query
    _mysql.connection.query(self, query)

@Bogdanp
Copy link
Owner

Bogdanp commented Jan 5, 2019

This issue sounds like you're holding a connection to a MySQL server in a pool for long enough that the server times out the connection. This page has more information on the issue. The simplest way to fix this in a Django application (without touching the db) is probably to turn off connection pooling by setting the CONN_MAX_AGE setting to 0 (the default). A better fix would be to set CONN_MAX_AGE to a value lower than your MySQL server's wait_timeout.

@Bogdanp Bogdanp closed this as completed Jan 5, 2019
@beigna
Copy link
Contributor

beigna commented Jan 5, 2019

Thanks you @Bogdanp . I "fixed" my issue several months ago, using the workarround suggested by @brunabxs

Let my say that I'm not using a connection pool with Django. Monitoring the MySQL DB I can see how the worker keeps the conecction alive (even when no tasks queued). When the connection is lost, the worker starts failing forever.
My tasks are too simples (one or two selects and one insert with out transactions, using the Django's ORM obviously).

I tried to find the reason by verifying the source code but I could not do it :(

Thanks again! Greetings!

@Bogdanp
Copy link
Owner

Bogdanp commented Jan 5, 2019

@nachopro I just took a look at the Django source code and it appears things work differently than I initially thought. IMO, this is bad design on Django's part, but it looks like they expect you to iterate over and possibly close connections before and after every request (in our case task execution). I'll push a fix in a few minutes. The example @brunabxs gave is the appropriate way to fix this.

@Bogdanp Bogdanp reopened this Jan 5, 2019
@Bogdanp Bogdanp self-assigned this Jan 5, 2019
@Bogdanp Bogdanp added the bug label Jan 5, 2019
@Bogdanp Bogdanp closed this as completed in 1a9a5a4 Jan 5, 2019
@Bogdanp
Copy link
Owner

Bogdanp commented Jan 5, 2019

OK, this should now be fixed in 0.5.2.

@Bogdanp
Copy link
Owner

Bogdanp commented Jan 6, 2019

@nachopro let me know if 0.5.2 fixed the issue for you.

@beigna
Copy link
Contributor

beigna commented Jan 6, 2019

@Bogdanp Yes, it's working great 🌅🌞

@Bogdanp
Copy link
Owner

Bogdanp commented Jan 6, 2019

Awesome! Sorry it took so long to fix.

@beigna
Copy link
Contributor

beigna commented Jan 6, 2019

Do not worry. Dramatiq is a great piece of software and supports many different providers. It is impossible to be aware of everything.

Thank you very much and especially to @brunabxs , who proposed the solution.

@CapedHero
Copy link
Contributor

CapedHero commented Sep 28, 2021

FYI, I have spent over half a day trying to figure out why I had the "lost connection" issue happening for me on Postgres. The exceptions were happening in before_process_message logic, which was trying to save a task in the database.

The underlying reason turned out to be the config:

DRAMATIQ_BROKER = {
    ...
    "MIDDLEWARE": [
        "django_dramatiq.middleware.DbConnectionsMiddleware",  # AdminMiddleware MUST BE AFTER
        "django_dramatiq.middleware.AdminMiddleware",
    ],
}

@Bogdanp, do you think it's worth mentioning in the README?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants