Skip to content

Commit

Permalink
fixup! Composer core patch
Browse files Browse the repository at this point in the history
Fix processor cleanup on DagFileProcessorManager - cherrypick from
Community apache/airflow#22685

Change-Id: I7e505840325b5f61bd96238424caedf9f9afe19e
GitOrigin-RevId: 9e20cd0bf3bd2fa67115b1d4a81a6e2009e49936
  • Loading branch information
Cloud Composer Team committed Aug 30, 2022
1 parent f9d0926 commit 1d5920e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
9 changes: 9 additions & 0 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,7 @@ def _find_zombies(self, session):
def _kill_timed_out_processors(self):
"""Kill any file processors that timeout to defend against process hangs."""
now = timezone.utcnow()
processors_to_remove = []
for file_path, processor in self._processors.items():
duration = now - processor.start_time
if duration > self._processor_timeout:
Expand All @@ -1152,6 +1153,14 @@ def _kill_timed_out_processors(self):
Stats.incr('dag_file_processor_timeouts')
processor.kill()

# Clean up processor references
self.waitables.pop(processor.waitable_handle)
processors_to_remove.append(file_path)

# Clean up `self._processors` after iterating over it
for proc in processors_to_remove:
self._processors.pop(proc)

def max_runs_reached(self):
""":return: whether all file paths have been processed max_runs times"""
if self._max_runs == -1: # Unlimited runs.
Expand Down
7 changes: 7 additions & 0 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import signal
import threading
import time
from contextlib import redirect_stderr, redirect_stdout, suppress
from datetime import timedelta
from multiprocessing.connection import Connection as MultiprocessingConnection
Expand Down Expand Up @@ -243,6 +244,12 @@ def _kill_process(self) -> None:
if self._process.is_alive() and self._process.pid:
self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", self._process.pid)
os.kill(self._process.pid, signal.SIGKILL)

# Reap the spawned zombie. We active wait, because in Python 3.9 `waitpid` might lead to an
# exception, due to change in Python standard library and possibility of race condition
# see https://bugs.python.org/issue42558
while self._process._popen.poll() is None: # type: ignore
time.sleep(0.001)
if self._parent_channel:
self._parent_channel.close()

Expand Down
10 changes: 9 additions & 1 deletion tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,10 +643,14 @@ def test_deactivate_stale_dags(self):

assert len(active_dags) == 0

@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock
)
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill")
def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid, mock_waitable_handle):
mock_pid.return_value = 1234
mock_waitable_handle.return_value = 3
manager = DagFileProcessorManager(
dag_directory='directory',
max_runs=1,
Expand All @@ -660,8 +664,12 @@ def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
processor = DagFileProcessorProcess('abc.txt', False, [], [])
processor._start_time = timezone.make_aware(datetime.min)
manager._processors = {'abc.txt': processor}
manager.waitables[3] = processor
initial_waitables = len(manager.waitables)
manager._kill_timed_out_processors()
mock_kill.assert_called_once_with()
assert len(manager._processors) == 0
assert len(manager.waitables) == initial_waitables - 1

@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess")
Expand Down

0 comments on commit 1d5920e

Please sign in to comment.