diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index e0a9f09d03c1..ef593f693c98 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -120,6 +120,7 @@ def file_path(self) -> str: @staticmethod def _run_file_processor( result_channel: MultiprocessingConnection, + parent_channel: MultiprocessingConnection, file_path: str, pickle_dags: bool, dag_ids: Optional[List[str]], @@ -131,6 +132,8 @@ def _run_file_processor( :param result_channel: the connection to use for passing back the result :type result_channel: multiprocessing.Connection + :param parent_channel: the parent end of the channel to close in the child + :type result_channel: multiprocessing.Connection :param file_path: the file to process :type file_path: str :param pickle_dags: whether to pickle the DAGs found in the file and @@ -149,6 +152,13 @@ def _run_file_processor( # This helper runs in the newly created process log: logging.Logger = logging.getLogger("airflow.processor") + # Since we share all open FDs from the parent, we need to close the parent side of the pipe here in + # the child, else it won't get closed properly until we exit. + log.info("Closing parent pipe") + + parent_channel.close() + del parent_channel + set_context(log, file_path) setproctitle("airflow scheduler - DagFileProcessor {}".format(file_path)) @@ -183,11 +193,12 @@ def _run_file_processor( log.exception("Got an exception! Propagating...") raise finally: - result_channel.close() # We re-initialized the ORM within this Process above so we need to # tear it down manually here settings.dispose_orm() + result_channel.close() + def start(self) -> None: """ Launch the process and start processing the DAG. @@ -195,11 +206,12 @@ def start(self) -> None: start_method = self._get_multiprocessing_start_method() context = multiprocessing.get_context(start_method) - self._parent_channel, _child_channel = context.Pipe() + _parent_channel, _child_channel = context.Pipe(duplex=False) process = context.Process( target=type(self)._run_file_processor, args=( _child_channel, + _parent_channel, self.file_path, self._pickle_dags, self._dag_ids, @@ -212,6 +224,15 @@ def start(self) -> None: self._start_time = timezone.utcnow() process.start() + # Close the child side of the pipe now the subprocess has started -- otherwise this would prevent it + # from closing in some cases + _child_channel.close() + del _child_channel + + # Don't store it on self until after we've started the child process - we don't want to keep it from + # getting GCd/closed + self._parent_channel = _parent_channel + def kill(self) -> None: """ Kill the process launched to process the file, and ensure consistent state. @@ -245,6 +266,8 @@ def _kill_process(self) -> None: if self._process.is_alive() and self._process.pid: self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", self._process.pid) os.kill(self._process.pid, signal.SIGKILL) + if self._parent_channel: + self._parent_channel.close() @property def pid(self) -> int: @@ -293,7 +316,16 @@ def done(self) -> bool: self._parent_channel.close() return True except EOFError: - pass + # If we get an EOFError, it means the child end of the pipe has been closed. This only happens + # in the finally block. But due to a possible race condition, the process may have not yet + # terminated (it could be doing cleanup/python shutdown still). So we kill it here after a + # "suitable" timeout. + self._done = True + # Arbitrary timeout -- error/race condition only, so this doesn't need to be tunable. + self._process.join(timeout=5) + if self._process.is_alive(): + # Didn't shut down cleanly - kill it + self._kill_process() if not self._process.is_alive(): self._done = True