Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler gets stuck without a trace #7935

Closed
dimberman opened this issue Mar 27, 2020 · 71 comments · Fixed by #15112
Closed

scheduler gets stuck without a trace #7935

dimberman opened this issue Mar 27, 2020 · 71 comments · Fixed by #15112
Assignees
Labels
area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug
Milestone

Comments

@dimberman
Copy link
Contributor

Apache Airflow version:

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

  • Cloud provider or hardware configuration:
  • OS (e.g. from /etc/os-release):
  • Kernel (e.g. uname -a):
  • Install tools:
  • Others:
    What happened:

The scheduler gets stuck without a trace or error. When this happens, the CPU usage of scheduler service is at 100%. No jobs get submitted and everything comes to a halt. Looks it goes into some kind of infinite loop.
The only way I could make it run again is by manually restarting the scheduler service. But again, after running some tasks it gets stuck. I've tried with both Celery and Local executors but same issue occurs. I am using the -n 3 parameter while starting scheduler.

Scheduler configs,
job_heartbeat_sec = 5
scheduler_heartbeat_sec = 5
executor = LocalExecutor
parallelism = 32

Please help. I would be happy to provide any other information needed

What you expected to happen:

How to reproduce it:

Anything else we need to know:

Moved here from https://issues.apache.org/jira/browse/AIRFLOW-401

@abhijit-kottur
Copy link

I'm running Airflow 1.10.4 as Celery in k8s. The scheduler pod is getting stuck while starting up at the step 'Resetting orphaned tasks'.

[2020-03-31 19:34:36,955] {{__init__.py:51}} INFO - Using executor CeleryExecutor
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
[2020-03-31 19:34:37,533] {{scheduler_job.py:1288}} INFO - Starting the scheduler
[2020-03-31 19:34:37,534] {{scheduler_job.py:1296}} INFO - Running execute loop for -1 seconds
[2020-03-31 19:34:37,534] {{scheduler_job.py:1297}} INFO - Processing each file at most -1 times
[2020-03-31 19:34:37,535] {{scheduler_job.py:1300}} INFO - Searching for files in /usr/local/airflow/dags
[2020-03-31 19:34:38,124] {{scheduler_job.py:1302}} INFO - There are 39 files in /usr/local/airflow/dags
[2020-03-31 19:34:38,124] {{scheduler_job.py:1349}} INFO - Resetting orphaned tasks for active dag runs

This causes the UI to say

The scheduler does not appear to be running. Last heartbeat was received 5 minutes ago

The same thing happens even after restarting the scheduler pod. (regardless of the CPU usage)

Any leads to solve this?

@mik-laj
Copy link
Member

mik-laj commented Mar 31, 2020

What database are you using?

@abhijit-kottur
Copy link

@mik-laj PostgreSQL. Thats running as a pod too.

@NiGhtFurRy
Copy link

We are also facing Scheduler stuck issue which sometimes gets resolved by restarting the scheduler pod. There are not log trace in the scheduler process. We are using airflow 1.10.9 with postgres and redis.

@leerobert
Copy link

leerobert commented Apr 21, 2020

We're also seeing this same issue... no idea how to debug. airflow 1.10.9 with postgres / rabbitmq

@chrismclennon
Copy link
Contributor

chrismclennon commented May 20, 2020

I see a similar issue on 1.10.9 where the scheduler runs fine on start but typically after 10 to 15 days the CPU utilization actually drops to near 0%. The scheduler health check in the webserver does still pass, but no jobs get scheduled. A restart fixes this.

Seeing as I observe a CPU drop instead of a CPU spike, I'm not sure if these are the same issues, but they share symptoms.

@gmcoringa
Copy link

I see a similar issue on 1.10.10... there are no logs to indicate the problem. Airflow with mysql, redis and celery executor.

PS: we still run the scheduler with the arguments -n 10

@chrismclennon
Copy link
Contributor

I've anecdotally noticed that once I've dropped argument -n 25 from our scheduler invocation, I haven't seen this issue come up since. Before, it would crop up every ~10 days or so and it's been about a month now without incident.

@mik-laj
Copy link
Member

mik-laj commented Jun 20, 2020

Could someone try to run pyspy when this incident occurs? This may bring us to a solution. Thanks to this, we will be able to check what code is currently being executed without restarting the application.
https://github.com/benfred/py-spy

@sylr
Copy link

sylr commented Jul 30, 2020

root@airflow-scheduler-5b76d7466f-dxdn2:/usr/local/airflow# ps auxf
USER        PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root       5229  0.5  0.0  19932  3596 pts/0    Ss   13:25   0:00 bash
root       5234  0.0  0.0  38308  3376 pts/0    R+   13:25   0:00  \_ ps auxf
root          1  2.7  0.6 847400 111092 ?       Ssl  12:48   1:01 /usr/local/bin/python /usr/local/bin/airflow scheduler -n -1
root         19  0.7  0.5 480420 86124 ?        S    12:48   0:16 airflow scheduler -- DagFileProcessorManager
root       5179  0.1  0.0      0     0 ?        Z    13:17   0:00  \_ [airflow schedul] <defunct>
root       5180  0.1  0.0      0     0 ?        Z    13:17   0:00  \_ [airflow schedul] <defunct>
root       5135  0.0  0.5 847416 96960 ?        S    13:17   0:00 /usr/local/bin/python /usr/local/bin/airflow scheduler -n -1
root       5136  0.0  0.0      0     0 ?        Z    13:17   0:00 [/usr/local/bin/] <defunct>
Collecting samples from 'airflow scheduler -- DagFileProcessorManager' (python v3.7.8)
Total Samples 3106
GIL: 0.00%, Active: 1.00%, Threads: 1

  %Own   %Total  OwnTime  TotalTime  Function (filename:line)
  1.00%   1.00%   0.200s    0.200s   _send (multiprocessing/connection.py:368)
  0.00%   1.00%   0.000s    0.200s   start (airflow/utils/dag_processing.py:554)
  0.00%   1.00%   0.000s    0.200s   wrapper (airflow/utils/cli.py:75)
  0.00%   1.00%   0.000s    0.200s   _run_processor_manager (airflow/utils/dag_processing.py:624)
  0.00%   1.00%   0.000s    0.200s   run (airflow/jobs/base_job.py:221)
  0.00%   1.00%   0.000s    0.200s   _Popen (multiprocessing/context.py:277)
  0.00%   1.00%   0.000s    0.200s   <module> (airflow:37)
  0.00%   1.00%   0.000s    0.200s   _send_bytes (multiprocessing/connection.py:404)
  0.00%   1.00%   0.000s    0.200s   _launch (multiprocessing/popen_fork.py:74)
  0.00%   1.00%   0.000s    0.200s   scheduler (airflow/bin/cli.py:1040)
  0.00%   1.00%   0.000s    0.200s   send (multiprocessing/connection.py:206)
  0.00%   1.00%   0.000s    0.200s   start (airflow/utils/dag_processing.py:861)
  0.00%   1.00%   0.000s    0.200s   _Popen (multiprocessing/context.py:223)
  0.00%   1.00%   0.000s    0.200s   _execute_helper (airflow/jobs/scheduler_job.py:1415)
  0.00%   1.00%   0.000s    0.200s   _bootstrap (multiprocessing/process.py:297)
  0.00%   1.00%   0.000s    0.200s   _execute (airflow/jobs/scheduler_job.py:1382)
  0.00%   1.00%   0.000s    0.200s   start (multiprocessing/process.py:112)
  0.00%   1.00%   0.000s    0.200s   run (multiprocessing/process.py:99)
  0.00%   1.00%   0.000s    0.200s   __init__ (multiprocessing/popen_fork.py:20)

@sylr
Copy link

sylr commented Jul 31, 2020

Happened again today

root@airflow-scheduler-5b76d7466f-9w89s:/usr/local/airflow# py-spy dump --pid=18 --nonblocking
Process 18: airflow scheduler -- DagFileProcessorManager
Python v3.7.8 (/usr/local/bin/python3.7)

Thread 0x7F1E7B360700 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:404)
    send (multiprocessing/connection.py:206)
    start (airflow/utils/dag_processing.py:886)
    _run_processor_manager (airflow/utils/dag_processing.py:624)
    run (multiprocessing/process.py:99)
    _bootstrap (multiprocessing/process.py:297)
    _launch (multiprocessing/popen_fork.py:74)
    __init__ (multiprocessing/popen_fork.py:20)
    _Popen (multiprocessing/context.py:277)
    _Popen (multiprocessing/context.py:223)
    start (multiprocessing/process.py:112)
    start (airflow/utils/dag_processing.py:554)
    _execute_helper (airflow/jobs/scheduler_job.py:1415)
    _execute (airflow/jobs/scheduler_job.py:1382)
    run (airflow/jobs/base_job.py:221)
    scheduler (airflow/bin/cli.py:1040)
    wrapper (airflow/utils/cli.py:75)
    <module> (airflow:37)
root@airflow-scheduler-5b76d7466f-9w89s:/usr/local/airflow# py-spy dump --pid=18 --native
Process 18: airflow scheduler -- DagFileProcessorManager
Python v3.7.8 (/usr/local/bin/python3.7)

Thread 18 (idle): "MainThread"
    __write (libpthread-2.24.so)
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:404)
    send (multiprocessing/connection.py:206)
    start (airflow/utils/dag_processing.py:886)
    _run_processor_manager (airflow/utils/dag_processing.py:624)
    run (multiprocessing/process.py:99)
    _bootstrap (multiprocessing/process.py:297)
    _launch (multiprocessing/popen_fork.py:74)
    __init__ (multiprocessing/popen_fork.py:20)
    _Popen (multiprocessing/context.py:277)
    _Popen (multiprocessing/context.py:223)
    start (multiprocessing/process.py:112)
    start (airflow/utils/dag_processing.py:554)
    _execute_helper (airflow/jobs/scheduler_job.py:1415)
    _execute (airflow/jobs/scheduler_job.py:1382)
    run (airflow/jobs/base_job.py:221)
    scheduler (airflow/bin/cli.py:1040)
    wrapper (airflow/utils/cli.py:75)
    <module> (airflow:37)

@mik-laj does it help ?

@sylr
Copy link

sylr commented Jul 31, 2020

Ok so I have more info, here the situation when the scheduler gets stuck:

root@airflow-scheduler-5b76d7466f-9w89s:/usr/local/airflow# ps auxf
USER        PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root       6040  0.0  0.0  19936  3964 pts/0    Ss   20:18   0:00 bash
root       6056  0.0  0.0  38308  3140 pts/0    R+   20:19   0:00  \_ ps auxf
root          1  2.9  0.7 851904 115828 ?       Ssl  Jul30  54:46 /usr/local/bin/python /usr/local/bin/airflow scheduler -n -1
root         18  0.9  0.5 480420 86616 ?        S    Jul30  18:20 airflow scheduler -- DagFileProcessorManager
root       6020  0.1  0.0      0     0 ?        Z    20:08   0:00  \_ [airflow schedul] <defunct>
root       6021  0.1  0.0      0     0 ?        Z    20:08   0:00  \_ [airflow schedul] <defunct>
root       5977  0.0  0.6 851920 100824 ?       S    20:08   0:00 /usr/local/bin/python /usr/local/bin/airflow scheduler -n -1
root       5978  0.0  0.6 851920 100424 ?       S    20:08   0:00 /usr/local/bin/python /usr/local/bin/airflow scheduler -n -1

I managed to revive the scheduler by killing both 5977 & 5978 pids.

root@airflow-scheduler-5b76d7466f-9w89s:/usr/local/airflow# py-spy dump --pid 5977
Process 5977: /usr/local/bin/python /usr/local/bin/airflow scheduler -n -1
Python v3.7.8 (/usr/local/bin/python3.7)

Thread 5977 (idle): "MainThread"
    _flush_std_streams (multiprocessing/util.py:435)
    _bootstrap (multiprocessing/process.py:317)
    _launch (multiprocessing/popen_fork.py:74)
    __init__ (multiprocessing/popen_fork.py:20)
    _Popen (multiprocessing/context.py:277)
    start (multiprocessing/process.py:112)
    _repopulate_pool (multiprocessing/pool.py:241)
    __init__ (multiprocessing/pool.py:176)
    Pool (multiprocessing/context.py:119)
    sync (airflow/executors/celery_executor.py:247)
    heartbeat (airflow/executors/base_executor.py:134)
    _validate_and_run_task_instances (airflow/jobs/scheduler_job.py:1505)
    _execute_helper (airflow/jobs/scheduler_job.py:1443)
    _execute (airflow/jobs/scheduler_job.py:1382)
    run (airflow/jobs/base_job.py:221)
    scheduler (airflow/bin/cli.py:1040)
    wrapper (airflow/utils/cli.py:75)
    <module> (airflow:37)

root@airflow-scheduler-5b76d7466f-9w89s:/usr/local/airflow# py-spy dump --pid 5978
Process 5978: /usr/local/bin/python /usr/local/bin/airflow scheduler -n -1
Python v3.7.8 (/usr/local/bin/python3.7)

Thread 5978 (idle): "MainThread"
    _flush_std_streams (multiprocessing/util.py:435)
    _bootstrap (multiprocessing/process.py:317)
    _launch (multiprocessing/popen_fork.py:74)
    __init__ (multiprocessing/popen_fork.py:20)
    _Popen (multiprocessing/context.py:277)
    start (multiprocessing/process.py:112)
    _repopulate_pool (multiprocessing/pool.py:241)
    __init__ (multiprocessing/pool.py:176)
    Pool (multiprocessing/context.py:119)
    sync (airflow/executors/celery_executor.py:247)
    heartbeat (airflow/executors/base_executor.py:134)
    _validate_and_run_task_instances (airflow/jobs/scheduler_job.py:1505)
    _execute_helper (airflow/jobs/scheduler_job.py:1443)
    _execute (airflow/jobs/scheduler_job.py:1382)
    run (airflow/jobs/base_job.py:221)
    scheduler (airflow/bin/cli.py:1040)
    wrapper (airflow/utils/cli.py:75)
    <module> (airflow:37)
root@airflow-scheduler-5b76d7466f-9w89s:/usr/local/airflow# kill -9 5978
root@airflow-scheduler-5b76d7466f-9w89s:/usr/local/airflow# kill -9 5977
root@airflow-scheduler-5b76d7466f-9w89s:/usr/local/airflow# ps auxf
USER        PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
root       6040  0.0  0.0  19936  3964 pts/0    Ss   20:18   0:00 bash
root       6071  0.0  0.0  38308  3176 pts/0    R+   20:21   0:00  \_ ps auxf
root          1  2.9  0.7 851904 115828 ?       Ssl  Jul30  54:46 /usr/local/bin/python /usr/local/bin/airflow scheduler -n -1
root         18  0.9  0.5 480420 86616 ?        S    Jul30  18:20 airflow scheduler -- DagFileProcessorManager
root       6069  0.0  0.5 485184 87268 ?        R    20:21   0:00  \_ airflow scheduler - DagFileProcessor /usr/local/airflow/dags/datafactory-kafka2adls-link-1.py
root       6070  0.0  0.5 483640 85684 ?        R    20:21   0:00  \_ airflow scheduler - DagFileProcessor /usr/local/airflow/dags/datafactory-kafka2adls-sfdc-history-1.py

@norwoodj
Copy link

norwoodj commented Aug 19, 2020

We also have this issue:
Apache Airflow version: 1.10.10

Kubernetes version (if you are using kubernetes) (use kubectl version): v1.14.10-gke.42

Environment:

Cloud provider or hardware configuration: Google Cloud Kubernetes
OS (e.g. from /etc/os-release): "Debian GNU/Linux 10 (buster)"
Kernel (e.g. uname -a): Linux airflow-scheduler-77fc4ff87c-k2td5 4.14.138+ #1 SMP Tue Sep 3 02:58:08 PDT 2019 x86_64 GNU/Linux
Install tools:
Others:
What happened: After running correctly for one to a few hours the scheduler simply stops scheduling tasks. No errors appear in any airflow logs (worker and web included). I see CPU go down when it hits the stopping point. We are using postgres/redis

@sglickman
Copy link

This is happening to us also. No errors appear in the logs but the scheduler will not create new pods, pipelines stall with tasks in 'queued' state, and the scheduler pod must be deleted in order to get things running again.

@pingdink
Copy link

Any fix for this issue yet? Our scheduler has no heartbeat, CPU spikes then drops, and scheduler is back up after 15 minutes. This is slowing our team down a lot.

@ashwinshankar77
Copy link
Contributor

Hi, this is happening at Slack too. We are using celery executor. The scheduler just gets stuck, no trace in the logs. Seeing a lot of defunct processes. Restart fixes it. @turbaszek @kaxil @potiuk any ideas what is going on?

@msumit
Copy link
Contributor

msumit commented Sep 2, 2020

We are also facing the same issue with the Airflow 1.10.4 - Mysql - Celery combination. Found that Schedule - DagFileProcessorManager gets hung and we've to kill that to get the scheduler back.

@ashwinshankar77
Copy link
Contributor

@msumit I see the exact same symptom. Please let us know if you find something.

@sdzharkov
Copy link

sdzharkov commented Sep 2, 2020

We've experienced this issue twice now, with the CPU spiking to 100% and failing to schedule any tasks after. Our config is Airflow 1.10.6 - Celery - Postgres running on AWS ECS. I went back into our Cloudwatch logs and noticed the following collection of logs at the time the bug occurred:

  | 2020-07-20T07:21:21.346Z | Process DagFileProcessor4357938-Process:
  | 2020-07-20T07:21:21.346Z | Traceback (most recent call last):
  | 2020-07-20T07:21:21.346Z | File "/usr/local/lib/python3.7/logging/__init__.py", line 1029, in emit
  | 2020-07-20T07:21:21.346Z | self.flush()
  | 2020-07-20T07:21:21.346Z | File "/usr/local/lib/python3.7/logging/__init__.py", line 1009, in flush
  | 2020-07-20T07:21:21.346Z | self.stream.flush()
  | 2020-07-20T07:21:21.346Z | OSError: [Errno 28] No space left on device
  | 2020-07-20T07:21:21.346Z | During handling of the above exception, another exception occurred:

Which would point to the scheduler running out of memory, likely due to log buildup (I added log cleanup tasks retroactively). I'm not sure if this is related to the scheduler getting stuck though.

@dlamblin
Copy link
Contributor

dlamblin commented Sep 7, 2020

Is disk space everyone's issue? I recall either v 1.10.5 or v 1.10.6 had some not-fit-for-production use issue that was fixed in the next version. 1.10.9 has been working okay for us and importantly -n > -1 is not recommended anymore.

I'm curious if you could work around it with AIRFLOW__CORE__BASE_LOG_FOLDER=/dev/null (probably not because it tries to make sub-dirs right)?

In the meantime we have a systemd timer service (or you use cron) that runs basically (gnu) find:

find <base_log_dir> -mindepth 2 -type f -mtime +6 -delete -or -type d -empty -delete

E.G.

$ tree -D dir/
dir/
└── [Sep  6 23:10]  dir
    ├── [Sep  6 23:10]  dir
    │   └── [Jan  1  2020]  file.txt
    ├── [Sep  6 23:09]  diry
    └── [Sep  6 23:10]  dirz
        └── [Sep  6 23:10]  file.txt

4 directories, 2 files
$ find dir -mindepth 2 -type f -mtime +6 -delete -or -type d -empty -delete
$ tree -D dir/
dir/
└── [Sep  6 23:13]  dir
    └── [Sep  6 23:10]  dirz
        └── [Sep  6 23:10]  file.txt

2 directories, 1 file

@msumit
Copy link
Contributor

msumit commented Sep 11, 2020

All system vitals like the disk, cpu, and mem are absolutely fine whenever the stuck happens for us. Whenever the process stuck, it doesn't respond to any other kill signals except 9 & 11.

I did a strace on the stuck process, it shows the following
futex(0x14d9390, FUTEX_WAIT_PRIVATE, 0, NULL

Then I killed the process with kill -11 and loaded the core in gdb, and below is the stack trace

(gdb) bt
#0 0x00007fe49b18b49b in raise () from /lib64/libpthread.so.0
#1
#2 0x00007fe49b189adb in do_futex_wait.constprop.1 () from /lib64/libpthread.so.0
#3 0x00007fe49b189b6f in __new_sem_wait_slow.constprop.0 () from /lib64/libpthread.so.0
#4 0x00007fe49b189c0b in sem_wait@@GLIBC_2.2.5 () from /lib64/libpthread.so.0
#5 0x0000000000430bc5 in PyThread_acquire_lock_timed ()
#6 0x0000000000521a4c in acquire_timed ()
#7 0x0000000000521af6 in rlock_acquire ()
#8 0x00000000004826cd in _PyCFunction_FastCallDict ()
#9 0x00000000004f4143 in call_function ()
#10 0x00000000004f7971 in _PyEval_EvalFrameDefault ()
#11 0x00000000004f33c0 in _PyFunction_FastCall ()
#12 0x00000000004f40d6 in call_function ()
#13 0x00000000004f7971 in _PyEval_EvalFrameDefault ()
#14 0x00000000004f33c0 in _PyFunction_FastCall ()
#15 0x00000000004f40d6 in call_function ()
#16 0x00000000004f7971 in _PyEval_EvalFrameDefault ()
#17 0x00000000004f33c0 in _PyFunction_FastCall ()
#18 0x00000000004f40d6 in call_function ()
#19 0x00000000004f7971 in _PyEval_EvalFrameDefault ()
#20 0x00000000004f33c0 in _PyFunction_FastCall ()
#21 0x00000000004f40d6 in call_function ()

@mik-laj mik-laj added area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug labels Sep 30, 2020
@norwoodj
Copy link

norwoodj commented Oct 6, 2020

If it helps, the last time this happened, with debug logging on, the scheduler logs this:
ending.log before freezing forever and never heartbeating again

@mik-laj
Copy link
Member

mik-laj commented Oct 7, 2020

#11306
This change improves scheduling process management a little and may help us. Could you check it?

@teastburn
Copy link

We also are experiencing a similar issue at Nextdoor with 1.10.12 / Postgres / Celery / AWS ECS. Ours looks much like @sylr 's post #7935 (comment) where we have many extra processes spawned that by program args appear identical to the scheduler main process and everything is stuck. However, ours has CPU go to 0 and RAM spike up quite high.

@teastburn
Copy link

We have a change that correlates (causation is not yet verified) to fixing the issue the @sylr mentioned here where many scheduler main processes spawn at the same time then disappear (which caused an OOM error for us).

The change was the following:

AIRFLOW__CORE__SQL_ALCHEMY_POOL_SIZE
- 5
+ 11
AIRFLOW__CORE__SQL_ALCHEMY_MAX_OVERFLOW
- 10
+ 30
AIRFLOW__CORE__SQL_ALCHEMY_POOL_RECYCLE
- 3600
+ 1800

And we run MAX_THREADS=10. Is it possible that reaching pool_size or pool_size+max_overflow caused processes to back up or spawn oddly? Before this change, the scheduler was getting stuck 1-2 times per day, now we have not seen this issue since the change 6 days ago.

We do not see the issue of many processes spawning at once anymore like this:

$ while true; do pgrep -f 'airflow scheduler' | wc -l; sleep .5; done
39
4
4
4
39
39
39
39
39
5
5
5
5
5
5
5
3
3
3
38
3
3
2
2
2
2
2
37
2
2
2
2
2
2
2
7
2
8
3
8
2
4
3
3
3
3
2
2
2
2
2
2
2
2
4
3
3
3
9
3
3
3
13
3
3
3
17
2
2
2
2
2
2
2
24
2
2
4

Can anyone else verify this change helps or not?

@dispensable
Copy link

Same issue here with 1.10.12 + rabbitmq + celery + k8s. The scheduler keeps logging [2020-10-23 08:10:21,387] {{scheduler_job.py:237}} WARNING - Killing PID 30918 while in the container side shows [airflow schedul] <defunct> generated by airflow scheduler - DagFileProcessor <example_dag.py> over and over again. And the scheduler just get stuck and never schedule any tasks.

@maijh
Copy link

maijh commented Feb 5, 2021

Seeing this on 1.10.9

@milton0825
Copy link
Contributor

milton0825 commented Feb 5, 2021

Seeing this on 1.10.8 with Celery executor.
We are running the scheduler with num duration 900 seconds. It would run fine for a couple of days then suddenly just freeze.

Thread 1 (idle): "MainThread"
    wait (threading.py:295)
    wait (threading.py:551)
    wait (multiprocessing/pool.py:635)
    get (multiprocessing/pool.py:638)
    map (multiprocessing/pool.py:266)
    trigger_tasks (lyft_etl/airflow/executors/lyft_celery_executor.py:90)
    heartbeat (airflow/executors/base_executor.py:130)
    _validate_and_run_task_instances (airflow/jobs/scheduler_job.py:1536)
    _execute_helper (airflow/jobs/scheduler_job.py:1473)
    _execute (airflow/jobs/scheduler_job.py:1412)
    run (airflow/jobs/base_job.py:221)
    scheduler (airflow/bin/cli.py:1117)
    wrapper (airflow/utils/cli.py:75)
    <module> (airflow/bin/airflow:37)
    <module> (airflow:7)
Thread 97067 (idle): "Thread-5667"
    _handle_workers (multiprocessing/pool.py:406)
    run (threading.py:864)
    _bootstrap_inner (threading.py:916)
    _bootstrap (threading.py:884)
Thread 97068 (idle): "Thread-5668"
    wait (threading.py:295)

@dimberman
Copy link
Contributor Author

@ashb perhaps there is somewhere in the scheduler loop where there is a race condition? Would be interesting to see this same thread trace on 2.0.

@ashb
Copy link
Member

ashb commented Feb 7, 2021

Airflow doesn't use threads - so I'm not sure why there are two threads in the about trace.

Oh multiprocessing uses threads internally

@dhuang
Copy link
Contributor

dhuang commented Feb 9, 2021

Started seeing this for the first time ever after we upgraded from 1.10.5 to 1.10.14.

@MatthewRBruce
Copy link
Contributor

MatthewRBruce commented Feb 18, 2021

We just saw this on 2.0.1 when we added a largish number of new DAGs (We're adding around 6000 DAGs total, but this seems to lock up when about 200 try to be scheduled at once).

Here's py-spy stacktraces from our scheduler:

Process 6: /usr/local/bin/python /usr/local/bin/airflow scheduler
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:411)
    send (multiprocessing/connection.py:206)
    send_callback_to_execute (airflow/utils/dag_processing.py:283)
    _send_dag_callbacks_to_processor (airflow/jobs/scheduler_job.py:1795)
    _schedule_dag_run (airflow/jobs/scheduler_job.py:1762)
    _do_scheduling (airflow/jobs/scheduler_job.py:1521)
    _run_scheduler_loop (airflow/jobs/scheduler_job.py:1382)
    _execute (airflow/jobs/scheduler_job.py:1280)
    run (airflow/jobs/base_job.py:237)
    scheduler (airflow/cli/commands/scheduler_command.py:63)
    wrapper (airflow/utils/cli.py:89)
    command (airflow/cli/cli_parser.py:48)
    main (airflow/__main__.py:40)
    <module> (airflow:8)
 
Process 77: airflow scheduler -- DagFileProcessorManager
Python v3.8.7 (/usr/local/bin/python3.8)
Thread 0x7FF5C09C8740 (active): "MainThread"
    _send (multiprocessing/connection.py:368)
    _send_bytes (multiprocessing/connection.py:405)
    send (multiprocessing/connection.py:206)
    _run_parsing_loop (airflow/utils/dag_processing.py:698)
    start (airflow/utils/dag_processing.py:596)
    _run_processor_manager (airflow/utils/dag_processing.py:365)
    run (multiprocessing/process.py:108)
    _bootstrap (multiprocessing/process.py:315)
    _launch (multiprocessing/popen_fork.py:75)
    __init__ (multiprocessing/popen_fork.py:19)
    _Popen (multiprocessing/context.py:277)
    start (multiprocessing/process.py:121)
    start (airflow/utils/dag_processing.py:248)
    _execute (airflow/jobs/scheduler_job.py:1276)
    run (airflow/jobs/base_job.py:237)
    scheduler (airflow/cli/commands/scheduler_command.py:63)
    wrapper (airflow/utils/cli.py:89)
    command (airflow/cli/cli_parser.py:48)
    main (airflow/__main__.py:40)
    <module> (airflow:8)

What I think is happening is that the pipe between the DagFileProcessorAgent and the DagFileProcessorManager is full and is causing the Scheduler to deadlock.

From what I can see the DagFileProcessorAgent only pulls data off the pipe in it's heartbeat and wait_until_finished functions
(

result = self._parent_signal_conn.recv()
)

and that the SchedulerJob is responsible for calling it's heartbeat function each scheduler loop (

self.processor_agent.heartbeat()
).

However, the SchedulerJob is blocked from calling heartbeat because it's blocked forever trying to send data to the same full pipe as part of the _send_dag_callbacks_to_processor in the _do_scheduling_ function causing a deadlock.

@ashb
Copy link
Member

ashb commented Feb 20, 2021

Nice debugging @MatthewRBruce - and your diagnosis seems sound. We'll start on a fix next week.

@kaxil kaxil added this to the Airflow 2.0.2 milestone Feb 20, 2021
@milton0825
Copy link
Contributor

milton0825 commented Feb 24, 2021

Have a theory of why the Airflow scheduler may stuck at CeleryExecutor._send_tasks_to_celery (our scheduler stuck in a different place 😃).

The size of the return value from send_task_to_executor may be huge as the traceback is included in case of failure and looks like it is a known bug [1] in cpython that huge output can cause deadlock in multiprocessing.Pool.

For example, the following code easily deadlock on Python 3.6.3:

import multiprocessing
import time

def f(x):
    return ' ' * 1000000
if __name__ == '__main__':
    with multiprocessing.Pool(1) as p:
        r = p.map(f, ('hi'*100000))

[1] https://bugs.python.org/issue35267

@ashb
Copy link
Member

ashb commented Feb 24, 2021

@milton0825 Sounds plausible for what I know of your usecase 😁 You're still on 1.10.x right? The scheduler on 2.0 sends a lot less data over the MP pipes, (it doesn't send the DAG, that gets written to the DB) so that particular issue won't be for 2.0+

@milton0825
Copy link
Contributor

Right we are still on 1.10.8

@SaithZhang
Copy link

SaithZhang commented Feb 26, 2021

Seeing this on 1.10.14 + CeleryExecutor + python 3.8, will this be fix on 1.10.x? for some reason our company has to use mysql 5.6.

 ps -ef |grep airflow
root       9522      1  1 15:24 ?        00:00:13 /data/anaconda3/envs/airflow/bin/python /data/anaconda3/envs/airflow/bin/airflow webserver -D
root       9528      1  0 15:24 ?        00:00:00 gunicorn: master [airflow-webserver]
root      21238      1  0 15:31 ?        00:00:04 /data/anaconda3/envs/airflow/bin/python /data/anaconda3/envs/airflow/bin/airflow scheduler -D
root      21239  21238  1 15:31 ?        00:00:09 airflow scheduler -- DagFileProcessorManager
root      38695   9528  1 15:42 ?        00:00:01 [ready] gunicorn: worker [airflow-webserver]
root      39492   9528  2 15:43 ?        00:00:01 [ready] gunicorn: worker [airflow-webserver]
root      39644   9528  4 15:43 ?        00:00:01 [ready] gunicorn: worker [airflow-webserver]
root      40455   9528 51 15:44 ?        00:00:01 [ready] gunicorn: worker [airflow-webserver]
root      40503  21239  0 15:44 ?        00:00:00 [airflow schedul] <defunct>
root      40504  21239  0 15:44 ?        00:00:00 [airflow schedul] <defunct>

the [airflow schedul] defunct process is keep restarting all the time.

@ashb
Copy link
Member

ashb commented Feb 26, 2021

@DreamyWen unlikely I'm afraid, at least not by me. I'll happily review a PR if anyone has time to submit it, but can't put any time to fixing this on 1.10 release branch, sorry

@leonsmith
Copy link
Contributor

leonsmith commented Mar 23, 2021

+1 on this issue.

Airflow 2.0.1

CeleryExecutor.

7000 dags~ seems to happen under load (when we have a bunch all dags all kick off at midnight)

py-spy dump --pid 132 --locals
Process 132: /usr/local/bin/python /usr/local/bin/airflow scheduler
Python v3.8.3 (/usr/local/bin/python)
Thread 132 (idle): "MainThread"
  _send (multiprocessing/connection.py:368)
      Arguments::
          self: <Connection at 0x7f5db7aac550>
          buf: <bytes at 0x5564f22e5260>
          write: <builtin_function_or_method at 0x7f5dbed8a540>
      Locals::
          remaining: 1213
  _send_bytes (multiprocessing/connection.py:411)
      Arguments::
          self: <Connection at 0x7f5db7aac550>
          buf: <memoryview at 0x7f5db66f4a00>
      Locals::
          n: 1209
          header: <bytes at 0x7f5dbc01fb10>
  send (multiprocessing/connection.py:206)
      Arguments::
          self: <Connection at 0x7f5db7aac550>
          obj: <TaskCallbackRequest at 0x7f5db7398940>
  send_callback_to_execute (airflow/utils/dag_processing.py:283)
      Arguments::
          self: <DagFileProcessorAgent at 0x7f5db7aac880>
          request: <TaskCallbackRequest at 0x7f5db7398940>
  _process_executor_events (airflow/jobs/scheduler_job.py:1242)
      Arguments::
          self: <SchedulerJob at 0x7f5dbed3dd00>
          session: <Session at 0x7f5db80cf6a0>
      Locals::
          ti_primary_key_to_try_number_map: {("redeacted", "redeacted", <datetime.datetime at 0x7f5db768b540>): 1, ...}
          event_buffer: {...}
          tis_with_right_state: [("redeacted", "redeacted", <datetime.datetime at 0x7f5db768b540>, 1), ...]
          ti_key: ("redeacted", "redeacted", ...)
          value: ("failed", None)
          state: "failed"
          _: None
          filter_for_tis: <BooleanClauseList at 0x7f5db7427df0>
          tis: [<TaskInstance at 0x7f5dbbfd77c0>, <TaskInstance at 0x7f5dbbfd7880>, <TaskInstance at 0x7f5dbbfdd820>, ...]
          ti: <TaskInstance at 0x7f5dbbffba90>
          try_number: 1
          buffer_key: ("redeacted", ...)
          info: None
          msg: "Executor reports task instance %s finished (%s) although the task says its %s. (Info: %s) Was the task killed externally?"
          request: <TaskCallbackRequest at 0x7f5db7398940>
  wrapper (airflow/utils/session.py:62)
      Locals::
          args: (<SchedulerJob at 0x7f5dbed3dd00>)
          kwargs: {"session": <Session at 0x7f5db80cf6a0>}
  _run_scheduler_loop (airflow/jobs/scheduler_job.py:1386)
      Arguments::
          self: <SchedulerJob at 0x7f5dbed3dd00>
      Locals::
          is_unit_test: False
          call_regular_interval: <function at 0x7f5db7ac3040>
          loop_count: 1
          timer: <Timer at 0x7f5db76808b0>
          session: <Session at 0x7f5db80cf6a0>
          num_queued_tis: 17
  _execute (airflow/jobs/scheduler_job.py:1280)
      Arguments::
          self: <SchedulerJob at 0x7f5dbed3dd00>
      Locals::
          pickle_dags: False
          async_mode: True
          processor_timeout_seconds: 600
          processor_timeout: <datetime.timedelta at 0x7f5db7ab9300>
          execute_start_time: <datetime.datetime at 0x7f5db7727510>
  run (airflow/jobs/base_job.py:237)
      Arguments::
          self: <SchedulerJob at 0x7f5dbed3dd00>
      Locals::
          session: <Session at 0x7f5db80cf6a0>
  scheduler (airflow/cli/commands/scheduler_command.py:63)
      Arguments::
          args: <Namespace at 0x7f5db816f6a0>
      Locals::
          job: <SchedulerJob at 0x7f5dbed3dd00>
  wrapper (airflow/utils/cli.py:89)
      Locals::
          args: (<Namespace at 0x7f5db816f6a0>)
          kwargs: {}
          metrics: {"sub_command": "scheduler", "start_datetime": <datetime.datetime at 0x7f5db80f5db0>, ...}
  command (airflow/cli/cli_parser.py:48)
      Locals::
          args: (<Namespace at 0x7f5db816f6a0>)
          kwargs: {}
          func: <function at 0x7f5db8090790>
  main (airflow/__main__.py:40)
      Locals::
          parser: <DefaultHelpParser at 0x7f5dbec13700>
          args: <Namespace at 0x7f5db816f6a0>
  <module> (airflow:8)
py-spy dump --pid 134 --locals
Process 134: airflow scheduler -- DagFileProcessorManager
Python v3.8.3 (/usr/local/bin/python)
Thread 134 (idle): "MainThread"
  _send (multiprocessing/connection.py:368)
      Arguments::
          self: <Connection at 0x7f5db77274f0>
          buf: <bytes at 0x5564f1a76590>
          write: <builtin_function_or_method at 0x7f5dbed8a540>
      Locals::
          remaining: 2276
  _send_bytes (multiprocessing/connection.py:411)
      Arguments::
          self: <Connection at 0x7f5db77274f0>
          buf: <memoryview at 0x7f5db77d7c40>
      Locals::
          n: 2272
          header: <bytes at 0x7f5db6eb1f60>
  send (multiprocessing/connection.py:206)
      Arguments::
          self: <Connection at 0x7f5db77274f0>
          obj: (...)
  _run_parsing_loop (airflow/utils/dag_processing.py:698)
      Locals::
          poll_time: 0.9996239839999816
          loop_start_time: 690.422146969
          ready: [<Connection at 0x7f5db77274f0>]
          agent_signal: <TaskCallbackRequest at 0x7f5db678c8e0>
          sentinel: <Connection at 0x7f5db77274f0>
          processor: <DagFileProcessorProcess at 0x7f5db6eb1910>
          all_files_processed: False
          max_runs_reached: False
          dag_parsing_stat: (...)
          loop_duration: 0.0003760160000183532
  start (airflow/utils/dag_processing.py:596)
      Arguments::
          self: <DagFileProcessorManager at 0x7f5dbcb9c880>
  _run_processor_manager (airflow/utils/dag_processing.py:365)
      Arguments::
          dag_directory: "/code/src/dags"
          max_runs: -1
          processor_factory: <function at 0x7f5db7b30ee0>
          processor_timeout: <datetime.timedelta at 0x7f5db7ab9300>
          signal_conn: <Connection at 0x7f5db77274f0>
          dag_ids: []
          pickle_dags: False
          async_mode: True
      Locals::
          processor_manager: <DagFileProcessorManager at 0x7f5dbcb9c880>
  run (multiprocessing/process.py:108)
      Arguments::
          self: <ForkProcess at 0x7f5db7727220>
  _bootstrap (multiprocessing/process.py:315)
      Arguments::
          self: <ForkProcess at 0x7f5db7727220>
          parent_sentinel: 8
      Locals::
          util: <module at 0x7f5db8011e00>
          context: <module at 0x7f5dbcb8ba90>
  _launch (multiprocessing/popen_fork.py:75)
      Arguments::
          self: <Popen at 0x7f5db7727820>
          process_obj: <ForkProcess at 0x7f5db7727220>
      Locals::
          code: 1
          parent_r: 6
          child_w: 7
          child_r: 8
          parent_w: 9
  __init__ (multiprocessing/popen_fork.py:19)
      Arguments::
          self: <Popen at 0x7f5db7727820>
          process_obj: <ForkProcess at 0x7f5db7727220>
  _Popen (multiprocessing/context.py:276)
      Arguments::
          process_obj: <ForkProcess at 0x7f5db7727220>
      Locals::
          Popen: <type at 0x5564f1a439e0>
  start (multiprocessing/process.py:121)
      Arguments::
          self: <ForkProcess at 0x7f5db7727220>
  start (airflow/utils/dag_processing.py:248)
      Arguments::
          self: <DagFileProcessorAgent at 0x7f5db7aac880>
      Locals::
          mp_start_method: "fork"
          context: <ForkContext at 0x7f5dbcb9ce80>
          child_signal_conn: <Connection at 0x7f5db77274f0>
          process: <ForkProcess at 0x7f5db7727220>
  _execute (airflow/jobs/scheduler_job.py:1276)
      Arguments::
          self: <SchedulerJob at 0x7f5dbed3dd00>
      Locals::
          pickle_dags: False
          async_mode: True
          processor_timeout_seconds: 600
          processor_timeout: <datetime.timedelta at 0x7f5db7ab9300>
  run (airflow/jobs/base_job.py:237)
      Arguments::
          self: <SchedulerJob at 0x7f5dbed3dd00>
      Locals::
          session: <Session at 0x7f5db80cf6a0>
  scheduler (airflow/cli/commands/scheduler_command.py:63)
      Arguments::
          args: <Namespace at 0x7f5db816f6a0>
      Locals::
          job: <SchedulerJob at 0x7f5dbed3dd00>
  wrapper (airflow/utils/cli.py:89)
      Locals::
          args: (<Namespace at 0x7f5db816f6a0>)
          kwargs: {}
          metrics: {"sub_command": "scheduler", "start_datetime": <datetime.datetime at 0x7f5db80f5db0>, ...}
  command (airflow/cli/cli_parser.py:48)
      Locals::
          args: (<Namespace at 0x7f5db816f6a0>)
          kwargs: {}
          func: <function at 0x7f5db8090790>
  main (airflow/__main__.py:40)
      Locals::
          parser: <DefaultHelpParser at 0x7f5dbec13700>
          args: <Namespace at 0x7f5db816f6a0>
  <module> (airflow:8)

@oleksandr-yatsuk
Copy link

We had the same issue with Airflow on Google Cloud until increased the setting AIRFLOW__CORE__SQL_ALCHEMY_MAX_OVERFLOW
The default value was 5, with a change to 60 our Airflow server started to perform very well, including on complex DAGs with around 1000 tasks each.
Any scale-up was resting on the database concurrent connections limit, so the scheduler was not able to perform fast.

@dimberman
Copy link
Contributor Author

@ashb considering what @oleksandr-yatsuk found, maybe this is a database issue?

@leonsmith
Copy link
Contributor

No freezes since bumping AIRFLOW__CORE__SQL_ALCHEMY_MAX_OVERFLOW like @oleksandr-yatsuk suggested

@ashb ashb self-assigned this Mar 30, 2021
@ashb
Copy link
Member

ashb commented Mar 31, 2021

I've got a fix for the case reported by @MatthewRBruce (for 2.0.1) coming in 2.0.2

@yuqian90
Copy link
Contributor

yuqian90 commented May 12, 2021

Hi @ashb I would like to report that we've been seeing something similar to this issue in Airflow 2.0.2 recently.

We are using airflow 2.0.2 with a single airflow-scheduler + a few airflow-worker using CeleryExecutor and postgres backend running dozens of dags each with hundreds to a few thousand tasks. Python version is 3.8.7.

Here's what we saw:
airflow-scheduler sometimes stops heartbeating and stops scheduling any tasks. This seems to happen at random times, about once or twice a week. When this happens, the last line in the scheduler log shows the following, i.e. it stopped writing out any log after receiving signal 15. I did strace the airflow scheduler process. It did not capture any other process sending it signal 15. So most likely the signal 15 was sent by the scheduler to itself.

May 11 21:19:56 airflow[12643]: [2021-05-11 21:19:56,908] {base_executor.py:82} INFO - Adding to queue: ['airflow', 'tasks', 'run', ...]
May 11 21:19:56 airflow[12643]: [2021-05-11 21:19:56,973] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15

When the scheduler was in this state, there was also a child airflow scheduler process shown in ps which was spawned by the main airflow scheduler process. I forgot py-spy dump, but I did use py-spy top to look at the child airflow scheduler process. This was what I saw. It seems to be stuck somewhere in celery_executor.py::_send_tasks_to_celery. This sounds similar to what @milton0825 reported previously although he mentioned he was using Airflow 1.10.8.

When I manually SIGTERM the child airflow scheduler process, it died. And immediately the main airflow scheduler started to heartbeat and schedule tasks again like nothing ever happened. So I suspect somewhere when the airflow scheduler was spawning a child processes, it got stuck. But I still don't understand how it produced a Exiting gracefully upon receiving signal 15 in the log.

Total Samples 7859
GIL: 0.00%, Active: 0.00%, Threads: 1

  %Own   %Total  OwnTime  TotalTime  Function (filename:line)
  0.00%   0.00%   0.540s    0.540s   __enter__ (multiprocessing/synchronize.py:95)
  0.00%   0.00%   0.000s    0.540s   worker (multiprocessing/pool.py:114)
  0.00%   0.00%   0.000s    0.540s   _bootstrap (multiprocessing/process.py:315)
  0.00%   0.00%   0.000s    0.540s   _repopulate_pool (multiprocessing/pool.py:303)
  0.00%   0.00%   0.000s    0.540s   main (airflow/__main__.py:40)
  0.00%   0.00%   0.000s    0.540s   start (multiprocessing/process.py:121)
  0.00%   0.00%   0.000s    0.540s   _send_tasks_to_celery (airflow/executors/celery_executor.py:330)
  0.00%   0.00%   0.000s    0.540s   Pool (multiprocessing/context.py:119)
  0.00%   0.00%   0.000s    0.540s   run (airflow/jobs/base_job.py:237)
  0.00%   0.00%   0.000s    0.540s   _repopulate_pool_static (multiprocessing/pool.py:326)
  0.00%   0.00%   0.000s    0.540s   heartbeat (airflow/executors/base_executor.py:158)
  0.00%   0.00%   0.000s    0.540s   _launch (multiprocessing/popen_fork.py:75)
  0.00%   0.00%   0.000s    0.540s   wrapper (airflow/utils/cli.py:89)
  0.00%   0.00%   0.000s    0.540s   __init__ (multiprocessing/pool.py:212)
  0.00%   0.00%   0.000s    0.540s   _Popen (multiprocessing/context.py:277)

One other observation was that when the airflow scheduler was in the stuck state, the DagFileProcessor processes started by airflow scheduler were still running. I could see them writing out logs to dag_processor_manager.log.

@davidcaron
Copy link
Contributor

@yuqian90 I have almost the exact same environment as you, and I have the same problem.

  • Airflow 2.0.2
  • CeleryExecutor (postgres result backend, and redis broker)
  • Single scheduler
  • Python version 3.8.5

The problem happens roughly twice per day.

I get the same last log message you do: Exiting gracefully upon receiving signal 15 and the exact same py-spy output.

As a last resort, I plan to watch for a hanged subprocess of the scheduler and kill it in a cron job... just like you, when I kill the subprocess manually, the main scheduler process continues as if nothing happened.

@yuqian90
Copy link
Contributor

The same behaviour in my previous comment happened again so I took a py-spy dump of both the main airflow scheduler and the child process. When the scheduler was stuck, the main airflow scheduler is stuck in celery_executor.py::_send_tasks_to_celery in __exit__ of multiprocessing.Pool. The code suggests _terminate_pool() method does send a SIGTERM. That seems to explain why there's a Exiting gracefully upon receiving signal 15 in the scheduler log, although it's not clear why the SIGTERM is sent to the main scheduler process itself.

The child airflow scheduler is stuck in _send_tasks_to_celery when trying to get the lock of SimpleQueue.

This is the py-spy dump of the main airflow scheduler process when it got stuck:

Python v3.8.7

Thread 0x7FB54794E740 (active): "MainThread"
    poll (multiprocessing/popen_fork.py:27)
    wait (multiprocessing/popen_fork.py:47)
    join (multiprocessing/process.py:149)
    _terminate_pool (multiprocessing/pool.py:729)
    __call__ (multiprocessing/util.py:224)
    terminate (multiprocessing/pool.py:654)
    __exit__ (multiprocessing/pool.py:736)
    _send_tasks_to_celery (airflow/executors/celery_executor.py:331)
    _process_tasks (airflow/executors/celery_executor.py:272)
    trigger_tasks (airflow/executors/celery_executor.py:263)
    heartbeat (airflow/executors/base_executor.py:158)
    _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
    _execute (airflow/jobs/scheduler_job.py:1284)
    run (airflow/jobs/base_job.py:237)
    scheduler (airflow/cli/commands/scheduler_command.py:63)
    wrapper (airflow/utils/cli.py:89)
    command (airflow/cli/cli_parser.py:48)
    main (airflow/__main__.py:40)
    <module> (airflow:8)

This is the py-spy dump of the child airflow scheduler process when it got stuck:

Python v3.8.7

Thread 16232 (idle): "MainThread"
    __enter__ (multiprocessing/synchronize.py:95)
    get (multiprocessing/queues.py:355)
    worker (multiprocessing/pool.py:114)
    run (multiprocessing/process.py:108)
    _bootstrap (multiprocessing/process.py:315)
    _launch (multiprocessing/popen_fork.py:75)
    __init__ (multiprocessing/popen_fork.py:19)
    _Popen (multiprocessing/context.py:277)
    start (multiprocessing/process.py:121)
    _repopulate_pool_static (multiprocessing/pool.py:326)
    _repopulate_pool (multiprocessing/pool.py:303)
    __init__ (multiprocessing/pool.py:212)
    Pool (multiprocessing/context.py:119)
    _send_tasks_to_celery (airflow/executors/celery_executor.py:330)
    _process_tasks (airflow/executors/celery_executor.py:272)
    trigger_tasks (airflow/executors/celery_executor.py:263)
    heartbeat (airflow/executors/base_executor.py:158)
    _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
    _execute (airflow/jobs/scheduler_job.py:1284)
    run (airflow/jobs/base_job.py:237)
    scheduler (airflow/cli/commands/scheduler_command.py:63)
    wrapper (airflow/utils/cli.py:89)
    command (airflow/cli/cli_parser.py:48)
    main (airflow/__main__.py:40)
    <module> (airflow:8)

@sterling-jackson
Copy link

sterling-jackson commented May 18, 2021

Have been struggling with this since we migrated to 2.0 our lower environments. Scheduler works for a couple of days, then stops scheduling, but doesn't trigger any heartbeat errors. Not sure it's helpful, but our PROD instance is running smoothly with Airflow 1.10.9 and Python 3.7.8.

Restarting the scheduler brings it back to life after Docker restarts the service.

  • Airflow 2.0.2
  • LocalExecutor (EC2)
  • Single scheduler, running in a Docker container, with and without host networking
  • Postgres backend running on RDS
  • Less than 100 DAGs running on this instance
  • Tasks executed on EKS via KubernetesPodOperator
  • Python version 3.8.9

@ashb
Copy link
Member

ashb commented May 18, 2021

@sterling-jackson Your use case might be fixed by 2.1.0 (currently in RC stage)

@yuqian90
Copy link
Contributor

yuqian90 commented May 19, 2021

Hi @ashb @davidcaron I managed to reproduce this issue consistently with a small reproducing example and traced the problem down to reset_signals() in celery_executor.py. Since it feels like a different issue from the original one reported here, I opened a new issue:
#15938

@thesuperzapper
Copy link
Contributor

I just wanted to share that the User-Community Airflow Helm Chart now has a mitigation for this issue that will automatically restart the scheduler if no tasks are created within some threshold time.

It's called the scheduler "Task Creation Check", but its not enabled by default as, the "threshold" must be longer than your shorted DAG schedule_interval, which we dont know unless the user tells us.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Scheduler Scheduler or dag parsing Issues kind:bug This is a clearly a bug
Projects
None yet