-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Database QueuePool issue with 1500+ task flows #6492
Comments
Thanks for the issue! I see you’re using the ephemeral server which means that each client needs to connect to your postgres database. This is only intended to facilitate onboarding and small flows. Once you start to scale out, you should run the API in a separate process with |
If that doesn't resolve it definitely let me know and we can dig into this issue. |
Getting the following error, but I am trying again.
|
This stack trace might be more useful, but I am not getting the QueuePool error!
|
Are you running your flow on the same machine that you started the server on? It doesn't look like the server is available at that URL. |
Yes I am using the same machine, and things started out just fine. I can still see the Orion server process running and the UI is up at my defined PREFECT_API_URL.
|
So just to clarify, some of your tasks are running and successfully connecting to the API but eventually one fails with this |
That looks to be what is happening, yes. |
Checking in on this issue. Is this something being looked into? It is still tagged "As Designed". Thank you. |
I also get the |
It looks like somehow the client is getting saturated and tasks are failing to connect to the API with This will require further investigation. cc @zangell44 on the performance raft. |
Update: @gabcoyne pointed me towards hooking into the cloud API and that worked once, but then I adjusted something in the flow and I get the same error. Here is the stack trace:
|
Is there an update on this issue? I am running into the same error when trying to run a flow with large number of tasks. |
We are still investigating this, thanks for reporting! |
Any update on this? Basically the |
Hi @ratulotron! We're actively working on these types of issues and hope to have pull requests to resolve them within the next few weeks. With that said I was actually attempting to reproduce this issue yesterday and was unable to do so on the most recent version. Have you tried it recently? If you have and are still seeing issues, do you have an example that's failing? |
In that case there is a possibility that I am running things in a wrong way. I have the following code as a deployment at the moment, without any concurrency limit as that didn't seem to effect the error. I am using a local PostgreSQL database as well. There are around 30k items, I can see a lot of entries get processed (i.e. the ID gets printed) then all of a sudden I get the error Edit: removed unused ElasticSearch related stuff from snippet. from prefect import flow, task, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
import json
FILENAME = "items.json"
# Method to push a single doc
@task(
retries=3,
retry_delay_seconds=300,
timeout_seconds=10
)
def create_doc(company):
logger = get_run_logger()
item_id = company["item_id"]
logger.info("Now processing <%s>!", item_id)
return item_id
@flow(task_runner=ConcurrentTaskRunner())
def push_to_elasticsearch():
with open(FILENAME, "r", encoding="utf-8") as f:
for line in f:
data = json.loads(line)
create_doc.submit(data)
if __name__ == "__main__":
push_to_elasticsearch() Error trace:
|
Despite the work in #8887 I'm encountering a locked database with SQLite 12:48:19.505 | INFO | Task run 'Test Task-2809' - WorkerThread-338
12:48:19.512 | ERROR | prefect.server - Encountered exception in request:
Traceback (most recent call last):
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
self._adapt_connection._handle_exception(error)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
raise error
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
self.await_(_cursor.execute(operation, parameters))
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/cursor.py", line 37, in execute
await self._execute(self._cursor.execute, sql, parameters)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/cursor.py", line 31, in _execute
return await self._conn._execute(fn, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/core.py", line 137, in _execute
return await future
^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/core.py", line 110, in run
result = function()
^^^^^^^^^^
sqlite3.OperationalError: database is locked model = await models.task_runs.create_task_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/dev/prefect/src/prefect/server/database/dependencies.py", line 119, in async_wrapper
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/dev/prefect/src/prefect/server/models/task_runs.py", line 63, in create_task_run
await session.execute(insert_stmt)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 214, in execute
result = await greenlet_spawn(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1714, in execute
result = conn._execute_20(statement, params or {}, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
self._handle_dbapi_exception(
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
util.raise_(
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
self._adapt_connection._handle_exception(error)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
raise error
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
self.await_(_cursor.execute(operation, parameters))
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/cursor.py", line 37, in execute
await self._execute(self._cursor.execute, sql, parameters)
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/cursor.py", line 31, in _execute
return await self._conn._execute(fn, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/core.py", line 137, in _execute
return await future
^^^^^^^^^^^^
File "/Users/mz/.pyenv/versions/prefect-311/lib/python3.11/site-packages/aiosqlite/core.py", line 110, in run
result = function()
^^^^^^^^^^
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: INSERT INTO task_run (id, created, updated, name, run_count, total_run_time, task_key, dynamic_key, cache_key, cache_expiration, task_version, flow_run_run_count, empirical_policy, task_inputs, tags, flow_run_id) VALUES (:id, :created, :updated, :name, :run_count, :total_run_time, :task_key, :dynamic_key, :cache_key, :cache_expiration, :task_version, :flow_run_run_count, :empirical_policy, :task_inputs, :tags, :flow_run_id) ON CONFLICT (flow_run_id, task_key, dynamic_key) DO NOTHING]
[parameters: {'id': 'd6372ac9-effe-48bc-a87e-74059ea593e7', 'created': '2023-03-23 17:47:19.015579', 'updated': '2023-03-23 17:47:19.126383', 'name': 'Test Task-80', 'run_count': 0, 'total_run_time': '1970-01-01 00:00:00.000000', 'task_key': '__main__.test_task', 'dynamic_key': '80', 'cache_key': None, 'cache_expiration': None, 'task_version': None, 'flow_run_run_count': 0, 'empirical_policy': '{"max_retries": 0, "retry_delay_seconds": 0.0, "retries": 0, "retry_delay": 0, "retry_jitter_factor": null}', 'task_inputs': '{"my_range": []}', 'tags': '[]', 'flow_run_id': 'e3dafbe4-ee87-4e99-9c69-a83bd593d236'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8) |
Has there been any solution to this issue? I'm facing the same problem. |
@madkinsz My problem is similar to the OP's problem. In my flow, I basically call a
I was running a local Prefect server with PostgreSQL database. I had created a concurrency limit of 80 for the tag associated with my task.
|
@madkinsz this is the issue i was running into in #8935 w/ the sqlite backend. seems to affect the postgres backend too. I debuged this a bit more by adding DEBUG logging to the server/client. My settings are added w/ a .env file:
Then I run https://www.dropbox.com/sh/5whhrsarcce6fe6/AACE7g_LJoB-LIlsWwY_hcYLa?dl=0
|
@zanieb @justquick Hitting the same error when trying to executes tasks on 200+ dask workers. Seems like prefect does not allow to configure It seems to be caused by the
|
We should add retries for this error like we did in #9632 |
This is a common but complex race condition. I'm actually running into it when doing an asyncio queue pool for talking to redis. Do you have a solution to address this in the pool settings? Adding retrys would be a bandaid and still might not fix the issue in subsequent runs since it can affect hundreds of coroutines |
@justquick Not sure if that helps but we workaround this by creating several prefect servers pods in our k8s cluster instead of a single one. This will increase the number of connection you can have at the same time on your DB. Still a workaround though... |
I need to eventually run 10,000 tasks but in setting up prefect for the first time and running just 100 I am also getting both errors mentioned...
I am using postgres and the server I am testing on has 8 CPUs and 16GB of RAM. When it does run fine there is plenty of headroom. I set concurrency limits and worker pool limit to 50. UPDATE: now 100 after fix below. my code is similar to this flow...
UPDATE: i just did a few runs in a row without a single error. Not sure if just coincidence or the following "AI recommendations" fixed it or not. Not sure how to test is this config.toml is even doing anything? I also upped limits in postgres. I am no expert though.
|
I was having the same problem. It was confusing because I was not getting any failures on my first several flow runs and then suddenly I couldn't get successfully complete any flow run without this popping up. I tried all the solutions I could find here and in other threads but the only thing that seems to have fixed my issue for now was simply truncating the task_run table in Postgres. My guess is the task_run table grew so large (it was over 500k records before truncation) that the insert statements for creating a new task became significantly less performant which started tripping the timeout error. Ultimately, I'm happy I found a fix for this, but I'm not sure what the long term solution would look like. I do see quite a few indexes on that table which I'm sure is contributing to the slow inserts once the table grows large enough, so maybe the ORM could drop them at the start of a flow run and recreate them after all the tasks are finished? I'm curious what other solutions the dev team might have. Edit: |
I've opened this PR which I believe addresses this issue. Would love to see this merged into the |
First check
Bug summary
I have a flow with a current size of 3000 tasks that I map across the ConcurrentTaskRunner. However, when I try to run the flow with around 1500 tasks or more, I get this QueuePool error.
Reproduction
Error
Versions
Additional context
This is a flow I am porting from Prefect 1.0 where it runs just fine at the 3000 task level. I even have flows that run more than 3000 mapped tasks without any issue on Prefect 1.0, which I would like to port to 2.0.
The text was updated successfully, but these errors were encountered: