You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have an external dependency with heavy setup calls (several auth requests) which is hard to modify. This external dependency saves and reuses asyncio event loop after first invocation.
I have an asyncio call which I need to run in the celery task. This method uses this external dependency internally.
I can't use async_to_sync django call because it doesn't allow me to preserve the loop. Therefore, subsequent tasks executions raise Event loop is closed when I invoke dependency lib calls.
I used the following way to handle that:
class ExampleModel(models.Model)
name = models.CharField()
async def my_method():
example_entry = await ExampleModel.objects.aget(id=1)
# external async dep call here
return example_entry
@shared_task(name="example_task", bind=True)
def example_task(self):
utils.get_event_loop().run_until_complete(my_method())
It has worked perfectly for a few months, until we had a restart of the database and connection was lost. All further tasks had django.db.utils.InterfaceError: connection already closed error.
Digging in, I created a set of test tasks:
@shared_task(name="debug_task_async_to_sync", bind=True)
def debug_task_async_to_sync(self):
import asgiref.sync
res = asgiref.sync.async_to_sync(my_method)()
print("async_to_sync", res)
@shared_task(name="debug_task_loop", bind=True)
def debug_task_loop(self):
res = utils.get_event_loop().run_until_complete(my_method())
print("loop", res)
@shared_task(name="debug_task")
def debug_task():
res = models.ExampleModel.objects.get(id=1)
print("sync", res)
and restarted the database in the time of their execution.
Both sync debug_task and debug_task_async_to_sync reconnect to the database. "Loop" one stops executing for all subsequent invocations. Moreover, it happens simultaneously (e.g. non-"loop" tasks work recreate the connection at the same time as the "loop" one keeps failing).
I can't change the loop approach to async_to_sync call due to the mentioned dependency. I couldn't find a way to preserve the loop in this case. Is there any way to fix DB reconnections?
The trace of "connection is already closed":
Traceback (most recent call last):
File "/home/user/.../venv/lib/python3.11/site-packages/celery/app/trace.py", line 477, in trace_task
R = retval = fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/celery/app/trace.py", line 760, in __protected_call__
return self.run(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../project/app/tasks/misc.py", line 27, in debug_task_loop
res = utils.get_event_loop().run_until_complete(my_method())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.11.0/lib/python3.11/asyncio/base_events.py", line 650, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "/home/user/.../project/app/tasks/misc.py", line 14, in my_method
return await models.ExampleModel.objects.aget(id=1)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/models/query.py", line 649, in aget
return await sync_to_async(self.get)(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/asgiref/sync.py", line 479, in __call__
ret: _R = await loop.run_in_executor(
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.pyenv/versions/3.11.0/lib/python3.11/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/asgiref/sync.py", line 538, in thread_handler
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/models/query.py", line 633, in get
num = len(clone)
^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/models/query.py", line 380, in __len__
self._fetch_all()
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/models/query.py", line 1881, in _fetch_all
self._result_cache = list(self._iterable_class(self))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/models/query.py", line 208, in __iter__
for row in compiler.results_iter(
^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/models/sql/compiler.py", line 1513, in results_iter
results = self.execute_sql(
^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/models/sql/compiler.py", line 1560, in execute_sql
cursor = self.connection.cursor()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/django/utils/asyncio.py", line 26, in inner
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/backends/base/base.py", line 330, in cursor
return self._cursor()
^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/backends/base/base.py", line 307, in _cursor
with self.wrap_database_errors:
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/utils.py", line 91, in __exit__
raise dj_exc_value.with_traceback(traceback) from exc_value
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/backends/base/base.py", line 308, in _cursor
return self._prepare_cursor(self.create_cursor(name))
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/django/utils/asyncio.py", line 26, in inner
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/user/.../venv/lib/python3.11/site-packages/django/db/backends/postgresql/base.py", line 330, in create_cursor
cursor = self.connection.cursor()
^^^^^^^^^^^^^^^^^^^^^^^^
django.db.utils.InterfaceError: connection already closed
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Preface:
I can't use
async_to_sync
django call because it doesn't allow me to preserve the loop. Therefore, subsequent tasks executions raiseEvent loop is closed
when I invoke dependency lib calls.I used the following way to handle that:
It has worked perfectly for a few months, until we had a restart of the database and connection was lost. All further tasks had
django.db.utils.InterfaceError: connection already closed
error.Digging in, I created a set of test tasks:
and restarted the database in the time of their execution.
Both sync
debug_task
anddebug_task_async_to_sync
reconnect to the database. "Loop" one stops executing for all subsequent invocations. Moreover, it happens simultaneously (e.g. non-"loop" tasks work recreate the connection at the same time as the "loop" one keeps failing).I can't change the loop approach to
async_to_sync
call due to the mentioned dependency. I couldn't find a way to preserve the loop in this case. Is there any way to fix DB reconnections?The trace of "connection is already closed":
Beta Was this translation helpful? Give feedback.
All reactions