-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
output of ps -elf
F S UID PID PPID C PRI NI ADDR SZ WCHAN STIME TTY TIME CMD4 S airflow 1 0 0 80 0 - 2921 do_wai 05:08 ? 00:00:00 sh /airflow/start-airflow.sh
4 S airflow 8 1 1 80 0 - 122377 poll_s 05:08 ? 00:02:08 /opt/conda/envs/airflow_app/bin/python
/opt/conda/envs/airflow_app/bin/airflow scheduler
5 S airflow 19 8 2 80 0 - 288560 futex_ 05:08 ? 00:03:00 /opt/conda/envs/airflow_app/bin/python /opt/conda/envs/airflow_app/bin/airflow scheduler
5 S airflow 27 8 1 80 0 - 308108 futex_ 05:08 ? 00:01:34 /opt/conda/envs/airflow_app/bin/python /opt/conda/envs/airflow_app/bin/airflow scheduler
5 S airflow 34 8 0 80 0 - 121993 sk_wai 05:08 ? 00:00:13 /opt/conda/envs/airflow_app/bin/python /opt/conda/envs/airflow_app/bin/airflow scheduler
5 S airflow 38 8 4 80 0 - 123007 poll_s 05:08 ? 00:05:23 airflow scheduler -- DagFileProcessorManager
4 S airflow 47723 0 0 80 0 - 3476 do_wai 07:12 pts/0 00:00:00 bash
4 R airflow 48145 47723 0 80 0 - 13456 - 07:12 pts/0 00:00:00 ps -elf
5 Z airflow 48457 38 0 80 0 - 0 do_exi 07:12 ? 00:00:00 [airflow schedul]
5 Z airflow 48459 38 0 80 0 - 0 do_exi 07:12 ? 00:00:00 [airflow schedul]
5 Z airflow 48464 38 0 80 0 - 0 do_exi 07:12 ? 00:00:00 [airflow schedul]
5 Z airflow 48467 38 0 80 0 - 0 do_exi 07:12 ? 00:00:00 [airflow schedul]
5 S airflow 48470 38 0 80 0 - 124282 sk_wai 07:12 ? 00:00:00 airflow scheduler - DagFileProcessor /airflow/dags/job1.zip
5 Z airflow 48475 38 0 80 0 - 0 do_exi 07:12 ? 00:00:00 [airflow schedul]
5 R airflow 48477 38 0 80 0 - 126660 - 07:12 ? 00:00:00 airflow scheduler - DagFileProcessor /airflow/dags/BashPipeline-v2-0-0-job.zip
5 Z airflow 48479 38 0 80 0 - 0 do_exi 07:12 ? 00:00:00 [airflow schedul]
5 Z airflow 48481 38 0 80 0 - 0 do_exi 07:12 ? 00:00:00 [airflow schedul]
5 Z airflow 48484 38 0 80 0 - 0 do_exi 07:12 ? 00:00:00 [airflow schedul]
Apache Airflow version: 1.10.10
Kubernetes version (if you are using kubernetes) (use kubectl version): 1.19.5
Environment: Airflow Docker image (python:3.6-slim-buster)
- OS (e.g. from /etc/os-release): RHEL 7.5
- Kernel (e.g.
uname -a): 3.10.0-1160.11.1.el7.x86_64
What happened: DagFileProcessorManager is not clearing defunct processes after running for some duration (Usually hours)
What you expected to happen: The defunct processes should be cleared up by the DagFilePocessorManager
I have the num_runs parameter set to -1 which causes the DagFileProcessorManager to keep spawning processes to try and schedule dags indefinetly (as expected). However, as this process keep on running, we eventually end up with some defunct processes. Ideally these defuncts would have been cleared if the DagFileProcessorManager had exited but due to the num_runs=-1 setting it never exits and the defunct processes are never reaped.
In the provided output of ps -elf, we can see the process 38 has created some child processes which went defunct and would now only exit if the parent was killed or called on the exit status of those defuncts.
How to reproduce it:By Running airflow docker image with num_runs not set in the config or set to -1. Have a few dags in the dags folder for the airflow to keep loading.
Anything else we need to know:
Since processes go defunct waiting for the parent to call upon the exitcode, I'm assuming , adding a call for exit code and a terminate call on finished processors at dag_processing.py - DagFileProcessorManager:collect_results will get the status of the process and kill the processes launched by DagFileProcessorManager once they have finished executing.
All the defunct processes have actually stopped as i can see that in the logs when if processor.done is getting called at collect_result. Ex: one of the defunct processes is 48464 and i see this statement in log "DEBUG - Waiting for <Process(DagFileProcessor48464-Process, stopped)>"
for file_path, processor in finished_processors.items():
if processor.result is None:
self.log.error(
"Processor for %s exited with return code %s.",
processor.file_path, processor.exit_code
)
else:
for simple_dag in processor.result[0]:
simple_dags.append(simple_dag)
self.log.info("Processor for %s exited with return code %s.", processor.file_path, processor.exit_code) -> this line
processor.terminate() -> and this lineHow often does this problem occur? Once? Every time etc? Every time after a few hours of starting the airflow scheduler.