Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow scheduling only 1 dag at a time leaving other dags in a queued state #18051

Closed
1 of 2 tasks
r-richmond opened this issue Sep 7, 2021 · 6 comments
Closed
1 of 2 tasks
Labels
area:core kind:bug This is a clearly a bug

Comments

@r-richmond
Copy link
Contributor

r-richmond commented Sep 7, 2021

Apache Airflow version

2.1.3 (latest released)

Operating System

Linux

Versions of Apache Airflow Providers

I don't think it is relevant but can provide them upon request.

Deployment

Other Docker-based deployment

Deployment details

No response

What happened

Completed an update to airflow 2.1.3 on Thursday and received an error report over the weekend that jobs were not being run. Upon investigating I discovered that only 1 DAG was running with everything else stuck in the queued state.
Glancing at the 1 running dag was a long running backfill (30 days, ETA 2 weeks left, ~180 dag runs) that has max_active_runs=1

The logs on airflow worker were normal however the logs on the scheduler were displaying the following error raise OSError("handle is closed") (See below for complete Logs).

Anyways restarting the scheduler & workers did nothing. However upon turning off this long running dag (another 1 just like it started after turning off that one too) all of the dags began scheduling normally.

What you expected to happen

Airflow should be able to continue scheduling dags normally regardless of the existence of this dag that is slowly catching up.

Note this may be related to one of the following:
#17975
#13542
#17945

EDIT: Upon reading #17945 again it seems like this will resolve our issue. I'll mark this as closed once I can verify that change fixes the issue (i.e. a new version is released)

How to reproduce

Steps to attempt to reproduce (Apologies for not going this far).

  1. Introduce a dag that has a large amount dag runs needed to catch up to current (catch_up=true)
  2. set the max_active_runs = 1 for this dag
  3. (probably a simple dag that sleeps for an hour on a task)
  4. submit the dag with a start date 2 years ago and runs daily
  5. Test and see if the dag can schedule other dags while the original dag is turned on.

Anything else

Airflow Scheduler Logs

Logs from after the issue started and after restarting the scheduler.

  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2021-09-07 02:55:51,944] {scheduler_job.py:661} INFO - Starting the scheduler
[2021-09-07 02:55:51,944] {scheduler_job.py:666} INFO - Processing each file at most -1 times
[2021-09-07 02:55:52,055] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 11
[2021-09-07 02:55:52,058] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 02:55:52,062] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 02:59:34,363] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=612)
Process ForkProcess-1:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 02:59:35,405] {manager.py:401} WARNING - DagFileProcessorManager (PID=11) exited with exit code 1 - re-launching
[2021-09-07 02:59:35,410] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 819
[2021-09-07 02:59:35,418] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 03:00:52,433] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 03:00:52,441] {scheduler_job.py:1219} INFO - Marked 1 SchedulerJob instances as failed
[2021-09-07 03:00:52,889] {celery_executor.py:483} INFO - Adopted the following 1 tasks from a dead executor
	<TaskInstance: queue_backfill.f_queue_time_cdc 2020-12-24 05:00:00+00:00 [running]> in state STARTED
[2021-09-07 03:03:22,623] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=1626)
[2021-09-07 03:03:22,624] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=1626)
Process ForkProcess-2:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 03:03:23,543] {manager.py:401} WARNING - DagFileProcessorManager (PID=819) exited with exit code 1 - re-launching
[2021-09-07 03:03:23,548] {manager.py:254} INFO - Launched DagFileProcessorManager with pid: 1806
[2021-09-07 03:03:23,557] {settings.py:51} INFO - Configured default timezone Timezone('UTC')
[2021-09-07 03:05:54,184] {scheduler_job.py:1197} INFO - Resetting orphaned tasks for active dag runs
[2021-09-07 03:07:42,551] {processor.py:243} WARNING - Killing DAGFileProcessorProcess (PID=2711)
Process ForkProcess-3:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
[2021-09-07 03:07:42,784] {manager.py:401} WARNING - DagFileProcessorManager (PID=1806) exited with exit code 1 - re-launching

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@r-richmond r-richmond added area:core kind:bug This is a clearly a bug labels Sep 7, 2021
@datarian
Copy link

datarian commented Sep 7, 2021

I am also observing the same issue.

I can add that it is not limited to max_active_runs = 1. We tried to increase max_active_runs in the hope to speed up things, but the same deadlock for other DAGs was still present.

We have Airflow 2.1.3 deployed on k8s using the unofficial helm chart at https://github.com/airflow-helm/charts.

We mostly have default config settings. Changes related to the scheduler are these:

AIRFLOW__SCHEDULER__PARSING_PROCESSES: "4"
AIRFLOW__SCHEDULER__PROCESSOR_POLL_INTERVAL: "3"
AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL: "60"

@ephraimbuddy
Copy link
Contributor

For now, Increase AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE to a higher number.

@r-richmond , #17945 will resolve it but would introduce another bug that execution_order won't be followed. We are working on a better fix: #18061

@r-richmond
Copy link
Contributor Author

Ty @ephraimbuddy for the update. I was about to proceed with cherry picking #17945 and your comment saved me some extra experimentation work.

@r-richmond
Copy link
Contributor Author

r-richmond commented Sep 22, 2021

Going to close this out since I assume #18061 tackled it and I'm on 2.1.4. now. If this issue presents itself again I'll re-open.

Thanks again @ephraimbuddy

note: the only thing that I find still strange is the error logs from the scheduler that I originally had in particular the error

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 370, in _run_processor_manager
    processor_manager.start()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 610, in start
    return self._run_parsing_loop()
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 671, in _run_parsing_loop
    self._collect_results_from_processor(processor)
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/manager.py", line 981, in _collect_results_from_processor
    if processor.result is not None:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 321, in result
    if not self.done:
  File "/usr/local/lib/python3.8/site-packages/airflow/dag_processing/processor.py", line 286, in done
    if self._parent_channel.poll():
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 255, in poll
    self._check_closed()
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")

however, that hasn't come up again so... fingers crossed it won't happen again?

@pcolladosoto
Copy link
Contributor

Hi! After reading through the issue I think this might be related to #22191. Should this be the case, it has been fixed on #22685 and it should make its way to Airflow 2.3.0 🎉

@r-richmond
Copy link
Contributor Author

@pcolladosoto after reading the linked issue I think you may be right. Thank you so much for your write up and fix. It definitely looks like you found the root cause for the OsError I saw in the logs.

Now I/we wait for 2.3 :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:core kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

4 participants