Skip to content

Commit

Permalink
Fix processor cleanup on DagFileProcessorManager (#22685)
Browse files Browse the repository at this point in the history
* Fix processor cleanup

References to processors weren't being cleaned up after
killing them in the event of a timeout. This lead to
a crash caused by an unhandled exception when trying to
read from a closed end of a pipe.

* Reap the zombie when killing the processor

When calling `_kill_process()` we're generating
zombies which weren't being `wait()`ed for. This
led to a process leak we fix by just calling
`waitpid()` on the appropriate PIDs.

* Reap resulting zombies in a safe way

According to @potiuk's and @malthe's input, the way
we were reaping the zombies could cause some racy and
unwanted situations. As seen on the discussion over at
`https://bugs.python.org/issue42558` we can safely
reap the spawned zombies with the changes we have
introduced.

* Explain why we are actively waiting

As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now...

Co-authored-by: Jarek Potiuk <jarek@potiuk.com>

* Fix small typo and triling whitespace

After accepting the changes proposed on the PR
we found a small typo (we make those on a daily basis)
and a trailing whitespace we though was nice to delete.
Hope we made the right choice!

* Fix call to `poll()`

We were calling `poll()` through the `_process` attribute
and, as shown on the static checks triggered by GitHub,
it's not defined for the `BaseProcess` class. We instead
have to call `poll()` through `BaseProcess`'s `_popen`
attribute.

* Fix processor cleanup

References to processors weren't being cleaned up after
killing them in the event of a timeout. This lead to
a crash caused by an unhandled exception when trying to
read from a closed end of a pipe.

* Reap the zombie when killing the processor

When calling `_kill_process()` we're generating
zombies which weren't being `wait()`ed for. This
led to a process leak we fix by just calling
`waitpid()` on the appropriate PIDs.

* Reap resulting zombies in a safe way

According to @potiuk's and @malthe's input, the way
we were reaping the zombies could cause some racy and
unwanted situations. As seen on the discussion over at
`https://bugs.python.org/issue42558` we can safely
reap the spawned zombies with the changes we have
introduced.

* Explain why we are actively waiting

As suggested by @potiuk explaining why we chose to actively wait on an scenario such as this one can indeed be useful for anybody taking a look at the code some time from now...

Co-authored-by: Jarek Potiuk <jarek@potiuk.com>

* Fix small typo and triling whitespace

After accepting the changes proposed on the PR
we found a small typo (we make those on a daily basis)
and a trailing whitespace we though was nice to delete.
Hope we made the right choice!

* Fix call to `poll()`

We were calling `poll()` through the `_process` attribute
and, as shown on the static checks triggered by GitHub,
it's not defined for the `BaseProcess` class. We instead
have to call `poll()` through `BaseProcess`'s `_popen`
attribute.

* Prevent static check from failing

After reading through `multiprocessing`'s implementation we
really didn't know why the static check on line `239` was
failing: the process should contain a `_popen` attribute...
That's when we found line `223` and discovered the trailing
`# type: ignore` comment. After reading up on it we found
that it instructs *MyPy* not to statically check that very
line. Given we're having trouble with the exact same attribute
we decided to include the same directive for the static checker.
Hope we made the right call!

* Fix test for `_kill_timed_out_processors()`

We hadn't updated the tests for the method whose
body we've altered. This caused the tests to fail
when trying to retrieve a processor's *waitable*,
a property similar to a *file descriptor* in
UNIX-like systems. We have added a mock property to
the `processor` and we've also updated the `manager`'s
attributes so as to faithfully recreate the state of
the data sctructures at a moment when a `processor`
is to be terminated.

Please note the `assertions` at the end are meant to
check we reach the `manager`'s expected state. We have
chosen to check the number of processor's against an
explicit value because we're defining `manager._processors`
explicitly within the test. On the other hand, `manager.waitables`
can have a different length depending on the call to
`DagFileProcessorManager`'s `__init__()`. In this test the
expected initial length is `1` given we're passing `MagicMock()`
as the `signal_conn` when instantiating the manager. However,
if this were to be changed the tests would 'inexplicably' fail.
Instead of checking `manager.waitables`' length against a hardcoded
value we decided to instead compare it to its initial length
so as to emphasize we're interested in the change in length, not
its absolute value.

* Fix `black` checks and `mock` decorators

One of the methods we are to mock required a rather
long `@mock.patch` decorator which didn't pass the
checks made by `black` on the precommit hooks. On
top of that, we messed up the ordering of the
`@mock.patch` decorators which meant we didn't
set them up properly. This manifested as a `KeyError`
on the method we're currently testing. O_o

Co-authored-by: Jarek Potiuk <jarek@potiuk.com>
  • Loading branch information
pcolladosoto and potiuk committed Apr 6, 2022
1 parent c0c08b2 commit 4a06f89
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 1 deletion.
9 changes: 9 additions & 0 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,7 @@ def prepare_file_path_queue(self):
def _kill_timed_out_processors(self):
"""Kill any file processors that timeout to defend against process hangs."""
now = timezone.utcnow()
processors_to_remove = []
for file_path, processor in self._processors.items():
duration = now - processor.start_time
if duration > self._processor_timeout:
Expand All @@ -1080,6 +1081,14 @@ def _kill_timed_out_processors(self):
Stats.incr('dag_file_processor_timeouts')
processor.kill()

# Clean up processor references
self.waitables.pop(processor.waitable_handle)
processors_to_remove.append(file_path)

# Clean up `self._processors` after iterating over it
for proc in processors_to_remove:
self._processors.pop(proc)

def max_runs_reached(self):
""":return: whether all file paths have been processed max_runs times"""
if self._max_runs == -1: # Unlimited runs.
Expand Down
7 changes: 7 additions & 0 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import os
import signal
import threading
import time
from contextlib import redirect_stderr, redirect_stdout, suppress
from datetime import timedelta
from multiprocessing.connection import Connection as MultiprocessingConnection
Expand Down Expand Up @@ -231,6 +232,12 @@ def _kill_process(self) -> None:
if self._process.is_alive() and self._process.pid:
self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", self._process.pid)
os.kill(self._process.pid, signal.SIGKILL)

# Reap the spawned zombie. We active wait, because in Python 3.9 `waitpid` might lead to an
# exception, due to change in Python standard library and possibility of race condition
# see https://bugs.python.org/issue42558
while self._process._popen.poll() is None: # type: ignore
time.sleep(0.001)
if self._parent_channel:
self._parent_channel.close()

Expand Down
10 changes: 9 additions & 1 deletion tests/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,10 +501,14 @@ def test_deactivate_stale_dags(self):

assert len(active_dags) == 0

@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", new_callable=PropertyMock
)
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.kill")
def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid, mock_waitable_handle):
mock_pid.return_value = 1234
mock_waitable_handle.return_value = 3
manager = DagFileProcessorManager(
dag_directory='directory',
max_runs=1,
Expand All @@ -518,8 +522,12 @@ def test_kill_timed_out_processors_kill(self, mock_kill, mock_pid):
processor = DagFileProcessorProcess('abc.txt', False, [], [])
processor._start_time = timezone.make_aware(datetime.min)
manager._processors = {'abc.txt': processor}
manager.waitables[3] = processor
initial_waitables = len(manager.waitables)
manager._kill_timed_out_processors()
mock_kill.assert_called_once_with()
assert len(manager._processors) == 0
assert len(manager.waitables) == initial_waitables - 1

@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess.pid", new_callable=PropertyMock)
@mock.patch("airflow.dag_processing.processor.DagFileProcessorProcess")
Expand Down

0 comments on commit 4a06f89

Please sign in to comment.