Skip to content

Fix DagProcessorJob crash with ValueError: write to closed file on or…#65002

Open
Saddala wants to merge 2 commits intoapache:mainfrom
Saddala:fix/64959-dag-processor-ValueError-on-kill
Open

Fix DagProcessorJob crash with ValueError: write to closed file on or…#65002
Saddala wants to merge 2 commits intoapache:mainfrom
Saddala:fix/64959-dag-processor-ValueError-on-kill

Conversation

@Saddala
Copy link
Copy Markdown

@Saddala Saddala commented Apr 10, 2026

After SIGKILL, stale socket registrations on the shared selector cause the next select() callback to write to an already-closed BytesLogger handle, raising ValueError: write to closed file and crashing the entire DagProcessorJob.

  • Add _deregister_processor_sockets() to unregister and close all sockets for a killed processor before logger_filehandle.close() is called. Applied in both terminate_orphan_processes and _kill_timed_out_processors.
  • Guard target.log() in process_log_messages_from_subprocess with a ValueError catch that discards remaining buffered output cleanly.

closes: #64959


…phan kill

After SIGKILL, stale socket registrations on the shared selector cause
the next select() callback to write to an already-closed log handle.
Add _deregister_processor_sockets() to clean up sockets before closing
the handle, and guard target.log() in the supervisor against ValueError.

Closes: apache#64959
@boring-cyborg
Copy link
Copy Markdown

boring-cyborg bot commented Apr 10, 2026

Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
Here are some useful points:

  • Pay attention to the quality of your code (ruff, mypy and type annotations). Our prek-hooks will help you with that.
  • In case of a new feature add useful documentation (in docstrings or in docs/ directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it.
  • Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
  • Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
  • Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
  • Be sure to read the Airflow Coding style.
  • Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits.
    Apache Airflow is a community-driven project and together we are making it better 🚀.
    In case of doubts contact the developers at:
    Mailing List: dev@airflow.apache.org
    Slack: https://s.apache.org/airflow-slack

- test_deregister_processor_sockets_suppresses_already_unregistered:
  KeyError from selector.unregister() is suppressed
- test_deregister_processor_sockets_suppresses_close_error:
  OSError from sock.close() is suppressed
- test_terminate_orphan_processes_deregisters_sockets:
  sockets are unregistered and closed before logger_filehandle.close()
- test_kill_timed_out_processors_deregisters_sockets:
  same coverage for the timed-out processors cleanup path
- test_process_log_messages_tolerates_closed_log_handle:
  generator returns cleanly on ValueError instead of propagating it

Generated-by: Claude Sonnet 4.6 following the guidelines
https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions
@Saddala
Copy link
Copy Markdown
Author

Saddala commented Apr 10, 2026

Closes the duplicate issue #64996 as well.
closes: #64996

@kaxil kaxil requested a review from Copilot April 10, 2026 19:55
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

DAG Processor Job crashes with "ValueError: write to closed file" — race condition in terminate_orphan_processes between SIGKILL and log drain

2 participants