From afc2d471b45a43268324c122eaf0de5c03f6c0b2 Mon Sep 17 00:00:00 2001 From: Ash Berlin-Taylor Date: Tue, 6 Oct 2020 14:37:40 +0100 Subject: [PATCH] Prevent race condition in trying to collect result from DagFileProcessor A rare race condition was noticed in the Scheduler HA PR where the "test_dags_with_system_exit" test would occasionally fail with the following symptoms: - The pipe was "readable" as returned by `multiprocessing.connection.wait` - On reading it yielded an EOFError, meaning the other side had closed the connection - But the process was still alive/running This previously would result in the Manager process dying with an error. This PR makes a few changes: - It ensures that the pipe is simplex, not duplex (we only ever send one data) as this is simpler - We ensure that the "other" end of the pipe is correctly closed in both parent and child processes. Without this the pipe would be kept open (sometimes) until the child process had closed anyway. - When we get an EOFError on reading and the process is still alive, we give it a few seconds to shut down cleanly, and then kill it. --- airflow/jobs/scheduler_job.py | 38 ++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index e0a9f09d03c19..ef593f693c986 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -120,6 +120,7 @@ def file_path(self) -> str: @staticmethod def _run_file_processor( result_channel: MultiprocessingConnection, + parent_channel: MultiprocessingConnection, file_path: str, pickle_dags: bool, dag_ids: Optional[List[str]], @@ -131,6 +132,8 @@ def _run_file_processor( :param result_channel: the connection to use for passing back the result :type result_channel: multiprocessing.Connection + :param parent_channel: the parent end of the channel to close in the child + :type result_channel: multiprocessing.Connection :param file_path: the file to process :type file_path: str :param pickle_dags: whether to pickle the DAGs found in the file and @@ -149,6 +152,13 @@ def _run_file_processor( # This helper runs in the newly created process log: logging.Logger = logging.getLogger("airflow.processor") + # Since we share all open FDs from the parent, we need to close the parent side of the pipe here in + # the child, else it won't get closed properly until we exit. + log.info("Closing parent pipe") + + parent_channel.close() + del parent_channel + set_context(log, file_path) setproctitle("airflow scheduler - DagFileProcessor {}".format(file_path)) @@ -183,11 +193,12 @@ def _run_file_processor( log.exception("Got an exception! Propagating...") raise finally: - result_channel.close() # We re-initialized the ORM within this Process above so we need to # tear it down manually here settings.dispose_orm() + result_channel.close() + def start(self) -> None: """ Launch the process and start processing the DAG. @@ -195,11 +206,12 @@ def start(self) -> None: start_method = self._get_multiprocessing_start_method() context = multiprocessing.get_context(start_method) - self._parent_channel, _child_channel = context.Pipe() + _parent_channel, _child_channel = context.Pipe(duplex=False) process = context.Process( target=type(self)._run_file_processor, args=( _child_channel, + _parent_channel, self.file_path, self._pickle_dags, self._dag_ids, @@ -212,6 +224,15 @@ def start(self) -> None: self._start_time = timezone.utcnow() process.start() + # Close the child side of the pipe now the subprocess has started -- otherwise this would prevent it + # from closing in some cases + _child_channel.close() + del _child_channel + + # Don't store it on self until after we've started the child process - we don't want to keep it from + # getting GCd/closed + self._parent_channel = _parent_channel + def kill(self) -> None: """ Kill the process launched to process the file, and ensure consistent state. @@ -245,6 +266,8 @@ def _kill_process(self) -> None: if self._process.is_alive() and self._process.pid: self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", self._process.pid) os.kill(self._process.pid, signal.SIGKILL) + if self._parent_channel: + self._parent_channel.close() @property def pid(self) -> int: @@ -293,7 +316,16 @@ def done(self) -> bool: self._parent_channel.close() return True except EOFError: - pass + # If we get an EOFError, it means the child end of the pipe has been closed. This only happens + # in the finally block. But due to a possible race condition, the process may have not yet + # terminated (it could be doing cleanup/python shutdown still). So we kill it here after a + # "suitable" timeout. + self._done = True + # Arbitrary timeout -- error/race condition only, so this doesn't need to be tunable. + self._process.join(timeout=5) + if self._process.is_alive(): + # Didn't shut down cleanly - kill it + self._kill_process() if not self._process.is_alive(): self._done = True