Skip to content

Transport endpoint is not connected #45597

@asemelianov

Description

@asemelianov

Apache Airflow version

2.10.4

If "Other Airflow 2 version" selected, which one?

No response

What happened?

We use the official Helm Chart Airflow with configured CeleryKubernetes Executor and csi-s3. Our scheduler restarts with different periodicity in time, and sometimes it does not start at all. According to the logs, the problem is with the csi-s3 driver, unmount the directory with the logs and until you manually restart the scheduler service, it will hang with an error, but we only have this problem with the scheduler, there are no problems with other services.

--- Logging error ---
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/logging/handlers.py", line 73, in emit
    if self.shouldRollover(record):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/logging/handlers.py", line 193, in shouldRollover
    pos = self.stream.tell()
          ^^^^^^^^^^^^^^^^^^
OSError: [Errno 107] Transport endpoint is not connected
Call stack:
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 62, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 116, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 59, in scheduler
    run_command_with_daemon_option(
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 62, in <lambda>
    callback=lambda: _run_scheduler_job(args),
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 48, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 421, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 450, in execute_job
    ret = execute_callable()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 983, in _execute
    self.processor_agent.start()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 172, in start
    process.start()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71, in _launch
    code = process_obj._bootstrap(parent_sentinel=child_r)
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 247, in _run_processor_manager
    processor_manager.start()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 489, in start
    return self._run_parsing_loop()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 667, in _run_parsing_loop
    self.collect_results()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 1196, in collect_results
    self._collect_results_from_processor(processor)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 1146, in _collect_results_from_processor
    self.log.error(
Message: 'Processor for %s exited with return code %s.'
Arguments: ('/opt/airflow/dags/repo/prod/ozon/ozon_dag.py', 1)
--- Logging error ---
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/logging/handlers.py", line 73, in emit
    if self.shouldRollover(record):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/logging/handlers.py", line 193, in shouldRollover
    pos = self.stream.tell()
          ^^^^^^^^^^^^^^^^^^
OSError: [Errno 107] Transport endpoint is not connected
Call stack:
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 62, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 116, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 59, in scheduler
    run_command_with_daemon_option(
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 62, in <lambda>
    callback=lambda: _run_scheduler_job(args),
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 48, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 421, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 450, in execute_job
    ret = execute_callable()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 983, in _execute
    self.processor_agent.start()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 172, in start
    process.start()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71, in _launch
    code = process_obj._bootstrap(parent_sentinel=child_r)
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 247, in _run_processor_manager
    processor_manager.start()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 489, in start
    return self._run_parsing_loop()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 667, in _run_parsing_loop
    self.collect_results()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 1196, in collect_results
    self._collect_results_from_processor(processor)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 1146, in _collect_results_from_processor
    self.log.error(
Message: 'Processor for %s exited with return code %s.'
Arguments: ('/opt/airflow/dags/repo/prod/utils/dag_status.py', 1)
[2025-01-11T22:44:51.127+0000] {process_utils.py:132} INFO - Sending 15 to group 55. PIDs of all processes in the group: [55]
[2025-01-11T22:44:51.127+0000] {process_utils.py:87} INFO - Sending the signal 15 to group 55
--- Logging error ---
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/logging/handlers.py", line 73, in emit
    if self.shouldRollover(record):
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/logging/handlers.py", line 193, in shouldRollover
    pos = self.stream.tell()
          ^^^^^^^^^^^^^^^^^^
OSError: [Errno 107] Transport endpoint is not connected
Call stack:
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/__main__.py", line 62, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/cli_config.py", line 49, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/cli.py", line 116, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/providers_configuration_loader.py", line 55, in wrapped_function
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 59, in scheduler
    run_command_with_daemon_option(
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/daemon_utils.py", line 86, in run_command_with_daemon_option
    callback()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 62, in <lambda>
    callback=lambda: _run_scheduler_job(args),
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/cli/commands/scheduler_command.py", line 48, in _run_scheduler_job
    run_job(job=job_runner.job, execute_callable=job_runner._execute)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 97, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 421, in run_job
    return execute_job(job, execute_callable=execute_callable)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/job.py", line 450, in execute_job
    ret = execute_callable()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 983, in _execute
    self.processor_agent.start()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 172, in start
    process.start()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.12/multiprocessing/context.py", line 282, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.12/multiprocessing/popen_fork.py", line 71, in _launch
    code = process_obj._bootstrap(parent_sentinel=child_r)
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.12/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 247, in _run_processor_manager
    processor_manager.start()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 489, in start
    return self._run_parsing_loop()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 573, in _run_parsing_loop
    ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
  File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 1136, in wait
    ready = selector.select(timeout)
  File "/usr/local/lib/python3.12/selectors.py", line 415, in select
    fd_event_list = self._selector.poll(timeout)
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/dag_processing/manager.py", line 465, in _exit_gracefully
    self.log.info("Exiting gracefully upon receiving signal %s", signum)
Message: 'Exiting gracefully upon receiving signal %s'
Arguments: (15,)
[2025-01-11T22:44:51.500+0000] {process_utils.py:80} INFO - Process psutil.Process(pid=55, status='terminated', exitcode=0, started='2025-01-09 12:30:47') (55) terminated with exit code 0
[2025-01-11T22:44:51.501+0000] {kubernetes_executor.py:760} INFO - Shutting down Kubernetes executor
[2025-01-11T22:44:51.501+0000] {scheduler_job_runner.py:1011} ERROR - Exception when executing Executor.end on CeleryKubernetesExecutor(parallelism=32)
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 987, in _execute
    self._run_scheduler_loop()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1176, in _run_scheduler_loop
    time.sleep(min(self._scheduler_idle_sleep_time, next_event or 0))
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 263, in _exit_gracefully
    sys.exit(os.EX_OK)
SystemExit: 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/jobs/scheduler_job_runner.py", line 1009, in _execute
    executor.end()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/celery/executors/celery_kubernetes_executor.py", line 254, in end
    self.kubernetes_executor.end()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 763, in end
    self._flush_task_queue()
  File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py", line 719, in _flush_task_queue
    self.log.debug("Executor shutting down, task_queue approximate size=%d", self.task_queue.qsize())
                                                                             ^^^^^^^^^^^^^^^^^^^^^^^
  File "<string>", line 2, in qsize
  File "/usr/local/lib/python3.12/multiprocessing/managers.py", line 820, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 427, in _send_bytes
    self._send(header + buf)
  File "/usr/local/lib/python3.12/multiprocessing/connection.py", line 384, in _send
    n = write(self._handle, buf)
        ^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [Errno 32] Broken pipe
[2025-01-11T22:44:51.521+0000] {process_utils.py:132} INFO - Sending 15 to group 55. PIDs of all processes in the group: []
[2025-01-11T22:44:51.521+0000] {process_utils.py:87} INFO - Sending the signal 15 to group 55
[2025-01-11T22:44:51.521+0000] {process_utils.py:101} INFO - Sending the signal 15 to process 55 as process group is missing.
[2025-01-11T22:44:51.521+0000] {scheduler_job_runner.py:1017} INFO - Exited execute loop
INFO: detected pid 1, running init handler

What you think should happen instead?

No response

How to reproduce

We launch airflow, the tasks are executed and after some time the scheduler service is restarted. It can restart and after the restart it continues to work, or it does not restart and the service stops launching new tasks.

Operating System

Debian GNU/Linux 12 (bookworm)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==9.0.0
apache-airflow-providers-celery==3.8.3
apache-airflow-providers-cncf-kubernetes==9.0.1
apache-airflow-providers-common-compat==1.2.1
apache-airflow-providers-common-io==1.4.2
apache-airflow-providers-common-sql==1.19.0
apache-airflow-providers-docker==3.14.0
apache-airflow-providers-elasticsearch==5.5.2
apache-airflow-providers-fab==1.5.0
apache-airflow-providers-ftp==3.11.1
apache-airflow-providers-google==10.25.0
apache-airflow-providers-grpc==3.6.0
apache-airflow-providers-hashicorp==3.8.0
apache-airflow-providers-http==4.13.2
apache-airflow-providers-imap==3.7.0
apache-airflow-providers-microsoft-azure==11.0.0
apache-airflow-providers-mysql==5.7.3
apache-airflow-providers-odbc==4.8.0
apache-airflow-providers-openlineage==1.13.0
apache-airflow-providers-postgres==5.13.1
apache-airflow-providers-redis==3.8.0
apache-airflow-providers-sendgrid==3.6.0
apache-airflow-providers-sftp==4.11.1
apache-airflow-providers-slack==8.9.1
apache-airflow-providers-smtp==1.8.0
apache-airflow-providers-snowflake==5.8.0
apache-airflow-providers-sqlite==3.9.0
apache-airflow-providers-ssh==3.14.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

k8s

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:Schedulerincluding HA (high availability) schedulerarea:corearea:loggingkind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions