Skip to content

CeleryKubernetesExecutor does not shutdown gracefully in 2.2.0 #18989

@dstandish

Description

@dstandish

Apache Airflow version

2.2.0 (latest released)

Operating System

debian

Deployment

Official Apache Airflow Helm Chart

Running CeleryKubernetesExecutor

What happened

I noticed that scheduler was restarting very frequently after upgrading to 2.2.0. It turns out this was due to the slow liveness probe (see #19001).

And when I looked at the logs I found that when kubernetes executor tried to shut down, it errored out.

[2021-10-14 16:00:45,192] {scheduler_job.py:161} INFO - Exiting gracefully upon receiving signal 15
[2021-10-14 16:00:46,194] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 33
[2021-10-14 16:00:46,673] {process_utils.py:66} INFO - Process psutil.Process(pid=33, status='terminated', exitcode=0, started='15:56:00') (33) terminated with exit code 0
[2021-10-14 16:00:46,741] {kubernetes_executor.py:787} INFO - Shutting down Kubernetes executor
[2021-10-14 16:00:46,741] {scheduler_job.py:609} ERROR - Exception when executing Executor.end
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 587, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 689, in _run_scheduler_loop
    time.sleep(min(self._scheduler_idle_sleep_time, next_event))
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 164, in _exit_gracefully
    sys.exit(os.EX_OK)
SystemExit: 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 607, in _execute
    self.executor.end()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_kubernetes_executor.py", line 177, in end
    self.kubernetes_executor.end()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/kubernetes_executor.py", line 789, in end
    self._flush_task_queue()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/kubernetes_executor.py", line 742, in _flush_task_queue
    self.log.debug('Executor shutting down, task_queue approximate size=%d', self.task_queue.qsize())
  File "<string>", line 2, in qsize
  File "/usr/local/lib/python3.8/multiprocessing/managers.py", line 834, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 411, in _send_bytes
    self._send(header + buf)
  File "/usr/local/lib/python3.8/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
[2021-10-14 16:00:46,744] {process_utils.py:100} INFO - Sending Signals.SIGTERM to GPID 33
[2021-10-14 16:00:46,744] {scheduler_job.py:614} INFO - Exited execute loop

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions