Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispose connections when running tasks with os.fork & CeleryExecutor #13265

Merged
merged 1 commit into from Dec 23, 2020

Conversation

kaxil
Copy link
Member

@kaxil kaxil commented Dec 22, 2020

Without this fix, when using CeleryExecutor and default config (i.e. AIRFLOW__CORE__EXECUTE_TASKS_NEW_PYTHON_INTERPRETER=False), tasks are run with os.fork and the pooled connections are shared to a forked process. This causes Celery tasks to hang infinitely (tasks will stay in queued state) with the following error if enough there are not enough DB connections:

[2020-12-22 18:49:39,085: WARNING/ForkPoolWorker-2] Failed to log action with (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq

It’s critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process.

Sqlalchmey docs: https://docs.sqlalchemy.org/en/14/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork

This is also consistent with what we do in LocalExecutor:

def run(self):
# We know we've just started a new process, so lets disconnect from the metadata db now
settings.engine.pool.dispose()
settings.engine.dispose()
return super().run()


^ Add meaningful description above

Read the Pull Request Guidelines for more information.
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.

@kaxil kaxil requested a review from ashb December 22, 2020 22:54
@boring-cyborg boring-cyborg bot added the area:Scheduler Scheduler or dag parsing Issues label Dec 22, 2020
@kaxil kaxil added this to the Airflow 2.0.1 milestone Dec 22, 2020
@kaxil kaxil changed the title Dispose connections when running tasks in fork with CeleryExecutor Dispose connections when running tasks with os.fork & CeleryExecutor Dec 22, 2020
Without this fix, when using CeleryExecutor and default config (i.e. `AIRFLOW__CORE__EXECUTE_TASKS_NEW_PYTHON_INTERPRETER=False`), tasks are run in fork and the pooled connections are shared to a forked process. This causes Celery tasks to hang infinitely (tasks will stay in queued state) with the following error:

```
[2020-12-22 18:49:39,085: WARNING/ForkPoolWorker-2] Failed to log action with (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq
```

>It’s critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process.

Sqlalchmey docs: https://docs.sqlalchemy.org/en/14/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork
@github-actions
Copy link

The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest master at your convenience, or amend the last commit of the PR, and push it with --force-with-lease.

@github-actions github-actions bot added the full tests needed We need to run full set of tests for this PR to merge label Dec 23, 2020
@kaxil kaxil merged commit 7f8be97 into apache:master Dec 23, 2020
@kaxil kaxil deleted the fix-celery-task-hangs branch December 23, 2020 09:27
kaxil added a commit that referenced this pull request Jan 21, 2021
…13265)

Without this fix, when using CeleryExecutor and default config (i.e. `AIRFLOW__CORE__EXECUTE_TASKS_NEW_PYTHON_INTERPRETER=False`), tasks are run in fork and the pooled connections are shared to a forked process. This causes Celery tasks to hang infinitely (tasks will stay in queued state) with the following error:

```
[2020-12-22 18:49:39,085: WARNING/ForkPoolWorker-2] Failed to log action with (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq
```

>It’s critical that when using a connection pool, and by extension when using an Engine created via create_engine(), that the pooled connections are not shared to a forked process.

Sqlalchmey docs: https://docs.sqlalchemy.org/en/14/core/pooling.html#using-connection-pools-with-multiprocessing-or-os-fork

(cherry picked from commit 7f8be97)
@syun64
Copy link
Contributor

syun64 commented Mar 28, 2023

Hi @kaxil, I'm running an Airflow cluster with v2.5.0, CeleryExecutor and SQLAlchemy 1.4.4, and I actually ran into the same error noted on this PR.

Traceback (most recent call last):
sqlalchemy/engine/base.py", line 1057, in _rollback_impl
	self.engine.dialect.do_rollback(self.connection)
sqlalchemy/engine/default.py", line 683, in do_rollback
	dbapi_connection.rollback()
psycopg2.DatabaseError: error with status PGRES_TUPLES_OK and no message from the libpq

It seems to have happened when making the call:

airflow/jobs/scheduler_job.py", line 889, in _run_scheduler_loop
num_finished_events = self._process_executor_events(session=session)

On that link you noted in the PR description, it recommends that we call the dispose function with close=False (default is close=True) to ensure that the new process will not touch any of the parent process’ connections and will instead start with new connections.

Was there a reason why we opted to leave the close option out of the function call, or is this something we could add to address more edge cases where the CeleryExecuter could be going into a bad state when forking on PostgreSQL connection?

@syun64
Copy link
Contributor

syun64 commented Mar 28, 2023

Actually, after some more debugging it looks like this issue isn't specific to happening when processing the executor events.

This is a error traceback that happened when the scheduler was fetching active_runs_of_dags, that put the scheduler into a bad state.

Traceback (most recent call last):
  File "/airflow/jobs/scheduler_job.py", line 759, in _execute
    self._run_scheduler_loop()
  File "/airflow/jobs/scheduler_job.py", line 885, in _run_scheduler_loop
    num_queued_tis = self._do_scheduling(session)
  File "/airflow/jobs/scheduler_job.py", line 958, in _do_scheduling
    self._start_queued_dagruns(session)
  File "/airflow/jobs/scheduler_job.py", line 1213, in _start_queued_dagruns
    DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), only_running=True, session=session),
  File "/airflow/utils/session.py", line 72, in wrapper
    return func(*args, **kwargs)
  File "/airflow/models/dagrun.py", line 277, in active_runs_of_dags
    query = query.filter(cls.dag_id.in_(list(set(dag_ids))))
  File "/airflow/jobs/scheduler_job.py", line 1213, in <genexpr>
    DagRun.active_runs_of_dags((dr.dag_id for dr in dag_runs), only_running=True, session=session),
  File "/sqlalchemy/orm/query.py", line 2900, in __iter__
    result = self._iter()
  File "/sqlalchemy/orm/query.py", line 2915, in _iter
    result = self.session.execute(
  File "/sqlalchemy/orm/session.py", line 1714, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/sqlalchemy/engine/base.py", line 1705, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
    ret = self._execute_context(
  File "/sqlalchemy/engine/base.py", line 1943, in _execute_context
    self._handle_dbapi_exception(
  File "/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
    util.raise_(
  File "/sqlalchemy/util/compat.py", line 210, in raise_
    raise exception
  File "/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.DatabaseError: (psycopg2.DatabaseError) error with status PGRES_TUPLES_OK and no message from the libpq

This failure happened as soon as the application came up, and I'm now wondering if this is related to the connection pool unintentionally being shared across forked processes as well... To continue with my investigation, I'm testing out disabling the usage of connection pooling on my Airflow cluster sql_alchemy_pool_enabled to see if I'm still able to observe this error.

If setting up that flag does resolve this issue, I wonder if that's means there are still some edge cases that make the usage of pooling unsafe with celery executor (unless we do the converse and launch all tasks with new Python Interpreter)

@syun64
Copy link
Contributor

syun64 commented Apr 4, 2023

Just following up on my own bug report on this PR for anyone who's facing a similar issue: I wasn't able to find the root cause for my issue, but I was able to resolve the issue by disabling connection pooling by setting sql_alchemy_pool_enabled to False.

I am not an expert in database connections to put much weight on this, but there seem to be folks recommending the use of NullPool when the DB access is already managed through PgBouncer. And this seems to be anecdotally consistent with our experience in debugging our issue. Using NullPool resolved all of the connection issues we were having with our deployment, where the some of the schedulers would go into a bad state with the PostgreSQL DB connection, and the number of connections were still being managed by PgBouncer, with no visible performance degradations as a result of disabling connection pools.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues full tests needed We need to run full set of tests for this PR to merge
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants